From b7cdb826020dc3dcb7eb9c979014836876c7dca5 Mon Sep 17 00:00:00 2001 From: fhanik Date: Tue, 14 Jul 2009 17:49:03 +0000 Subject: [PATCH] Add experimental new queue, runs faster but is not yet complete and needs some work around concurrency, iterators and remove operations git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@793991 13f79535-47bb-0310-9956-ffa450edef68 --- .../jdbc/pool/MultiLockFairBlockingQueue.java | 537 +++++++++++++++++++++ 1 file changed, 537 insertions(+) create mode 100644 modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java diff --git a/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java b/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java new file mode 100644 index 000000000..e73443746 --- /dev/null +++ b/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java @@ -0,0 +1,537 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.jdbc.pool; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.NoSuchElementException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +/** + * EXPERIMENTAL AND NOT YET COMPLETE! + * + * + * An implementation of a blocking queue with fairness waiting and lock dispersal to avoid contention. + * invocations to method poll(...) will get handed out in the order they were received. + * Locking is fine grained, a shared lock is only used during the first level of contention, waiting is done in a + * lock per thread basis so that order is guaranteed once the thread goes into a suspended monitor state. + *
+ * Not all of the methods of the {@link java.util.concurrent.BlockingQueue} are implemented. + * @author Filip Hanik + * + */ + +public class MultiLockFairBlockingQueue implements BlockingQueue { + + final int LOCK_COUNT = Runtime.getRuntime().availableProcessors(); + + final AtomicInteger putQueue = new AtomicInteger(0); + final AtomicInteger pollQueue = new AtomicInteger(0); + + public int getNextPut() { + int idx = Math.abs(putQueue.incrementAndGet()) % LOCK_COUNT; + return idx; + } + + public int getNextPoll() { + int idx = Math.abs(pollQueue.incrementAndGet()) % LOCK_COUNT; + return idx; + } + /** + * Phase one entry lock in order to give out + * per-thread-locks for the waiting phase we have + * a phase one lock during the contention period. + */ + final ReentrantLock[] locks = new ReentrantLock[LOCK_COUNT]; + + /** + * All the objects in the pool are stored in a simple linked list + */ + final LinkedList[] items; + + /** + * All threads waiting for an object are stored in a linked list + */ + final LinkedList>[] waiters; + + /** + * Creates a new fair blocking queue. + */ + public MultiLockFairBlockingQueue() { + items = new LinkedList[LOCK_COUNT]; + waiters = new LinkedList[LOCK_COUNT]; + for (int i=0; i(); + waiters[i] = new LinkedList>(); + locks[i] = new ReentrantLock(false); + } + } + + //------------------------------------------------------------------ + // USED BY CONPOOL IMPLEMENTATION + //------------------------------------------------------------------ + /** + * Will always return true, queue is unbounded. + * {@inheritDoc} + */ + public boolean offer(E e) { + int idx = getNextPut(); + //during the offer, we will grab the main lock + final ReentrantLock lock = this.locks[idx]; + lock.lock(); + ExchangeCountDownLatch c = null; + try { + //check to see if threads are waiting for an object + if (waiters[idx].size() > 0) { + //if threads are waiting grab the latch for that thread + c = waiters[idx].poll(); + //give the object to the thread instead of adding it to the pool + c.setItem(e); + } else { + //we always add first, so that the most recently used object will be given out + items[idx].addFirst(e); + } + } finally { + lock.unlock(); + } + //if we exchanged an object with another thread, wake it up. + if (c!=null) c.countDown(); + //we have an unbounded queue, so always return true + return true; + } + + /** + * Will never timeout, as it invokes the {@link #offer(Object)} method. + * Once a lock has been acquired, the + * {@inheritDoc} + */ + public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { + return offer(e); + } + + /** + * Fair retrieval of an object in the queue. + * Objects are returned in the order the threads requested them. + * {@inheritDoc} + */ + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + int idx = getNextPoll(); + E result = null; + final ReentrantLock lock = this.locks[idx]; + boolean error = true; + //acquire the global lock until we know what to do + lock.lock(); + try { + //check to see if we have objects + result = items[idx].poll(); + if (result==null && timeout>0) { + //the queue is empty we will wait for an object + ExchangeCountDownLatch c = new ExchangeCountDownLatch(1); + //add to the bottom of the wait list + waiters[idx].addLast(c); + //unlock the global lock + lock.unlock(); + //wait for the specified timeout + if (!c.await(timeout, unit)) { + //if we timed out, remove ourselves from the waitlist + lock.lock(); + waiters[idx].remove(c); + lock.unlock(); + } + //return the item we received, can be null if we timed out + result = c.getItem(); + } else { + //we have an object, release + lock.unlock(); + } + error = false; + } finally { + if (error && lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + return result; + } + + /** + * Request an item from the queue asynchronously + * @return - a future pending the result from the queue poll request + */ + public Future pollAsync() { + int idx = getNextPoll(); + Future result = null; + final ReentrantLock lock = this.locks[idx]; + boolean error = true; + //grab the global lock + lock.lock(); + try { + //check to see if we have objects in the queue + E item = items[idx].poll(); + if (item==null) { + //queue is empty, add ourselves as waiters + ExchangeCountDownLatch c = new ExchangeCountDownLatch(1); + waiters[idx].addLast(c); + lock.unlock(); + //return a future that will wait for the object + result = new ItemFuture(c); + } else { + lock.unlock(); + //return a future with the item + result = new ItemFuture(item); + } + error = false; + } finally { + if (error && lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + return result; + } + + /** + * {@inheritDoc} + */ + public boolean remove(Object e) { + for (int idx=0; idx iterator() { + return new FairIterator(); + } + + /** + * {@inheritDoc} + */ + public E poll() { + int idx = getNextPoll(); + final ReentrantLock lock = this.locks[idx]; + lock.lock(); + try { + return items[idx].poll(); + } finally { + lock.unlock(); + } + } + + /** + * {@inheritDoc} + */ + public boolean contains(Object e) { + for (int idx=0; idx c, int maxElements) { + throw new UnsupportedOperationException("int drainTo(Collection c, int maxElements)"); + } + + /** + * {@inheritDoc} + * @throws UnsupportedOperation - this operation is not supported + */ + public int drainTo(Collection c) { + return drainTo(c,Integer.MAX_VALUE); + } + + /** + * {@inheritDoc} + */ + public void put(E e) throws InterruptedException { + offer(e); + } + + /** + * {@inheritDoc} + */ + public int remainingCapacity() { + return Integer.MAX_VALUE - size(); + } + + /** + * {@inheritDoc} + */ + public E take() throws InterruptedException { + return this.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + } + + /** + * {@inheritDoc} + */ + public boolean addAll(Collection c) { + Iterator i = c.iterator(); + while (i.hasNext()) { + E e = (E)i.next(); + offer(e); + } + return true; + } + + /** + * {@inheritDoc} + * @throws UnsupportedOperation - this operation is not supported + */ + public void clear() { + throw new UnsupportedOperationException("void clear()"); + + } + + /** + * {@inheritDoc} + * @throws UnsupportedOperation - this operation is not supported + */ + public boolean containsAll(Collection c) { + throw new UnsupportedOperationException("boolean containsAll(Collection c)"); + } + + /** + * {@inheritDoc} + */ + public boolean isEmpty() { + return size() == 0; + } + + /** + * {@inheritDoc} + * @throws UnsupportedOperation - this operation is not supported + */ + public boolean removeAll(Collection c) { + throw new UnsupportedOperationException("boolean removeAll(Collection c)"); + } + + /** + * {@inheritDoc} + * @throws UnsupportedOperation - this operation is not supported + */ + public boolean retainAll(Collection c) { + throw new UnsupportedOperationException("boolean retainAll(Collection c)"); + } + + /** + * {@inheritDoc} + * @throws UnsupportedOperation - this operation is not supported + */ + public Object[] toArray() { + throw new UnsupportedOperationException("Object[] toArray()"); + } + + /** + * {@inheritDoc} + * @throws UnsupportedOperation - this operation is not supported + */ + public T[] toArray(T[] a) { + throw new UnsupportedOperationException(" T[] toArray(T[] a)"); + } + + /** + * {@inheritDoc} + * @throws UnsupportedOperation - this operation is not supported + */ + public E element() { + throw new UnsupportedOperationException("E element()"); + } + + /** + * {@inheritDoc} + * @throws UnsupportedOperation - this operation is not supported + */ + public E peek() { + throw new UnsupportedOperationException("E peek()"); + } + + /** + * {@inheritDoc} + * @throws UnsupportedOperation - this operation is not supported + */ + public E remove() { + throw new UnsupportedOperationException("E remove()"); + } + + + + //------------------------------------------------------------------ + // Non cancellable Future used to check and see if a connection has been made available + //------------------------------------------------------------------ + protected class ItemFuture implements Future { + protected volatile T item = null; + protected volatile ExchangeCountDownLatch latch = null; + protected volatile boolean canceled = false; + + public ItemFuture(T item) { + this.item = item; + } + + public ItemFuture(ExchangeCountDownLatch latch) { + this.latch = latch; + } + + public boolean cancel(boolean mayInterruptIfRunning) { + return false; //don't allow cancel for now + } + + public T get() throws InterruptedException, ExecutionException { + if (item!=null) { + return item; + } else if (latch!=null) { + latch.await(); + return latch.getItem(); + } else { + throw new ExecutionException("ItemFuture incorrectly instantiated. Bug in the code?", new Exception()); + } + } + + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + if (item!=null) { + return item; + } else if (latch!=null) { + boolean timedout = !latch.await(timeout, unit); + if (timedout) throw new TimeoutException(); + else return latch.getItem(); + } else { + throw new ExecutionException("ItemFuture incorrectly instantiated. Bug in the code?", new Exception()); + } + } + + public boolean isCancelled() { + return false; + } + + public boolean isDone() { + return (item!=null || latch.getItem()!=null); + } + + } + + //------------------------------------------------------------------ + // Count down latch that can be used to exchange information + //------------------------------------------------------------------ + protected class ExchangeCountDownLatch extends CountDownLatch { + protected volatile T item; + public ExchangeCountDownLatch(int i) { + super(i); + } + public T getItem() { + return item; + } + public void setItem(T item) { + this.item = item; + } + } + + //------------------------------------------------------------------ + // Iterator safe from concurrent modification exceptions + //------------------------------------------------------------------ + protected class FairIterator implements Iterator { + E[] elements = null; + int index; + E element = null; + + public FairIterator() { + ArrayList list = new ArrayList(MultiLockFairBlockingQueue.this.size()); + for (int idx=0; idx