From: fhanik Date: Tue, 25 Nov 2008 22:57:24 +0000 (+0000) Subject: Extend the fair blocking queue to allow asynchronous polling X-Git-Url: https://git.internetallee.de/?a=commitdiff_plain;h=0259fc4519869d8c60db30f09bc72c1412e521ef;p=tomcat7.0 Extend the fair blocking queue to allow asynchronous polling git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@720641 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java b/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java index 4eea64be8..fb024b0ee 100644 --- a/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java +++ b/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/FairBlockingQueue.java @@ -21,7 +21,10 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantLock; /** @@ -47,6 +50,9 @@ public class FairBlockingQueue implements BlockingQueue { //------------------------------------------------------------------ // USED BY CONPOOL IMPLEMENTATION //------------------------------------------------------------------ + /** + * {@inheritDoc} + */ public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); @@ -65,10 +71,16 @@ public class FairBlockingQueue implements BlockingQueue { return true; } + /** + * {@inheritDoc} + */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { return offer(e); } + /** + * {@inheritDoc} + */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { E result = null; final ReentrantLock lock = this.lock; @@ -97,7 +109,39 @@ public class FairBlockingQueue implements BlockingQueue { } return result; } - + + /** + * Request an item from the queue asynchronously + * @return - a future pending the result from the queue poll request + */ + public Future pollAsync() { + Future result = null; + final ReentrantLock lock = this.lock; + boolean error = true; + lock.lock(); + try { + E item = items.poll(); + if (item==null) { + ExchangeCountDownLatch c = new ExchangeCountDownLatch(1); + waiters.addLast(c); + lock.unlock(); + result = new ItemFuture(c); + } else { + lock.unlock(); + result = new ItemFuture(item); + } + error = false; + } finally { + if (error && lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + return result; + } + + /** + * {@inheritDoc} + */ public boolean remove(Object e) { final ReentrantLock lock = this.lock; lock.lock(); @@ -107,15 +151,24 @@ public class FairBlockingQueue implements BlockingQueue { lock.unlock(); } } - + + /** + * {@inheritDoc} + */ public int size() { return items.size(); } + /** + * {@inheritDoc} + */ public Iterator iterator() { return new FairIterator(); } + /** + * {@inheritDoc} + */ public E poll() { final ReentrantLock lock = this.lock; lock.lock(); @@ -126,6 +179,9 @@ public class FairBlockingQueue implements BlockingQueue { } } + /** + * {@inheritDoc} + */ public boolean contains(Object e) { final ReentrantLock lock = this.lock; lock.lock(); @@ -140,31 +196,53 @@ public class FairBlockingQueue implements BlockingQueue { //------------------------------------------------------------------ // NOT USED BY CONPOOL IMPLEMENTATION //------------------------------------------------------------------ - + /** + * {@inheritDoc} + */ public boolean add(E e) { return offer(e); } + /** + * {@inheritDoc} + * @throws UnsupportedOperation - this operation is not supported + */ public int drainTo(Collection c, int maxElements) { throw new UnsupportedOperationException("int drainTo(Collection c, int maxElements)"); } + /** + * {@inheritDoc} + * @throws UnsupportedOperation - this operation is not supported + */ public int drainTo(Collection c) { return drainTo(c,Integer.MAX_VALUE); } + /** + * {@inheritDoc} + */ public void put(E e) throws InterruptedException { offer(e); } + /** + * {@inheritDoc} + */ public int remainingCapacity() { return Integer.MAX_VALUE - size(); } + /** + * {@inheritDoc} + */ public E take() throws InterruptedException { return this.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS); } + /** + * {@inheritDoc} + */ public boolean addAll(Collection c) { Iterator i = c.iterator(); while (i.hasNext()) { @@ -174,56 +252,146 @@ public class FairBlockingQueue implements BlockingQueue { return true; } + /** + * {@inheritDoc} + * @throws UnsupportedOperation - this operation is not supported + */ public void clear() { throw new UnsupportedOperationException("void clear()"); } + /** + * {@inheritDoc} + * @throws UnsupportedOperation - this operation is not supported + */ public boolean containsAll(Collection c) { throw new UnsupportedOperationException("boolean containsAll(Collection c)"); } + /** + * {@inheritDoc} + */ public boolean isEmpty() { return size() == 0; } + /** + * {@inheritDoc} + * @throws UnsupportedOperation - this operation is not supported + */ public boolean removeAll(Collection c) { throw new UnsupportedOperationException("boolean removeAll(Collection c)"); } + /** + * {@inheritDoc} + * @throws UnsupportedOperation - this operation is not supported + */ public boolean retainAll(Collection c) { throw new UnsupportedOperationException("boolean retainAll(Collection c)"); } + /** + * {@inheritDoc} + * @throws UnsupportedOperation - this operation is not supported + */ public Object[] toArray() { throw new UnsupportedOperationException("Object[] toArray()"); } + /** + * {@inheritDoc} + * @throws UnsupportedOperation - this operation is not supported + */ public T[] toArray(T[] a) { throw new UnsupportedOperationException(" T[] toArray(T[] a)"); } + /** + * {@inheritDoc} + * @throws UnsupportedOperation - this operation is not supported + */ public E element() { throw new UnsupportedOperationException("E element()"); } + /** + * {@inheritDoc} + * @throws UnsupportedOperation - this operation is not supported + */ public E peek() { throw new UnsupportedOperationException("E peek()"); } + /** + * {@inheritDoc} + * @throws UnsupportedOperation - this operation is not supported + */ public E remove() { throw new UnsupportedOperationException("E remove()"); } + //------------------------------------------------------------------ + // Future used to check and see if a connection has been made available + //------------------------------------------------------------------ + protected class ItemFuture implements Future { + protected volatile T item = null; + protected volatile ExchangeCountDownLatch latch = null; + protected volatile boolean canceled = false; + + public ItemFuture(T item) { + this.item = item; + } + + public ItemFuture(ExchangeCountDownLatch latch) { + this.latch = latch; + } + + public boolean cancel(boolean mayInterruptIfRunning) { + return false; //don't allow cancel for now + } + public T get() throws InterruptedException, ExecutionException { + if (item!=null) { + return item; + } else if (latch!=null) { + latch.await(); + return latch.getItem(); + } else { + throw new ExecutionException("ItemFuture incorrectly instantiated. Bug in the code?", new Exception()); + } + } + + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + if (item!=null) { + return item; + } else if (latch!=null) { + boolean timedout = !latch.await(timeout, unit); + if (timedout) throw new TimeoutException(); + else return latch.getItem(); + } else { + throw new ExecutionException("ItemFuture incorrectly instantiated. Bug in the code?", new Exception()); + } + } + + public boolean isCancelled() { + return false; + } + + public boolean isDone() { + return (item!=null || latch.getItem()!=null); + } + + } //------------------------------------------------------------------ // Count down latch that can be used to exchange information //------------------------------------------------------------------ protected class ExchangeCountDownLatch extends CountDownLatch { - protected T item; + protected volatile T item; public ExchangeCountDownLatch(int i) { super(i); } diff --git a/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TestAsyncQueue.java b/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TestAsyncQueue.java new file mode 100644 index 000000000..a8ad34cd3 --- /dev/null +++ b/modules/jdbc-pool/test/org/apache/tomcat/jdbc/test/TestAsyncQueue.java @@ -0,0 +1,66 @@ +package org.apache.tomcat.jdbc.test; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.tomcat.jdbc.pool.FairBlockingQueue; + +import junit.framework.TestCase; + +public class TestAsyncQueue extends TestCase { + protected FairBlockingQueue queue = null; + protected void setUp() throws Exception { + super.setUp(); + this.queue = new FairBlockingQueue(); + } + + protected void tearDown() throws Exception { + this.queue = null; + super.tearDown(); + } + + + public void testAsyncPoll1() throws Exception { + Object item = new Object(); + queue.offer(item); + Future future = queue.pollAsync(); + assertEquals(future.get(),item); + } + + public void testAsyncPoll2() throws Exception { + Object item = new Object(); + OfferThread thread = new OfferThread(item,5000); + thread.start(); + Future future = queue.pollAsync(); + try { + future.get(2000, TimeUnit.MILLISECONDS); + this.assertFalse("Request should have timed out",true); + }catch (TimeoutException x) { + this.assertTrue("Request timed out properly",true); + }catch (Exception x) { + this.assertTrue("Request threw an error",false); + x.printStackTrace(); + } + assertEquals(future.get(),item); + } + + protected class OfferThread extends Thread { + Object item = null; + long delay = 5000; + volatile boolean offered = false; + public OfferThread(Object i, long d) { + this.item = i; + this.delay = d; + this.setDaemon(false); + this.setName(TestAsyncQueue.class.getName()+"-OfferThread"); + } + public void run() { + try { + this.sleep(delay); + }catch (Exception ignore){} + offered = true; + TestAsyncQueue.this.queue.offer(item); + } + } +}