From: fhanik Date: Tue, 14 Jul 2009 14:46:34 +0000 (+0000) Subject: Patch by arielandres@hotmail.com X-Git-Url: https://git.internetallee.de/?a=commitdiff_plain;h=9d51aa91d0f120ae239cdafae25dbbd1fb6f4f34;p=tomcat7.0 Patch by arielandres@hotmail.com Fix for https://issues.apache.org/bugzilla/show_bug.cgi?id=47524 git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@793913 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/java/org/apache/catalina/tribes/membership/McastServiceImpl.java b/java/org/apache/catalina/tribes/membership/McastServiceImpl.java index f31d28ea2..a6826a254 100644 --- a/java/org/apache/catalina/tribes/membership/McastServiceImpl.java +++ b/java/org/apache/catalina/tribes/membership/McastServiceImpl.java @@ -27,8 +27,6 @@ import java.net.MulticastSocket; import java.net.SocketTimeoutException; import java.util.Arrays; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.catalina.tribes.Channel; @@ -37,6 +35,7 @@ import org.apache.catalina.tribes.MembershipListener; import org.apache.catalina.tribes.MessageListener; import org.apache.catalina.tribes.io.ChannelData; import org.apache.catalina.tribes.io.XByteBuffer; +import org.apache.catalina.tribes.util.ExecutorFactory; /** * A membership implementation using simple multicast. @@ -145,7 +144,7 @@ public class McastServiceImpl /** * Dont interrupt the sender/receiver thread, but pass off to an executor */ - protected ExecutorService executor = new ThreadPoolExecutor(0, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); + protected ExecutorService executor = ExecutorFactory.newThreadPool(0, 2, 2, TimeUnit.SECONDS); /** * disable/enable local loopback message diff --git a/java/org/apache/catalina/tribes/transport/ReceiverBase.java b/java/org/apache/catalina/tribes/transport/ReceiverBase.java index da1c32f11..2772fb368 100644 --- a/java/org/apache/catalina/tribes/transport/ReceiverBase.java +++ b/java/org/apache/catalina/tribes/transport/ReceiverBase.java @@ -21,12 +21,8 @@ import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; -import java.util.Collection; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -34,6 +30,7 @@ import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.ChannelReceiver; import org.apache.catalina.tribes.MessageListener; import org.apache.catalina.tribes.io.ListenCallback; +import org.apache.catalina.tribes.util.ExecutorFactory; import org.apache.juli.logging.Log; /** @@ -94,10 +91,8 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R public void start() throws IOException { if ( executor == null ) { //executor = new ThreadPoolExecutor(minThreads,maxThreads,60,TimeUnit.SECONDS,new LinkedBlockingQueue()); - TaskQueue taskqueue = new TaskQueue(); TaskThreadFactory tf = new TaskThreadFactory("Tribes-Task-Receiver-"); - executor = new ThreadPoolExecutor(minThreads, maxThreads, maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf); - taskqueue.setParent((ThreadPoolExecutor)executor); + executor = ExecutorFactory.newThreadPool(minThreads, maxThreads, maxIdleTime, TimeUnit.MILLISECONDS, tf); } } @@ -549,46 +544,6 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R this.udpTxBufSize = udpTxBufSize; } - // ---------------------------------------------- TaskQueue Inner Class - class TaskQueue extends LinkedBlockingQueue { - ThreadPoolExecutor parent = null; - - public TaskQueue() { - super(); - } - - public TaskQueue(int initialCapacity) { - super(initialCapacity); - } - - public TaskQueue(Collection c) { - super(c); - } - - public void setParent(ThreadPoolExecutor tp) { - parent = tp; - } - - public boolean force(Runnable o) { - if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue"); - return super.offer(o); //forces the item onto the queue, to be used if the task is rejected - } - - public boolean offer(Runnable o) { - //we can't do any checks - if (parent==null) return super.offer(o); - //we are maxed out on threads, simply queue the object - if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); - //we have idle threads, just add it to the queue - //this is an approximation, so it could use some tuning - if (parent.getActiveCount()<(parent.getPoolSize())) return super.offer(o); - //if we have less threads than maximum force creation of a new thread - if (parent.getPoolSize() { + ThreadPoolExecutor parent = null; + + public TaskQueue() { + super(); + } + + public TaskQueue(int initialCapacity) { + super(initialCapacity); + } + + public TaskQueue(Collection c) { + super(c); + } + + public void setParent(ThreadPoolExecutor tp) { + parent = tp; + } + + public boolean force(Runnable o) { + if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue"); + return super.offer(o); //forces the item onto the queue, to be used if the task is rejected + } + + public boolean offer(Runnable o) { + //we can't do any checks + if (parent==null) return super.offer(o); + //we are maxed out on threads, simply queue the object + if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); + //we have idle threads, just add it to the queue + //this is an approximation, so it could use some tuning + if (parent.getActiveCount()<(parent.getPoolSize())) return super.offer(o); + //if we have less threads than maximum force creation of a new thread + if (parent.getPoolSize()