Refactor, name change in preparation to swap in Executor, and to make increase thread...
authorfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Fri, 15 Dec 2006 00:25:17 +0000 (00:25 +0000)
committerfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Fri, 15 Dec 2006 00:25:17 +0000 (00:25 +0000)
git-svn-id: https://svn.apache.org/repos/asf/tomcat/tc6.0.x/trunk@487407 13f79535-47bb-0310-9956-ffa450edef68

java/org/apache/catalina/tribes/transport/ReceiverBase.java
java/org/apache/catalina/tribes/transport/RxTaskPool.java [new file with mode: 0644]
java/org/apache/catalina/tribes/transport/ThreadPool.java [deleted file]
java/org/apache/catalina/tribes/transport/WorkerThread.java
java/org/apache/catalina/tribes/transport/bio/BioReceiver.java
java/org/apache/catalina/tribes/transport/nio/NioReceiver.java

index e01b6d9..fdddf9c 100644 (file)
@@ -38,7 +38,7 @@ import org.apache.juli.logging.Log;
  * @author not attributable
  * @version 1.0
  */
-public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, ThreadPool.ThreadCreator {
+public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, RxTaskPool.TaskCreator {
 
     public static final int OPTION_DIRECT_BUFFER = 0x0004;
 
@@ -53,7 +53,7 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, T
     private int rxBufSize = 43800;
     private int txBufSize = 25188;
     private boolean listen = false;
-    private ThreadPool pool;
+    private RxTaskPool pool;
     private boolean direct = true;
     private long tcpSelectorTimeout = 5000;
     //how many times to search for an available socket
@@ -270,7 +270,7 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, T
         return listener;
     }
 
-    public ThreadPool getPool() {
+    public RxTaskPool getPool() {
         return pool;
     }
     
@@ -367,7 +367,7 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, T
         this.log = log;
     }
 
-    public void setPool(ThreadPool pool) {
+    public void setPool(RxTaskPool pool) {
         this.pool = pool;
     }
 
diff --git a/java/org/apache/catalina/tribes/transport/RxTaskPool.java b/java/org/apache/catalina/tribes/transport/RxTaskPool.java
new file mode 100644 (file)
index 0000000..43ac09b
--- /dev/null
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.transport;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * @author not attributable
+ * @version 1.0
+ */
+
+public class RxTaskPool
+{
+    /**
+     * A very simple thread pool class.  The pool size is set at
+     * construction time and remains fixed.  Threads are cycled
+     * through a FIFO idle queue.
+     */
+
+    List idle = new LinkedList();
+    List used = new LinkedList();
+    
+    Object mutex = new Object();
+    boolean running = true;
+    
+    private static int counter = 1;
+    private int maxThreads;
+    private int minThreads;
+    
+    private TaskCreator creator = null;
+
+    private static synchronized int inc() {
+        return counter++;
+    }
+
+    
+    public RxTaskPool (int maxThreads, int minThreads, TaskCreator creator) throws Exception {
+        // fill up the pool with worker threads
+        this.maxThreads = maxThreads;
+        this.minThreads = minThreads;
+        this.creator = creator;
+        //for (int i = 0; i < minThreads; i++) {
+        for (int i = 0; i < maxThreads; i++) { //temporary fix for thread hand off problem
+            WorkerThread thread = creator.getWorkerThread();
+            setupThread(thread);
+            idle.add (thread);
+        }
+    }
+    
+    protected void setupThread(WorkerThread thread) {
+        synchronized (thread) {
+            thread.setPool(this);
+            thread.setName(thread.getClass().getName() + "[" + inc() + "]");
+            thread.setDaemon(true);
+            thread.setPriority(Thread.MAX_PRIORITY);
+            thread.start();
+            try {thread.wait(500); }catch ( InterruptedException x ) {}
+        }
+    }
+
+    /**
+     * Find an idle worker thread, if any.  Could return null.
+     */
+    public WorkerThread getWorker()
+    {
+        WorkerThread worker = null;
+        synchronized (mutex) {
+            while ( worker == null && running ) {
+                if (idle.size() > 0) {
+                    try {
+                        worker = (WorkerThread) idle.remove(0);
+                    } catch (java.util.NoSuchElementException x) {
+                        //this means that there are no available workers
+                        worker = null;
+                    }
+                } else if ( used.size() < this.maxThreads && creator != null) {
+                    worker = creator.getWorkerThread();
+                    setupThread(worker);
+                } else {
+                    try { mutex.wait(); } catch ( java.lang.InterruptedException x ) {Thread.currentThread().interrupted();}
+                }
+            }//while
+            if ( worker != null ) used.add(worker);
+        }
+        return (worker);
+    }
+    
+    public int available() {
+        return idle.size();
+    }
+
+    /**
+     * Called by the worker thread to return itself to the
+     * idle pool.
+     */
+    public void returnWorker (WorkerThread worker) {
+        if ( running ) {
+            synchronized (mutex) {
+                used.remove(worker);
+                //if ( idle.size() < minThreads && !idle.contains(worker)) idle.add(worker);
+                if ( idle.size() < maxThreads && !idle.contains(worker)) idle.add(worker); //let max be the upper limit
+                else {
+                    worker.setDoRun(false);
+                    synchronized (worker){worker.notify();}
+                }
+                mutex.notify();
+            }
+        }else {
+            worker.setDoRun(false);
+            synchronized (worker){worker.notify();}
+        }
+    }
+
+    public int getMaxThreads() {
+        return maxThreads;
+    }
+
+    public int getMinThreads() {
+        return minThreads;
+    }
+
+    public void stop() {
+        running = false;
+        synchronized (mutex) {
+            Iterator i = idle.iterator();
+            while ( i.hasNext() ) {
+                WorkerThread worker = (WorkerThread)i.next();
+                returnWorker(worker);
+                i.remove();
+            }
+        }
+    }
+
+    public void setMaxThreads(int maxThreads) {
+        this.maxThreads = maxThreads;
+    }
+
+    public void setMinThreads(int minThreads) {
+        this.minThreads = minThreads;
+    }
+
+    public TaskCreator getThreadCreator() {
+        return this.creator;
+    }
+    
+    public static interface TaskCreator {
+        public WorkerThread getWorkerThread();
+    }
+}
diff --git a/java/org/apache/catalina/tribes/transport/ThreadPool.java b/java/org/apache/catalina/tribes/transport/ThreadPool.java
deleted file mode 100644 (file)
index e15a018..0000000
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.transport;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * @author not attributable
- * @version 1.0
- */
-
-public class ThreadPool
-{
-    /**
-     * A very simple thread pool class.  The pool size is set at
-     * construction time and remains fixed.  Threads are cycled
-     * through a FIFO idle queue.
-     */
-
-    List idle = new LinkedList();
-    List used = new LinkedList();
-    
-    Object mutex = new Object();
-    boolean running = true;
-    
-    private static int counter = 1;
-    private int maxThreads;
-    private int minThreads;
-    
-    private ThreadCreator creator = null;
-
-    private static synchronized int inc() {
-        return counter++;
-    }
-
-    
-    public ThreadPool (int maxThreads, int minThreads, ThreadCreator creator) throws Exception {
-        // fill up the pool with worker threads
-        this.maxThreads = maxThreads;
-        this.minThreads = minThreads;
-        this.creator = creator;
-        //for (int i = 0; i < minThreads; i++) {
-        for (int i = 0; i < maxThreads; i++) { //temporary fix for thread hand off problem
-            WorkerThread thread = creator.getWorkerThread();
-            setupThread(thread);
-            idle.add (thread);
-        }
-    }
-    
-    protected void setupThread(WorkerThread thread) {
-        synchronized (thread) {
-            thread.setPool(this);
-            thread.setName(thread.getClass().getName() + "[" + inc() + "]");
-            thread.setDaemon(true);
-            thread.setPriority(Thread.MAX_PRIORITY);
-            thread.start();
-            try {thread.wait(500); }catch ( InterruptedException x ) {}
-        }
-    }
-
-    /**
-     * Find an idle worker thread, if any.  Could return null.
-     */
-    public WorkerThread getWorker()
-    {
-        WorkerThread worker = null;
-        synchronized (mutex) {
-            while ( worker == null && running ) {
-                if (idle.size() > 0) {
-                    try {
-                        worker = (WorkerThread) idle.remove(0);
-                    } catch (java.util.NoSuchElementException x) {
-                        //this means that there are no available workers
-                        worker = null;
-                    }
-                } else if ( used.size() < this.maxThreads && creator != null) {
-                    worker = creator.getWorkerThread();
-                    setupThread(worker);
-                } else {
-                    try { mutex.wait(); } catch ( java.lang.InterruptedException x ) {Thread.currentThread().interrupted();}
-                }
-            }//while
-            if ( worker != null ) used.add(worker);
-        }
-        return (worker);
-    }
-    
-    public int available() {
-        return idle.size();
-    }
-
-    /**
-     * Called by the worker thread to return itself to the
-     * idle pool.
-     */
-    public void returnWorker (WorkerThread worker) {
-        if ( running ) {
-            synchronized (mutex) {
-                used.remove(worker);
-                //if ( idle.size() < minThreads && !idle.contains(worker)) idle.add(worker);
-                if ( idle.size() < maxThreads && !idle.contains(worker)) idle.add(worker); //let max be the upper limit
-                else {
-                    worker.setDoRun(false);
-                    synchronized (worker){worker.notify();}
-                }
-                mutex.notify();
-            }
-        }else {
-            worker.setDoRun(false);
-            synchronized (worker){worker.notify();}
-        }
-    }
-
-    public int getMaxThreads() {
-        return maxThreads;
-    }
-
-    public int getMinThreads() {
-        return minThreads;
-    }
-
-    public void stop() {
-        running = false;
-        synchronized (mutex) {
-            Iterator i = idle.iterator();
-            while ( i.hasNext() ) {
-                WorkerThread worker = (WorkerThread)i.next();
-                returnWorker(worker);
-                i.remove();
-            }
-        }
-    }
-
-    public void setMaxThreads(int maxThreads) {
-        this.maxThreads = maxThreads;
-    }
-
-    public void setMinThreads(int minThreads) {
-        this.minThreads = minThreads;
-    }
-
-    public ThreadCreator getThreadCreator() {
-        return this.creator;
-    }
-    
-    public static interface ThreadCreator {
-        public WorkerThread getWorkerThread();
-    }
-}
index 8c7df84..88afd87 100644 (file)
@@ -27,12 +27,12 @@ import org.apache.catalina.tribes.io.ListenCallback;
  * @version $Revision$ $Date$
  */
 public abstract class WorkerThread extends Thread 
-{
+{ 
     
     public static final int OPTION_DIRECT_BUFFER = ReceiverBase.OPTION_DIRECT_BUFFER;
     
     private ListenCallback callback;
-    private ThreadPool pool;
+    private RxTaskPool pool;
     private boolean doRun = true;
     private int options;
     protected boolean useBufferPool = true;
@@ -41,7 +41,7 @@ public abstract class WorkerThread extends Thread
         this.callback = callback;
     }
 
-    public void setPool(ThreadPool pool) {
+    public void setPool(RxTaskPool pool) {
         this.pool = pool;
     }
 
@@ -57,7 +57,7 @@ public abstract class WorkerThread extends Thread
         this.doRun = doRun;
     }
 
-    public ThreadPool getPool() {
+    public RxTaskPool getPool() {
         return pool;
     }
 
index ac083dd..6fc75e6 100644 (file)
@@ -23,7 +23,7 @@ import org.apache.catalina.tribes.ChannelReceiver;
 import org.apache.catalina.tribes.io.ListenCallback;
 import org.apache.catalina.tribes.io.ObjectReader;
 import org.apache.catalina.tribes.transport.ReceiverBase;
-import org.apache.catalina.tribes.transport.ThreadPool;
+import org.apache.catalina.tribes.transport.RxTaskPool;
 import org.apache.catalina.tribes.transport.WorkerThread;
 
 /**
@@ -54,7 +54,7 @@ public class BioReceiver extends ReceiverBase implements Runnable, ChannelReceiv
      */
     public void start() throws IOException {
         try {
-            setPool(new ThreadPool(getMaxThreads(),getMinThreads(),this));
+            setPool(new RxTaskPool(getMaxThreads(),getMinThreads(),this));
         } catch (Exception x) {
             log.fatal("ThreadPool can initilzed. Listener not started", x);
             if ( x instanceof IOException ) throw (IOException)x;
index fac742c..0b0a022 100644 (file)
@@ -31,7 +31,7 @@ import org.apache.catalina.tribes.io.ListenCallback;
 import org.apache.catalina.tribes.io.ObjectReader;
 import org.apache.catalina.tribes.transport.Constants;
 import org.apache.catalina.tribes.transport.ReceiverBase;
-import org.apache.catalina.tribes.transport.ThreadPool;
+import org.apache.catalina.tribes.transport.RxTaskPool;
 import org.apache.catalina.tribes.transport.WorkerThread;
 import org.apache.catalina.tribes.util.StringManager;
 import java.util.LinkedList;
@@ -90,7 +90,7 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv
     public void start() throws IOException {
         try {
 //            setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this));
-            setPool(new ThreadPool(getMaxThreads(),getMinThreads(),this));
+            setPool(new RxTaskPool(getMaxThreads(),getMinThreads(),this));
         } catch (Exception x) {
             log.fatal("ThreadPool can initilzed. Listener not started", x);
             if ( x instanceof IOException ) throw (IOException)x;