Add the ability to configure a job queue size, and a timeout for how long we want...
authorfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Fri, 5 Dec 2008 22:02:15 +0000 (22:02 +0000)
committerfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Fri, 5 Dec 2008 22:02:15 +0000 (22:02 +0000)
git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@723889 13f79535-47bb-0310-9956-ffa450edef68

java/org/apache/catalina/Executor.java
java/org/apache/catalina/core/StandardThreadExecutor.java

index 79d0360..0bf065b 100644 (file)
  */
 package org.apache.catalina;
 
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
 
 
 public interface Executor extends java.util.concurrent.Executor, Lifecycle {
     public String getName();
+    
+    /**
+     * Executes the given command at some time in the future.  The command
+     * may execute in a new thread, in a pooled thread, or in the calling
+     * thread, at the discretion of the <tt>Executor</tt> implementation.
+     * If no threads are available, it will be added to the work queue.
+     * If the work queue is full, the system will wait for the specified 
+     * time until it throws a RejectedExecutionException
+     *
+     * @param command the runnable task
+     * @throws RejectedExecutionException if this task cannot be
+     * accepted for execution - the queue is full
+     * @throws NullPointerException if command or unit is null
+     */
+    void execute(Runnable command, long timeout, TimeUnit unit);
 }
\ No newline at end of file
index 3093fff..49d0ab2 100644 (file)
@@ -33,22 +33,51 @@ import java.util.concurrent.RejectedExecutionException;
 public class StandardThreadExecutor implements Executor {
     
     // ---------------------------------------------- Properties
+    /**
+     * Default thread priority
+     */
     protected int threadPriority = Thread.NORM_PRIORITY;
 
+    /**
+     * Run threads in daemon or non-daemon state
+     */
     protected boolean daemon = true;
     
+    /**
+     * Default name prefix for the thread name
+     */
     protected String namePrefix = "tomcat-exec-";
     
+    /**
+     * max number of threads
+     */
     protected int maxThreads = 200;
     
+    /**
+     * min number of threads
+     */
     protected int minSpareThreads = 25;
     
+    /**
+     * idle time in milliseconds
+     */
     protected int maxIdleTime = 60000;
     
+    /**
+     * The executor we use for this component
+     */
     protected ThreadPoolExecutor executor = null;
     
+    /**
+     * the name of this thread pool
+     */
     protected String name;
     
+    /**
+     * The maximum number of elements that can queue up before we reject them
+     */
+    protected int maxQueueSize = Integer.MAX_VALUE;
+    
     private LifecycleSupport lifecycle = new LifecycleSupport(this);
     // ---------------------------------------------- Constructors
     public StandardThreadExecutor() {
@@ -60,7 +89,7 @@ public class StandardThreadExecutor implements Executor {
     // ---------------------------------------------- Public Methods
     public void start() throws LifecycleException {
         lifecycle.fireLifecycleEvent(BEFORE_START_EVENT, null);
-        TaskQueue taskqueue = new TaskQueue();
+        TaskQueue taskqueue = new TaskQueue(maxQueueSize);
         TaskThreadFactory tf = new TaskThreadFactory(namePrefix);
         lifecycle.fireLifecycleEvent(START_EVENT, null);
         executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
@@ -76,13 +105,29 @@ public class StandardThreadExecutor implements Executor {
         lifecycle.fireLifecycleEvent(AFTER_STOP_EVENT, null);
     }
     
+    public void execute(Runnable command, long timeout, TimeUnit unit) {
+        if ( executor != null ) {
+            try {
+                executor.execute(command);
+            } catch (RejectedExecutionException rx) {
+                //there could have been contention around the queue
+                try {
+                    if ( !( (TaskQueue) executor.getQueue()).force(command,timeout,unit) ) throw new RejectedExecutionException("Work queue full.");
+                }catch (InterruptedException x) {
+                    throw new RejectedExecutionException("Interrupted.",x);
+                }
+            }
+        } else throw new IllegalStateException("StandardThreadPool not started.");
+    }
+    
+    
     public void execute(Runnable command) {
         if ( executor != null ) {
             try {
                 executor.execute(command);
             } catch (RejectedExecutionException rx) {
                 //there could have been contention around the queue
-                if ( !( (TaskQueue) executor.getQueue()).force(command) ) throw new RejectedExecutionException();
+                if ( !( (TaskQueue) executor.getQueue()).force(command) ) throw new RejectedExecutionException("Work queue full.");
             }
         } else throw new IllegalStateException("StandardThreadPool not started.");
     }
@@ -153,6 +198,14 @@ public class StandardThreadExecutor implements Executor {
         this.name = name;
     }
     
+    public void setMaxQueueSize(int size) {
+        this.maxQueueSize = size;
+    }
+    
+    public int getMaxQueueSize() {
+        return maxQueueSize;
+    }
+    
     /**
      * Add a LifecycleEvent listener to this component.
      *
@@ -214,8 +267,8 @@ public class StandardThreadExecutor implements Executor {
             super();
         }
 
-        public TaskQueue(int initialCapacity) {
-            super(initialCapacity);
+        public TaskQueue(int capacity) {
+            super(capacity);
         }
 
         public TaskQueue(Collection<? extends Runnable> c) {
@@ -231,6 +284,11 @@ public class StandardThreadExecutor implements Executor {
             return super.offer(o); //forces the item onto the queue, to be used if the task is rejected
         }
 
+        public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
+            if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
+            return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
+        }
+
         public boolean offer(Runnable o) {
             //we can't do any checks
             if (parent==null) return super.offer(o);