First round of refactoring connectors.
authorfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Mon, 24 Aug 2009 15:33:48 +0000 (15:33 +0000)
committerfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Mon, 24 Aug 2009 15:33:48 +0000 (15:33 +0000)
Remove the worker based thread pools
Enable local or injected executors
Add in a resizable executors interface to be used in future revisions
start abstracting out and using a base class. There was one, deleted, since its not used anywhere

git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@807284 13f79535-47bb-0310-9956-ffa450edef68

java/org/apache/tomcat/util/net/AbstractEndpoint.java [new file with mode: 0644]
java/org/apache/tomcat/util/net/AprEndpoint.java
java/org/apache/tomcat/util/net/BaseEndpoint.java [deleted file]
java/org/apache/tomcat/util/net/JIoEndpoint.java
java/org/apache/tomcat/util/net/NioEndpoint.java
java/org/apache/tomcat/util/threads/ResizableExecutor.java [new file with mode: 0644]

diff --git a/java/org/apache/tomcat/util/net/AbstractEndpoint.java b/java/org/apache/tomcat/util/net/AbstractEndpoint.java
new file mode 100644 (file)
index 0000000..e4a811e
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.net;
+
+import org.apache.tomcat.util.res.StringManager;
+/**
+ * 
+ * @author fhanik
+ * @author Mladen Turk
+ * @author Remy Maucherat
+ */
+public abstract class AbstractEndpoint {
+    
+    // -------------------------------------------------------------- Constants
+    protected StringManager sm = StringManager.getManager("org.apache.tomcat.util.net.res");
+
+    /**
+     * The Request attribute key for the cipher suite.
+     */
+    public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite";
+
+    /**
+     * The Request attribute key for the key size.
+     */
+    public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size";
+
+    /**
+     * The Request attribute key for the client certificate chain.
+     */
+    public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate";
+
+    /**
+     * The Request attribute key for the session id.
+     * This one is a Tomcat extension to the Servlet spec.
+     */
+    public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session";
+
+    /**
+     * The request attribute key for the session manager.
+     * This one is a Tomcat extension to the Servlet spec.
+     */
+    public static final String SESSION_MGR = "javax.servlet.request.ssl_session_mgr";
+
+}
index bfb5c5c..d3053e3 100644 (file)
@@ -21,6 +21,8 @@ import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
@@ -37,6 +39,10 @@ import org.apache.tomcat.jni.SSLSocket;
 import org.apache.tomcat.jni.Socket;
 import org.apache.tomcat.jni.Status;
 import org.apache.tomcat.util.res.StringManager;
+import org.apache.tomcat.util.threads.ResizableExecutor;
+import org.apache.tomcat.util.threads.TaskQueue;
+import org.apache.tomcat.util.threads.TaskThreadFactory;
+import org.apache.tomcat.util.threads.ThreadPoolExecutor;
 
 /**
  * APR tailored thread pool, providing the following services:
@@ -53,7 +59,7 @@ import org.apache.tomcat.util.res.StringManager;
  * @author Mladen Turk
  * @author Remy Maucherat
  */
-public class AprEndpoint {
+public class AprEndpoint extends AbstractEndpoint {
 
 
     // -------------------------------------------------------------- Constants
@@ -61,8 +67,6 @@ public class AprEndpoint {
 
     protected static Log log = LogFactory.getLog(AprEndpoint.class);
 
-    protected static StringManager sm =
-        StringManager.getManager("org.apache.tomcat.util.net.res");
 
 
     /**
@@ -86,24 +90,11 @@ public class AprEndpoint {
      */
     public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session";
 
-    /**
-     * The request attribute key for the session manager.
-     * This one is a Tomcat extension to the Servlet spec.
-     */
-    public static final String SESSION_MGR =
-        "javax.servlet.request.ssl_session_mgr";
-
 
     // ----------------------------------------------------------------- Fields
 
 
     /**
-     * Available workers.
-     */
-    protected WorkerStack workers = null;
-
-
-    /**
      * Running state of the endpoint.
      */
     protected volatile boolean running = false;
@@ -163,6 +154,10 @@ public class AprEndpoint {
     protected long sslContext = 0;
 
     
+    /**
+     * Are we using an internal executor
+     */
+    protected volatile boolean internalExecutor = false;
     // ------------------------------------------------------------- Properties
 
 
@@ -188,10 +183,8 @@ public class AprEndpoint {
     protected int maxThreads = 200;
     public void setMaxThreads(int maxThreads) {
         this.maxThreads = maxThreads;
-        if (running) {
-            synchronized(workers) {
-                workers.resize(maxThreads);
-            }
+        if (running && executor instanceof ResizableExecutor) {
+            ((ResizableExecutor)executor).resizePool(getMinSpareThreads(), getMaxThreads());
         }
     }
     public int getMaxThreads() { return maxThreads; }
@@ -545,9 +538,15 @@ public class AprEndpoint {
      */
     public int getCurrentThreadCount() {
         if (executor!=null) {
-            return -1;
+            if (executor instanceof ThreadPoolExecutor) {
+                return ((ThreadPoolExecutor)executor).getPoolSize();
+            } else if (executor instanceof ResizableExecutor) {
+                return ((ResizableExecutor)executor).getPoolSize();
+            } else {
+                return -1;
+            }
         } else {
-            return curThreads;
+            return -2;
         }
     }
 
@@ -558,9 +557,15 @@ public class AprEndpoint {
      */
     public int getCurrentThreadsBusy() {
         if (executor!=null) {
-            return -1;
+            if (executor instanceof ThreadPoolExecutor) {
+                return ((ThreadPoolExecutor)executor).getActiveCount();
+            } else if (executor instanceof ResizableExecutor) {
+                return ((ResizableExecutor)executor).getActiveCount();
+            } else {
+                return -1;
+            }
         } else {
-            return workers!=null?curThreads - workers.size():0;
+            return -2;
         }
     }
     
@@ -744,7 +749,11 @@ public class AprEndpoint {
 
             // Create worker collection
             if (executor == null) {
-                workers = new WorkerStack(maxThreads);
+                internalExecutor = true;
+                TaskQueue taskqueue = new TaskQueue();
+                TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
+                executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
+                taskqueue.setParent( (ThreadPoolExecutor) executor);
             }
 
             // Start poller threads
@@ -838,6 +847,16 @@ public class AprEndpoint {
                 sendfiles = null;
             }
         }
+        if ( executor!=null && internalExecutor ) {
+            if ( executor instanceof ThreadPoolExecutor ) {
+                //this is our internal one, so we need to shut it down
+                ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;
+                tpe.shutdownNow();
+                TaskQueue queue = (TaskQueue) tpe.getQueue();
+                queue.setParent(null);
+            }
+            executor = null;
+        }
     }
 
 
@@ -946,86 +965,6 @@ public class AprEndpoint {
     }
 
 
-    /**
-     * Create (or allocate) and return an available processor for use in
-     * processing a specific HTTP request, if possible.  If the maximum
-     * allowed processors have already been created and are in use, return
-     * <code>null</code> instead.
-     */
-    protected Worker createWorkerThread() {
-
-        synchronized (workers) {
-            if (workers.size() > 0) {
-                curThreadsBusy++;
-                return (workers.pop());
-            }
-            if ((maxThreads > 0) && (curThreads < maxThreads)) {
-                curThreadsBusy++;
-                if (curThreadsBusy == maxThreads) {
-                    log.info(sm.getString("endpoint.info.maxThreads",
-                            Integer.toString(maxThreads), address,
-                            Integer.toString(port)));
-                }
-                return (newWorkerThread());
-            } else {
-                if (maxThreads < 0) {
-                    curThreadsBusy++;
-                    return (newWorkerThread());
-                } else {
-                    return (null);
-                }
-            }
-        }
-
-    }
-
-
-    /**
-     * Create and return a new processor suitable for processing HTTP
-     * requests and returning the corresponding responses.
-     */
-    protected Worker newWorkerThread() {
-
-        Worker workerThread = new Worker();
-        workerThread.start();
-        return (workerThread);
-
-    }
-
-
-    /**
-     * Return a new worker thread, and block while to worker is available.
-     */
-    protected Worker getWorkerThread() {
-        // Allocate a new worker thread
-        Worker workerThread = createWorkerThread();
-        while (workerThread == null) {
-            try {
-                synchronized (workers) {
-                    workers.wait();
-                }
-            } catch (InterruptedException e) {
-                // Ignore
-            }
-            workerThread = createWorkerThread();
-        }
-        return workerThread;
-    }
-
-
-    /**
-     * Recycle the specified Processor so that it can be used again.
-     *
-     * @param workerThread The processor to be recycled
-     */
-    protected void recycleWorkerThread(Worker workerThread) {
-        synchronized (workers) {
-            workers.push(workerThread);
-            curThreadsBusy--;
-            workers.notify();
-        }
-    }
-
     
     /**
      * Allocate a new poller of the specified size.
@@ -1050,11 +989,10 @@ public class AprEndpoint {
      */
     protected boolean processSocketWithOptions(long socket) {
         try {
-            if (executor == null) {
-                getWorkerThread().assignWithOptions(socket);
-            } else {
-                executor.execute(new SocketWithOptionsProcessor(socket));
-            }
+            executor.execute(new SocketWithOptionsProcessor(socket));
+        } catch (RejectedExecutionException x) {
+            log.warn("Socket processing request was rejected for:"+socket,x);
+            return false;
         } catch (Throwable t) {
             // This means we got an OOM or similar creating a thread, or that
             // the pool and its queue are full
@@ -1070,11 +1008,10 @@ public class AprEndpoint {
      */
     protected boolean processSocket(long socket) {
         try {
-            if (executor == null) {
-                getWorkerThread().assign(socket);
-            } else {
-                executor.execute(new SocketProcessor(socket));
-            }
+            executor.execute(new SocketProcessor(socket));
+        } catch (RejectedExecutionException x) {
+            log.warn("Socket processing request was rejected for:"+socket,x);
+            return false;
         } catch (Throwable t) {
             // This means we got an OOM or similar creating a thread, or that
             // the pool and its queue are full
@@ -1090,11 +1027,10 @@ public class AprEndpoint {
      */
     protected boolean processSocket(long socket, SocketStatus status) {
         try {
-            if (executor == null) {
-                getWorkerThread().assign(socket, status);
-            } else {
-                executor.execute(new SocketEventProcessor(socket, status));
-            }
+            executor.execute(new SocketEventProcessor(socket, status));
+        } catch (RejectedExecutionException x) {
+            log.warn("Socket processing request was rejected for:"+socket,x);
+            return false;
         } catch (Throwable t) {
             // This means we got an OOM or similar creating a thread, or that
             // the pool and its queue are full
@@ -1389,178 +1325,6 @@ public class AprEndpoint {
     // ----------------------------------------------------- Worker Inner Class
 
 
-    /**
-     * Server processor class.
-     */
-    protected class Worker implements Runnable {
-
-
-        protected Thread thread = null;
-        protected boolean available = false;
-        protected long socket = 0;
-        protected SocketStatus status = null;
-        protected boolean options = false;
-
-
-        /**
-         * Process an incoming TCP/IP connection on the specified socket.  Any
-         * exception that occurs during processing must be logged and swallowed.
-         * <b>NOTE</b>:  This method is called from our Connector's thread.  We
-         * must assign it to our own thread so that multiple simultaneous
-         * requests can be handled.
-         *
-         * @param socket TCP socket to process
-         */
-        protected synchronized void assignWithOptions(long socket) {
-
-            // Wait for the Processor to get the previous Socket
-            while (available) {
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                }
-            }
-
-            // Store the newly available Socket and notify our thread
-            this.socket = socket;
-            status = null;
-            options = true;
-            available = true;
-            notifyAll();
-
-        }
-
-
-        /**
-         * Process an incoming TCP/IP connection on the specified socket.  Any
-         * exception that occurs during processing must be logged and swallowed.
-         * <b>NOTE</b>:  This method is called from our Connector's thread.  We
-         * must assign it to our own thread so that multiple simultaneous
-         * requests can be handled.
-         *
-         * @param socket TCP socket to process
-         */
-        protected synchronized void assign(long socket) {
-
-            // Wait for the Processor to get the previous Socket
-            while (available) {
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                }
-            }
-
-            // Store the newly available Socket and notify our thread
-            this.socket = socket;
-            status = null;
-            options = false;
-            available = true;
-            notifyAll();
-
-        }
-
-
-        protected synchronized void assign(long socket, SocketStatus status) {
-
-            // Wait for the Processor to get the previous Socket
-            while (available) {
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                }
-            }
-
-            // Store the newly available Socket and notify our thread
-            this.socket = socket;
-            this.status = status;
-            options = false;
-            available = true;
-            notifyAll();
-
-        }
-
-
-        /**
-         * Await a newly assigned Socket from our Connector, or <code>null</code>
-         * if we are supposed to shut down.
-         */
-        protected synchronized long await() {
-
-            // Wait for the Connector to provide a new Socket
-            while (!available) {
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                }
-            }
-
-            // Notify the Connector that we have received this Socket
-            long socket = this.socket;
-            available = false;
-            notifyAll();
-
-            return (socket);
-
-        }
-
-
-        /**
-         * The background thread that listens for incoming TCP/IP connections and
-         * hands them off to an appropriate processor.
-         */
-        public void run() {
-
-            // Process requests until we receive a shutdown signal
-            while (running) {
-
-                // Wait for the next socket to be assigned
-                long socket = await();
-                if (socket == 0)
-                    continue;
-
-                if (!deferAccept && options) {
-                    if (setSocketOptions(socket)) {
-                        getPoller().add(socket);
-                    } else {
-                        // Close socket and pool
-                        Socket.destroy(socket);
-                        socket = 0;
-                    }
-                } else {
-
-                    // Process the request from this socket
-                    if ((status != null) && (handler.event(socket, status) == Handler.SocketState.CLOSED)) {
-                        // Close socket and pool
-                        Socket.destroy(socket);
-                        socket = 0;
-                    } else if ((status == null) && ((options && !setSocketOptions(socket)) 
-                            || handler.process(socket) == Handler.SocketState.CLOSED)) {
-                        // Close socket and pool
-                        Socket.destroy(socket);
-                        socket = 0;
-                    }
-                }
-
-                // Finish up this request
-                recycleWorkerThread(this);
-
-            }
-
-        }
-
-
-        /**
-         * Start the background processing thread.
-         */
-        public void start() {
-            thread = new Thread(this);
-            thread.setName(getName() + "-" + (++curThreads));
-            thread.setDaemon(true);
-            thread.start();
-        }
-
-
-    }
 
 
     // ----------------------------------------------- SendfileData Inner Class
@@ -1887,83 +1651,6 @@ public class AprEndpoint {
     }
 
 
-    // ------------------------------------------------- WorkerStack Inner Class
-
-
-    public class WorkerStack {
-        
-        protected Worker[] workers = null;
-        protected int end = 0;
-        
-        public WorkerStack(int size) {
-            workers = new Worker[size];
-        }
-        
-        /** 
-         * Put the object into the queue. If the queue is full (for example if
-         * the queue has been reduced in size) the object will be dropped.
-         * 
-         * @param   object  the object to be appended to the queue (first
-         *                  element).
-         */
-        public void push(Worker worker) {
-            if (end < workers.length) {
-                workers[end++] = worker;
-            } else {
-                curThreads--;
-            }
-        }
-        
-        /**
-         * Get the first object out of the queue. Return null if the queue
-         * is empty. 
-         */
-        public Worker pop() {
-            if (end > 0) {
-                return workers[--end];
-            }
-            return null;
-        }
-        
-        /**
-         * Get the first object out of the queue, Return null if the queue
-         * is empty.
-         */
-        public Worker peek() {
-            return workers[end];
-        }
-        
-        /**
-         * Is the queue empty?
-         */
-        public boolean isEmpty() {
-            return (end == 0);
-        }
-        
-        /**
-         * How many elements are there in this queue?
-         */
-        public int size() {
-            return (end);
-        }
-        
-        /**
-         * Resize the queue. If there are too many objects in the queue for the
-         * new size, drop the excess.
-         * 
-         * @param newSize
-         */
-        public void resize(int newSize) {
-            Worker[] newWorkers = new Worker[newSize];
-            int len = workers.length;
-            if (newSize < len) {
-                len = newSize;
-            }
-            System.arraycopy(workers, 0, newWorkers, 0, len);
-            workers = newWorkers;
-        }
-    }
-
 
     // ---------------------------------------------- SocketProcessor Inner Class
 
diff --git a/java/org/apache/tomcat/util/net/BaseEndpoint.java b/java/org/apache/tomcat/util/net/BaseEndpoint.java
deleted file mode 100644 (file)
index eb74ff0..0000000
+++ /dev/null
@@ -1,358 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.tomcat.util.net;
-
-import java.net.InetAddress;
-import java.util.concurrent.Executor;
-
-import org.apache.juli.logging.Log;
-import org.apache.juli.logging.LogFactory;
-import org.apache.tomcat.util.res.StringManager;
-
-/**
- * APR tailored thread pool, providing the following services:
- * <ul>
- * <li>Socket acceptor thread</li>
- * <li>Socket poller thread</li>
- * <li>Sendfile thread</li>
- * <li>Worker threads pool</li>
- * </ul>
- *
- * When switching to Java 5, there's an opportunity to use the virtual
- * machine's thread pool.
- *
- * @author Mladen Turk
- * @author Remy Maucherat
- */
-public abstract class BaseEndpoint {
-
-
-    // -------------------------------------------------------------- Constants
-
-
-    protected static Log log = LogFactory.getLog(BaseEndpoint.class);
-
-    protected static StringManager sm =
-        StringManager.getManager("org.apache.tomcat.util.net.res");
-
-
-    /**
-     * The Request attribute key for the cipher suite.
-     */
-    public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite";
-
-    /**
-     * The Request attribute key for the key size.
-     */
-    public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size";
-
-    /**
-     * The Request attribute key for the client certificate chain.
-     */
-    public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate";
-
-    /**
-     * The Request attribute key for the session id.
-     * This one is a Tomcat extension to the Servlet spec.
-     */
-    public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session";
-
-
-    // ----------------------------------------------------------------- Fields
-
-
-    /**
-     * Running state of the endpoint.
-     */
-    protected volatile boolean running = false;
-
-
-    /**
-     * Will be set to true whenever the endpoint is paused.
-     */
-    protected volatile boolean paused = false;
-
-
-    /**
-     * Track the initialization state of the endpoint.
-     */
-    protected boolean initialized = false;
-
-
-    /**
-     * Current worker threads busy count.
-     */
-    protected int curThreadsBusy = 0;
-
-
-    /**
-     * Current worker threads count.
-     */
-    protected int curThreads = 0;
-
-
-    /**
-     * Sequence number used to generate thread names.
-     */
-    protected int sequence = 0;
-
-
-    // ------------------------------------------------------------- Properties
-
-
-    /**
-     * External Executor based thread pool.
-     */
-    protected Executor executor = null;
-    public void setExecutor(Executor executor) { this.executor = executor; }
-    public Executor getExecutor() { return executor; }
-
-
-    /**
-     * Maximum amount of worker threads.
-     */
-    protected int maxThreads = 200;
-    public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; }
-    public int getMaxThreads() { return maxThreads; }
-
-
-    /**
-     * Priority of the acceptor and poller threads.
-     */
-    protected int threadPriority = Thread.NORM_PRIORITY;
-    public void setThreadPriority(int threadPriority) { this.threadPriority = threadPriority; }
-    public int getThreadPriority() { return threadPriority; }
-
-
-    /**
-     * Server socket port.
-     */
-    protected int port;
-    public int getPort() { return port; }
-    public void setPort(int port ) { this.port=port; }
-
-
-    /**
-     * Address for the server socket.
-     */
-    protected InetAddress address;
-    public InetAddress getAddress() { return address; }
-    public void setAddress(InetAddress address) { this.address = address; }
-
-
-    /**
-     * Allows the server developer to specify the backlog that
-     * should be used for server sockets. By default, this value
-     * is 100.
-     */
-    protected int backlog = 100;
-    public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; }
-    public int getBacklog() { return backlog; }
-
-
-    /**
-     * Socket TCP no delay.
-     */
-    protected boolean tcpNoDelay = false;
-    public boolean getTcpNoDelay() { return tcpNoDelay; }
-    public void setTcpNoDelay(boolean tcpNoDelay) { this.tcpNoDelay = tcpNoDelay; }
-
-
-    /**
-     * Socket linger.
-     */
-    protected int soLinger = 100;
-    public int getSoLinger() { return soLinger; }
-    public void setSoLinger(int soLinger) { this.soLinger = soLinger; }
-
-
-    /**
-     * Socket timeout.
-     */
-    protected int soTimeout = -1;
-    public int getSoTimeout() { return soTimeout; }
-    public void setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; }
-
-
-    /**
-     * The default is true - the created threads will be
-     *  in daemon mode. If set to false, the control thread
-     *  will not be daemon - and will keep the process alive.
-     */
-    protected boolean daemon = true;
-    public void setDaemon(boolean b) { daemon = b; }
-    public boolean getDaemon() { return daemon; }
-
-
-    /**
-     * Name of the thread pool, which will be used for naming child threads.
-     */
-    protected String name = "TP";
-    public void setName(String name) { this.name = name; }
-    public String getName() { return name; }
-
-
-    /**
-     * Dummy maxSpareThreads property.
-     */
-    public int getMaxSpareThreads() { return 0; }
-
-
-    /**
-     * Dummy minSpareThreads property.
-     */
-    public int getMinSpareThreads() { return 0; }
-
-
-    // --------------------------------------------------------- Public Methods
-
-
-    /**
-     * Return the amount of threads that are managed by the pool.
-     *
-     * @return the amount of threads that are managed by the pool
-     */
-    public int getCurrentThreadCount() {
-        return curThreads;
-    }
-
-
-    /**
-     * Return the amount of threads currently busy.
-     *
-     * @return the amount of threads currently busy
-     */
-    public int getCurrentThreadsBusy() {
-        return curThreadsBusy;
-    }
-
-
-    /**
-     * Return the state of the endpoint.
-     *
-     * @return true if the endpoint is running, false otherwise
-     */
-    public boolean isRunning() {
-        return running;
-    }
-
-
-    /**
-     * Return the state of the endpoint.
-     *
-     * @return true if the endpoint is paused, false otherwise
-     */
-    public boolean isPaused() {
-        return paused;
-    }
-
-
-    // ----------------------------------------------- Public Lifecycle Methods
-
-
-    /**
-     * Initialize the endpoint.
-     */
-    public abstract void init()
-        throws Exception;
-
-
-    /**
-     * Start the APR endpoint, creating acceptor, poller and sendfile threads.
-     */
-    public abstract void start()
-        throws Exception;
-
-
-    /**
-     * Pause the endpoint, which will make it stop accepting new sockets.
-     */
-    public void pause() {
-        if (running && !paused) {
-            paused = true;
-            unlockAccept();
-        }
-    }
-
-
-    /**
-     * Resume the endpoint, which will make it start accepting new sockets
-     * again.
-     */
-    public void resume() {
-        if (running) {
-            paused = false;
-        }
-    }
-
-
-    /**
-     * Stop the endpoint. This will cause all processing threads to stop.
-     */
-    public abstract void stop();
-
-
-    /**
-     * Deallocate APR memory pools, and close server socket.
-     */
-    public abstract void destroy() throws Exception;
-
-
-    // ------------------------------------------------------ Protected Methods
-
-
-    /**
-     * Get a sequence number used for thread naming.
-     */
-    protected int getSequence() {
-        return sequence++;
-    }
-
-
-    /**
-     * Unlock the server socket accept using a bugus connection.
-     */
-    protected void unlockAccept() {
-        java.net.Socket s = null;
-        try {
-            // Need to create a connection to unlock the accept();
-            if (address == null) {
-                s = new java.net.Socket("127.0.0.1", port);
-            } else {
-                s = new java.net.Socket(address, port);
-                // setting soLinger to a small value will help shutdown the
-                // connection quicker
-                s.setSoLinger(true, 0);
-            }
-        } catch(Exception e) {
-            if (log.isDebugEnabled()) {
-                log.debug(sm.getString("endpoint.debug.unlock", "" + port), e);
-            }
-        } finally {
-            if (s != null) {
-                try {
-                    s.close();
-                } catch (Exception e) {
-                    // Ignore
-                }
-            }
-        }
-    }
-
-
-}
index e8b5ef4..e51de36 100644 (file)
@@ -23,11 +23,17 @@ import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
 import org.apache.tomcat.util.IntrospectionUtils;
 import org.apache.tomcat.util.res.StringManager;
+import org.apache.tomcat.util.threads.ResizableExecutor;
+import org.apache.tomcat.util.threads.TaskQueue;
+import org.apache.tomcat.util.threads.TaskThreadFactory;
+import org.apache.tomcat.util.threads.ThreadPoolExecutor;
 
 /**
  * Handle incoming TCP connections.
@@ -45,50 +51,17 @@ import org.apache.tomcat.util.res.StringManager;
  * @author Yoav Shapira
  * @author Remy Maucherat
  */
-public class JIoEndpoint {
+public class JIoEndpoint extends AbstractEndpoint {
 
 
     // -------------------------------------------------------------- Constants
 
-
     protected static Log log = LogFactory.getLog(JIoEndpoint.class);
 
-    protected StringManager sm = 
-        StringManager.getManager("org.apache.tomcat.util.net.res");
-
-
-    /**
-     * The Request attribute key for the cipher suite.
-     */
-    public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite";
-
-    /**
-     * The Request attribute key for the key size.
-     */
-    public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size";
-
-    /**
-     * The Request attribute key for the client certificate chain.
-     */
-    public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate";
-
-    /**
-     * The Request attribute key for the session id.
-     * This one is a Tomcat extension to the Servlet spec.
-     */
-    public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session";
-
-
     // ----------------------------------------------------------------- Fields
 
 
     /**
-     * Available workers.
-     */
-    protected WorkerStack workers = null;
-
-
-    /**
      * Running state of the endpoint.
      */
     protected volatile boolean running = false;
@@ -134,6 +107,10 @@ public class JIoEndpoint {
      */
     protected SocketProperties socketProperties = new SocketProperties();
 
+    /**
+     * Are we using an internal executor
+     */
+    protected volatile boolean internalExecutor = false;
 
     // ------------------------------------------------------------- Properties
 
@@ -177,13 +154,19 @@ public class JIoEndpoint {
     public void setMaxThreads(int maxThreads) {
         this.maxThreads = maxThreads;
         if (running) {
-            synchronized(workers) {
-                workers.resize(maxThreads);
-            }
+            //TODO Dynamic resize
+            log.error("Resizing executor dynamically is not possible at this time.");
         }
     }
     public int getMaxThreads() { return maxThreads; }
 
+    public int minSpareThreads = 10;
+    public int getMinSpareThreads() {
+        return Math.min(minSpareThreads,getMaxThreads());
+    }
+    public void setMinSpareThreads(int minSpareThreads) {
+        this.minSpareThreads = minSpareThreads;
+    }
 
     /**
      * Priority of the acceptor and poller threads.
@@ -304,9 +287,15 @@ public class JIoEndpoint {
      */
     public int getCurrentThreadCount() {
         if (executor!=null) {
-            return -1;
+            if (executor instanceof ThreadPoolExecutor) {
+                return ((ThreadPoolExecutor)executor).getPoolSize();
+            } else if (executor instanceof ResizableExecutor) {
+                return ((ResizableExecutor)executor).getPoolSize();
+            } else {
+                return -1;
+            }
         } else {
-            return curThreads;
+            return -2;
         }
     }
 
@@ -317,9 +306,15 @@ public class JIoEndpoint {
      */
     public int getCurrentThreadsBusy() {
         if (executor!=null) {
-            return -1;
+            if (executor instanceof ThreadPoolExecutor) {
+                return ((ThreadPoolExecutor)executor).getActiveCount();
+            } else if (executor instanceof ResizableExecutor) {
+                return ((ResizableExecutor)executor).getActiveCount();
+            } else {
+                return -1;
+            }
         } else {
-            return workers!=null?curThreads - workers.size():0;
+            return -2;
         }
     }
     
@@ -426,113 +421,6 @@ public class JIoEndpoint {
     }
     
     
-    // ----------------------------------------------------- Worker Inner Class
-
-
-    protected class Worker implements Runnable {
-
-        protected Thread thread = null;
-        protected boolean available = false;
-        protected Socket socket = null;
-
-        
-        /**
-         * Process an incoming TCP/IP connection on the specified socket.  Any
-         * exception that occurs during processing must be logged and swallowed.
-         * <b>NOTE</b>:  This method is called from our Connector's thread.  We
-         * must assign it to our own thread so that multiple simultaneous
-         * requests can be handled.
-         *
-         * @param socket TCP socket to process
-         */
-        synchronized void assign(Socket socket) {
-
-            // Wait for the Processor to get the previous Socket
-            while (available) {
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                }
-            }
-
-            // Store the newly available Socket and notify our thread
-            this.socket = socket;
-            available = true;
-            notifyAll();
-
-        }
-
-        
-        /**
-         * Await a newly assigned Socket from our Connector, or <code>null</code>
-         * if we are supposed to shut down.
-         */
-        private synchronized Socket await() {
-
-            // Wait for the Connector to provide a new Socket
-            while (!available) {
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                }
-            }
-
-            // Notify the Connector that we have received this Socket
-            Socket socket = this.socket;
-            available = false;
-            notifyAll();
-
-            return (socket);
-
-        }
-
-
-
-        /**
-         * The background thread that listens for incoming TCP/IP connections and
-         * hands them off to an appropriate processor.
-         */
-        public void run() {
-
-            // Process requests until we receive a shutdown signal
-            while (running) {
-
-                // Wait for the next socket to be assigned
-                Socket socket = await();
-                if (socket == null)
-                    continue;
-
-                // Process the request from this socket
-                if (!setSocketOptions(socket) || !handler.process(socket)) {
-                    // Close socket
-                    try {
-                        socket.close();
-                    } catch (IOException e) {
-                    }
-                }
-
-                // Finish up this request
-                socket = null;
-                recycleWorkerThread(this);
-
-            }
-
-        }
-
-
-        /**
-         * Start the background processing thread.
-         */
-        public void start() {
-            thread = new Thread(this);
-            thread.setName(getName() + "-" + (++curThreads));
-            thread.setDaemon(true);
-            thread.start();
-        }
-
-
-    }
-
 
     // -------------------- Public methods --------------------
 
@@ -583,7 +471,11 @@ public class JIoEndpoint {
 
             // Create worker collection
             if (executor == null) {
-                workers = new WorkerStack(maxThreads);
+                internalExecutor = true;
+                TaskQueue taskqueue = new TaskQueue();
+                TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
+                executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
+                taskqueue.setParent( (ThreadPoolExecutor) executor);
             }
 
             // Start acceptor threads
@@ -614,6 +506,16 @@ public class JIoEndpoint {
             running = false;
             unlockAccept();
         }
+        if ( executor!=null && internalExecutor ) {
+            if ( executor instanceof ThreadPoolExecutor ) {
+                //this is our internal one, so we need to shut it down
+                ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;
+                tpe.shutdownNow();
+                TaskQueue queue = (TaskQueue) tpe.getQueue();
+                queue.setParent(null);
+            }
+            executor = null;
+        }
     }
 
     /**
@@ -696,97 +598,16 @@ public class JIoEndpoint {
 
     
     /**
-     * Create (or allocate) and return an available processor for use in
-     * processing a specific HTTP request, if possible.  If the maximum
-     * allowed processors have already been created and are in use, return
-     * <code>null</code> instead.
-     */
-    protected Worker createWorkerThread() {
-
-        synchronized (workers) {
-            if (workers.size() > 0) {
-                curThreadsBusy++;
-                return workers.pop();
-            }
-            if ((maxThreads > 0) && (curThreads < maxThreads)) {
-                curThreadsBusy++;
-                if (curThreadsBusy == maxThreads) {
-                    log.info(sm.getString("endpoint.info.maxThreads",
-                            Integer.toString(maxThreads), address,
-                            Integer.toString(port)));
-                }
-                return (newWorkerThread());
-            } else {
-                if (maxThreads < 0) {
-                    curThreadsBusy++;
-                    return (newWorkerThread());
-                } else {
-                    return (null);
-                }
-            }
-        }
-
-    }
-
-
-    /**
-     * Create and return a new processor suitable for processing HTTP
-     * requests and returning the corresponding responses.
-     */
-    protected Worker newWorkerThread() {
-
-        Worker workerThread = new Worker();
-        workerThread.start();
-        return (workerThread);
-
-    }
-
-
-    /**
-     * Return a new worker thread, and block while to worker is available.
-     */
-    protected Worker getWorkerThread() {
-        // Allocate a new worker thread
-        Worker workerThread = createWorkerThread();
-        while (workerThread == null) {
-            try {
-                synchronized (workers) {
-                    workers.wait();
-                }
-            } catch (InterruptedException e) {
-                // Ignore
-            }
-            workerThread = createWorkerThread();
-        }
-        return workerThread;
-    }
-
-
-    /**
-     * Recycle the specified Processor so that it can be used again.
-     *
-     * @param workerThread The processor to be recycled
-     */
-    protected void recycleWorkerThread(Worker workerThread) {
-        synchronized (workers) {
-            workers.push(workerThread);
-            curThreadsBusy--;
-            workers.notify();
-        }
-    }
-
-
-    /**
      * Process given socket.
      */
     protected boolean processSocket(Socket socket) {
         try {
-            if (executor == null) {
-                getWorkerThread().assign(socket);
-            } else {
-                executor.execute(new SocketProcessor(socket));
-            }
+            executor.execute(new SocketProcessor(socket));
+        } catch (RejectedExecutionException x) {
+            log.warn("Socket processing request was rejected for:"+socket,x);
+            return false;
         } catch (Throwable t) {
+            
             // This means we got an OOM or similar creating a thread, or that
             // the pool and its queue are full
             log.error(sm.getString("endpoint.process.fail"), t);
@@ -796,81 +617,4 @@ public class JIoEndpoint {
     }
     
 
-    // ------------------------------------------------- WorkerStack Inner Class
-
-
-    public class WorkerStack {
-        
-        protected Worker[] workers = null;
-        protected int end = 0;
-        
-        public WorkerStack(int size) {
-            workers = new Worker[size];
-        }
-        
-        /** 
-         * Put the object into the queue. If the queue is full (for example if
-         * the queue has been reduced in size) the object will be dropped.
-         * 
-         * @param   object  the object to be appended to the queue (first
-         *                  element).
-         */
-        public void push(Worker worker) {
-            if (end < workers.length) {
-                workers[end++] = worker;
-            } else {
-                curThreads--;
-            }
-        }
-        
-        /**
-         * Get the first object out of the queue. Return null if the queue
-         * is empty. 
-         */
-        public Worker pop() {
-            if (end > 0) {
-                return workers[--end];
-            }
-            return null;
-        }
-        
-        /**
-         * Get the first object out of the queue, Return null if the queue
-         * is empty.
-         */
-        public Worker peek() {
-            return workers[end];
-        }
-        
-        /**
-         * Is the queue empty?
-         */
-        public boolean isEmpty() {
-            return (end == 0);
-        }
-        
-        /**
-         * How many elements are there in this queue?
-         */
-        public int size() {
-            return (end);
-        }
-        
-        /**
-         * Resize the queue. If there are too many objects in the queue for the
-         * new size, drop the excess.
-         * 
-         * @param newSize
-         */
-        public void resize(int newSize) {
-            Worker[] newWorkers = new Worker[newSize];
-            int len = workers.length;
-            if (newSize < len) {
-                len = newSize;
-            }
-            System.arraycopy(workers, 0, newWorkers, 0, len);
-            workers = newWorkers;
-        }
-    }
-
 }
index b4f4362..bfc0f6e 100644 (file)
@@ -58,6 +58,7 @@ import org.apache.tomcat.util.IntrospectionUtils;
 import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler;
 import org.apache.tomcat.util.net.jsse.NioX509KeyManager;
 import org.apache.tomcat.util.res.StringManager;
+import org.apache.tomcat.util.threads.ResizableExecutor;
 import org.apache.tomcat.util.threads.TaskQueue;
 import org.apache.tomcat.util.threads.TaskThreadFactory;
 import org.apache.tomcat.util.threads.ThreadPoolExecutor;
@@ -77,7 +78,7 @@ import org.apache.tomcat.util.threads.ThreadPoolExecutor;
  * @author Remy Maucherat
  * @author Filip Hanik
  */
-public class NioEndpoint {
+public class NioEndpoint extends AbstractEndpoint {
 
 
     // -------------------------------------------------------------- Constants
@@ -85,30 +86,6 @@ public class NioEndpoint {
 
     protected static Log log = LogFactory.getLog(NioEndpoint.class);
 
-    protected static StringManager sm =
-        StringManager.getManager("org.apache.tomcat.util.net.res");
-
-
-    /**
-     * The Request attribute key for the cipher suite.
-     */
-    public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite";
-
-    /**
-     * The Request attribute key for the key size.
-     */
-    public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size";
-
-    /**
-     * The Request attribute key for the client certificate chain.
-     */
-    public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate";
-
-    /**
-     * The Request attribute key for the session id.
-     * This one is a Tomcat extension to the Servlet spec.
-     */
-    public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session";
 
     public static final int OP_REGISTER = 0x100; //register interest op
     public static final int OP_CALLBACK = 0x200; //callback interest op
@@ -333,7 +310,7 @@ public class NioEndpoint {
     /**
      * Are we using an internal executor
      */
-    protected boolean internalExecutor = true;
+    protected volatile boolean internalExecutor = false;
     
     protected boolean useExecutor = true;
     /**
@@ -518,13 +495,16 @@ public class NioEndpoint {
     /**
      * Dummy maxSpareThreads property.
      */
-    public int getMaxSpareThreads() { return Math.min(getMaxThreads(),5); }
+    public int getMaxSpareThreads() { return Math.min(getMaxThreads(),getMinSpareThreads()); }
 
 
-    /**
-     * Dummy minSpareThreads property.
-     */
-    public int getMinSpareThreads() { return Math.min(getMaxThreads(),5); }
+    public int minSpareThreads = 10;
+    public int getMinSpareThreads() {
+        return Math.min(minSpareThreads,getMaxThreads());
+    }
+    public void setMinSpareThreads(int minSpareThreads) {
+        this.minSpareThreads = minSpareThreads;
+    }
     
     /**
      * Generic properties, introspected
@@ -733,6 +713,8 @@ public class NioEndpoint {
         if (executor!=null) {
             if (executor instanceof ThreadPoolExecutor) {
                 return ((ThreadPoolExecutor)executor).getPoolSize();
+            } else if (executor instanceof ResizableExecutor) {
+                return ((ResizableExecutor)executor).getPoolSize();
             } else {
                 return -1;
             }
@@ -750,6 +732,8 @@ public class NioEndpoint {
         if (executor!=null) {
             if (executor instanceof ThreadPoolExecutor) {
                 return ((ThreadPoolExecutor)executor).getActiveCount();
+            } else if (executor instanceof ResizableExecutor) {
+                return ((ResizableExecutor)executor).getActiveCount();
             } else {
                 return -1;
             }
@@ -1142,9 +1126,7 @@ public class NioEndpoint {
             if ( dispatch && executor!=null ) executor.execute(sc);
             else sc.run();
         } catch (RejectedExecutionException rx) {
-            if (log.isDebugEnabled()) {
-                log.debug("Unable to process socket, executor rejected the task.",rx);
-            }
+            log.warn("Socket processing request was rejected for:"+socket,rx);
             return false;
         } catch (Throwable t) {
             // This means we got an OOM or similar creating a thread, or that
diff --git a/java/org/apache/tomcat/util/threads/ResizableExecutor.java b/java/org/apache/tomcat/util/threads/ResizableExecutor.java
new file mode 100644 (file)
index 0000000..3d137e3
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.threads;
+
+import java.util.concurrent.Executor;
+
+public interface ResizableExecutor extends Executor {
+    /**
+     * {@link java.util.concurrent.ThreadPoolExecutor#getPoolSize()}
+     * @return  {@link java.util.concurrent.ThreadPoolExecutor#getPoolSize()}
+     */
+    public int getPoolSize();
+    
+    /**
+     * {@link java.util.concurrent.ThreadPoolExecutor#getActiveCount()}
+     * @return {@link java.util.concurrent.ThreadPoolExecutor#getActiveCount()}
+     */
+    public int getActiveCount();
+    
+    public boolean resizePool(int corePoolSize, int maximumPoolSize);
+    
+    public boolean resizeQueue(int capacity);
+    
+}