From 1781318137570329220c96e51e7ee0d8a97742de Mon Sep 17 00:00:00 2001 From: markt Date: Thu, 9 Apr 2009 10:37:36 +0000 Subject: [PATCH] Remove old thread pool code git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@763590 13f79535-47bb-0310-9956-ffa450edef68 --- .../util/net/LeaderFollowerWorkerThread.java | 87 --- .../tomcat/util/net/MasterSlaveWorkerThread.java | 151 ---- .../apache/tomcat/util/net/PoolTcpEndpoint.java | 686 ----------------- java/org/apache/tomcat/util/net/TcpConnection.java | 110 --- .../tomcat/util/net/TcpConnectionHandler.java | 50 -- .../org/apache/tomcat/util/threads/ThreadPool.java | 839 --------------------- .../tomcat/util/threads/ThreadPoolRunnable.java | 39 - .../tomcat/util/threads/ThreadWithAttributes.java | 104 --- 8 files changed, 2066 deletions(-) delete mode 100644 java/org/apache/tomcat/util/net/LeaderFollowerWorkerThread.java delete mode 100644 java/org/apache/tomcat/util/net/MasterSlaveWorkerThread.java delete mode 100644 java/org/apache/tomcat/util/net/PoolTcpEndpoint.java delete mode 100644 java/org/apache/tomcat/util/net/TcpConnection.java delete mode 100644 java/org/apache/tomcat/util/net/TcpConnectionHandler.java delete mode 100644 java/org/apache/tomcat/util/threads/ThreadPool.java delete mode 100644 java/org/apache/tomcat/util/threads/ThreadPoolRunnable.java delete mode 100644 java/org/apache/tomcat/util/threads/ThreadWithAttributes.java diff --git a/java/org/apache/tomcat/util/net/LeaderFollowerWorkerThread.java b/java/org/apache/tomcat/util/net/LeaderFollowerWorkerThread.java deleted file mode 100644 index 8d2f65bec..000000000 --- a/java/org/apache/tomcat/util/net/LeaderFollowerWorkerThread.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tomcat.util.net; - -import java.net.Socket; -import org.apache.tomcat.util.threads.ThreadPoolRunnable; - -/* - * I switched the threading model here. - * - * We used to have a "listener" thread and a "connection" - * thread, this results in code simplicity but also a needless - * thread switch. - * - * Instead I am now using a pool of threads, all the threads are - * simmetric in their execution and no thread switch is needed. - */ -class LeaderFollowerWorkerThread implements ThreadPoolRunnable { - /* This is not a normal Runnable - it gets attached to an existing - thread, runs and when run() ends - the thread keeps running. - - It's better to keep the name ThreadPoolRunnable - avoid confusion. - We also want to use per/thread data and avoid sync wherever possible. - */ - PoolTcpEndpoint endpoint; - - public LeaderFollowerWorkerThread(PoolTcpEndpoint endpoint) { - this.endpoint = endpoint; - } - - public Object[] getInitData() { - // no synchronization overhead, but 2 array access - Object obj[]=new Object[2]; - obj[1]= endpoint.getConnectionHandler().init(); - obj[0]=new TcpConnection(); - return obj; - } - - public void runIt(Object perThrData[]) { - - // Create per-thread cache - if (endpoint.isRunning()) { - - // Loop if endpoint is paused - while (endpoint.isPaused()) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // Ignore - } - } - - // Accept a new connection - Socket s = null; - try { - s = endpoint.acceptSocket(); - } finally { - // Continue accepting on another thread... - if (endpoint.isRunning()) { - endpoint.tp.runIt(this); - } - } - - // Process the connection - if (null != s) { - endpoint.processSocket(s, (TcpConnection) perThrData[0], (Object[]) perThrData[1]); - } - - } - } - -} diff --git a/java/org/apache/tomcat/util/net/MasterSlaveWorkerThread.java b/java/org/apache/tomcat/util/net/MasterSlaveWorkerThread.java deleted file mode 100644 index c1cfa10d1..000000000 --- a/java/org/apache/tomcat/util/net/MasterSlaveWorkerThread.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tomcat.util.net; - -import java.net.Socket; - -import org.apache.tomcat.util.threads.ThreadWithAttributes; - -/** - * Regular master slave thread pool. Slave threads will wait for work. - */ -class MasterSlaveWorkerThread implements Runnable { - - protected PoolTcpEndpoint endpoint; - protected String threadName; - protected boolean stopped = false; - private Object threadSync = new Object(); - private Thread thread = null; - private boolean available = false; - private Socket socket = null; - private TcpConnection con = new TcpConnection(); - private Object[] threadData = null; - - - public MasterSlaveWorkerThread(PoolTcpEndpoint endpoint, String threadName) { - this.endpoint = endpoint; - this.threadName = threadName; - } - - - /** - * Process an incoming TCP/IP connection on the specified socket. Any - * exception that occurs during processing must be logged and swallowed. - * NOTE: This method is called from our Connector's thread. We - * must assign it to our own thread so that multiple simultaneous - * requests can be handled. - * - * @param socket TCP socket to process - */ - synchronized void assign(Socket socket) { - - // Wait for the Processor to get the previous Socket - while (available) { - try { - wait(); - } catch (InterruptedException e) { - } - } - - // Store the newly available Socket and notify our thread - this.socket = socket; - available = true; - notifyAll(); - - } - - - /** - * Await a newly assigned Socket from our Connector, or null - * if we are supposed to shut down. - */ - private synchronized Socket await() { - - // Wait for the Connector to provide a new Socket - while (!available) { - try { - wait(); - } catch (InterruptedException e) { - } - } - - // Notify the Connector that we have received this Socket - Socket socket = this.socket; - available = false; - notifyAll(); - - return (socket); - - } - - - - /** - * The background thread that listens for incoming TCP/IP connections and - * hands them off to an appropriate processor. - */ - public void run() { - - // Process requests until we receive a shutdown signal - while (!stopped) { - - // Wait for the next socket to be assigned - Socket socket = await(); - if (socket == null) - continue; - - // Process the request from this socket - endpoint.processSocket(socket, con, threadData); - - // Finish up this request - endpoint.recycleWorkerThread(this); - - } - - // Tell threadStop() we have shut ourselves down successfully - synchronized (threadSync) { - threadSync.notifyAll(); - } - - } - - - /** - * Start the background processing thread. - */ - public void start() { - threadData = endpoint.getConnectionHandler().init(); - thread = new ThreadWithAttributes(null, this); - thread.setName(threadName); - thread.setDaemon(true); - thread.start(); - } - - - /** - * Stop the background processing thread. - */ - public void stop() { - stopped = true; - assign(null); - thread = null; - threadData = null; - } - - -} diff --git a/java/org/apache/tomcat/util/net/PoolTcpEndpoint.java b/java/org/apache/tomcat/util/net/PoolTcpEndpoint.java deleted file mode 100644 index 0b3379388..000000000 --- a/java/org/apache/tomcat/util/net/PoolTcpEndpoint.java +++ /dev/null @@ -1,686 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tomcat.util.net; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.net.BindException; -import java.net.InetAddress; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.SocketException; -import java.security.AccessControlException; -import java.util.Stack; -import java.util.Vector; - -import org.apache.juli.logging.Log; -import org.apache.juli.logging.LogFactory; -import org.apache.tomcat.util.res.StringManager; -import org.apache.tomcat.util.threads.ThreadPool; -import org.apache.tomcat.util.threads.ThreadPoolRunnable; - -/* Similar with MPM module in Apache2.0. Handles all the details related with - "tcp server" functionality - thread management, accept policy, etc. - It should do nothing more - as soon as it get a socket ( and all socket options - are set, etc), it just handle the stream to ConnectionHandler.processConnection. (costin) -*/ - - - -/** - * Handle incoming TCP connections. - * - * This class implement a simple server model: one listener thread accepts on a socket and - * creates a new worker thread for each incoming connection. - * - * More advanced Endpoints will reuse the threads, use queues, etc. - * - * @author James Duncan Davidson [duncan@eng.sun.com] - * @author Jason Hunter [jch@eng.sun.com] - * @author James Todd [gonzo@eng.sun.com] - * @author Costin@eng.sun.com - * @author Gal Shachor [shachor@il.ibm.com] - * @author Yoav Shapira - */ -public class PoolTcpEndpoint implements Runnable { // implements Endpoint { - - static Log log=LogFactory.getLog(PoolTcpEndpoint.class ); - - private StringManager sm = - StringManager.getManager("org.apache.tomcat.util.net.res"); - - private static final int BACKLOG = 100; - private static final int TIMEOUT = 1000; - - private final Object threadSync = new Object(); - - private int backlog = BACKLOG; - private int serverTimeout = TIMEOUT; - - private InetAddress inet; - private int port; - - private ServerSocketFactory factory; - private ServerSocket serverSocket; - - private volatile boolean running = false; - private volatile boolean paused = false; - private boolean initialized = false; - private boolean reinitializing = false; - static final int debug=0; - - protected boolean tcpNoDelay=false; - protected int linger=100; - protected int socketTimeout=-1; - private boolean lf = true; - - - // ------ Leader follower fields - - - TcpConnectionHandler handler; - ThreadPoolRunnable listener; - ThreadPool tp; - - - // ------ Master slave fields - - /* The background thread. */ - private Thread thread = null; - /* Available processors. */ - private Stack workerThreads = - new Stack(); - private int curThreads = 0; - private int maxThreads = 20; - /* All processors which have been created. */ - private Vector created = - new Vector(); - - - public PoolTcpEndpoint() { - tp = new ThreadPool(); - } - - public PoolTcpEndpoint( ThreadPool tp ) { - this.tp=tp; - } - - // -------------------- Configuration -------------------- - - public void setMaxThreads(int maxThreads) { - if( maxThreads > 0) - tp.setMaxThreads(maxThreads); - } - - public int getMaxThreads() { - return tp.getMaxThreads(); - } - - public void setMaxSpareThreads(int maxThreads) { - if(maxThreads > 0) - tp.setMaxSpareThreads(maxThreads); - } - - public int getMaxSpareThreads() { - return tp.getMaxSpareThreads(); - } - - public void setMinSpareThreads(int minThreads) { - if(minThreads > 0) - tp.setMinSpareThreads(minThreads); - } - - public int getMinSpareThreads() { - return tp.getMinSpareThreads(); - } - - public void setThreadPriority(int threadPriority) { - tp.setThreadPriority(threadPriority); - } - - public int getThreadPriority() { - return tp.getThreadPriority(); - } - - public int getPort() { - return port; - } - - public void setPort(int port ) { - this.port=port; - } - - public InetAddress getAddress() { - return inet; - } - - public void setAddress(InetAddress inet) { - this.inet=inet; - } - - public void setServerSocket(ServerSocket ss) { - serverSocket = ss; - } - - public void setServerSocketFactory( ServerSocketFactory factory ) { - this.factory=factory; - } - - ServerSocketFactory getServerSocketFactory() { - return factory; - } - - public void setConnectionHandler( TcpConnectionHandler handler ) { - this.handler=handler; - } - - public TcpConnectionHandler getConnectionHandler() { - return handler; - } - - public boolean isRunning() { - return running; - } - - public boolean isPaused() { - return paused; - } - - /** - * Allows the server developer to specify the backlog that - * should be used for server sockets. By default, this value - * is 100. - */ - public void setBacklog(int backlog) { - if( backlog>0) - this.backlog = backlog; - } - - public int getBacklog() { - return backlog; - } - - /** - * Sets the timeout in ms of the server sockets created by this - * server. This method allows the developer to make servers - * more or less responsive to having their server sockets - * shut down. - * - *

By default this value is 1000ms. - */ - public void setServerTimeout(int timeout) { - this.serverTimeout = timeout; - } - - public boolean getTcpNoDelay() { - return tcpNoDelay; - } - - public void setTcpNoDelay( boolean b ) { - tcpNoDelay=b; - } - - public int getSoLinger() { - return linger; - } - - public void setSoLinger( int i ) { - linger=i; - } - - public int getSoTimeout() { - return socketTimeout; - } - - public void setSoTimeout( int i ) { - socketTimeout=i; - } - - public int getServerSoTimeout() { - return serverTimeout; - } - - public void setServerSoTimeout( int i ) { - serverTimeout=i; - } - - public String getStrategy() { - if (lf) { - return "lf"; - } else { - return "ms"; - } - } - - public void setStrategy(String strategy) { - if ("ms".equals(strategy)) { - lf = false; - } else { - lf = true; - } - } - - public int getCurrentThreadCount() { - return curThreads; - } - - public int getCurrentThreadsBusy() { - return curThreads - workerThreads.size(); - } - - // -------------------- Public methods -------------------- - - public void initEndpoint() throws IOException, InstantiationException { - try { - if(factory==null) - factory=ServerSocketFactory.getDefault(); - if(serverSocket==null) { - try { - if (inet == null) { - serverSocket = factory.createSocket(port, backlog); - } else { - serverSocket = factory.createSocket(port, backlog, inet); - } - } catch ( BindException be ) { - throw new BindException(be.getMessage() + ":" + port); - } - } - if( serverTimeout >= 0 ) - serverSocket.setSoTimeout( serverTimeout ); - } catch( IOException ex ) { - throw ex; - } catch( InstantiationException ex1 ) { - throw ex1; - } - initialized = true; - } - - public void startEndpoint() throws IOException, InstantiationException { - if (!initialized) { - initEndpoint(); - } - if (lf) { - tp.start(); - } - running = true; - paused = false; - if (lf) { - listener = new LeaderFollowerWorkerThread(this); - tp.runIt(listener); - } else { - maxThreads = getMaxThreads(); - threadStart(); - } - } - - public void pauseEndpoint() { - if (running && !paused) { - paused = true; - unlockAccept(); - } - } - - public void resumeEndpoint() { - if (running) { - paused = false; - } - } - - public void stopEndpoint() { - if (running) { - if (lf) { - tp.shutdown(); - } - running = false; - if (serverSocket != null) { - closeServerSocket(); - } - if (!lf) { - threadStop(); - } - initialized=false ; - } - } - - protected void closeServerSocket() { - if (!paused) - unlockAccept(); - try { - if( serverSocket!=null) - serverSocket.close(); - } catch(Exception e) { - log.error(sm.getString("endpoint.err.close"), e); - } - serverSocket = null; - } - - protected void unlockAccept() { - Socket s = null; - try { - // Need to create a connection to unlock the accept(); - if (inet == null) { - s = new Socket("127.0.0.1", port); - } else { - s = new Socket(inet, port); - // setting soLinger to a small value will help shutdown the - // connection quicker - s.setSoLinger(true, 0); - } - } catch(Exception e) { - if (log.isDebugEnabled()) { - log.debug(sm.getString("endpoint.debug.unlock", "" + port), e); - } - } finally { - if (s != null) { - try { - s.close(); - } catch (Exception e) { - // Ignore - } - } - } - } - - // -------------------- Private methods - - Socket acceptSocket() { - if( !running || serverSocket==null ) return null; - - Socket accepted = null; - - try { - if(factory==null) { - accepted = serverSocket.accept(); - } else { - accepted = factory.acceptSocket(serverSocket); - } - if (null == accepted) { - log.warn(sm.getString("endpoint.warn.nullSocket")); - } else { - if (!running) { - accepted.close(); // rude, but unlikely! - accepted = null; - } else if (factory != null) { - factory.initSocket( accepted ); - } - } - } - catch(InterruptedIOException iioe) { - // normal part -- should happen regularly so - // that the endpoint can release if the server - // is shutdown. - } - catch (AccessControlException ace) { - // When using the Java SecurityManager this exception - // can be thrown if you are restricting access to the - // socket with SocketPermission's. - // Log the unauthorized access and continue - String msg = sm.getString("endpoint.warn.security", - serverSocket, ace); - log.warn(msg); - } - catch (IOException e) { - - String msg = null; - - if (running) { - msg = sm.getString("endpoint.err.nonfatal", - serverSocket, e); - log.error(msg, e); - } - - if (accepted != null) { - try { - accepted.close(); - } catch(Throwable ex) { - msg = sm.getString("endpoint.err.nonfatal", - accepted, ex); - log.warn(msg, ex); - } - accepted = null; - } - - if( ! running ) return null; - reinitializing = true; - // Restart endpoint when getting an IOException during accept - synchronized (threadSync) { - if (reinitializing) { - reinitializing = false; - // 1) Attempt to close server socket - closeServerSocket(); - initialized = false; - // 2) Reinit endpoint (recreate server socket) - try { - msg = sm.getString("endpoint.warn.reinit"); - log.warn(msg); - initEndpoint(); - } catch (Throwable t) { - msg = sm.getString("endpoint.err.nonfatal", - serverSocket, t); - log.error(msg, t); - } - // 3) If failed, attempt to restart endpoint - if (!initialized) { - msg = sm.getString("endpoint.warn.restart"); - log.warn(msg); - try { - stopEndpoint(); - initEndpoint(); - startEndpoint(); - } catch (Throwable t) { - msg = sm.getString("endpoint.err.fatal", - serverSocket, t); - log.error(msg, t); - } - // Current thread is now invalid: kill it - throw new ThreadDeath(); - } - } - } - - } - - return accepted; - } - - void setSocketOptions(Socket socket) - throws SocketException { - if(linger >= 0 ) - socket.setSoLinger( true, linger); - if( tcpNoDelay ) - socket.setTcpNoDelay(tcpNoDelay); - if( socketTimeout > 0 ) - socket.setSoTimeout( socketTimeout ); - } - - - void processSocket(Socket s, TcpConnection con, Object[] threadData) { - // Process the connection - int step = 1; - try { - - // 1: Set socket options: timeout, linger, etc - setSocketOptions(s); - - // 2: SSL handshake - step = 2; - if (getServerSocketFactory() != null) { - getServerSocketFactory().handshake(s); - } - - // 3: Process the connection - step = 3; - con.setEndpoint(this); - con.setSocket(s); - getConnectionHandler().processConnection(con, threadData); - - } catch (SocketException se) { - log.debug(sm.getString("endpoint.err.socket", s.getInetAddress()), - se); - // Try to close the socket - try { - s.close(); - } catch (IOException e) { - } - } catch (Throwable t) { - if (step == 2) { - if (log.isDebugEnabled()) { - log.debug(sm.getString("endpoint.err.handshake"), t); - } - } else { - log.error(sm.getString("endpoint.err.unexpected"), t); - } - // Try to close the socket - try { - s.close(); - } catch (IOException e) { - } - } finally { - if (con != null) { - con.recycle(); - } - } - } - - - // -------------------------------------------------- Master Slave Methods - - - /** - * Create (or allocate) and return an available processor for use in - * processing a specific HTTP request, if possible. If the maximum - * allowed processors have already been created and are in use, return - * null instead. - */ - private MasterSlaveWorkerThread createWorkerThread() { - - synchronized (workerThreads) { - if (workerThreads.size() > 0) { - return (workerThreads.pop()); - } - if ((maxThreads > 0) && (curThreads < maxThreads)) { - return (newWorkerThread()); - } else { - if (maxThreads < 0) { - return (newWorkerThread()); - } else { - return (null); - } - } - } - - } - - - /** - * Create and return a new processor suitable for processing HTTP - * requests and returning the corresponding responses. - */ - private MasterSlaveWorkerThread newWorkerThread() { - - MasterSlaveWorkerThread workerThread = - new MasterSlaveWorkerThread(this, tp.getName() + "-" + (++curThreads)); - workerThread.start(); - created.addElement(workerThread); - return (workerThread); - - } - - - /** - * Recycle the specified Processor so that it can be used again. - * - * @param processor The processor to be recycled - */ - void recycleWorkerThread(MasterSlaveWorkerThread workerThread) { - workerThreads.push(workerThread); - } - - - /** - * The background thread that listens for incoming TCP/IP connections and - * hands them off to an appropriate processor. - */ - public void run() { - - // Loop until we receive a shutdown command - while (running) { - - // Loop if endpoint is paused - while (paused) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // Ignore - } - } - - // Allocate a new worker thread - MasterSlaveWorkerThread workerThread = createWorkerThread(); - if (workerThread == null) { - try { - // Wait a little for load to go down: as a result, - // no accept will be made until the concurrency is - // lower than the specified maxThreads, and current - // connections will wait for a little bit instead of - // failing right away. - Thread.sleep(100); - } catch (InterruptedException e) { - // Ignore - } - continue; - } - - // Accept the next incoming connection from the server socket - Socket socket = acceptSocket(); - - // Hand this socket off to an appropriate processor - workerThread.assign(socket); - - // The processor will recycle itself when it finishes - - } - - // Notify the threadStop() method that we have shut ourselves down - synchronized (threadSync) { - threadSync.notifyAll(); - } - - } - - - /** - * Start the background processing thread. - */ - private void threadStart() { - thread = new Thread(this, tp.getName()); - thread.setPriority(getThreadPriority()); - thread.setDaemon(true); - thread.start(); - } - - - /** - * Stop the background processing thread. - */ - private void threadStop() { - thread = null; - } - - -} diff --git a/java/org/apache/tomcat/util/net/TcpConnection.java b/java/org/apache/tomcat/util/net/TcpConnection.java deleted file mode 100644 index c4b3963d6..000000000 --- a/java/org/apache/tomcat/util/net/TcpConnection.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tomcat.util.net; - -import java.io.IOException; -import java.io.InputStream; -import java.net.Socket; - -/** - * - */ -public class TcpConnection { // implements Endpoint { - /** - * Maxium number of times to clear the socket input buffer. - */ - static int MAX_SHUTDOWN_TRIES=20; - - public TcpConnection() { - } - - // -------------------- Properties -------------------- - - PoolTcpEndpoint endpoint; - Socket socket; - - public static void setMaxShutdownTries(int mst) { - MAX_SHUTDOWN_TRIES = mst; - } - public void setEndpoint(PoolTcpEndpoint endpoint) { - this.endpoint = endpoint; - } - - public PoolTcpEndpoint getEndpoint() { - return endpoint; - } - - public void setSocket(Socket socket) { - this.socket=socket; - } - - public Socket getSocket() { - return socket; - } - - public void recycle() { - endpoint = null; - socket = null; - } - - // Another frequent repetition - public static int readLine(InputStream in, byte[] b, int off, int len) - throws IOException - { - if (len <= 0) { - return 0; - } - int count = 0, c; - - while ((c = in.read()) != -1) { - b[off++] = (byte)c; - count++; - if (c == '\n' || count == len) { - break; - } - } - return count > 0 ? count : -1; - } - - - // Usefull stuff - avoid having it replicated everywhere - public static void shutdownInput(Socket socket) - throws IOException - { - try { - InputStream is = socket.getInputStream(); - int available = is.available (); - int count=0; - - // XXX on JDK 1.3 just socket.shutdownInput () which - // was added just to deal with such issues. - - // skip any unread (bogus) bytes - while (available > 0 && count++ < MAX_SHUTDOWN_TRIES) { - is.skip (available); - available = is.available(); - } - }catch(NullPointerException npe) { - // do nothing - we are just cleaning up, this is - // a workaround for Netscape \n\r in POST - it is supposed - // to be ignored - } - } -} - - diff --git a/java/org/apache/tomcat/util/net/TcpConnectionHandler.java b/java/org/apache/tomcat/util/net/TcpConnectionHandler.java deleted file mode 100644 index 686d55f0d..000000000 --- a/java/org/apache/tomcat/util/net/TcpConnectionHandler.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tomcat.util.net; - - -/** - * This interface will be implemented by any object that - * uses TcpConnections. It is supported by the pool tcp - * connection manager and should be supported by future - * managers. - * The goal is to decouple the connection handler from - * the thread, socket and pooling complexity. - */ -public interface TcpConnectionHandler { - - /** Called before the call to processConnection. - * If the thread is reused, init() should be called once per thread. - * - * It may look strange, but it's a _very_ good way to avoid synchronized - * methods and keep per thread data. - * - * Assert: the object returned from init() will be passed to - * all processConnection() methods happening in the same thread. - * - */ - public Object[] init( ); - - /** - * Assert: connection!=null - * Assert: connection.getSocket() != null - * Assert: thData != null and is the result of calling init() - * Assert: thData is preserved per Thread. - */ - public void processConnection(TcpConnection connection, Object thData[]); -} diff --git a/java/org/apache/tomcat/util/threads/ThreadPool.java b/java/org/apache/tomcat/util/threads/ThreadPool.java deleted file mode 100644 index e11810c69..000000000 --- a/java/org/apache/tomcat/util/threads/ThreadPool.java +++ /dev/null @@ -1,839 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tomcat.util.threads; - -import java.util.*; - -import org.apache.juli.logging.Log; -import org.apache.juli.logging.LogFactory; -import org.apache.tomcat.util.res.StringManager; - -/** - * A thread pool that is trying to copy the apache process management. - * - * Should we remove this in favor of Doug Lea's thread package? - * - * @author Gal Shachor - * @author Yoav Shapira - */ -public class ThreadPool { - - private static Log log = LogFactory.getLog(ThreadPool.class); - - private static StringManager sm = - StringManager.getManager("org.apache.tomcat.util.threads.res"); - - private static boolean logfull=true; - - /* - * Default values ... - */ - public static final int MAX_THREADS = 200; - public static final int MAX_THREADS_MIN = 10; - public static final int MAX_SPARE_THREADS = 50; - public static final int MIN_SPARE_THREADS = 4; - public static final int WORK_WAIT_TIMEOUT = 60*1000; - - /* - * Where the threads are held. - */ - protected ControlRunnable[] pool = null; - - /* - * A monitor thread that monitors the pool for idel threads. - */ - protected MonitorRunnable monitor; - - - /* - * Max number of threads that you can open in the pool. - */ - protected int maxThreads; - - /* - * Min number of idel threads that you can leave in the pool. - */ - protected int minSpareThreads; - - /* - * Max number of idel threads that you can leave in the pool. - */ - protected int maxSpareThreads; - - /* - * Number of threads in the pool. - */ - protected int currentThreadCount; - - /* - * Number of busy threads in the pool. - */ - protected int currentThreadsBusy; - - /* - * Flag that the pool should terminate all the threads and stop. - */ - protected boolean stopThePool; - - /* Flag to control if the main thread is 'daemon' */ - protected boolean isDaemon=true; - - /** The threads that are part of the pool. - * Key is Thread, value is the ControlRunnable - */ - protected Hashtable threads = - new Hashtable(); - - protected Vector listeners = - new Vector(); - - /** Name of the threadpool - */ - protected String name = "TP"; - - /** - * Sequence. - */ - protected int sequence = 1; - - /** - * Thread priority. - */ - protected int threadPriority = Thread.NORM_PRIORITY; - - - /** - * Constructor. - */ - public ThreadPool() { - maxThreads = MAX_THREADS; - maxSpareThreads = MAX_SPARE_THREADS; - minSpareThreads = MIN_SPARE_THREADS; - currentThreadCount = 0; - currentThreadsBusy = 0; - stopThePool = false; - } - - - /** Create a ThreadPool instance. - * - * @param jmx UNUSED - * @return ThreadPool instance. If JMX support is requested, you need to - * call register() in order to set a name. - */ - public static ThreadPool createThreadPool(boolean jmx) { - return new ThreadPool(); - } - - public synchronized void start() { - stopThePool=false; - currentThreadCount = 0; - currentThreadsBusy = 0; - - adjustLimits(); - - pool = new ControlRunnable[maxThreads]; - - openThreads(minSpareThreads); - if (maxSpareThreads < maxThreads) { - monitor = new MonitorRunnable(this); - } - } - - public MonitorRunnable getMonitor() { - return monitor; - } - - /** - * Sets the thread priority for current - * and future threads in this pool. - * - * @param threadPriority The new priority - * @throws IllegalArgumentException If the specified - * priority is less than Thread.MIN_PRIORITY or - * more than Thread.MAX_PRIORITY - */ - public synchronized void setThreadPriority(int threadPriority) { - if(log.isDebugEnabled()) - log.debug(getClass().getName() + - ": setPriority(" + threadPriority + "): here."); - - if (threadPriority < Thread.MIN_PRIORITY) { - throw new IllegalArgumentException("new priority < MIN_PRIORITY"); - } else if (threadPriority > Thread.MAX_PRIORITY) { - throw new IllegalArgumentException("new priority > MAX_PRIORITY"); - } - - // Set for future threads - this.threadPriority = threadPriority; - - Enumeration currentThreads = getThreads(); - Thread t = null; - while(currentThreads.hasMoreElements()) { - t = currentThreads.nextElement(); - t.setPriority(threadPriority); - } - } - - /** - * Returns the priority level of current and - * future threads in this pool. - * - * @return The priority - */ - public int getThreadPriority() { - return threadPriority; - } - - - public void setMaxThreads(int maxThreads) { - this.maxThreads = maxThreads; - } - - public int getMaxThreads() { - return maxThreads; - } - - public void setMinSpareThreads(int minSpareThreads) { - this.minSpareThreads = minSpareThreads; - } - - public int getMinSpareThreads() { - return minSpareThreads; - } - - public void setMaxSpareThreads(int maxSpareThreads) { - this.maxSpareThreads = maxSpareThreads; - } - - public int getMaxSpareThreads() { - return maxSpareThreads; - } - - public int getCurrentThreadCount() { - return currentThreadCount; - } - - public int getCurrentThreadsBusy() { - return currentThreadsBusy; - } - - public boolean isDaemon() { - return isDaemon; - } - - public static int getDebug() { - return 0; - } - - /** The default is true - the created threads will be - * in daemon mode. If set to false, the control thread - * will not be daemon - and will keep the process alive. - */ - public void setDaemon( boolean b ) { - isDaemon=b; - } - - public boolean getDaemon() { - return isDaemon; - } - - public void setName(String name) { - this.name = name; - } - - public String getName() { - return name; - } - - public int getSequence() { - return sequence; - } - - public int incSequence() { - return sequence++; - } - - public void addThread( Thread t, ControlRunnable cr ) { - threads.put( t, cr ); - for( int i=0; i getThreads(){ - return threads.keys(); - } - - public void run(Runnable r) { - ControlRunnable c = findControlRunnable(); - c.runIt(r); - } - - // - // You may wonder what you see here ... basically I am trying - // to maintain a stack of threads. This way locality in time - // is kept and there is a better chance to find residues of the - // thread in memory next time it runs. - // - - /** - * Executes a given Runnable on a thread in the pool, block if needed. - */ - public void runIt(ThreadPoolRunnable r) { - if(null == r) { - throw new NullPointerException(); - } - - ControlRunnable c = findControlRunnable(); - c.runIt(r); - } - - private ControlRunnable findControlRunnable() { - ControlRunnable c=null; - - if ( stopThePool ) { - throw new IllegalStateException(); - } - - // Obtain a free thread from the pool. - synchronized(this) { - - while (currentThreadsBusy == currentThreadCount) { - // All threads are busy - if (currentThreadCount < maxThreads) { - // Not all threads were open, - // Open new threads up to the max number of idel threads - int toOpen = currentThreadCount + minSpareThreads; - openThreads(toOpen); - } else { - logFull(currentThreadCount, maxThreads); - // Wait for a thread to become idel. - try { - this.wait(); - } - // was just catch Throwable -- but no other - // exceptions can be thrown by wait, right? - // So we catch and ignore this one, since - // it'll never actually happen, since nowhere - // do we say pool.interrupt(). - catch(InterruptedException e) { - log.error("Unexpected exception", e); - } - if( log.isDebugEnabled() ) { - log.debug("Finished waiting: CTC="+currentThreadCount + - ", CTB=" + currentThreadsBusy); - } - // Pool was stopped. Get away of the pool. - if( stopThePool) { - break; - } - } - } - // Pool was stopped. Get away of the pool. - if(0 == currentThreadCount || stopThePool) { - throw new IllegalStateException(); - } - - // If we are here it means that there is a free thread. Take it. - int pos = currentThreadCount - currentThreadsBusy - 1; - c = pool[pos]; - pool[pos] = null; - currentThreadsBusy++; - - } - return c; - } - - private static void logFull(int currentThreadCount, int maxThreads) { - if( logfull ) { - log.error(sm.getString("threadpool.busy", - new Integer(currentThreadCount), - new Integer(maxThreads))); - logfull=false; - } else if( log.isDebugEnabled() ) { - log.debug("All threads are busy " + currentThreadCount + " " + - maxThreads ); - } - } - - /** - * Stop the thread pool - */ - public synchronized void shutdown() { - if(!stopThePool) { - stopThePool = true; - if (monitor != null) { - monitor.terminate(); - monitor = null; - } - for(int i = 0; i < currentThreadCount - currentThreadsBusy; i++) { - try { - pool[i].terminate(); - } catch(Throwable t) { - /* - * Do nothing... The show must go on, we are shutting - * down the pool and nothing should stop that. - */ - log.error("Ignored exception while shutting down thread pool", t); - } - } - currentThreadsBusy = currentThreadCount = 0; - pool = null; - notifyAll(); - } - } - - /** - * Called by the monitor thread to harvest idle threads. - */ - protected synchronized void checkSpareControllers() { - - if(stopThePool) { - return; - } - - if((currentThreadCount - currentThreadsBusy) > maxSpareThreads) { - int toFree = currentThreadCount - - currentThreadsBusy - - maxSpareThreads; - - for(int i = 0 ; i < toFree ; i++) { - ControlRunnable c = pool[currentThreadCount - currentThreadsBusy - 1]; - c.terminate(); - pool[currentThreadCount - currentThreadsBusy - 1] = null; - currentThreadCount --; - } - - } - - } - - /** - * Returns the thread to the pool. - * Called by threads as they are becoming idel. - */ - protected synchronized void returnController(ControlRunnable c) { - - if(0 == currentThreadCount || stopThePool) { - c.terminate(); - return; - } - - // atomic - currentThreadsBusy--; - - pool[currentThreadCount - currentThreadsBusy - 1] = c; - notify(); - } - - /** - * Inform the pool that the specific thread finish. - * - * Called by the ControlRunnable.run() when the runnable - * throws an exception. - */ - protected synchronized void notifyThreadEnd(ControlRunnable c) { - currentThreadsBusy--; - currentThreadCount --; - notify(); - } - - - /* - * Checks for problematic configuration and fix it. - * The fix provides reasonable settings for a single CPU - * with medium load. - */ - protected void adjustLimits() { - if(maxThreads <= 0) { - maxThreads = MAX_THREADS; - } else if (maxThreads < MAX_THREADS_MIN) { - log.warn(sm.getString("threadpool.max_threads_too_low", - new Integer(maxThreads), - new Integer(MAX_THREADS_MIN))); - maxThreads = MAX_THREADS_MIN; - } - - if(maxSpareThreads >= maxThreads) { - maxSpareThreads = maxThreads; - } - - if(maxSpareThreads <= 0) { - if(1 == maxThreads) { - maxSpareThreads = 1; - } else { - maxSpareThreads = maxThreads/2; - } - } - - if(minSpareThreads > maxSpareThreads) { - minSpareThreads = maxSpareThreads; - } - - if(minSpareThreads <= 0) { - if(1 == maxSpareThreads) { - minSpareThreads = 1; - } else { - minSpareThreads = maxSpareThreads/2; - } - } - } - - /** Create missing threads. - * - * @param toOpen Total number of threads we'll have open - */ - protected void openThreads(int toOpen) { - - if(toOpen > maxThreads) { - toOpen = maxThreads; - } - - for(int i = currentThreadCount ; i < toOpen ; i++) { - pool[i - currentThreadsBusy] = new ControlRunnable(this); - } - - currentThreadCount = toOpen; - } - - /** - * Periodically execute an action - cleanup in this case - */ - public static class MonitorRunnable implements Runnable { - ThreadPool p; - Thread t; - int interval=WORK_WAIT_TIMEOUT; - boolean shouldTerminate; - - MonitorRunnable(ThreadPool p) { - this.p=p; - this.start(); - } - - public void start() { - shouldTerminate = false; - t = new Thread(this); - t.setDaemon(p.getDaemon() ); - t.setName(p.getName() + "-Monitor"); - t.start(); - } - - public void setInterval(int i ) { - this.interval=i; - } - - public void run() { - while(true) { - try { - - // Sleep for a while. - synchronized(this) { - this.wait(interval); - } - - // Check if should terminate. - // termination happens when the pool is shutting down. - if(shouldTerminate) { - break; - } - - // Harvest idle threads. - p.checkSpareControllers(); - - } catch(Throwable t) { - ThreadPool.log.error("Unexpected exception", t); - } - } - } - - public void stop() { - this.terminate(); - } - - /** Stop the monitor - */ - public synchronized void terminate() { - shouldTerminate = true; - this.notify(); - } - } - - /** - * A Thread object that executes various actions ( ThreadPoolRunnable ) - * under control of ThreadPool - */ - public static class ControlRunnable implements Runnable { - /** - * ThreadPool where this thread will be returned - */ - private ThreadPool p; - - /** - * The thread that executes the actions - */ - private ThreadWithAttributes t; - - /** - * The method that is executed in this thread - */ - - private ThreadPoolRunnable toRun; - private Runnable toRunRunnable; - - /** - * Stop this thread - */ - private boolean shouldTerminate; - - /** - * Activate the execution of the action - */ - private boolean shouldRun; - - /** - * Per thread data - can be used only if all actions are - * of the same type. - * A better mechanism is possible ( that would allow association of - * thread data with action type ), but right now it's enough. - */ - private boolean noThData; - - /** - * Start a new thread, with no method in it - */ - ControlRunnable(ThreadPool p) { - toRun = null; - shouldTerminate = false; - shouldRun = false; - this.p = p; - t = new ThreadWithAttributes(p, this); - t.setDaemon(true); - t.setName(p.getName() + "-Processor" + p.incSequence()); - t.setPriority(p.getThreadPriority()); - p.addThread( t, this ); - noThData=true; - t.start(); - } - - public void run() { - boolean _shouldRun = false; - boolean _shouldTerminate = false; - ThreadPoolRunnable _toRun = null; - try { - while (true) { - try { - /* Wait for work. */ - synchronized (this) { - while (!shouldRun && !shouldTerminate) { - this.wait(); - } - _shouldRun = shouldRun; - _shouldTerminate = shouldTerminate; - _toRun = toRun; - } - - if (_shouldTerminate) { - if (ThreadPool.log.isDebugEnabled()) - ThreadPool.log.debug("Terminate"); - break; - } - - /* Check if should execute a runnable. */ - try { - if (noThData) { - if (_toRun != null) { - Object thData[] = _toRun.getInitData(); - t.setThreadData(p, thData); - if (ThreadPool.log.isDebugEnabled()) - ThreadPool.log.debug( - "Getting new thread data"); - } - noThData = false; - } - - if (_shouldRun) { - if (_toRun != null) { - _toRun.runIt(t.getThreadData(p)); - } else if (toRunRunnable != null) { - toRunRunnable.run(); - } else { - if (ThreadPool.log.isDebugEnabled()) - ThreadPool.log.debug("No toRun ???"); - } - } - } catch (Throwable t) { - ThreadPool.log.error(sm.getString - ("threadpool.thread_error", t, toRun.toString())); - /* - * The runnable throw an exception (can be even a ThreadDeath), - * signalling that the thread die. - * - * The meaning is that we should release the thread from - * the pool. - */ - _shouldTerminate = true; - _shouldRun = false; - p.notifyThreadEnd(this); - } finally { - if (_shouldRun) { - shouldRun = false; - /* - * Notify the pool that the thread is now idle. - */ - p.returnController(this); - } - } - - /* - * Check if should terminate. - * termination happens when the pool is shutting down. - */ - if (_shouldTerminate) { - break; - } - } catch (InterruptedException ie) { /* for the wait operation */ - // can never happen, since we don't call interrupt - ThreadPool.log.error("Unexpected exception", ie); - } - } - } finally { - p.removeThread(Thread.currentThread()); - } - } - /** Run a task - * - * @param toRun - */ - public synchronized void runIt(Runnable toRun) { - this.toRunRunnable = toRun; - // Do not re-init, the whole idea is to run init only once per - // thread - the pool is supposed to run a single task, that is - // initialized once. - // noThData = true; - shouldRun = true; - this.notify(); - } - - /** Run a task - * - * @param toRun - */ - public synchronized void runIt(ThreadPoolRunnable toRun) { - this.toRun = toRun; - // Do not re-init, the whole idea is to run init only once per - // thread - the pool is supposed to run a single task, that is - // initialized once. - // noThData = true; - shouldRun = true; - this.notify(); - } - - public void stop() { - this.terminate(); - } - - public void kill() { - t.stop(); - } - - public synchronized void terminate() { - shouldTerminate = true; - this.notify(); - } - } - - /** - * Debug display of the stage of each thread. The return is html style, - * for display in the console ( it can be easily parsed too ). - * - * @return The thread status display - */ - public String threadStatusString() { - StringBuffer sb=new StringBuffer(); - Iterator it=threads.keySet().iterator(); - sb.append("

    "); - while( it.hasNext()) { - sb.append("
  • "); - ThreadWithAttributes twa=(ThreadWithAttributes) - it.next(); - sb.append(twa.getCurrentStage(this) ).append(" "); - sb.append( twa.getParam(this)); - sb.append( "
  • \n"); - } - sb.append("
"); - return sb.toString(); - } - - /** Return an array with the status of each thread. The status - * indicates the current request processing stage ( for tomcat ) or - * whatever the thread is doing ( if the application using TP provide - * this info ) - * - * @return The status of all threads - */ - public String[] getThreadStatus() { - String status[]=new String[ threads.size()]; - Iterator it=threads.keySet().iterator(); - for( int i=0; ( i it=threads.keySet().iterator(); - for( int i=0; ( i object ), - // expensive. - - /** Called when this object is first loaded in the thread pool. - * Important: all workers in a pool must be of the same type, - * otherwise the mechanism becomes more complex. - */ - public Object[] getInitData(); - - /** This method will be executed in one of the pool's threads. The - * thread will be returned to the pool. - */ - public void runIt(Object thData[]); - -} diff --git a/java/org/apache/tomcat/util/threads/ThreadWithAttributes.java b/java/org/apache/tomcat/util/threads/ThreadWithAttributes.java deleted file mode 100644 index 04781316d..000000000 --- a/java/org/apache/tomcat/util/threads/ThreadWithAttributes.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tomcat.util.threads; - -import java.util.Hashtable; - -/** Special thread that allows storing of attributes and notes. - * A guard is used to prevent untrusted code from accessing the - * attributes. - * - * This avoids hash lookups and provide something very similar - * with ThreadLocal ( but compatible with JDK1.1 and faster on - * JDK < 1.4 ). - * - * The main use is to store 'state' for monitoring ( like "processing - * request 'GET /' "). - */ -public class ThreadWithAttributes extends Thread { - - private Object control; - public static int MAX_NOTES=16; - private Object notes[]=new Object[MAX_NOTES]; - private Hashtable attributes=new Hashtable(); - private String currentStage; - private Object param; - - private Object thData[]; - - public ThreadWithAttributes(Object control, Runnable r) { - super(r); - this.control=control; - } - - public final Object[] getThreadData(Object control ) { - if( this.control != control ) return null; - return thData; - } - - public final void setThreadData(Object control, Object thData[] ) { - if( this.control != control ) return; - this.thData=thData; - } - - /** Notes - for attributes that need fast access ( array ) - * The application is responsible for id management - */ - public final void setNote( Object control, int id, Object value ) { - if( this.control != control ) return; - notes[id]=value; - } - - /** Information about the curent performed operation - */ - public final String getCurrentStage(Object control) { - if( this.control != control ) return null; - return currentStage; - } - - /** Information about the current request ( or the main object - * we are processing ) - */ - public final Object getParam(Object control) { - if( this.control != control ) return null; - return param; - } - - public final void setCurrentStage(Object control, String currentStage) { - if( this.control != control ) return; - this.currentStage = currentStage; - } - - public final void setParam( Object control, Object param ) { - if( this.control != control ) return; - this.param=param; - } - - public final Object getNote(Object control, int id ) { - if( this.control != control ) return null; - return notes[id]; - } - - /** Generic attributes. You'll need a hashtable lookup - - * you can use notes for array access. - */ - public final Hashtable getAttributes(Object control) { - if( this.control != control ) return null; - return attributes; - } -} -- 2.11.0