From: fhanik Date: Fri, 15 Dec 2006 00:25:17 +0000 (+0000) Subject: Refactor, name change in preparation to swap in Executor, and to make increase thread... X-Git-Url: https://git.internetallee.de/?a=commitdiff_plain;h=4e17feefa58d91eadec2b6130f83d846a463a475;p=tomcat7.0 Refactor, name change in preparation to swap in Executor, and to make increase thread fairness when receiving data git-svn-id: https://svn.apache.org/repos/asf/tomcat/tc6.0.x/trunk@487407 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/java/org/apache/catalina/tribes/transport/ReceiverBase.java b/java/org/apache/catalina/tribes/transport/ReceiverBase.java index e01b6d920..fdddf9ce5 100644 --- a/java/org/apache/catalina/tribes/transport/ReceiverBase.java +++ b/java/org/apache/catalina/tribes/transport/ReceiverBase.java @@ -38,7 +38,7 @@ import org.apache.juli.logging.Log; * @author not attributable * @version 1.0 */ -public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, ThreadPool.ThreadCreator { +public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, RxTaskPool.TaskCreator { public static final int OPTION_DIRECT_BUFFER = 0x0004; @@ -53,7 +53,7 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, T private int rxBufSize = 43800; private int txBufSize = 25188; private boolean listen = false; - private ThreadPool pool; + private RxTaskPool pool; private boolean direct = true; private long tcpSelectorTimeout = 5000; //how many times to search for an available socket @@ -270,7 +270,7 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, T return listener; } - public ThreadPool getPool() { + public RxTaskPool getPool() { return pool; } @@ -367,7 +367,7 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, T this.log = log; } - public void setPool(ThreadPool pool) { + public void setPool(RxTaskPool pool) { this.pool = pool; } diff --git a/java/org/apache/catalina/tribes/transport/RxTaskPool.java b/java/org/apache/catalina/tribes/transport/RxTaskPool.java new file mode 100644 index 000000000..43ac09b5d --- /dev/null +++ b/java/org/apache/catalina/tribes/transport/RxTaskPool.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.catalina.tribes.transport; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +/** + * @author not attributable + * @version 1.0 + */ + +public class RxTaskPool +{ + /** + * A very simple thread pool class. The pool size is set at + * construction time and remains fixed. Threads are cycled + * through a FIFO idle queue. + */ + + List idle = new LinkedList(); + List used = new LinkedList(); + + Object mutex = new Object(); + boolean running = true; + + private static int counter = 1; + private int maxThreads; + private int minThreads; + + private TaskCreator creator = null; + + private static synchronized int inc() { + return counter++; + } + + + public RxTaskPool (int maxThreads, int minThreads, TaskCreator creator) throws Exception { + // fill up the pool with worker threads + this.maxThreads = maxThreads; + this.minThreads = minThreads; + this.creator = creator; + //for (int i = 0; i < minThreads; i++) { + for (int i = 0; i < maxThreads; i++) { //temporary fix for thread hand off problem + WorkerThread thread = creator.getWorkerThread(); + setupThread(thread); + idle.add (thread); + } + } + + protected void setupThread(WorkerThread thread) { + synchronized (thread) { + thread.setPool(this); + thread.setName(thread.getClass().getName() + "[" + inc() + "]"); + thread.setDaemon(true); + thread.setPriority(Thread.MAX_PRIORITY); + thread.start(); + try {thread.wait(500); }catch ( InterruptedException x ) {} + } + } + + /** + * Find an idle worker thread, if any. Could return null. + */ + public WorkerThread getWorker() + { + WorkerThread worker = null; + synchronized (mutex) { + while ( worker == null && running ) { + if (idle.size() > 0) { + try { + worker = (WorkerThread) idle.remove(0); + } catch (java.util.NoSuchElementException x) { + //this means that there are no available workers + worker = null; + } + } else if ( used.size() < this.maxThreads && creator != null) { + worker = creator.getWorkerThread(); + setupThread(worker); + } else { + try { mutex.wait(); } catch ( java.lang.InterruptedException x ) {Thread.currentThread().interrupted();} + } + }//while + if ( worker != null ) used.add(worker); + } + return (worker); + } + + public int available() { + return idle.size(); + } + + /** + * Called by the worker thread to return itself to the + * idle pool. + */ + public void returnWorker (WorkerThread worker) { + if ( running ) { + synchronized (mutex) { + used.remove(worker); + //if ( idle.size() < minThreads && !idle.contains(worker)) idle.add(worker); + if ( idle.size() < maxThreads && !idle.contains(worker)) idle.add(worker); //let max be the upper limit + else { + worker.setDoRun(false); + synchronized (worker){worker.notify();} + } + mutex.notify(); + } + }else { + worker.setDoRun(false); + synchronized (worker){worker.notify();} + } + } + + public int getMaxThreads() { + return maxThreads; + } + + public int getMinThreads() { + return minThreads; + } + + public void stop() { + running = false; + synchronized (mutex) { + Iterator i = idle.iterator(); + while ( i.hasNext() ) { + WorkerThread worker = (WorkerThread)i.next(); + returnWorker(worker); + i.remove(); + } + } + } + + public void setMaxThreads(int maxThreads) { + this.maxThreads = maxThreads; + } + + public void setMinThreads(int minThreads) { + this.minThreads = minThreads; + } + + public TaskCreator getThreadCreator() { + return this.creator; + } + + public static interface TaskCreator { + public WorkerThread getWorkerThread(); + } +} diff --git a/java/org/apache/catalina/tribes/transport/ThreadPool.java b/java/org/apache/catalina/tribes/transport/ThreadPool.java deleted file mode 100644 index e15a0182d..000000000 --- a/java/org/apache/catalina/tribes/transport/ThreadPool.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.catalina.tribes.transport; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; - -/** - * @author not attributable - * @version 1.0 - */ - -public class ThreadPool -{ - /** - * A very simple thread pool class. The pool size is set at - * construction time and remains fixed. Threads are cycled - * through a FIFO idle queue. - */ - - List idle = new LinkedList(); - List used = new LinkedList(); - - Object mutex = new Object(); - boolean running = true; - - private static int counter = 1; - private int maxThreads; - private int minThreads; - - private ThreadCreator creator = null; - - private static synchronized int inc() { - return counter++; - } - - - public ThreadPool (int maxThreads, int minThreads, ThreadCreator creator) throws Exception { - // fill up the pool with worker threads - this.maxThreads = maxThreads; - this.minThreads = minThreads; - this.creator = creator; - //for (int i = 0; i < minThreads; i++) { - for (int i = 0; i < maxThreads; i++) { //temporary fix for thread hand off problem - WorkerThread thread = creator.getWorkerThread(); - setupThread(thread); - idle.add (thread); - } - } - - protected void setupThread(WorkerThread thread) { - synchronized (thread) { - thread.setPool(this); - thread.setName(thread.getClass().getName() + "[" + inc() + "]"); - thread.setDaemon(true); - thread.setPriority(Thread.MAX_PRIORITY); - thread.start(); - try {thread.wait(500); }catch ( InterruptedException x ) {} - } - } - - /** - * Find an idle worker thread, if any. Could return null. - */ - public WorkerThread getWorker() - { - WorkerThread worker = null; - synchronized (mutex) { - while ( worker == null && running ) { - if (idle.size() > 0) { - try { - worker = (WorkerThread) idle.remove(0); - } catch (java.util.NoSuchElementException x) { - //this means that there are no available workers - worker = null; - } - } else if ( used.size() < this.maxThreads && creator != null) { - worker = creator.getWorkerThread(); - setupThread(worker); - } else { - try { mutex.wait(); } catch ( java.lang.InterruptedException x ) {Thread.currentThread().interrupted();} - } - }//while - if ( worker != null ) used.add(worker); - } - return (worker); - } - - public int available() { - return idle.size(); - } - - /** - * Called by the worker thread to return itself to the - * idle pool. - */ - public void returnWorker (WorkerThread worker) { - if ( running ) { - synchronized (mutex) { - used.remove(worker); - //if ( idle.size() < minThreads && !idle.contains(worker)) idle.add(worker); - if ( idle.size() < maxThreads && !idle.contains(worker)) idle.add(worker); //let max be the upper limit - else { - worker.setDoRun(false); - synchronized (worker){worker.notify();} - } - mutex.notify(); - } - }else { - worker.setDoRun(false); - synchronized (worker){worker.notify();} - } - } - - public int getMaxThreads() { - return maxThreads; - } - - public int getMinThreads() { - return minThreads; - } - - public void stop() { - running = false; - synchronized (mutex) { - Iterator i = idle.iterator(); - while ( i.hasNext() ) { - WorkerThread worker = (WorkerThread)i.next(); - returnWorker(worker); - i.remove(); - } - } - } - - public void setMaxThreads(int maxThreads) { - this.maxThreads = maxThreads; - } - - public void setMinThreads(int minThreads) { - this.minThreads = minThreads; - } - - public ThreadCreator getThreadCreator() { - return this.creator; - } - - public static interface ThreadCreator { - public WorkerThread getWorkerThread(); - } -} diff --git a/java/org/apache/catalina/tribes/transport/WorkerThread.java b/java/org/apache/catalina/tribes/transport/WorkerThread.java index 8c7df8408..88afd87a8 100644 --- a/java/org/apache/catalina/tribes/transport/WorkerThread.java +++ b/java/org/apache/catalina/tribes/transport/WorkerThread.java @@ -27,12 +27,12 @@ import org.apache.catalina.tribes.io.ListenCallback; * @version $Revision$ $Date$ */ public abstract class WorkerThread extends Thread -{ +{ public static final int OPTION_DIRECT_BUFFER = ReceiverBase.OPTION_DIRECT_BUFFER; private ListenCallback callback; - private ThreadPool pool; + private RxTaskPool pool; private boolean doRun = true; private int options; protected boolean useBufferPool = true; @@ -41,7 +41,7 @@ public abstract class WorkerThread extends Thread this.callback = callback; } - public void setPool(ThreadPool pool) { + public void setPool(RxTaskPool pool) { this.pool = pool; } @@ -57,7 +57,7 @@ public abstract class WorkerThread extends Thread this.doRun = doRun; } - public ThreadPool getPool() { + public RxTaskPool getPool() { return pool; } diff --git a/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java b/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java index ac083dd7c..6fc75e61a 100644 --- a/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java +++ b/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java @@ -23,7 +23,7 @@ import org.apache.catalina.tribes.ChannelReceiver; import org.apache.catalina.tribes.io.ListenCallback; import org.apache.catalina.tribes.io.ObjectReader; import org.apache.catalina.tribes.transport.ReceiverBase; -import org.apache.catalina.tribes.transport.ThreadPool; +import org.apache.catalina.tribes.transport.RxTaskPool; import org.apache.catalina.tribes.transport.WorkerThread; /** @@ -54,7 +54,7 @@ public class BioReceiver extends ReceiverBase implements Runnable, ChannelReceiv */ public void start() throws IOException { try { - setPool(new ThreadPool(getMaxThreads(),getMinThreads(),this)); + setPool(new RxTaskPool(getMaxThreads(),getMinThreads(),this)); } catch (Exception x) { log.fatal("ThreadPool can initilzed. Listener not started", x); if ( x instanceof IOException ) throw (IOException)x; diff --git a/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java b/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java index fac742c53..0b0a02243 100644 --- a/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java +++ b/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java @@ -31,7 +31,7 @@ import org.apache.catalina.tribes.io.ListenCallback; import org.apache.catalina.tribes.io.ObjectReader; import org.apache.catalina.tribes.transport.Constants; import org.apache.catalina.tribes.transport.ReceiverBase; -import org.apache.catalina.tribes.transport.ThreadPool; +import org.apache.catalina.tribes.transport.RxTaskPool; import org.apache.catalina.tribes.transport.WorkerThread; import org.apache.catalina.tribes.util.StringManager; import java.util.LinkedList; @@ -90,7 +90,7 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv public void start() throws IOException { try { // setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this)); - setPool(new ThreadPool(getMaxThreads(),getMinThreads(),this)); + setPool(new RxTaskPool(getMaxThreads(),getMinThreads(),this)); } catch (Exception x) { log.fatal("ThreadPool can initilzed. Listener not started", x); if ( x instanceof IOException ) throw (IOException)x;