From 9330879c84eb542fa6e3a043669f7a99dab2d350 Mon Sep 17 00:00:00 2001 From: fhanik Date: Mon, 8 Dec 2008 03:32:07 +0000 Subject: [PATCH] Remove the synchronize/notifyAll based thread pool, and use only the built in pool git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@724239 13f79535-47bb-0310-9956-ffa450edef68 --- java/org/apache/tomcat/util/net/NioEndpoint.java | 373 ++--------------------- 1 file changed, 17 insertions(+), 356 deletions(-) diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java index 7595f6f23..dd3eea8c9 100644 --- a/java/org/apache/tomcat/util/net/NioEndpoint.java +++ b/java/org/apache/tomcat/util/net/NioEndpoint.java @@ -116,12 +116,6 @@ public class NioEndpoint { /** - * Available workers. - */ - protected WorkerStack workers = null; - - - /** * Running state of the endpoint. */ protected volatile boolean running = false; @@ -349,7 +343,11 @@ public class NioEndpoint { public Executor getExecutor() { return executor; } protected boolean useExecutor = true; - public void setUseExecutor(boolean useexec) { useExecutor = useexec;} + /** + * @deprecated Executor is always used + * @param useexec + */ + public void setUseExecutor(boolean useexec) { log.info("Setting useExecutor is deprecated. Executors are always used.");} public boolean getUseExecutor() { return useExecutor || (executor!=null);} /** @@ -359,14 +357,10 @@ public class NioEndpoint { public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; if (running) { - if (getUseExecutor() && executor!=null) { + if (executor!=null) { if (executor instanceof ThreadPoolExecutor) { ((ThreadPoolExecutor)executor).setMaximumPoolSize(maxThreads); } - }else if (workers!=null){ - synchronized(workers) { - workers.resize(maxThreads); - } } } } @@ -857,15 +851,11 @@ public class NioEndpoint { paused = false; // Create worker collection - if (getUseExecutor()) { - if ( executor == null ) { - TaskQueue taskqueue = new TaskQueue(); - TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-"); - executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); - taskqueue.setParent( (ThreadPoolExecutor) executor, this); - } - } else if ( executor == null ) {//avoid two thread pools being created - workers = new WorkerStack(maxThreads); + if ( executor == null ) { + TaskQueue taskqueue = new TaskQueue(); + TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-"); + executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); + taskqueue.setParent( (ThreadPoolExecutor) executor, this); } // Start poller threads @@ -1104,99 +1094,8 @@ public class NioEndpoint { protected boolean isWorkerAvailable() { if ( executor != null ) { return true; - } else { - if (workers.size() > 0) { - return true; - } - if ( (maxThreads > 0) && (curThreads < maxThreads)) { - return true; - } else { - if (maxThreads < 0) { - return true; - } else { - return false; - } - } - } - } - /** - * Create (or allocate) and return an available processor for use in - * processing a specific HTTP request, if possible. If the maximum - * allowed processors have already been created and are in use, return - * null instead. - */ - protected Worker createWorkerThread() { - - synchronized (workers) { - if (workers.size() > 0) { - curThreadsBusy++; - return (workers.pop()); - } - if ((maxThreads > 0) && (curThreads < maxThreads)) { - curThreadsBusy++; - if (curThreadsBusy == maxThreads) { - log.info(sm.getString("endpoint.info.maxThreads", - Integer.toString(maxThreads), address, - Integer.toString(port))); - } - return (newWorkerThread()); - } else { - if (maxThreads < 0) { - curThreadsBusy++; - return (newWorkerThread()); - } else { - return (null); - } - } - } - } - - - /** - * Create and return a new processor suitable for processing HTTP - * requests and returning the corresponding responses. - */ - protected Worker newWorkerThread() { - - Worker workerThread = new Worker(); - workerThread.start(); - return (workerThread); - - } - - - /** - * Return a new worker thread, and block while to worker is available. - */ - protected Worker getWorkerThread() { - // Allocate a new worker thread - Worker workerThread = createWorkerThread(); - while (workerThread == null) { - try { - synchronized (workers) { - workerThread = createWorkerThread(); - if ( workerThread == null ) workers.wait(); - } - } catch (InterruptedException e) { - // Ignore - } - if ( workerThread == null ) workerThread = createWorkerThread(); - } - return workerThread; - } - - - /** - * Recycle the specified Processor so that it can be used again. - * - * @param workerThread The processor to be recycled - */ - protected void recycleWorkerThread(Worker workerThread) { - synchronized (workers) { - workers.push(workerThread); - curThreadsBusy--; - workers.notify(); } + return false; } /** * Process given socket. @@ -1217,15 +1116,11 @@ public class NioEndpoint { try { KeyAttachment attachment = (KeyAttachment)socket.getAttachment(false); attachment.setCometNotify(false); //will get reset upon next reg - if (executor == null) { - getWorkerThread().assign(socket, status); - } else { - SocketProcessor sc = processorCache.poll(); - if ( sc == null ) sc = new SocketProcessor(socket,status); - else sc.reset(socket,status); - if ( dispatch ) executor.execute(sc); - else sc.run(); - } + SocketProcessor sc = processorCache.poll(); + if ( sc == null ) sc = new SocketProcessor(socket,status); + else sc.reset(socket,status); + if ( dispatch && executor!=null ) executor.execute(sc); + else sc.run(); } catch (Throwable t) { // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full @@ -1888,162 +1783,6 @@ public class NioEndpoint { protected long lastRegistered = 0; protected SendfileData sendfileData = null; } - // ----------------------------------------------------- Worker Inner Class - - - /** - * Server processor class. - */ - protected class Worker implements Runnable { - - - protected Thread thread = null; - protected boolean available = false; - protected Object socket = null; - protected SocketStatus status = null; - - - /** - * Process an incoming TCP/IP connection on the specified socket. Any - * exception that occurs during processing must be logged and swallowed. - * NOTE: This method is called from our Connector's thread. We - * must assign it to our own thread so that multiple simultaneous - * requests can be handled. - * - * @param socket TCP socket to process - */ - protected synchronized void assign(Object socket) { - - // Wait for the Processor to get the previous Socket - while (available) { - try { - wait(); - } catch (InterruptedException e) { - } - } - // Store the newly available Socket and notify our thread - this.socket = socket; - status = null; - available = true; - notifyAll(); - - } - - - protected synchronized void assign(Object socket, SocketStatus status) { - - // Wait for the Processor to get the previous Socket - while (available) { - try { - wait(); - } catch (InterruptedException e) { - } - } - - // Store the newly available Socket and notify our thread - this.socket = socket; - this.status = status; - available = true; - notifyAll(); - } - - - /** - * Await a newly assigned Socket from our Connector, or null - * if we are supposed to shut down. - */ - protected synchronized Object await() { - - // Wait for the Connector to provide a new Socket - while (!available) { - try { - wait(); - } catch (InterruptedException e) { - } - } - - // Notify the Connector that we have received this Socket - Object socket = this.socket; - available = false; - notifyAll(); - - return (socket); - - } - - - /** - * The background thread that listens for incoming TCP/IP connections and - * hands them off to an appropriate processor. - */ - public void run() { - - // Process requests until we receive a shutdown signal - while (running) { - NioChannel socket = null; - SelectionKey key = null; - try { - // Wait for the next socket to be assigned - Object channel = await(); - if (channel == null) - continue; - - if ( channel instanceof SocketChannel) { - SocketChannel sc = (SocketChannel)channel; - if ( !setSocketOptions(sc) ) { - try { - sc.socket().close(); - sc.close(); - }catch ( IOException ix ) { - if ( log.isDebugEnabled() ) log.debug("",ix); - } - } else { - //now we have it registered, remove it from the cache - - } - } else { - socket = (NioChannel)channel; - SocketProcessor sc = processorCache.poll(); - if ( sc == null ) sc = new SocketProcessor(socket,status); - else sc.reset(socket,status); - sc.run(); - } - }catch(CancelledKeyException cx) { - if (socket!=null && key!=null) socket.getPoller().cancelledKey(key,null,false); - } catch (OutOfMemoryError oom) { - try { - oomParachuteData = null; - releaseCaches(); - log.error("", oom); - }catch ( Throwable oomt ) { - try { - System.err.println(oomParachuteMsg); - oomt.printStackTrace(); - }catch (Throwable letsHopeWeDontGetHere){} - } - } finally { - //dereference socket to let GC do its job - socket = null; - // Finish up this request - recycleWorkerThread(this); - } - } - } - - - /** - * Start the background processing thread. - */ - public void start() { - thread = new Thread(this); - thread.setName(getName() + "-" + (++curThreads)); - thread.setDaemon(true); - thread.setPriority(getThreadPriority()); - thread.start(); - } - - - } // ------------------------------------------------ Application Buffer Handler public class NioBufferHandler implements ApplicationBufferHandler { @@ -2085,84 +1824,6 @@ public class NioEndpoint { } - // ------------------------------------------------- WorkerStack Inner Class - - - public class WorkerStack { - - protected Worker[] workers = null; - protected int end = 0; - - public WorkerStack(int size) { - workers = new Worker[size]; - } - - /** - * Put the object into the queue. If the queue is full (for example if - * the queue has been reduced in size) the object will be dropped. - * - * @param object the object to be appended to the queue (first - * element). - */ - public void push(Worker worker) { - if (end < workers.length) { - workers[end++] = worker; - } else { - curThreads--; - } - } - - /** - * Get the first object out of the queue. Return null if the queue - * is empty. - */ - public Worker pop() { - if (end > 0) { - return workers[--end]; - } - return null; - } - - /** - * Get the first object out of the queue, Return null if the queue - * is empty. - */ - public Worker peek() { - return workers[end]; - } - - /** - * Is the queue empty? - */ - public boolean isEmpty() { - return (end == 0); - } - - /** - * How many elements are there in this queue? - */ - public int size() { - return (end); - } - - /** - * Resize the queue. If there are too many objects in the queue for the - * new size, drop the excess. - * - * @param newSize - */ - public void resize(int newSize) { - Worker[] newWorkers = new Worker[newSize]; - int len = workers.length; - if (newSize < len) { - len = newSize; - } - System.arraycopy(workers, 0, newWorkers, 0, len); - workers = newWorkers; - } - } - - // ---------------------------------------------- SocketProcessor Inner Class -- 2.11.0