From fa410a34ffc13e9d4723415ffcc80ee99a1a7d8d Mon Sep 17 00:00:00 2001 From: fhanik Date: Wed, 22 Oct 2008 20:00:58 +0000 Subject: [PATCH] fix thread boundaries by adding a queue to the pool git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@707181 13f79535-47bb-0310-9956-ffa450edef68 --- .../catalina/tribes/transport/ReceiverBase.java | 92 +++++++++++++++++++++- 1 file changed, 89 insertions(+), 3 deletions(-) diff --git a/java/org/apache/catalina/tribes/transport/ReceiverBase.java b/java/org/apache/catalina/tribes/transport/ReceiverBase.java index 50c689920..bf24f8e25 100644 --- a/java/org/apache/catalina/tribes/transport/ReceiverBase.java +++ b/java/org/apache/catalina/tribes/transport/ReceiverBase.java @@ -21,10 +21,14 @@ import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; +import java.util.Collection; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.ChannelReceiver; @@ -65,7 +69,7 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R private long tcpSelectorTimeout = 5000; //how many times to search for an available socket private int autoBind = 100; - private int maxThreads = Integer.MAX_VALUE; + private int maxThreads = 15; private int minThreads = 6; private int maxTasks = 100; private int minTasks = 10; @@ -78,7 +82,9 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R private int soTrafficClass = 0x04 | 0x08 | 0x010; private int timeout = 3000; //3 seconds private boolean useBufferPool = true; - + private boolean daemon = true; + private long maxIdleTime = 60000; + private ExecutorService executor; @@ -87,7 +93,11 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R public void start() throws IOException { if ( executor == null ) { - executor = new ThreadPoolExecutor(minThreads,maxThreads,60,TimeUnit.SECONDS,new LinkedBlockingQueue()); + //executor = new ThreadPoolExecutor(minThreads,maxThreads,60,TimeUnit.SECONDS,new LinkedBlockingQueue()); + TaskQueue taskqueue = new TaskQueue(); + TaskThreadFactory tf = new TaskThreadFactory("Tribes-Task-Receiver-"); + executor = new ThreadPoolExecutor(minThreads, maxThreads, maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf); + taskqueue.setParent((ThreadPoolExecutor)executor); } } @@ -539,4 +549,80 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R this.udpTxBufSize = udpTxBufSize; } + // ---------------------------------------------- TaskQueue Inner Class + class TaskQueue extends LinkedBlockingQueue { + ThreadPoolExecutor parent = null; + + public TaskQueue() { + super(); + } + + public TaskQueue(int initialCapacity) { + super(initialCapacity); + } + + public TaskQueue(Collection c) { + super(c); + } + + public void setParent(ThreadPoolExecutor tp) { + parent = tp; + } + + public boolean force(Runnable o) { + if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue"); + return super.offer(o); //forces the item onto the queue, to be used if the task is rejected + } + + public boolean offer(Runnable o) { + //we can't do any checks + if (parent==null) return super.offer(o); + //we are maxed out on threads, simply queue the object + if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); + //we have idle threads, just add it to the queue + //this is an approximation, so it could use some tuning + if (parent.getActiveCount()<(parent.getPoolSize())) return super.offer(o); + //if we have less threads than maximum force creation of a new thread + if (parent.getPoolSize()