implement cancellation ability for future
authorfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Tue, 2 Dec 2008 16:02:16 +0000 (16:02 +0000)
committerfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Tue, 2 Dec 2008 16:02:16 +0000 (16:02 +0000)
git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@722506 13f79535-47bb-0310-9956-ffa450edef68

modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/ConnectionPool.java
modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java

index 751a12a..b984dae 100644 (file)
@@ -31,6 +31,8 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -100,6 +102,11 @@ public class ConnectionPool {
      */
     protected Constructor proxyClassConstructor;
 
+    /**
+     * Executor service used to cancel Futures
+     */
+    protected ThreadPoolExecutor cancellator = new ThreadPoolExecutor(0,1,1000,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
+
 
     //===============================================================================
     //         PUBLIC METHODS
@@ -741,22 +748,27 @@ public class ConnectionPool {
      * This one retrieves the pooled connection object
      * and performs the initialization according to 
      * interceptors and validation rules.
-     * This class is thread safe.
+     * This class is thread safe and is cancellable
      * @author fhanik
      *
      */
-    protected class ConnectionFuture implements Future<Connection> {
+    protected class ConnectionFuture implements Future<Connection>, Runnable {
         Future<PooledConnection> pcFuture = null;
         AtomicBoolean configured = new AtomicBoolean(false);
         CountDownLatch latch = new CountDownLatch(1);
         Connection result = null;
         SQLException cause = null;
+        AtomicBoolean cancelled = new AtomicBoolean(false);
         public ConnectionFuture(Future<PooledConnection> pcf) {
             this.pcFuture = pcf;
         }
         
         public boolean cancel(boolean mayInterruptIfRunning) {
-            return pcFuture.cancel(mayInterruptIfRunning);
+            if ((!cancelled.get()) && cancelled.compareAndSet(false, true)) {
+                //cancel by retrieving the connection and returning it to the pool
+                ConnectionPool.this.cancellator.execute(this);
+            }
+            return true;
         }
 
         public Connection get() throws InterruptedException, ExecutionException {
@@ -792,13 +804,24 @@ public class ConnectionPool {
         }
 
         public boolean isCancelled() {
-            return pcFuture.isCancelled();
+            return pcFuture.isCancelled() || cancelled.get();
         }
 
         public boolean isDone() {
             return pcFuture.isDone();
         }
         
+        public void run() {
+            try {
+                Connection con = get(); //complete this future
+                con.close(); //return to the pool
+            }catch (ExecutionException ex) {
+                //we can ignore this
+            }catch (Exception x) {
+                ConnectionPool.log.error("Unable to cancel ConnectionFuture.",x);
+            }
+        }
+        
     }
 
     protected class PoolCleaner extends Thread {
index bf2d5ce..25e0bec 100644 (file)
@@ -23,6 +23,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.ReentrantLock;
@@ -41,7 +42,7 @@ public class FairBlockingQueue<E> implements BlockingQueue<E> {
     LinkedList<E> items = null;
 
     LinkedList<ExchangeCountDownLatch<E>> waiters = null;
-
+    
     public FairBlockingQueue() {
         items = new LinkedList<E>();
         waiters = new LinkedList<ExchangeCountDownLatch<E>>();