From: fhanik Date: Mon, 24 Aug 2009 15:33:48 +0000 (+0000) Subject: First round of refactoring connectors. X-Git-Url: https://git.internetallee.de/?a=commitdiff_plain;h=03f54a9ba62536d2a9b29fa3beb3bbb1ed94758e;p=tomcat7.0 First round of refactoring connectors. Remove the worker based thread pools Enable local or injected executors Add in a resizable executors interface to be used in future revisions start abstracting out and using a base class. There was one, deleted, since its not used anywhere git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@807284 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/java/org/apache/tomcat/util/net/AbstractEndpoint.java b/java/org/apache/tomcat/util/net/AbstractEndpoint.java new file mode 100644 index 000000000..e4a811e50 --- /dev/null +++ b/java/org/apache/tomcat/util/net/AbstractEndpoint.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.util.net; + +import org.apache.tomcat.util.res.StringManager; +/** + * + * @author fhanik + * @author Mladen Turk + * @author Remy Maucherat + */ +public abstract class AbstractEndpoint { + + // -------------------------------------------------------------- Constants + protected StringManager sm = StringManager.getManager("org.apache.tomcat.util.net.res"); + + /** + * The Request attribute key for the cipher suite. + */ + public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite"; + + /** + * The Request attribute key for the key size. + */ + public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size"; + + /** + * The Request attribute key for the client certificate chain. + */ + public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate"; + + /** + * The Request attribute key for the session id. + * This one is a Tomcat extension to the Servlet spec. + */ + public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session"; + + /** + * The request attribute key for the session manager. + * This one is a Tomcat extension to the Servlet spec. + */ + public static final String SESSION_MGR = "javax.servlet.request.ssl_session_mgr"; + +} diff --git a/java/org/apache/tomcat/util/net/AprEndpoint.java b/java/org/apache/tomcat/util/net/AprEndpoint.java index bfb5c5c14..d3053e3cd 100644 --- a/java/org/apache/tomcat/util/net/AprEndpoint.java +++ b/java/org/apache/tomcat/util/net/AprEndpoint.java @@ -21,6 +21,8 @@ import java.net.InetAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; @@ -37,6 +39,10 @@ import org.apache.tomcat.jni.SSLSocket; import org.apache.tomcat.jni.Socket; import org.apache.tomcat.jni.Status; import org.apache.tomcat.util.res.StringManager; +import org.apache.tomcat.util.threads.ResizableExecutor; +import org.apache.tomcat.util.threads.TaskQueue; +import org.apache.tomcat.util.threads.TaskThreadFactory; +import org.apache.tomcat.util.threads.ThreadPoolExecutor; /** * APR tailored thread pool, providing the following services: @@ -53,7 +59,7 @@ import org.apache.tomcat.util.res.StringManager; * @author Mladen Turk * @author Remy Maucherat */ -public class AprEndpoint { +public class AprEndpoint extends AbstractEndpoint { // -------------------------------------------------------------- Constants @@ -61,8 +67,6 @@ public class AprEndpoint { protected static Log log = LogFactory.getLog(AprEndpoint.class); - protected static StringManager sm = - StringManager.getManager("org.apache.tomcat.util.net.res"); /** @@ -86,24 +90,11 @@ public class AprEndpoint { */ public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session"; - /** - * The request attribute key for the session manager. - * This one is a Tomcat extension to the Servlet spec. - */ - public static final String SESSION_MGR = - "javax.servlet.request.ssl_session_mgr"; - // ----------------------------------------------------------------- Fields /** - * Available workers. - */ - protected WorkerStack workers = null; - - - /** * Running state of the endpoint. */ protected volatile boolean running = false; @@ -163,6 +154,10 @@ public class AprEndpoint { protected long sslContext = 0; + /** + * Are we using an internal executor + */ + protected volatile boolean internalExecutor = false; // ------------------------------------------------------------- Properties @@ -188,10 +183,8 @@ public class AprEndpoint { protected int maxThreads = 200; public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; - if (running) { - synchronized(workers) { - workers.resize(maxThreads); - } + if (running && executor instanceof ResizableExecutor) { + ((ResizableExecutor)executor).resizePool(getMinSpareThreads(), getMaxThreads()); } } public int getMaxThreads() { return maxThreads; } @@ -545,9 +538,15 @@ public class AprEndpoint { */ public int getCurrentThreadCount() { if (executor!=null) { - return -1; + if (executor instanceof ThreadPoolExecutor) { + return ((ThreadPoolExecutor)executor).getPoolSize(); + } else if (executor instanceof ResizableExecutor) { + return ((ResizableExecutor)executor).getPoolSize(); + } else { + return -1; + } } else { - return curThreads; + return -2; } } @@ -558,9 +557,15 @@ public class AprEndpoint { */ public int getCurrentThreadsBusy() { if (executor!=null) { - return -1; + if (executor instanceof ThreadPoolExecutor) { + return ((ThreadPoolExecutor)executor).getActiveCount(); + } else if (executor instanceof ResizableExecutor) { + return ((ResizableExecutor)executor).getActiveCount(); + } else { + return -1; + } } else { - return workers!=null?curThreads - workers.size():0; + return -2; } } @@ -744,7 +749,11 @@ public class AprEndpoint { // Create worker collection if (executor == null) { - workers = new WorkerStack(maxThreads); + internalExecutor = true; + TaskQueue taskqueue = new TaskQueue(); + TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority()); + executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); + taskqueue.setParent( (ThreadPoolExecutor) executor); } // Start poller threads @@ -838,6 +847,16 @@ public class AprEndpoint { sendfiles = null; } } + if ( executor!=null && internalExecutor ) { + if ( executor instanceof ThreadPoolExecutor ) { + //this is our internal one, so we need to shut it down + ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor; + tpe.shutdownNow(); + TaskQueue queue = (TaskQueue) tpe.getQueue(); + queue.setParent(null); + } + executor = null; + } } @@ -946,86 +965,6 @@ public class AprEndpoint { } - /** - * Create (or allocate) and return an available processor for use in - * processing a specific HTTP request, if possible. If the maximum - * allowed processors have already been created and are in use, return - * null instead. - */ - protected Worker createWorkerThread() { - - synchronized (workers) { - if (workers.size() > 0) { - curThreadsBusy++; - return (workers.pop()); - } - if ((maxThreads > 0) && (curThreads < maxThreads)) { - curThreadsBusy++; - if (curThreadsBusy == maxThreads) { - log.info(sm.getString("endpoint.info.maxThreads", - Integer.toString(maxThreads), address, - Integer.toString(port))); - } - return (newWorkerThread()); - } else { - if (maxThreads < 0) { - curThreadsBusy++; - return (newWorkerThread()); - } else { - return (null); - } - } - } - - } - - - /** - * Create and return a new processor suitable for processing HTTP - * requests and returning the corresponding responses. - */ - protected Worker newWorkerThread() { - - Worker workerThread = new Worker(); - workerThread.start(); - return (workerThread); - - } - - - /** - * Return a new worker thread, and block while to worker is available. - */ - protected Worker getWorkerThread() { - // Allocate a new worker thread - Worker workerThread = createWorkerThread(); - while (workerThread == null) { - try { - synchronized (workers) { - workers.wait(); - } - } catch (InterruptedException e) { - // Ignore - } - workerThread = createWorkerThread(); - } - return workerThread; - } - - - /** - * Recycle the specified Processor so that it can be used again. - * - * @param workerThread The processor to be recycled - */ - protected void recycleWorkerThread(Worker workerThread) { - synchronized (workers) { - workers.push(workerThread); - curThreadsBusy--; - workers.notify(); - } - } - /** * Allocate a new poller of the specified size. @@ -1050,11 +989,10 @@ public class AprEndpoint { */ protected boolean processSocketWithOptions(long socket) { try { - if (executor == null) { - getWorkerThread().assignWithOptions(socket); - } else { - executor.execute(new SocketWithOptionsProcessor(socket)); - } + executor.execute(new SocketWithOptionsProcessor(socket)); + } catch (RejectedExecutionException x) { + log.warn("Socket processing request was rejected for:"+socket,x); + return false; } catch (Throwable t) { // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full @@ -1070,11 +1008,10 @@ public class AprEndpoint { */ protected boolean processSocket(long socket) { try { - if (executor == null) { - getWorkerThread().assign(socket); - } else { - executor.execute(new SocketProcessor(socket)); - } + executor.execute(new SocketProcessor(socket)); + } catch (RejectedExecutionException x) { + log.warn("Socket processing request was rejected for:"+socket,x); + return false; } catch (Throwable t) { // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full @@ -1090,11 +1027,10 @@ public class AprEndpoint { */ protected boolean processSocket(long socket, SocketStatus status) { try { - if (executor == null) { - getWorkerThread().assign(socket, status); - } else { - executor.execute(new SocketEventProcessor(socket, status)); - } + executor.execute(new SocketEventProcessor(socket, status)); + } catch (RejectedExecutionException x) { + log.warn("Socket processing request was rejected for:"+socket,x); + return false; } catch (Throwable t) { // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full @@ -1389,178 +1325,6 @@ public class AprEndpoint { // ----------------------------------------------------- Worker Inner Class - /** - * Server processor class. - */ - protected class Worker implements Runnable { - - - protected Thread thread = null; - protected boolean available = false; - protected long socket = 0; - protected SocketStatus status = null; - protected boolean options = false; - - - /** - * Process an incoming TCP/IP connection on the specified socket. Any - * exception that occurs during processing must be logged and swallowed. - * NOTE: This method is called from our Connector's thread. We - * must assign it to our own thread so that multiple simultaneous - * requests can be handled. - * - * @param socket TCP socket to process - */ - protected synchronized void assignWithOptions(long socket) { - - // Wait for the Processor to get the previous Socket - while (available) { - try { - wait(); - } catch (InterruptedException e) { - } - } - - // Store the newly available Socket and notify our thread - this.socket = socket; - status = null; - options = true; - available = true; - notifyAll(); - - } - - - /** - * Process an incoming TCP/IP connection on the specified socket. Any - * exception that occurs during processing must be logged and swallowed. - * NOTE: This method is called from our Connector's thread. We - * must assign it to our own thread so that multiple simultaneous - * requests can be handled. - * - * @param socket TCP socket to process - */ - protected synchronized void assign(long socket) { - - // Wait for the Processor to get the previous Socket - while (available) { - try { - wait(); - } catch (InterruptedException e) { - } - } - - // Store the newly available Socket and notify our thread - this.socket = socket; - status = null; - options = false; - available = true; - notifyAll(); - - } - - - protected synchronized void assign(long socket, SocketStatus status) { - - // Wait for the Processor to get the previous Socket - while (available) { - try { - wait(); - } catch (InterruptedException e) { - } - } - - // Store the newly available Socket and notify our thread - this.socket = socket; - this.status = status; - options = false; - available = true; - notifyAll(); - - } - - - /** - * Await a newly assigned Socket from our Connector, or null - * if we are supposed to shut down. - */ - protected synchronized long await() { - - // Wait for the Connector to provide a new Socket - while (!available) { - try { - wait(); - } catch (InterruptedException e) { - } - } - - // Notify the Connector that we have received this Socket - long socket = this.socket; - available = false; - notifyAll(); - - return (socket); - - } - - - /** - * The background thread that listens for incoming TCP/IP connections and - * hands them off to an appropriate processor. - */ - public void run() { - - // Process requests until we receive a shutdown signal - while (running) { - - // Wait for the next socket to be assigned - long socket = await(); - if (socket == 0) - continue; - - if (!deferAccept && options) { - if (setSocketOptions(socket)) { - getPoller().add(socket); - } else { - // Close socket and pool - Socket.destroy(socket); - socket = 0; - } - } else { - - // Process the request from this socket - if ((status != null) && (handler.event(socket, status) == Handler.SocketState.CLOSED)) { - // Close socket and pool - Socket.destroy(socket); - socket = 0; - } else if ((status == null) && ((options && !setSocketOptions(socket)) - || handler.process(socket) == Handler.SocketState.CLOSED)) { - // Close socket and pool - Socket.destroy(socket); - socket = 0; - } - } - - // Finish up this request - recycleWorkerThread(this); - - } - - } - - - /** - * Start the background processing thread. - */ - public void start() { - thread = new Thread(this); - thread.setName(getName() + "-" + (++curThreads)); - thread.setDaemon(true); - thread.start(); - } - - - } // ----------------------------------------------- SendfileData Inner Class @@ -1887,83 +1651,6 @@ public class AprEndpoint { } - // ------------------------------------------------- WorkerStack Inner Class - - - public class WorkerStack { - - protected Worker[] workers = null; - protected int end = 0; - - public WorkerStack(int size) { - workers = new Worker[size]; - } - - /** - * Put the object into the queue. If the queue is full (for example if - * the queue has been reduced in size) the object will be dropped. - * - * @param object the object to be appended to the queue (first - * element). - */ - public void push(Worker worker) { - if (end < workers.length) { - workers[end++] = worker; - } else { - curThreads--; - } - } - - /** - * Get the first object out of the queue. Return null if the queue - * is empty. - */ - public Worker pop() { - if (end > 0) { - return workers[--end]; - } - return null; - } - - /** - * Get the first object out of the queue, Return null if the queue - * is empty. - */ - public Worker peek() { - return workers[end]; - } - - /** - * Is the queue empty? - */ - public boolean isEmpty() { - return (end == 0); - } - - /** - * How many elements are there in this queue? - */ - public int size() { - return (end); - } - - /** - * Resize the queue. If there are too many objects in the queue for the - * new size, drop the excess. - * - * @param newSize - */ - public void resize(int newSize) { - Worker[] newWorkers = new Worker[newSize]; - int len = workers.length; - if (newSize < len) { - len = newSize; - } - System.arraycopy(workers, 0, newWorkers, 0, len); - workers = newWorkers; - } - } - // ---------------------------------------------- SocketProcessor Inner Class diff --git a/java/org/apache/tomcat/util/net/BaseEndpoint.java b/java/org/apache/tomcat/util/net/BaseEndpoint.java deleted file mode 100644 index eb74ff0ae..000000000 --- a/java/org/apache/tomcat/util/net/BaseEndpoint.java +++ /dev/null @@ -1,358 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tomcat.util.net; - -import java.net.InetAddress; -import java.util.concurrent.Executor; - -import org.apache.juli.logging.Log; -import org.apache.juli.logging.LogFactory; -import org.apache.tomcat.util.res.StringManager; - -/** - * APR tailored thread pool, providing the following services: - * - * - * When switching to Java 5, there's an opportunity to use the virtual - * machine's thread pool. - * - * @author Mladen Turk - * @author Remy Maucherat - */ -public abstract class BaseEndpoint { - - - // -------------------------------------------------------------- Constants - - - protected static Log log = LogFactory.getLog(BaseEndpoint.class); - - protected static StringManager sm = - StringManager.getManager("org.apache.tomcat.util.net.res"); - - - /** - * The Request attribute key for the cipher suite. - */ - public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite"; - - /** - * The Request attribute key for the key size. - */ - public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size"; - - /** - * The Request attribute key for the client certificate chain. - */ - public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate"; - - /** - * The Request attribute key for the session id. - * This one is a Tomcat extension to the Servlet spec. - */ - public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session"; - - - // ----------------------------------------------------------------- Fields - - - /** - * Running state of the endpoint. - */ - protected volatile boolean running = false; - - - /** - * Will be set to true whenever the endpoint is paused. - */ - protected volatile boolean paused = false; - - - /** - * Track the initialization state of the endpoint. - */ - protected boolean initialized = false; - - - /** - * Current worker threads busy count. - */ - protected int curThreadsBusy = 0; - - - /** - * Current worker threads count. - */ - protected int curThreads = 0; - - - /** - * Sequence number used to generate thread names. - */ - protected int sequence = 0; - - - // ------------------------------------------------------------- Properties - - - /** - * External Executor based thread pool. - */ - protected Executor executor = null; - public void setExecutor(Executor executor) { this.executor = executor; } - public Executor getExecutor() { return executor; } - - - /** - * Maximum amount of worker threads. - */ - protected int maxThreads = 200; - public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; } - public int getMaxThreads() { return maxThreads; } - - - /** - * Priority of the acceptor and poller threads. - */ - protected int threadPriority = Thread.NORM_PRIORITY; - public void setThreadPriority(int threadPriority) { this.threadPriority = threadPriority; } - public int getThreadPriority() { return threadPriority; } - - - /** - * Server socket port. - */ - protected int port; - public int getPort() { return port; } - public void setPort(int port ) { this.port=port; } - - - /** - * Address for the server socket. - */ - protected InetAddress address; - public InetAddress getAddress() { return address; } - public void setAddress(InetAddress address) { this.address = address; } - - - /** - * Allows the server developer to specify the backlog that - * should be used for server sockets. By default, this value - * is 100. - */ - protected int backlog = 100; - public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; } - public int getBacklog() { return backlog; } - - - /** - * Socket TCP no delay. - */ - protected boolean tcpNoDelay = false; - public boolean getTcpNoDelay() { return tcpNoDelay; } - public void setTcpNoDelay(boolean tcpNoDelay) { this.tcpNoDelay = tcpNoDelay; } - - - /** - * Socket linger. - */ - protected int soLinger = 100; - public int getSoLinger() { return soLinger; } - public void setSoLinger(int soLinger) { this.soLinger = soLinger; } - - - /** - * Socket timeout. - */ - protected int soTimeout = -1; - public int getSoTimeout() { return soTimeout; } - public void setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; } - - - /** - * The default is true - the created threads will be - * in daemon mode. If set to false, the control thread - * will not be daemon - and will keep the process alive. - */ - protected boolean daemon = true; - public void setDaemon(boolean b) { daemon = b; } - public boolean getDaemon() { return daemon; } - - - /** - * Name of the thread pool, which will be used for naming child threads. - */ - protected String name = "TP"; - public void setName(String name) { this.name = name; } - public String getName() { return name; } - - - /** - * Dummy maxSpareThreads property. - */ - public int getMaxSpareThreads() { return 0; } - - - /** - * Dummy minSpareThreads property. - */ - public int getMinSpareThreads() { return 0; } - - - // --------------------------------------------------------- Public Methods - - - /** - * Return the amount of threads that are managed by the pool. - * - * @return the amount of threads that are managed by the pool - */ - public int getCurrentThreadCount() { - return curThreads; - } - - - /** - * Return the amount of threads currently busy. - * - * @return the amount of threads currently busy - */ - public int getCurrentThreadsBusy() { - return curThreadsBusy; - } - - - /** - * Return the state of the endpoint. - * - * @return true if the endpoint is running, false otherwise - */ - public boolean isRunning() { - return running; - } - - - /** - * Return the state of the endpoint. - * - * @return true if the endpoint is paused, false otherwise - */ - public boolean isPaused() { - return paused; - } - - - // ----------------------------------------------- Public Lifecycle Methods - - - /** - * Initialize the endpoint. - */ - public abstract void init() - throws Exception; - - - /** - * Start the APR endpoint, creating acceptor, poller and sendfile threads. - */ - public abstract void start() - throws Exception; - - - /** - * Pause the endpoint, which will make it stop accepting new sockets. - */ - public void pause() { - if (running && !paused) { - paused = true; - unlockAccept(); - } - } - - - /** - * Resume the endpoint, which will make it start accepting new sockets - * again. - */ - public void resume() { - if (running) { - paused = false; - } - } - - - /** - * Stop the endpoint. This will cause all processing threads to stop. - */ - public abstract void stop(); - - - /** - * Deallocate APR memory pools, and close server socket. - */ - public abstract void destroy() throws Exception; - - - // ------------------------------------------------------ Protected Methods - - - /** - * Get a sequence number used for thread naming. - */ - protected int getSequence() { - return sequence++; - } - - - /** - * Unlock the server socket accept using a bugus connection. - */ - protected void unlockAccept() { - java.net.Socket s = null; - try { - // Need to create a connection to unlock the accept(); - if (address == null) { - s = new java.net.Socket("127.0.0.1", port); - } else { - s = new java.net.Socket(address, port); - // setting soLinger to a small value will help shutdown the - // connection quicker - s.setSoLinger(true, 0); - } - } catch(Exception e) { - if (log.isDebugEnabled()) { - log.debug(sm.getString("endpoint.debug.unlock", "" + port), e); - } - } finally { - if (s != null) { - try { - s.close(); - } catch (Exception e) { - // Ignore - } - } - } - } - - -} diff --git a/java/org/apache/tomcat/util/net/JIoEndpoint.java b/java/org/apache/tomcat/util/net/JIoEndpoint.java index e8b5ef447..e51de3683 100644 --- a/java/org/apache/tomcat/util/net/JIoEndpoint.java +++ b/java/org/apache/tomcat/util/net/JIoEndpoint.java @@ -23,11 +23,17 @@ import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; import org.apache.tomcat.util.IntrospectionUtils; import org.apache.tomcat.util.res.StringManager; +import org.apache.tomcat.util.threads.ResizableExecutor; +import org.apache.tomcat.util.threads.TaskQueue; +import org.apache.tomcat.util.threads.TaskThreadFactory; +import org.apache.tomcat.util.threads.ThreadPoolExecutor; /** * Handle incoming TCP connections. @@ -45,50 +51,17 @@ import org.apache.tomcat.util.res.StringManager; * @author Yoav Shapira * @author Remy Maucherat */ -public class JIoEndpoint { +public class JIoEndpoint extends AbstractEndpoint { // -------------------------------------------------------------- Constants - protected static Log log = LogFactory.getLog(JIoEndpoint.class); - protected StringManager sm = - StringManager.getManager("org.apache.tomcat.util.net.res"); - - - /** - * The Request attribute key for the cipher suite. - */ - public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite"; - - /** - * The Request attribute key for the key size. - */ - public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size"; - - /** - * The Request attribute key for the client certificate chain. - */ - public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate"; - - /** - * The Request attribute key for the session id. - * This one is a Tomcat extension to the Servlet spec. - */ - public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session"; - - // ----------------------------------------------------------------- Fields /** - * Available workers. - */ - protected WorkerStack workers = null; - - - /** * Running state of the endpoint. */ protected volatile boolean running = false; @@ -134,6 +107,10 @@ public class JIoEndpoint { */ protected SocketProperties socketProperties = new SocketProperties(); + /** + * Are we using an internal executor + */ + protected volatile boolean internalExecutor = false; // ------------------------------------------------------------- Properties @@ -177,13 +154,19 @@ public class JIoEndpoint { public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; if (running) { - synchronized(workers) { - workers.resize(maxThreads); - } + //TODO Dynamic resize + log.error("Resizing executor dynamically is not possible at this time."); } } public int getMaxThreads() { return maxThreads; } + public int minSpareThreads = 10; + public int getMinSpareThreads() { + return Math.min(minSpareThreads,getMaxThreads()); + } + public void setMinSpareThreads(int minSpareThreads) { + this.minSpareThreads = minSpareThreads; + } /** * Priority of the acceptor and poller threads. @@ -304,9 +287,15 @@ public class JIoEndpoint { */ public int getCurrentThreadCount() { if (executor!=null) { - return -1; + if (executor instanceof ThreadPoolExecutor) { + return ((ThreadPoolExecutor)executor).getPoolSize(); + } else if (executor instanceof ResizableExecutor) { + return ((ResizableExecutor)executor).getPoolSize(); + } else { + return -1; + } } else { - return curThreads; + return -2; } } @@ -317,9 +306,15 @@ public class JIoEndpoint { */ public int getCurrentThreadsBusy() { if (executor!=null) { - return -1; + if (executor instanceof ThreadPoolExecutor) { + return ((ThreadPoolExecutor)executor).getActiveCount(); + } else if (executor instanceof ResizableExecutor) { + return ((ResizableExecutor)executor).getActiveCount(); + } else { + return -1; + } } else { - return workers!=null?curThreads - workers.size():0; + return -2; } } @@ -426,113 +421,6 @@ public class JIoEndpoint { } - // ----------------------------------------------------- Worker Inner Class - - - protected class Worker implements Runnable { - - protected Thread thread = null; - protected boolean available = false; - protected Socket socket = null; - - - /** - * Process an incoming TCP/IP connection on the specified socket. Any - * exception that occurs during processing must be logged and swallowed. - * NOTE: This method is called from our Connector's thread. We - * must assign it to our own thread so that multiple simultaneous - * requests can be handled. - * - * @param socket TCP socket to process - */ - synchronized void assign(Socket socket) { - - // Wait for the Processor to get the previous Socket - while (available) { - try { - wait(); - } catch (InterruptedException e) { - } - } - - // Store the newly available Socket and notify our thread - this.socket = socket; - available = true; - notifyAll(); - - } - - - /** - * Await a newly assigned Socket from our Connector, or null - * if we are supposed to shut down. - */ - private synchronized Socket await() { - - // Wait for the Connector to provide a new Socket - while (!available) { - try { - wait(); - } catch (InterruptedException e) { - } - } - - // Notify the Connector that we have received this Socket - Socket socket = this.socket; - available = false; - notifyAll(); - - return (socket); - - } - - - - /** - * The background thread that listens for incoming TCP/IP connections and - * hands them off to an appropriate processor. - */ - public void run() { - - // Process requests until we receive a shutdown signal - while (running) { - - // Wait for the next socket to be assigned - Socket socket = await(); - if (socket == null) - continue; - - // Process the request from this socket - if (!setSocketOptions(socket) || !handler.process(socket)) { - // Close socket - try { - socket.close(); - } catch (IOException e) { - } - } - - // Finish up this request - socket = null; - recycleWorkerThread(this); - - } - - } - - - /** - * Start the background processing thread. - */ - public void start() { - thread = new Thread(this); - thread.setName(getName() + "-" + (++curThreads)); - thread.setDaemon(true); - thread.start(); - } - - - } - // -------------------- Public methods -------------------- @@ -583,7 +471,11 @@ public class JIoEndpoint { // Create worker collection if (executor == null) { - workers = new WorkerStack(maxThreads); + internalExecutor = true; + TaskQueue taskqueue = new TaskQueue(); + TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority()); + executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); + taskqueue.setParent( (ThreadPoolExecutor) executor); } // Start acceptor threads @@ -614,6 +506,16 @@ public class JIoEndpoint { running = false; unlockAccept(); } + if ( executor!=null && internalExecutor ) { + if ( executor instanceof ThreadPoolExecutor ) { + //this is our internal one, so we need to shut it down + ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor; + tpe.shutdownNow(); + TaskQueue queue = (TaskQueue) tpe.getQueue(); + queue.setParent(null); + } + executor = null; + } } /** @@ -696,97 +598,16 @@ public class JIoEndpoint { /** - * Create (or allocate) and return an available processor for use in - * processing a specific HTTP request, if possible. If the maximum - * allowed processors have already been created and are in use, return - * null instead. - */ - protected Worker createWorkerThread() { - - synchronized (workers) { - if (workers.size() > 0) { - curThreadsBusy++; - return workers.pop(); - } - if ((maxThreads > 0) && (curThreads < maxThreads)) { - curThreadsBusy++; - if (curThreadsBusy == maxThreads) { - log.info(sm.getString("endpoint.info.maxThreads", - Integer.toString(maxThreads), address, - Integer.toString(port))); - } - return (newWorkerThread()); - } else { - if (maxThreads < 0) { - curThreadsBusy++; - return (newWorkerThread()); - } else { - return (null); - } - } - } - - } - - - /** - * Create and return a new processor suitable for processing HTTP - * requests and returning the corresponding responses. - */ - protected Worker newWorkerThread() { - - Worker workerThread = new Worker(); - workerThread.start(); - return (workerThread); - - } - - - /** - * Return a new worker thread, and block while to worker is available. - */ - protected Worker getWorkerThread() { - // Allocate a new worker thread - Worker workerThread = createWorkerThread(); - while (workerThread == null) { - try { - synchronized (workers) { - workers.wait(); - } - } catch (InterruptedException e) { - // Ignore - } - workerThread = createWorkerThread(); - } - return workerThread; - } - - - /** - * Recycle the specified Processor so that it can be used again. - * - * @param workerThread The processor to be recycled - */ - protected void recycleWorkerThread(Worker workerThread) { - synchronized (workers) { - workers.push(workerThread); - curThreadsBusy--; - workers.notify(); - } - } - - - /** * Process given socket. */ protected boolean processSocket(Socket socket) { try { - if (executor == null) { - getWorkerThread().assign(socket); - } else { - executor.execute(new SocketProcessor(socket)); - } + executor.execute(new SocketProcessor(socket)); + } catch (RejectedExecutionException x) { + log.warn("Socket processing request was rejected for:"+socket,x); + return false; } catch (Throwable t) { + // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full log.error(sm.getString("endpoint.process.fail"), t); @@ -796,81 +617,4 @@ public class JIoEndpoint { } - // ------------------------------------------------- WorkerStack Inner Class - - - public class WorkerStack { - - protected Worker[] workers = null; - protected int end = 0; - - public WorkerStack(int size) { - workers = new Worker[size]; - } - - /** - * Put the object into the queue. If the queue is full (for example if - * the queue has been reduced in size) the object will be dropped. - * - * @param object the object to be appended to the queue (first - * element). - */ - public void push(Worker worker) { - if (end < workers.length) { - workers[end++] = worker; - } else { - curThreads--; - } - } - - /** - * Get the first object out of the queue. Return null if the queue - * is empty. - */ - public Worker pop() { - if (end > 0) { - return workers[--end]; - } - return null; - } - - /** - * Get the first object out of the queue, Return null if the queue - * is empty. - */ - public Worker peek() { - return workers[end]; - } - - /** - * Is the queue empty? - */ - public boolean isEmpty() { - return (end == 0); - } - - /** - * How many elements are there in this queue? - */ - public int size() { - return (end); - } - - /** - * Resize the queue. If there are too many objects in the queue for the - * new size, drop the excess. - * - * @param newSize - */ - public void resize(int newSize) { - Worker[] newWorkers = new Worker[newSize]; - int len = workers.length; - if (newSize < len) { - len = newSize; - } - System.arraycopy(workers, 0, newWorkers, 0, len); - workers = newWorkers; - } - } - } diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java index b4f4362d2..bfc0f6e63 100644 --- a/java/org/apache/tomcat/util/net/NioEndpoint.java +++ b/java/org/apache/tomcat/util/net/NioEndpoint.java @@ -58,6 +58,7 @@ import org.apache.tomcat.util.IntrospectionUtils; import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler; import org.apache.tomcat.util.net.jsse.NioX509KeyManager; import org.apache.tomcat.util.res.StringManager; +import org.apache.tomcat.util.threads.ResizableExecutor; import org.apache.tomcat.util.threads.TaskQueue; import org.apache.tomcat.util.threads.TaskThreadFactory; import org.apache.tomcat.util.threads.ThreadPoolExecutor; @@ -77,7 +78,7 @@ import org.apache.tomcat.util.threads.ThreadPoolExecutor; * @author Remy Maucherat * @author Filip Hanik */ -public class NioEndpoint { +public class NioEndpoint extends AbstractEndpoint { // -------------------------------------------------------------- Constants @@ -85,30 +86,6 @@ public class NioEndpoint { protected static Log log = LogFactory.getLog(NioEndpoint.class); - protected static StringManager sm = - StringManager.getManager("org.apache.tomcat.util.net.res"); - - - /** - * The Request attribute key for the cipher suite. - */ - public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite"; - - /** - * The Request attribute key for the key size. - */ - public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size"; - - /** - * The Request attribute key for the client certificate chain. - */ - public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate"; - - /** - * The Request attribute key for the session id. - * This one is a Tomcat extension to the Servlet spec. - */ - public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session"; public static final int OP_REGISTER = 0x100; //register interest op public static final int OP_CALLBACK = 0x200; //callback interest op @@ -333,7 +310,7 @@ public class NioEndpoint { /** * Are we using an internal executor */ - protected boolean internalExecutor = true; + protected volatile boolean internalExecutor = false; protected boolean useExecutor = true; /** @@ -518,13 +495,16 @@ public class NioEndpoint { /** * Dummy maxSpareThreads property. */ - public int getMaxSpareThreads() { return Math.min(getMaxThreads(),5); } + public int getMaxSpareThreads() { return Math.min(getMaxThreads(),getMinSpareThreads()); } - /** - * Dummy minSpareThreads property. - */ - public int getMinSpareThreads() { return Math.min(getMaxThreads(),5); } + public int minSpareThreads = 10; + public int getMinSpareThreads() { + return Math.min(minSpareThreads,getMaxThreads()); + } + public void setMinSpareThreads(int minSpareThreads) { + this.minSpareThreads = minSpareThreads; + } /** * Generic properties, introspected @@ -733,6 +713,8 @@ public class NioEndpoint { if (executor!=null) { if (executor instanceof ThreadPoolExecutor) { return ((ThreadPoolExecutor)executor).getPoolSize(); + } else if (executor instanceof ResizableExecutor) { + return ((ResizableExecutor)executor).getPoolSize(); } else { return -1; } @@ -750,6 +732,8 @@ public class NioEndpoint { if (executor!=null) { if (executor instanceof ThreadPoolExecutor) { return ((ThreadPoolExecutor)executor).getActiveCount(); + } else if (executor instanceof ResizableExecutor) { + return ((ResizableExecutor)executor).getActiveCount(); } else { return -1; } @@ -1142,9 +1126,7 @@ public class NioEndpoint { if ( dispatch && executor!=null ) executor.execute(sc); else sc.run(); } catch (RejectedExecutionException rx) { - if (log.isDebugEnabled()) { - log.debug("Unable to process socket, executor rejected the task.",rx); - } + log.warn("Socket processing request was rejected for:"+socket,rx); return false; } catch (Throwable t) { // This means we got an OOM or similar creating a thread, or that diff --git a/java/org/apache/tomcat/util/threads/ResizableExecutor.java b/java/org/apache/tomcat/util/threads/ResizableExecutor.java new file mode 100644 index 000000000..3d137e39b --- /dev/null +++ b/java/org/apache/tomcat/util/threads/ResizableExecutor.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.util.threads; + +import java.util.concurrent.Executor; + +public interface ResizableExecutor extends Executor { + /** + * {@link java.util.concurrent.ThreadPoolExecutor#getPoolSize()} + * @return {@link java.util.concurrent.ThreadPoolExecutor#getPoolSize()} + */ + public int getPoolSize(); + + /** + * {@link java.util.concurrent.ThreadPoolExecutor#getActiveCount()} + * @return {@link java.util.concurrent.ThreadPoolExecutor#getActiveCount()} + */ + public int getActiveCount(); + + public boolean resizePool(int corePoolSize, int maximumPoolSize); + + public boolean resizeQueue(int capacity); + +}