import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+
/**
* As task queue specifically designed to run with a thread pool executor.
* The task queue is optimised to properly utilize threads within
//we are maxed out on threads, simply queue the object
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//we have idle threads, just add it to the queue
- if (parent.getActiveCount()<(parent.getPoolSize())) return super.offer(o);
+ if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
//if we have less threads than maximum force creation of a new thread
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
//if we reached here, we need to add it to the queue
import java.util.concurrent.atomic.AtomicInteger;
/**
* Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient
- * getActiveCount method, to be used to properly handle the work queue
+ * {@link #getSubmittedCount()} method, to be used to properly handle the work queue.
* If a RejectedExecutionHandler is not specified a default one will be configured
* and that one will always throw a RejectedExecutionException
* @author fhanik
*/
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
- private final AtomicInteger activeCount = new AtomicInteger(0);
+ /**
+ * The number of tasks submitted but not yet finished. This includes tasks
+ * in the queue and tasks that have been handed to a worker thread but the
+ * latter did not start executing the task yet.
+ * This number is always greater or equal to {@link #getActiveCount()}.
+ */
+ private final AtomicInteger submittedCount = new AtomicInteger(0);
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
@Override
protected void afterExecute(Runnable r, Throwable t) {
- activeCount.decrementAndGet();
- }
-
- @Override
- protected void beforeExecute(Thread t, Runnable r) {
- activeCount.incrementAndGet();
+ submittedCount.decrementAndGet();
}
- @Override
- public int getActiveCount() {
- return activeCount.get();
+ public int getSubmittedCount() {
+ return submittedCount.get();
}
/**
* @throws NullPointerException if command or unit is null
*/
public void execute(Runnable command, long timeout, TimeUnit unit) {
+ submittedCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
if (!queue.force(command, timeout, unit)) {
+ submittedCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.");
}
} catch (InterruptedException x) {
+ submittedCount.decrementAndGet();
Thread.interrupted();
throw new RejectedExecutionException(x);
}
} else {
+ submittedCount.decrementAndGet();
throw rx;
}