import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
*/
protected Constructor proxyClassConstructor;
+ /**
+ * Executor service used to cancel Futures
+ */
+ protected ThreadPoolExecutor cancellator = new ThreadPoolExecutor(0,1,1000,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
+
//===============================================================================
// PUBLIC METHODS
* This one retrieves the pooled connection object
* and performs the initialization according to
* interceptors and validation rules.
- * This class is thread safe.
+ * This class is thread safe and is cancellable
* @author fhanik
*
*/
- protected class ConnectionFuture implements Future<Connection> {
+ protected class ConnectionFuture implements Future<Connection>, Runnable {
Future<PooledConnection> pcFuture = null;
AtomicBoolean configured = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
Connection result = null;
SQLException cause = null;
+ AtomicBoolean cancelled = new AtomicBoolean(false);
public ConnectionFuture(Future<PooledConnection> pcf) {
this.pcFuture = pcf;
}
public boolean cancel(boolean mayInterruptIfRunning) {
- return pcFuture.cancel(mayInterruptIfRunning);
+ if ((!cancelled.get()) && cancelled.compareAndSet(false, true)) {
+ //cancel by retrieving the connection and returning it to the pool
+ ConnectionPool.this.cancellator.execute(this);
+ }
+ return true;
}
public Connection get() throws InterruptedException, ExecutionException {
}
public boolean isCancelled() {
- return pcFuture.isCancelled();
+ return pcFuture.isCancelled() || cancelled.get();
}
public boolean isDone() {
return pcFuture.isDone();
}
+ public void run() {
+ try {
+ Connection con = get(); //complete this future
+ con.close(); //return to the pool
+ }catch (ExecutionException ex) {
+ //we can ignore this
+ }catch (Exception x) {
+ ConnectionPool.log.error("Unable to cancel ConnectionFuture.",x);
+ }
+ }
+
}
protected class PoolCleaner extends Thread {
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
LinkedList<E> items = null;
LinkedList<ExchangeCountDownLatch<E>> waiters = null;
-
+
public FairBlockingQueue() {
items = new LinkedList<E>();
waiters = new LinkedList<ExchangeCountDownLatch<E>>();