import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
+import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.ChannelReceiver;
private long tcpSelectorTimeout = 5000;
//how many times to search for an available socket
private int autoBind = 100;
- private int maxThreads = Integer.MAX_VALUE;
+ private int maxThreads = 15;
private int minThreads = 6;
private int maxTasks = 100;
private int minTasks = 10;
private int soTrafficClass = 0x04 | 0x08 | 0x010;
private int timeout = 3000; //3 seconds
private boolean useBufferPool = true;
-
+ private boolean daemon = true;
+ private long maxIdleTime = 60000;
+
private ExecutorService executor;
public void start() throws IOException {
if ( executor == null ) {
- executor = new ThreadPoolExecutor(minThreads,maxThreads,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
+ //executor = new ThreadPoolExecutor(minThreads,maxThreads,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
+ TaskQueue taskqueue = new TaskQueue();
+ TaskThreadFactory tf = new TaskThreadFactory("Tribes-Task-Receiver-");
+ executor = new ThreadPoolExecutor(minThreads, maxThreads, maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
+ taskqueue.setParent((ThreadPoolExecutor)executor);
}
}
this.udpTxBufSize = udpTxBufSize;
}
+ // ---------------------------------------------- TaskQueue Inner Class
+ class TaskQueue extends LinkedBlockingQueue<Runnable> {
+ ThreadPoolExecutor parent = null;
+
+ public TaskQueue() {
+ super();
+ }
+
+ public TaskQueue(int initialCapacity) {
+ super(initialCapacity);
+ }
+
+ public TaskQueue(Collection<? extends Runnable> c) {
+ super(c);
+ }
+
+ public void setParent(ThreadPoolExecutor tp) {
+ parent = tp;
+ }
+
+ public boolean force(Runnable o) {
+ if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
+ return super.offer(o); //forces the item onto the queue, to be used if the task is rejected
+ }
+
+ public boolean offer(Runnable o) {
+ //we can't do any checks
+ if (parent==null) return super.offer(o);
+ //we are maxed out on threads, simply queue the object
+ if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
+ //we have idle threads, just add it to the queue
+ //this is an approximation, so it could use some tuning
+ if (parent.getActiveCount()<(parent.getPoolSize())) return super.offer(o);
+ //if we have less threads than maximum force creation of a new thread
+ if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
+ //if we reached here, we need to add it to the queue
+ return super.offer(o);
+ }
+ }
+
+ // ---------------------------------------------- ThreadFactory Inner Class
+ class TaskThreadFactory implements ThreadFactory {
+ final ThreadGroup group;
+ final AtomicInteger threadNumber = new AtomicInteger(1);
+ final String namePrefix;
+
+ TaskThreadFactory(String namePrefix) {
+ SecurityManager s = System.getSecurityManager();
+ group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
+ this.namePrefix = namePrefix;
+ }
+
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement());
+ t.setDaemon(daemon);
+ t.setPriority(Thread.NORM_PRIORITY);
+ return t;
+ }
+ }
+
+ public boolean isDaemon() {
+ return daemon;
+ }
+
+ public long getMaxIdleTime() {
+ return maxIdleTime;
+ }
+
+ public void setDaemon(boolean daemon) {
+ this.daemon = daemon;
+ }
+
+ public void setMaxIdleTime(long maxIdleTime) {
+ this.maxIdleTime = maxIdleTime;
+ }
+
}
\ No newline at end of file