Patch by arielandres@hotmail.com
authorfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Tue, 14 Jul 2009 14:46:34 +0000 (14:46 +0000)
committerfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Tue, 14 Jul 2009 14:46:34 +0000 (14:46 +0000)
Fix for https://issues.apache.org/bugzilla/show_bug.cgi?id=47524

git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@793913 13f79535-47bb-0310-9956-ffa450edef68

java/org/apache/catalina/tribes/membership/McastServiceImpl.java
java/org/apache/catalina/tribes/transport/ReceiverBase.java
java/org/apache/catalina/tribes/util/ExecutorFactory.java [new file with mode: 0644]

index f31d28e..a6826a2 100644 (file)
@@ -27,8 +27,6 @@ import java.net.MulticastSocket;
 import java.net.SocketTimeoutException;
 import java.util.Arrays;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.catalina.tribes.Channel;
@@ -37,6 +35,7 @@ import org.apache.catalina.tribes.MembershipListener;
 import org.apache.catalina.tribes.MessageListener;
 import org.apache.catalina.tribes.io.ChannelData;
 import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.util.ExecutorFactory;
 
 /**
  * A <b>membership</b> implementation using simple multicast.
@@ -145,7 +144,7 @@ public class McastServiceImpl
     /**
      * Dont interrupt the sender/receiver thread, but pass off to an executor
      */
-    protected ExecutorService executor = new ThreadPoolExecutor(0, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+    protected ExecutorService executor = ExecutorFactory.newThreadPool(0, 2, 2, TimeUnit.SECONDS);
     
     /**
      * disable/enable local loopback message
index da1c32f..2772fb3 100644 (file)
@@ -21,12 +21,8 @@ import java.net.DatagramSocket;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
-import java.util.Collection;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -34,6 +30,7 @@ import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.ChannelReceiver;
 import org.apache.catalina.tribes.MessageListener;
 import org.apache.catalina.tribes.io.ListenCallback;
+import org.apache.catalina.tribes.util.ExecutorFactory;
 import org.apache.juli.logging.Log;
 
 /**
@@ -94,10 +91,8 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R
     public void start() throws IOException {
         if ( executor == null ) {
             //executor = new ThreadPoolExecutor(minThreads,maxThreads,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
-            TaskQueue taskqueue = new TaskQueue();
             TaskThreadFactory tf = new TaskThreadFactory("Tribes-Task-Receiver-");
-            executor = new ThreadPoolExecutor(minThreads, maxThreads, maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
-            taskqueue.setParent((ThreadPoolExecutor)executor);
+            executor = ExecutorFactory.newThreadPool(minThreads, maxThreads, maxIdleTime, TimeUnit.MILLISECONDS, tf);
         }
     }
 
@@ -549,46 +544,6 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R
         this.udpTxBufSize = udpTxBufSize;
     }
 
- // ---------------------------------------------- TaskQueue Inner Class
-    class TaskQueue extends LinkedBlockingQueue<Runnable> {
-        ThreadPoolExecutor parent = null;
-
-        public TaskQueue() {
-            super();
-        }
-
-        public TaskQueue(int initialCapacity) {
-            super(initialCapacity);
-        }
-
-        public TaskQueue(Collection<? extends Runnable> c) {
-            super(c);
-        }
-
-        public void setParent(ThreadPoolExecutor tp) {
-            parent = tp;
-        }
-        
-        public boolean force(Runnable o) {
-            if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
-            return super.offer(o); //forces the item onto the queue, to be used if the task is rejected
-        }
-
-        public boolean offer(Runnable o) {
-            //we can't do any checks
-            if (parent==null) return super.offer(o);
-            //we are maxed out on threads, simply queue the object
-            if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
-            //we have idle threads, just add it to the queue
-            //this is an approximation, so it could use some tuning
-            if (parent.getActiveCount()<(parent.getPoolSize())) return super.offer(o);
-            //if we have less threads than maximum force creation of a new thread
-            if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
-            //if we reached here, we need to add it to the queue
-            return super.offer(o);
-        }
-    }
-
     // ---------------------------------------------- ThreadFactory Inner Class
     class TaskThreadFactory implements ThreadFactory {
         final ThreadGroup group;
diff --git a/java/org/apache/catalina/tribes/util/ExecutorFactory.java b/java/org/apache/catalina/tribes/util/ExecutorFactory.java
new file mode 100644 (file)
index 0000000..c42b9d9
--- /dev/null
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.util;
+
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class ExecutorFactory {
+
+       public static ExecutorService newThreadPool(int minThreads, int maxThreads, long maxIdleTime, TimeUnit unit) {
+               TaskQueue taskqueue = new TaskQueue();
+               ThreadPoolExecutor service = new ThreadPoolExecutor(minThreads, maxThreads, maxIdleTime, unit,taskqueue);
+               taskqueue.setParent(service);
+               return service;         
+       }
+
+       public static ExecutorService newThreadPool(int minThreads, int maxThreads, long maxIdleTime, TimeUnit unit, ThreadFactory threadFactory) {
+               TaskQueue taskqueue = new TaskQueue();
+               ThreadPoolExecutor service = new ThreadPoolExecutor(minThreads, maxThreads, maxIdleTime, unit,taskqueue, threadFactory);
+               taskqueue.setParent(service);
+               return service;
+       }
+       
+        // ---------------------------------------------- TaskQueue Inner Class
+    private static class TaskQueue extends LinkedBlockingQueue<Runnable> {
+        ThreadPoolExecutor parent = null;
+
+        public TaskQueue() {
+            super();
+        }
+
+        public TaskQueue(int initialCapacity) {
+            super(initialCapacity);
+        }
+
+        public TaskQueue(Collection<? extends Runnable> c) {
+            super(c);
+        }
+
+        public void setParent(ThreadPoolExecutor tp) {
+            parent = tp;
+        }
+        
+        public boolean force(Runnable o) {
+            if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
+            return super.offer(o); //forces the item onto the queue, to be used if the task is rejected
+        }
+
+        public boolean offer(Runnable o) {
+            //we can't do any checks
+            if (parent==null) return super.offer(o);
+            //we are maxed out on threads, simply queue the object
+            if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
+            //we have idle threads, just add it to the queue
+            //this is an approximation, so it could use some tuning
+            if (parent.getActiveCount()<(parent.getPoolSize())) return super.offer(o);
+            //if we have less threads than maximum force creation of a new thread
+            if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
+            //if we reached here, we need to add it to the queue
+            return super.offer(o);
+        }
+    }
+}