From: fhanik Date: Fri, 5 Dec 2008 22:02:15 +0000 (+0000) Subject: Add the ability to configure a job queue size, and a timeout for how long we want... X-Git-Url: https://git.internetallee.de/?a=commitdiff_plain;h=9240e7ab92a24b72efb96c1d610f31c3e44a77b5;p=tomcat7.0 Add the ability to configure a job queue size, and a timeout for how long we want to try to add something to the queue. git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@723889 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/java/org/apache/catalina/Executor.java b/java/org/apache/catalina/Executor.java index 79d036040..0bf065be0 100644 --- a/java/org/apache/catalina/Executor.java +++ b/java/org/apache/catalina/Executor.java @@ -16,8 +16,26 @@ */ package org.apache.catalina; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + public interface Executor extends java.util.concurrent.Executor, Lifecycle { public String getName(); + + /** + * Executes the given command at some time in the future. The command + * may execute in a new thread, in a pooled thread, or in the calling + * thread, at the discretion of the Executor implementation. + * If no threads are available, it will be added to the work queue. + * If the work queue is full, the system will wait for the specified + * time until it throws a RejectedExecutionException + * + * @param command the runnable task + * @throws RejectedExecutionException if this task cannot be + * accepted for execution - the queue is full + * @throws NullPointerException if command or unit is null + */ + void execute(Runnable command, long timeout, TimeUnit unit); } \ No newline at end of file diff --git a/java/org/apache/catalina/core/StandardThreadExecutor.java b/java/org/apache/catalina/core/StandardThreadExecutor.java index 3093fff55..49d0ab224 100644 --- a/java/org/apache/catalina/core/StandardThreadExecutor.java +++ b/java/org/apache/catalina/core/StandardThreadExecutor.java @@ -33,22 +33,51 @@ import java.util.concurrent.RejectedExecutionException; public class StandardThreadExecutor implements Executor { // ---------------------------------------------- Properties + /** + * Default thread priority + */ protected int threadPriority = Thread.NORM_PRIORITY; + /** + * Run threads in daemon or non-daemon state + */ protected boolean daemon = true; + /** + * Default name prefix for the thread name + */ protected String namePrefix = "tomcat-exec-"; + /** + * max number of threads + */ protected int maxThreads = 200; + /** + * min number of threads + */ protected int minSpareThreads = 25; + /** + * idle time in milliseconds + */ protected int maxIdleTime = 60000; + /** + * The executor we use for this component + */ protected ThreadPoolExecutor executor = null; + /** + * the name of this thread pool + */ protected String name; + /** + * The maximum number of elements that can queue up before we reject them + */ + protected int maxQueueSize = Integer.MAX_VALUE; + private LifecycleSupport lifecycle = new LifecycleSupport(this); // ---------------------------------------------- Constructors public StandardThreadExecutor() { @@ -60,7 +89,7 @@ public class StandardThreadExecutor implements Executor { // ---------------------------------------------- Public Methods public void start() throws LifecycleException { lifecycle.fireLifecycleEvent(BEFORE_START_EVENT, null); - TaskQueue taskqueue = new TaskQueue(); + TaskQueue taskqueue = new TaskQueue(maxQueueSize); TaskThreadFactory tf = new TaskThreadFactory(namePrefix); lifecycle.fireLifecycleEvent(START_EVENT, null); executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf); @@ -76,13 +105,29 @@ public class StandardThreadExecutor implements Executor { lifecycle.fireLifecycleEvent(AFTER_STOP_EVENT, null); } + public void execute(Runnable command, long timeout, TimeUnit unit) { + if ( executor != null ) { + try { + executor.execute(command); + } catch (RejectedExecutionException rx) { + //there could have been contention around the queue + try { + if ( !( (TaskQueue) executor.getQueue()).force(command,timeout,unit) ) throw new RejectedExecutionException("Work queue full."); + }catch (InterruptedException x) { + throw new RejectedExecutionException("Interrupted.",x); + } + } + } else throw new IllegalStateException("StandardThreadPool not started."); + } + + public void execute(Runnable command) { if ( executor != null ) { try { executor.execute(command); } catch (RejectedExecutionException rx) { //there could have been contention around the queue - if ( !( (TaskQueue) executor.getQueue()).force(command) ) throw new RejectedExecutionException(); + if ( !( (TaskQueue) executor.getQueue()).force(command) ) throw new RejectedExecutionException("Work queue full."); } } else throw new IllegalStateException("StandardThreadPool not started."); } @@ -153,6 +198,14 @@ public class StandardThreadExecutor implements Executor { this.name = name; } + public void setMaxQueueSize(int size) { + this.maxQueueSize = size; + } + + public int getMaxQueueSize() { + return maxQueueSize; + } + /** * Add a LifecycleEvent listener to this component. * @@ -214,8 +267,8 @@ public class StandardThreadExecutor implements Executor { super(); } - public TaskQueue(int initialCapacity) { - super(initialCapacity); + public TaskQueue(int capacity) { + super(capacity); } public TaskQueue(Collection c) { @@ -231,6 +284,11 @@ public class StandardThreadExecutor implements Executor { return super.offer(o); //forces the item onto the queue, to be used if the task is rejected } + public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException { + if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue"); + return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected + } + public boolean offer(Runnable o) { //we can't do any checks if (parent==null) return super.offer(o);