import java.util.concurrent.locks.ReentrantLock;
/**
- *
+ *
* A simple implementation of a blocking queue with fairness waiting.
* invocations to method poll(...) will get handed out in the order they were received.
* @author Filip Hanik
- *
+ *
*/
public class FairBlockingQueue<E> implements BlockingQueue<E> {
ReentrantLock lock = new ReentrantLock();
-
+
LinkedList<E> items = null;
-
+
LinkedList<ExchangeCountDownLatch<E>> waiters = null;
-
+
public FairBlockingQueue() {
items = new LinkedList<E>();
waiters = new LinkedList<ExchangeCountDownLatch<E>>();
}
-
- //------------------------------------------------------------------
+
+ //------------------------------------------------------------------
// USED BY CONPOOL IMPLEMENTATION
- //------------------------------------------------------------------
+ //------------------------------------------------------------------
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
return offer(e);
}
-
+
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E result = null;
final ReentrantLock lock = this.lock;
}
return result;
}
-
+
public boolean remove(Object e) {
final ReentrantLock lock = this.lock;
lock.lock();
lock.unlock();
}
}
-
+
public int size() {
return items.size();
}
-
+
public Iterator<E> iterator() {
return new FairIterator();
}
-
+
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
lock.unlock();
}
}
-
+
public boolean contains(Object e) {
final ReentrantLock lock = this.lock;
lock.lock();
lock.unlock();
}
}
-
- //------------------------------------------------------------------
+
+ //------------------------------------------------------------------
// NOT USED BY CONPOOL IMPLEMENTATION
- //------------------------------------------------------------------
-
- @Override
+ //------------------------------------------------------------------
+
public boolean add(E e) {
return offer(e);
}
- @Override
public int drainTo(Collection<? super E> c, int maxElements) {
throw new UnsupportedOperationException("int drainTo(Collection<? super E> c, int maxElements)");
}
- @Override
public int drainTo(Collection<? super E> c) {
return drainTo(c,Integer.MAX_VALUE);
}
- @Override
public void put(E e) throws InterruptedException {
offer(e);
}
- @Override
public int remainingCapacity() {
return Integer.MAX_VALUE - size();
}
- @Override
public E take() throws InterruptedException {
return this.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
- @Override
public boolean addAll(Collection<? extends E> c) {
Iterator i = c.iterator();
while (i.hasNext()) {
return true;
}
- @Override
public void clear() {
throw new UnsupportedOperationException("void clear()");
-
+
}
- @Override
public boolean containsAll(Collection<?> c) {
throw new UnsupportedOperationException("boolean containsAll(Collection<?> c)");
}
- @Override
public boolean isEmpty() {
return size() == 0;
}
- @Override
public boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException("boolean removeAll(Collection<?> c)");
}
- @Override
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException("boolean retainAll(Collection<?> c)");
}
- @Override
public Object[] toArray() {
throw new UnsupportedOperationException("Object[] toArray()");
}
- @Override
public <T> T[] toArray(T[] a) {
throw new UnsupportedOperationException("<T> T[] toArray(T[] a)");
}
- @Override
public E element() {
throw new UnsupportedOperationException("E element()");
}
- @Override
public E peek() {
throw new UnsupportedOperationException("E peek()");
}
- @Override
public E remove() {
throw new UnsupportedOperationException("E remove()");
}
- //------------------------------------------------------------------
+ //------------------------------------------------------------------
// Count down latch that can be used to exchange information
- //------------------------------------------------------------------
+ //------------------------------------------------------------------
protected class ExchangeCountDownLatch<T> extends CountDownLatch {
protected T item;
public ExchangeCountDownLatch(int i) {
this.item = item;
}
}
-
- //------------------------------------------------------------------
+
+ //------------------------------------------------------------------
// Iterator safe from concurrent modification exceptions
- //------------------------------------------------------------------
+ //------------------------------------------------------------------
protected class FairIterator implements Iterator<E> {
E[] elements = null;
int index;
E element = null;
-
+
public FairIterator() {
final ReentrantLock lock = FairBlockingQueue.this.lock;
lock.lock();
lock.unlock();
}
}
- @Override
public boolean hasNext() {
return index<elements.length;
}
- @Override
public E next() {
element = elements[index++];
return element;
}
- @Override
public void remove() {
final ReentrantLock lock = FairBlockingQueue.this.lock;
lock.lock();
lock.unlock();
}
}
-
+
}
}