From: fhanik Date: Tue, 13 Mar 2007 23:54:03 +0000 (+0000) Subject: Implement the use of a useful executor, this executor will increase threads until... X-Git-Url: https://git.internetallee.de/?a=commitdiff_plain;h=1c04bb42f65f101dc8b6b154814f9002c1b190cb;p=tomcat7.0 Implement the use of a useful executor, this executor will increase threads until it reaches max threads, then it starts queueing the connections. This yields in much better fairness. git-svn-id: https://svn.apache.org/repos/asf/tomcat/tc6.0.x/trunk@517941 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/java/org/apache/coyote/http11/Http11NioProtocol.java b/java/org/apache/coyote/http11/Http11NioProtocol.java index 6afddaeca..9b390aa7c 100644 --- a/java/org/apache/coyote/http11/Http11NioProtocol.java +++ b/java/org/apache/coyote/http11/Http11NioProtocol.java @@ -256,6 +256,10 @@ public class Http11NioProtocol implements ProtocolHandler, MBeanRegistration public void setExecutor(Executor executor) { ep.setExecutor(executor); } + + public void setUseExecutor(boolean useexec) { + ep.setUseExecutor(useexec); + } public int getMaxThreads() { return ep.getMaxThreads(); diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java index 40457f6fd..990057c46 100644 --- a/java/org/apache/tomcat/util/net/NioEndpoint.java +++ b/java/org/apache/tomcat/util/net/NioEndpoint.java @@ -51,6 +51,8 @@ import java.util.Comparator; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.LinkedBlockingQueue; +import java.util.Collection; +import java.util.concurrent.ThreadFactory; /** * NIO tailored thread pool, providing the following services: @@ -268,7 +270,10 @@ public class NioEndpoint { protected Executor executor = null; public void setExecutor(Executor executor) { this.executor = executor; } public Executor getExecutor() { return executor; } - + + protected boolean useExecutor = true; + public void setUseExecutor(boolean useexec) { useExecutor = useexec;} + public boolean getUseExecutor() { return useExecutor;} /** * Maximum amount of worker threads. @@ -639,7 +644,7 @@ public class NioEndpoint { /** - * Start the APR endpoint, creating acceptor, poller threads. + * Start the NIO endpoint, creating acceptor, poller threads. */ public void start() throws Exception { @@ -652,9 +657,15 @@ public class NioEndpoint { paused = false; // Create worker collection - if (executor == null) { - //workers = new WorkerStack(maxThreads); - executor = new ThreadPoolExecutor(getMaxThreads(),getMaxThreads(),5000,TimeUnit.MILLISECONDS,new LinkedBlockingQueue()); + if (getUseExecutor()) { + if ( executor == null ) { + TaskQueue taskqueue = new TaskQueue(); + TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-"); + executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); + taskqueue.setParent( (ThreadPoolExecutor) executor); + } + } else { + workers = new WorkerStack(maxThreads); } // Start acceptor threads @@ -716,6 +727,13 @@ public class NioEndpoint { eventCache.clear(); keyCache.clear(); nioChannels.clear(); + if ( executor!=null ) { + ThreadPoolExecutor tpe = (ThreadPoolExecutor)executor; + tpe.shutdown(); + TaskQueue queue = (TaskQueue)tpe.getQueue(); + queue.setParent(null); + executor = null; + } } @@ -863,6 +881,8 @@ public class NioEndpoint { */ protected boolean isWorkerAvailable() { if ( executor != null ) { +// ThreadPoolExecutor tpe = (ThreadPoolExecutor)executor; +// TaskQueue queue = (TaskQueue)tpe.getQueue(); return true; } else { if (workers.size() > 0) { @@ -953,23 +973,6 @@ public class NioEndpoint { workers.notify(); } } - - - protected boolean processSocket(SocketChannel socket) { - try { - if (executor == null) { - getWorkerThread().assign(socket); - } else { - executor.execute(new SocketOptionsProcessor(socket)); - } - } catch (Throwable t) { - // This means we got an OOM or similar creating a thread, or that - // the pool and its queue are full - log.error(sm.getString("endpoint.process.fail"), t); - return false; - } - return true; - } /** * Process given socket. */ @@ -978,7 +981,7 @@ public class NioEndpoint { if (executor == null) { getWorkerThread().assign(socket); } else { - executor.execute(new SocketProcessor(socket)); + executor.execute(new SocketProcessor(socket,null)); } } catch (Throwable t) { // This means we got an OOM or similar creating a thread, or that @@ -998,7 +1001,7 @@ public class NioEndpoint { if (executor == null) { getWorkerThread().assign(socket, status); } else { - executor.execute(new SocketEventProcessor(socket, status)); + executor.execute(new SocketProcessor(socket, status)); } } catch (Throwable t) { // This means we got an OOM or similar creating a thread, or that @@ -1298,9 +1301,8 @@ public class NioEndpoint { while (iterator != null && iterator.hasNext()) { SelectionKey sk = (SelectionKey) iterator.next(); KeyAttachment attachment = (KeyAttachment)sk.attachment(); - if ( processKey(sk, attachment) ) { - iterator.remove(); //only remove it if the key was processed. - } + iterator.remove(); + processKey(sk, attachment); }//while //process timeouts @@ -1691,6 +1693,7 @@ public class NioEndpoint { thread = new Thread(this); thread.setName(getName() + "-" + (++curThreads)); thread.setDaemon(true); + thread.setPriority(getThreadPriority()); thread.start(); } @@ -1827,15 +1830,19 @@ public class NioEndpoint { protected class SocketProcessor implements Runnable { protected NioChannel socket = null; + protected SocketStatus status = null; - public SocketProcessor(NioChannel socket) { + public SocketProcessor(NioChannel socket, SocketStatus status) { this.socket = socket; + this.status = status; } - + public void run() { // Process the request from this socket - if (handler.process(socket) == Handler.SocketState.CLOSED) { + boolean closed = (status==null)?(handler.process(socket)==Handler.SocketState.CLOSED) : + (handler.event(socket,status)==Handler.SocketState.CLOSED); + if (closed) { // Close socket and pool try { try {socket.close();}catch (Exception ignore){} @@ -1844,47 +1851,56 @@ public class NioEndpoint { log.error("",x); } socket = null; + status = null; } } } + // ---------------------------------------------- TaskQueue Inner Class + public static class TaskQueue extends LinkedBlockingQueue { + ThreadPoolExecutor parent = null; + + public TaskQueue() { + super(); + } - - // --------------------------------------- SocketEventProcessor Inner Class - - - /** - * This class is the equivalent of the Worker, but will simply use in an - * external Executor thread pool. - */ - protected class SocketEventProcessor implements Runnable { - - protected NioChannel socket = null; - protected SocketStatus status = null; - - public SocketEventProcessor(NioChannel socket, SocketStatus status) { - this.socket = socket; - this.status = status; + public TaskQueue(int initialCapacity) { + super(initialCapacity); + } + + public TaskQueue(Collection c) { + super(c); } - public void run() { + + public void setParent(ThreadPoolExecutor tp) { + parent = tp; + } + + public boolean offer(Runnable o) { + if ( parent != null && parent.getPoolSize() + + Set to true to use the NIO thread pool executor. The default value is true. + If set to false, it uses a thread pool based on a stack for its execution. + Generally, using the executor yields a little bit slower performance, but yields a better + fairness for processing connections in a high load environment as the traffic gets queued through a + FIFO queue. If set to true(default) then the max pool size is the maxThreads attribute + and the core pool size is the minSpareThreads. +

The number of threads to be used to accept connections. Increase this value on a multi CPU machine, although you would never really need more than 2. Also, with a lot of non keep alive connections,