From 8a46b4eb3287c2f8ffac60911714778b3e8e53ec Mon Sep 17 00:00:00 2001 From: fhanik Date: Tue, 9 Dec 2008 20:56:59 +0000 Subject: [PATCH] Refactored the thread pooling when using an executor, this gets rid of duplicate code in the NIO connector as well as in the org.apache.catalina.core.StandardThreadExecutor package. I provided a ThreadPoolExecutor with a small extension to the java.util.concurrent The connector method setExecutor still take a java.util.concurrent.Executor as an argument to provide the most flexibility git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@724886 13f79535-47bb-0310-9956-ffa450edef68 --- .../catalina/core/StandardThreadExecutor.java | 94 +++------------------- java/org/apache/tomcat/util/net/NioEndpoint.java | 89 ++++---------------- java/org/apache/tomcat/util/threads/TaskQueue.java | 73 +++++++++++++++++ .../tomcat/util/threads/TaskThreadFactory.java | 47 +++++++++++ .../tomcat/util/threads/ThreadPoolExecutor.java | 74 +++++++++++++++++ 5 files changed, 220 insertions(+), 157 deletions(-) create mode 100644 java/org/apache/tomcat/util/threads/TaskQueue.java create mode 100644 java/org/apache/tomcat/util/threads/TaskThreadFactory.java create mode 100644 java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java diff --git a/java/org/apache/catalina/core/StandardThreadExecutor.java b/java/org/apache/catalina/core/StandardThreadExecutor.java index 49d0ab224..6f853df08 100644 --- a/java/org/apache/catalina/core/StandardThreadExecutor.java +++ b/java/org/apache/catalina/core/StandardThreadExecutor.java @@ -17,18 +17,16 @@ package org.apache.catalina.core; -import java.util.Collection; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.catalina.Executor; import org.apache.catalina.LifecycleException; import org.apache.catalina.LifecycleListener; import org.apache.catalina.util.LifecycleSupport; -import java.util.concurrent.RejectedExecutionException; +import org.apache.tomcat.util.threads.TaskQueue; +import org.apache.tomcat.util.threads.TaskThreadFactory; +import org.apache.tomcat.util.threads.ThreadPoolExecutor; public class StandardThreadExecutor implements Executor { @@ -90,7 +88,7 @@ public class StandardThreadExecutor implements Executor { public void start() throws LifecycleException { lifecycle.fireLifecycleEvent(BEFORE_START_EVENT, null); TaskQueue taskqueue = new TaskQueue(maxQueueSize); - TaskThreadFactory tf = new TaskThreadFactory(namePrefix); + TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority()); lifecycle.fireLifecycleEvent(START_EVENT, null); executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf); taskqueue.setParent( (ThreadPoolExecutor) executor); @@ -107,17 +105,10 @@ public class StandardThreadExecutor implements Executor { public void execute(Runnable command, long timeout, TimeUnit unit) { if ( executor != null ) { - try { - executor.execute(command); - } catch (RejectedExecutionException rx) { - //there could have been contention around the queue - try { - if ( !( (TaskQueue) executor.getQueue()).force(command,timeout,unit) ) throw new RejectedExecutionException("Work queue full."); - }catch (InterruptedException x) { - throw new RejectedExecutionException("Interrupted.",x); - } - } - } else throw new IllegalStateException("StandardThreadPool not started."); + executor.execute(command,timeout,unit); + } else { + throw new IllegalStateException("StandardThreadExecutor not started."); + } } @@ -258,71 +249,4 @@ public class StandardThreadExecutor implements Executor { public int getQueueSize() { return (executor != null) ? executor.getQueue().size() : -1; } - - // ---------------------------------------------- TaskQueue Inner Class - class TaskQueue extends LinkedBlockingQueue { - ThreadPoolExecutor parent = null; - - public TaskQueue() { - super(); - } - - public TaskQueue(int capacity) { - super(capacity); - } - - public TaskQueue(Collection c) { - super(c); - } - - public void setParent(ThreadPoolExecutor tp) { - parent = tp; - } - - public boolean force(Runnable o) { - if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue"); - return super.offer(o); //forces the item onto the queue, to be used if the task is rejected - } - - public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException { - if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue"); - return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected - } - - public boolean offer(Runnable o) { - //we can't do any checks - if (parent==null) return super.offer(o); - //we are maxed out on threads, simply queue the object - if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); - //we have idle threads, just add it to the queue - //this is an approximation, so it could use some tuning - if (parent.getActiveCount()<(parent.getPoolSize())) return super.offer(o); - //if we have less threads than maximum force creation of a new thread - if (parent.getPoolSize() { - ThreadPoolExecutor parent = null; - NioEndpoint endpoint = null; - - public TaskQueue() { - super(); - } - public TaskQueue(int initialCapacity) { - super(initialCapacity); - } - - public TaskQueue(Collection c) { - super(c); - } - - - public void setParent(ThreadPoolExecutor tp, NioEndpoint ep) { - parent = tp; - this.endpoint = ep; - } - - public boolean offer(Runnable o) { - //we can't do any checks - if (parent==null) return super.offer(o); - //we are maxed out on threads, simply queue the object - if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); - //we have idle threads, just add it to the queue - //this is an approximation, so it could use some tuning - if (endpoint.activeSocketProcessors.get()<(parent.getPoolSize())) return super.offer(o); - //if we have less threads than maximum force creation of a new thread - if (parent.getPoolSize() { + ThreadPoolExecutor parent = null; + + public TaskQueue() { + super(); + } + + public TaskQueue(int capacity) { + super(capacity); + } + + public TaskQueue(Collection c) { + super(c); + } + + public void setParent(ThreadPoolExecutor tp) { + parent = tp; + } + + public boolean force(Runnable o) { + if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue"); + return super.offer(o); //forces the item onto the queue, to be used if the task is rejected + } + + public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException { + if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue"); + return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected + } + + public boolean offer(Runnable o) { + //we can't do any checks + if (parent==null) return super.offer(o); + //we are maxed out on threads, simply queue the object + if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); + //we have idle threads, just add it to the queue + if (parent.getActiveCount()<(parent.getPoolSize())) return super.offer(o); + //if we have less threads than maximum force creation of a new thread + if (parent.getPoolSize() workQueue, RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); + } + + public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + } + + public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + } + + public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + activeCount.decrementAndGet(); + } + + @Override + protected void beforeExecute(Thread t, Runnable r) { + activeCount.incrementAndGet(); + } + + @Override + public int getActiveCount() { + return activeCount.get(); + } + + public void execute(Runnable command, long timeout, TimeUnit unit) { + + } + + + + + +} -- 2.11.0