--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.net;
+
+import org.apache.tomcat.util.res.StringManager;
+/**
+ *
+ * @author fhanik
+ * @author Mladen Turk
+ * @author Remy Maucherat
+ */
+public abstract class AbstractEndpoint {
+
+ // -------------------------------------------------------------- Constants
+ protected StringManager sm = StringManager.getManager("org.apache.tomcat.util.net.res");
+
+ /**
+ * The Request attribute key for the cipher suite.
+ */
+ public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite";
+
+ /**
+ * The Request attribute key for the key size.
+ */
+ public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size";
+
+ /**
+ * The Request attribute key for the client certificate chain.
+ */
+ public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate";
+
+ /**
+ * The Request attribute key for the session id.
+ * This one is a Tomcat extension to the Servlet spec.
+ */
+ public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session";
+
+ /**
+ * The request attribute key for the session manager.
+ * This one is a Tomcat extension to the Servlet spec.
+ */
+ public static final String SESSION_MGR = "javax.servlet.request.ssl_session_mgr";
+
+}
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.jni.Socket;
import org.apache.tomcat.jni.Status;
import org.apache.tomcat.util.res.StringManager;
+import org.apache.tomcat.util.threads.ResizableExecutor;
+import org.apache.tomcat.util.threads.TaskQueue;
+import org.apache.tomcat.util.threads.TaskThreadFactory;
+import org.apache.tomcat.util.threads.ThreadPoolExecutor;
/**
* APR tailored thread pool, providing the following services:
* @author Mladen Turk
* @author Remy Maucherat
*/
-public class AprEndpoint {
+public class AprEndpoint extends AbstractEndpoint {
// -------------------------------------------------------------- Constants
protected static Log log = LogFactory.getLog(AprEndpoint.class);
- protected static StringManager sm =
- StringManager.getManager("org.apache.tomcat.util.net.res");
/**
*/
public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session";
- /**
- * The request attribute key for the session manager.
- * This one is a Tomcat extension to the Servlet spec.
- */
- public static final String SESSION_MGR =
- "javax.servlet.request.ssl_session_mgr";
-
// ----------------------------------------------------------------- Fields
/**
- * Available workers.
- */
- protected WorkerStack workers = null;
-
-
- /**
* Running state of the endpoint.
*/
protected volatile boolean running = false;
protected long sslContext = 0;
+ /**
+ * Are we using an internal executor
+ */
+ protected volatile boolean internalExecutor = false;
// ------------------------------------------------------------- Properties
protected int maxThreads = 200;
public void setMaxThreads(int maxThreads) {
this.maxThreads = maxThreads;
- if (running) {
- synchronized(workers) {
- workers.resize(maxThreads);
- }
+ if (running && executor instanceof ResizableExecutor) {
+ ((ResizableExecutor)executor).resizePool(getMinSpareThreads(), getMaxThreads());
}
}
public int getMaxThreads() { return maxThreads; }
*/
public int getCurrentThreadCount() {
if (executor!=null) {
- return -1;
+ if (executor instanceof ThreadPoolExecutor) {
+ return ((ThreadPoolExecutor)executor).getPoolSize();
+ } else if (executor instanceof ResizableExecutor) {
+ return ((ResizableExecutor)executor).getPoolSize();
+ } else {
+ return -1;
+ }
} else {
- return curThreads;
+ return -2;
}
}
*/
public int getCurrentThreadsBusy() {
if (executor!=null) {
- return -1;
+ if (executor instanceof ThreadPoolExecutor) {
+ return ((ThreadPoolExecutor)executor).getActiveCount();
+ } else if (executor instanceof ResizableExecutor) {
+ return ((ResizableExecutor)executor).getActiveCount();
+ } else {
+ return -1;
+ }
} else {
- return workers!=null?curThreads - workers.size():0;
+ return -2;
}
}
// Create worker collection
if (executor == null) {
- workers = new WorkerStack(maxThreads);
+ internalExecutor = true;
+ TaskQueue taskqueue = new TaskQueue();
+ TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
+ executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
+ taskqueue.setParent( (ThreadPoolExecutor) executor);
}
// Start poller threads
sendfiles = null;
}
}
+ if ( executor!=null && internalExecutor ) {
+ if ( executor instanceof ThreadPoolExecutor ) {
+ //this is our internal one, so we need to shut it down
+ ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;
+ tpe.shutdownNow();
+ TaskQueue queue = (TaskQueue) tpe.getQueue();
+ queue.setParent(null);
+ }
+ executor = null;
+ }
}
}
- /**
- * Create (or allocate) and return an available processor for use in
- * processing a specific HTTP request, if possible. If the maximum
- * allowed processors have already been created and are in use, return
- * <code>null</code> instead.
- */
- protected Worker createWorkerThread() {
-
- synchronized (workers) {
- if (workers.size() > 0) {
- curThreadsBusy++;
- return (workers.pop());
- }
- if ((maxThreads > 0) && (curThreads < maxThreads)) {
- curThreadsBusy++;
- if (curThreadsBusy == maxThreads) {
- log.info(sm.getString("endpoint.info.maxThreads",
- Integer.toString(maxThreads), address,
- Integer.toString(port)));
- }
- return (newWorkerThread());
- } else {
- if (maxThreads < 0) {
- curThreadsBusy++;
- return (newWorkerThread());
- } else {
- return (null);
- }
- }
- }
-
- }
-
-
- /**
- * Create and return a new processor suitable for processing HTTP
- * requests and returning the corresponding responses.
- */
- protected Worker newWorkerThread() {
-
- Worker workerThread = new Worker();
- workerThread.start();
- return (workerThread);
-
- }
-
-
- /**
- * Return a new worker thread, and block while to worker is available.
- */
- protected Worker getWorkerThread() {
- // Allocate a new worker thread
- Worker workerThread = createWorkerThread();
- while (workerThread == null) {
- try {
- synchronized (workers) {
- workers.wait();
- }
- } catch (InterruptedException e) {
- // Ignore
- }
- workerThread = createWorkerThread();
- }
- return workerThread;
- }
-
-
- /**
- * Recycle the specified Processor so that it can be used again.
- *
- * @param workerThread The processor to be recycled
- */
- protected void recycleWorkerThread(Worker workerThread) {
- synchronized (workers) {
- workers.push(workerThread);
- curThreadsBusy--;
- workers.notify();
- }
- }
-
/**
* Allocate a new poller of the specified size.
*/
protected boolean processSocketWithOptions(long socket) {
try {
- if (executor == null) {
- getWorkerThread().assignWithOptions(socket);
- } else {
- executor.execute(new SocketWithOptionsProcessor(socket));
- }
+ executor.execute(new SocketWithOptionsProcessor(socket));
+ } catch (RejectedExecutionException x) {
+ log.warn("Socket processing request was rejected for:"+socket,x);
+ return false;
} catch (Throwable t) {
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
*/
protected boolean processSocket(long socket) {
try {
- if (executor == null) {
- getWorkerThread().assign(socket);
- } else {
- executor.execute(new SocketProcessor(socket));
- }
+ executor.execute(new SocketProcessor(socket));
+ } catch (RejectedExecutionException x) {
+ log.warn("Socket processing request was rejected for:"+socket,x);
+ return false;
} catch (Throwable t) {
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
*/
protected boolean processSocket(long socket, SocketStatus status) {
try {
- if (executor == null) {
- getWorkerThread().assign(socket, status);
- } else {
- executor.execute(new SocketEventProcessor(socket, status));
- }
+ executor.execute(new SocketEventProcessor(socket, status));
+ } catch (RejectedExecutionException x) {
+ log.warn("Socket processing request was rejected for:"+socket,x);
+ return false;
} catch (Throwable t) {
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
// ----------------------------------------------------- Worker Inner Class
- /**
- * Server processor class.
- */
- protected class Worker implements Runnable {
-
-
- protected Thread thread = null;
- protected boolean available = false;
- protected long socket = 0;
- protected SocketStatus status = null;
- protected boolean options = false;
-
-
- /**
- * Process an incoming TCP/IP connection on the specified socket. Any
- * exception that occurs during processing must be logged and swallowed.
- * <b>NOTE</b>: This method is called from our Connector's thread. We
- * must assign it to our own thread so that multiple simultaneous
- * requests can be handled.
- *
- * @param socket TCP socket to process
- */
- protected synchronized void assignWithOptions(long socket) {
-
- // Wait for the Processor to get the previous Socket
- while (available) {
- try {
- wait();
- } catch (InterruptedException e) {
- }
- }
-
- // Store the newly available Socket and notify our thread
- this.socket = socket;
- status = null;
- options = true;
- available = true;
- notifyAll();
-
- }
-
-
- /**
- * Process an incoming TCP/IP connection on the specified socket. Any
- * exception that occurs during processing must be logged and swallowed.
- * <b>NOTE</b>: This method is called from our Connector's thread. We
- * must assign it to our own thread so that multiple simultaneous
- * requests can be handled.
- *
- * @param socket TCP socket to process
- */
- protected synchronized void assign(long socket) {
-
- // Wait for the Processor to get the previous Socket
- while (available) {
- try {
- wait();
- } catch (InterruptedException e) {
- }
- }
-
- // Store the newly available Socket and notify our thread
- this.socket = socket;
- status = null;
- options = false;
- available = true;
- notifyAll();
-
- }
-
-
- protected synchronized void assign(long socket, SocketStatus status) {
-
- // Wait for the Processor to get the previous Socket
- while (available) {
- try {
- wait();
- } catch (InterruptedException e) {
- }
- }
-
- // Store the newly available Socket and notify our thread
- this.socket = socket;
- this.status = status;
- options = false;
- available = true;
- notifyAll();
-
- }
-
-
- /**
- * Await a newly assigned Socket from our Connector, or <code>null</code>
- * if we are supposed to shut down.
- */
- protected synchronized long await() {
-
- // Wait for the Connector to provide a new Socket
- while (!available) {
- try {
- wait();
- } catch (InterruptedException e) {
- }
- }
-
- // Notify the Connector that we have received this Socket
- long socket = this.socket;
- available = false;
- notifyAll();
-
- return (socket);
-
- }
-
-
- /**
- * The background thread that listens for incoming TCP/IP connections and
- * hands them off to an appropriate processor.
- */
- public void run() {
-
- // Process requests until we receive a shutdown signal
- while (running) {
-
- // Wait for the next socket to be assigned
- long socket = await();
- if (socket == 0)
- continue;
-
- if (!deferAccept && options) {
- if (setSocketOptions(socket)) {
- getPoller().add(socket);
- } else {
- // Close socket and pool
- Socket.destroy(socket);
- socket = 0;
- }
- } else {
-
- // Process the request from this socket
- if ((status != null) && (handler.event(socket, status) == Handler.SocketState.CLOSED)) {
- // Close socket and pool
- Socket.destroy(socket);
- socket = 0;
- } else if ((status == null) && ((options && !setSocketOptions(socket))
- || handler.process(socket) == Handler.SocketState.CLOSED)) {
- // Close socket and pool
- Socket.destroy(socket);
- socket = 0;
- }
- }
-
- // Finish up this request
- recycleWorkerThread(this);
-
- }
-
- }
-
-
- /**
- * Start the background processing thread.
- */
- public void start() {
- thread = new Thread(this);
- thread.setName(getName() + "-" + (++curThreads));
- thread.setDaemon(true);
- thread.start();
- }
-
-
- }
// ----------------------------------------------- SendfileData Inner Class
}
- // ------------------------------------------------- WorkerStack Inner Class
-
-
- public class WorkerStack {
-
- protected Worker[] workers = null;
- protected int end = 0;
-
- public WorkerStack(int size) {
- workers = new Worker[size];
- }
-
- /**
- * Put the object into the queue. If the queue is full (for example if
- * the queue has been reduced in size) the object will be dropped.
- *
- * @param object the object to be appended to the queue (first
- * element).
- */
- public void push(Worker worker) {
- if (end < workers.length) {
- workers[end++] = worker;
- } else {
- curThreads--;
- }
- }
-
- /**
- * Get the first object out of the queue. Return null if the queue
- * is empty.
- */
- public Worker pop() {
- if (end > 0) {
- return workers[--end];
- }
- return null;
- }
-
- /**
- * Get the first object out of the queue, Return null if the queue
- * is empty.
- */
- public Worker peek() {
- return workers[end];
- }
-
- /**
- * Is the queue empty?
- */
- public boolean isEmpty() {
- return (end == 0);
- }
-
- /**
- * How many elements are there in this queue?
- */
- public int size() {
- return (end);
- }
-
- /**
- * Resize the queue. If there are too many objects in the queue for the
- * new size, drop the excess.
- *
- * @param newSize
- */
- public void resize(int newSize) {
- Worker[] newWorkers = new Worker[newSize];
- int len = workers.length;
- if (newSize < len) {
- len = newSize;
- }
- System.arraycopy(workers, 0, newWorkers, 0, len);
- workers = newWorkers;
- }
- }
-
// ---------------------------------------------- SocketProcessor Inner Class
+++ /dev/null
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tomcat.util.net;
-
-import java.net.InetAddress;
-import java.util.concurrent.Executor;
-
-import org.apache.juli.logging.Log;
-import org.apache.juli.logging.LogFactory;
-import org.apache.tomcat.util.res.StringManager;
-
-/**
- * APR tailored thread pool, providing the following services:
- * <ul>
- * <li>Socket acceptor thread</li>
- * <li>Socket poller thread</li>
- * <li>Sendfile thread</li>
- * <li>Worker threads pool</li>
- * </ul>
- *
- * When switching to Java 5, there's an opportunity to use the virtual
- * machine's thread pool.
- *
- * @author Mladen Turk
- * @author Remy Maucherat
- */
-public abstract class BaseEndpoint {
-
-
- // -------------------------------------------------------------- Constants
-
-
- protected static Log log = LogFactory.getLog(BaseEndpoint.class);
-
- protected static StringManager sm =
- StringManager.getManager("org.apache.tomcat.util.net.res");
-
-
- /**
- * The Request attribute key for the cipher suite.
- */
- public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite";
-
- /**
- * The Request attribute key for the key size.
- */
- public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size";
-
- /**
- * The Request attribute key for the client certificate chain.
- */
- public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate";
-
- /**
- * The Request attribute key for the session id.
- * This one is a Tomcat extension to the Servlet spec.
- */
- public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session";
-
-
- // ----------------------------------------------------------------- Fields
-
-
- /**
- * Running state of the endpoint.
- */
- protected volatile boolean running = false;
-
-
- /**
- * Will be set to true whenever the endpoint is paused.
- */
- protected volatile boolean paused = false;
-
-
- /**
- * Track the initialization state of the endpoint.
- */
- protected boolean initialized = false;
-
-
- /**
- * Current worker threads busy count.
- */
- protected int curThreadsBusy = 0;
-
-
- /**
- * Current worker threads count.
- */
- protected int curThreads = 0;
-
-
- /**
- * Sequence number used to generate thread names.
- */
- protected int sequence = 0;
-
-
- // ------------------------------------------------------------- Properties
-
-
- /**
- * External Executor based thread pool.
- */
- protected Executor executor = null;
- public void setExecutor(Executor executor) { this.executor = executor; }
- public Executor getExecutor() { return executor; }
-
-
- /**
- * Maximum amount of worker threads.
- */
- protected int maxThreads = 200;
- public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; }
- public int getMaxThreads() { return maxThreads; }
-
-
- /**
- * Priority of the acceptor and poller threads.
- */
- protected int threadPriority = Thread.NORM_PRIORITY;
- public void setThreadPriority(int threadPriority) { this.threadPriority = threadPriority; }
- public int getThreadPriority() { return threadPriority; }
-
-
- /**
- * Server socket port.
- */
- protected int port;
- public int getPort() { return port; }
- public void setPort(int port ) { this.port=port; }
-
-
- /**
- * Address for the server socket.
- */
- protected InetAddress address;
- public InetAddress getAddress() { return address; }
- public void setAddress(InetAddress address) { this.address = address; }
-
-
- /**
- * Allows the server developer to specify the backlog that
- * should be used for server sockets. By default, this value
- * is 100.
- */
- protected int backlog = 100;
- public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; }
- public int getBacklog() { return backlog; }
-
-
- /**
- * Socket TCP no delay.
- */
- protected boolean tcpNoDelay = false;
- public boolean getTcpNoDelay() { return tcpNoDelay; }
- public void setTcpNoDelay(boolean tcpNoDelay) { this.tcpNoDelay = tcpNoDelay; }
-
-
- /**
- * Socket linger.
- */
- protected int soLinger = 100;
- public int getSoLinger() { return soLinger; }
- public void setSoLinger(int soLinger) { this.soLinger = soLinger; }
-
-
- /**
- * Socket timeout.
- */
- protected int soTimeout = -1;
- public int getSoTimeout() { return soTimeout; }
- public void setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; }
-
-
- /**
- * The default is true - the created threads will be
- * in daemon mode. If set to false, the control thread
- * will not be daemon - and will keep the process alive.
- */
- protected boolean daemon = true;
- public void setDaemon(boolean b) { daemon = b; }
- public boolean getDaemon() { return daemon; }
-
-
- /**
- * Name of the thread pool, which will be used for naming child threads.
- */
- protected String name = "TP";
- public void setName(String name) { this.name = name; }
- public String getName() { return name; }
-
-
- /**
- * Dummy maxSpareThreads property.
- */
- public int getMaxSpareThreads() { return 0; }
-
-
- /**
- * Dummy minSpareThreads property.
- */
- public int getMinSpareThreads() { return 0; }
-
-
- // --------------------------------------------------------- Public Methods
-
-
- /**
- * Return the amount of threads that are managed by the pool.
- *
- * @return the amount of threads that are managed by the pool
- */
- public int getCurrentThreadCount() {
- return curThreads;
- }
-
-
- /**
- * Return the amount of threads currently busy.
- *
- * @return the amount of threads currently busy
- */
- public int getCurrentThreadsBusy() {
- return curThreadsBusy;
- }
-
-
- /**
- * Return the state of the endpoint.
- *
- * @return true if the endpoint is running, false otherwise
- */
- public boolean isRunning() {
- return running;
- }
-
-
- /**
- * Return the state of the endpoint.
- *
- * @return true if the endpoint is paused, false otherwise
- */
- public boolean isPaused() {
- return paused;
- }
-
-
- // ----------------------------------------------- Public Lifecycle Methods
-
-
- /**
- * Initialize the endpoint.
- */
- public abstract void init()
- throws Exception;
-
-
- /**
- * Start the APR endpoint, creating acceptor, poller and sendfile threads.
- */
- public abstract void start()
- throws Exception;
-
-
- /**
- * Pause the endpoint, which will make it stop accepting new sockets.
- */
- public void pause() {
- if (running && !paused) {
- paused = true;
- unlockAccept();
- }
- }
-
-
- /**
- * Resume the endpoint, which will make it start accepting new sockets
- * again.
- */
- public void resume() {
- if (running) {
- paused = false;
- }
- }
-
-
- /**
- * Stop the endpoint. This will cause all processing threads to stop.
- */
- public abstract void stop();
-
-
- /**
- * Deallocate APR memory pools, and close server socket.
- */
- public abstract void destroy() throws Exception;
-
-
- // ------------------------------------------------------ Protected Methods
-
-
- /**
- * Get a sequence number used for thread naming.
- */
- protected int getSequence() {
- return sequence++;
- }
-
-
- /**
- * Unlock the server socket accept using a bugus connection.
- */
- protected void unlockAccept() {
- java.net.Socket s = null;
- try {
- // Need to create a connection to unlock the accept();
- if (address == null) {
- s = new java.net.Socket("127.0.0.1", port);
- } else {
- s = new java.net.Socket(address, port);
- // setting soLinger to a small value will help shutdown the
- // connection quicker
- s.setSoLinger(true, 0);
- }
- } catch(Exception e) {
- if (log.isDebugEnabled()) {
- log.debug(sm.getString("endpoint.debug.unlock", "" + port), e);
- }
- } finally {
- if (s != null) {
- try {
- s.close();
- } catch (Exception e) {
- // Ignore
- }
- }
- }
- }
-
-
-}
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.IntrospectionUtils;
import org.apache.tomcat.util.res.StringManager;
+import org.apache.tomcat.util.threads.ResizableExecutor;
+import org.apache.tomcat.util.threads.TaskQueue;
+import org.apache.tomcat.util.threads.TaskThreadFactory;
+import org.apache.tomcat.util.threads.ThreadPoolExecutor;
/**
* Handle incoming TCP connections.
* @author Yoav Shapira
* @author Remy Maucherat
*/
-public class JIoEndpoint {
+public class JIoEndpoint extends AbstractEndpoint {
// -------------------------------------------------------------- Constants
-
protected static Log log = LogFactory.getLog(JIoEndpoint.class);
- protected StringManager sm =
- StringManager.getManager("org.apache.tomcat.util.net.res");
-
-
- /**
- * The Request attribute key for the cipher suite.
- */
- public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite";
-
- /**
- * The Request attribute key for the key size.
- */
- public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size";
-
- /**
- * The Request attribute key for the client certificate chain.
- */
- public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate";
-
- /**
- * The Request attribute key for the session id.
- * This one is a Tomcat extension to the Servlet spec.
- */
- public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session";
-
-
// ----------------------------------------------------------------- Fields
/**
- * Available workers.
- */
- protected WorkerStack workers = null;
-
-
- /**
* Running state of the endpoint.
*/
protected volatile boolean running = false;
*/
protected SocketProperties socketProperties = new SocketProperties();
+ /**
+ * Are we using an internal executor
+ */
+ protected volatile boolean internalExecutor = false;
// ------------------------------------------------------------- Properties
public void setMaxThreads(int maxThreads) {
this.maxThreads = maxThreads;
if (running) {
- synchronized(workers) {
- workers.resize(maxThreads);
- }
+ //TODO Dynamic resize
+ log.error("Resizing executor dynamically is not possible at this time.");
}
}
public int getMaxThreads() { return maxThreads; }
+ public int minSpareThreads = 10;
+ public int getMinSpareThreads() {
+ return Math.min(minSpareThreads,getMaxThreads());
+ }
+ public void setMinSpareThreads(int minSpareThreads) {
+ this.minSpareThreads = minSpareThreads;
+ }
/**
* Priority of the acceptor and poller threads.
*/
public int getCurrentThreadCount() {
if (executor!=null) {
- return -1;
+ if (executor instanceof ThreadPoolExecutor) {
+ return ((ThreadPoolExecutor)executor).getPoolSize();
+ } else if (executor instanceof ResizableExecutor) {
+ return ((ResizableExecutor)executor).getPoolSize();
+ } else {
+ return -1;
+ }
} else {
- return curThreads;
+ return -2;
}
}
*/
public int getCurrentThreadsBusy() {
if (executor!=null) {
- return -1;
+ if (executor instanceof ThreadPoolExecutor) {
+ return ((ThreadPoolExecutor)executor).getActiveCount();
+ } else if (executor instanceof ResizableExecutor) {
+ return ((ResizableExecutor)executor).getActiveCount();
+ } else {
+ return -1;
+ }
} else {
- return workers!=null?curThreads - workers.size():0;
+ return -2;
}
}
}
- // ----------------------------------------------------- Worker Inner Class
-
-
- protected class Worker implements Runnable {
-
- protected Thread thread = null;
- protected boolean available = false;
- protected Socket socket = null;
-
-
- /**
- * Process an incoming TCP/IP connection on the specified socket. Any
- * exception that occurs during processing must be logged and swallowed.
- * <b>NOTE</b>: This method is called from our Connector's thread. We
- * must assign it to our own thread so that multiple simultaneous
- * requests can be handled.
- *
- * @param socket TCP socket to process
- */
- synchronized void assign(Socket socket) {
-
- // Wait for the Processor to get the previous Socket
- while (available) {
- try {
- wait();
- } catch (InterruptedException e) {
- }
- }
-
- // Store the newly available Socket and notify our thread
- this.socket = socket;
- available = true;
- notifyAll();
-
- }
-
-
- /**
- * Await a newly assigned Socket from our Connector, or <code>null</code>
- * if we are supposed to shut down.
- */
- private synchronized Socket await() {
-
- // Wait for the Connector to provide a new Socket
- while (!available) {
- try {
- wait();
- } catch (InterruptedException e) {
- }
- }
-
- // Notify the Connector that we have received this Socket
- Socket socket = this.socket;
- available = false;
- notifyAll();
-
- return (socket);
-
- }
-
-
-
- /**
- * The background thread that listens for incoming TCP/IP connections and
- * hands them off to an appropriate processor.
- */
- public void run() {
-
- // Process requests until we receive a shutdown signal
- while (running) {
-
- // Wait for the next socket to be assigned
- Socket socket = await();
- if (socket == null)
- continue;
-
- // Process the request from this socket
- if (!setSocketOptions(socket) || !handler.process(socket)) {
- // Close socket
- try {
- socket.close();
- } catch (IOException e) {
- }
- }
-
- // Finish up this request
- socket = null;
- recycleWorkerThread(this);
-
- }
-
- }
-
-
- /**
- * Start the background processing thread.
- */
- public void start() {
- thread = new Thread(this);
- thread.setName(getName() + "-" + (++curThreads));
- thread.setDaemon(true);
- thread.start();
- }
-
-
- }
-
// -------------------- Public methods --------------------
// Create worker collection
if (executor == null) {
- workers = new WorkerStack(maxThreads);
+ internalExecutor = true;
+ TaskQueue taskqueue = new TaskQueue();
+ TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
+ executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
+ taskqueue.setParent( (ThreadPoolExecutor) executor);
}
// Start acceptor threads
running = false;
unlockAccept();
}
+ if ( executor!=null && internalExecutor ) {
+ if ( executor instanceof ThreadPoolExecutor ) {
+ //this is our internal one, so we need to shut it down
+ ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;
+ tpe.shutdownNow();
+ TaskQueue queue = (TaskQueue) tpe.getQueue();
+ queue.setParent(null);
+ }
+ executor = null;
+ }
}
/**
/**
- * Create (or allocate) and return an available processor for use in
- * processing a specific HTTP request, if possible. If the maximum
- * allowed processors have already been created and are in use, return
- * <code>null</code> instead.
- */
- protected Worker createWorkerThread() {
-
- synchronized (workers) {
- if (workers.size() > 0) {
- curThreadsBusy++;
- return workers.pop();
- }
- if ((maxThreads > 0) && (curThreads < maxThreads)) {
- curThreadsBusy++;
- if (curThreadsBusy == maxThreads) {
- log.info(sm.getString("endpoint.info.maxThreads",
- Integer.toString(maxThreads), address,
- Integer.toString(port)));
- }
- return (newWorkerThread());
- } else {
- if (maxThreads < 0) {
- curThreadsBusy++;
- return (newWorkerThread());
- } else {
- return (null);
- }
- }
- }
-
- }
-
-
- /**
- * Create and return a new processor suitable for processing HTTP
- * requests and returning the corresponding responses.
- */
- protected Worker newWorkerThread() {
-
- Worker workerThread = new Worker();
- workerThread.start();
- return (workerThread);
-
- }
-
-
- /**
- * Return a new worker thread, and block while to worker is available.
- */
- protected Worker getWorkerThread() {
- // Allocate a new worker thread
- Worker workerThread = createWorkerThread();
- while (workerThread == null) {
- try {
- synchronized (workers) {
- workers.wait();
- }
- } catch (InterruptedException e) {
- // Ignore
- }
- workerThread = createWorkerThread();
- }
- return workerThread;
- }
-
-
- /**
- * Recycle the specified Processor so that it can be used again.
- *
- * @param workerThread The processor to be recycled
- */
- protected void recycleWorkerThread(Worker workerThread) {
- synchronized (workers) {
- workers.push(workerThread);
- curThreadsBusy--;
- workers.notify();
- }
- }
-
-
- /**
* Process given socket.
*/
protected boolean processSocket(Socket socket) {
try {
- if (executor == null) {
- getWorkerThread().assign(socket);
- } else {
- executor.execute(new SocketProcessor(socket));
- }
+ executor.execute(new SocketProcessor(socket));
+ } catch (RejectedExecutionException x) {
+ log.warn("Socket processing request was rejected for:"+socket,x);
+ return false;
} catch (Throwable t) {
+
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
}
- // ------------------------------------------------- WorkerStack Inner Class
-
-
- public class WorkerStack {
-
- protected Worker[] workers = null;
- protected int end = 0;
-
- public WorkerStack(int size) {
- workers = new Worker[size];
- }
-
- /**
- * Put the object into the queue. If the queue is full (for example if
- * the queue has been reduced in size) the object will be dropped.
- *
- * @param object the object to be appended to the queue (first
- * element).
- */
- public void push(Worker worker) {
- if (end < workers.length) {
- workers[end++] = worker;
- } else {
- curThreads--;
- }
- }
-
- /**
- * Get the first object out of the queue. Return null if the queue
- * is empty.
- */
- public Worker pop() {
- if (end > 0) {
- return workers[--end];
- }
- return null;
- }
-
- /**
- * Get the first object out of the queue, Return null if the queue
- * is empty.
- */
- public Worker peek() {
- return workers[end];
- }
-
- /**
- * Is the queue empty?
- */
- public boolean isEmpty() {
- return (end == 0);
- }
-
- /**
- * How many elements are there in this queue?
- */
- public int size() {
- return (end);
- }
-
- /**
- * Resize the queue. If there are too many objects in the queue for the
- * new size, drop the excess.
- *
- * @param newSize
- */
- public void resize(int newSize) {
- Worker[] newWorkers = new Worker[newSize];
- int len = workers.length;
- if (newSize < len) {
- len = newSize;
- }
- System.arraycopy(workers, 0, newWorkers, 0, len);
- workers = newWorkers;
- }
- }
-
}
import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler;
import org.apache.tomcat.util.net.jsse.NioX509KeyManager;
import org.apache.tomcat.util.res.StringManager;
+import org.apache.tomcat.util.threads.ResizableExecutor;
import org.apache.tomcat.util.threads.TaskQueue;
import org.apache.tomcat.util.threads.TaskThreadFactory;
import org.apache.tomcat.util.threads.ThreadPoolExecutor;
* @author Remy Maucherat
* @author Filip Hanik
*/
-public class NioEndpoint {
+public class NioEndpoint extends AbstractEndpoint {
// -------------------------------------------------------------- Constants
protected static Log log = LogFactory.getLog(NioEndpoint.class);
- protected static StringManager sm =
- StringManager.getManager("org.apache.tomcat.util.net.res");
-
-
- /**
- * The Request attribute key for the cipher suite.
- */
- public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite";
-
- /**
- * The Request attribute key for the key size.
- */
- public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size";
-
- /**
- * The Request attribute key for the client certificate chain.
- */
- public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate";
-
- /**
- * The Request attribute key for the session id.
- * This one is a Tomcat extension to the Servlet spec.
- */
- public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session";
public static final int OP_REGISTER = 0x100; //register interest op
public static final int OP_CALLBACK = 0x200; //callback interest op
/**
* Are we using an internal executor
*/
- protected boolean internalExecutor = true;
+ protected volatile boolean internalExecutor = false;
protected boolean useExecutor = true;
/**
/**
* Dummy maxSpareThreads property.
*/
- public int getMaxSpareThreads() { return Math.min(getMaxThreads(),5); }
+ public int getMaxSpareThreads() { return Math.min(getMaxThreads(),getMinSpareThreads()); }
- /**
- * Dummy minSpareThreads property.
- */
- public int getMinSpareThreads() { return Math.min(getMaxThreads(),5); }
+ public int minSpareThreads = 10;
+ public int getMinSpareThreads() {
+ return Math.min(minSpareThreads,getMaxThreads());
+ }
+ public void setMinSpareThreads(int minSpareThreads) {
+ this.minSpareThreads = minSpareThreads;
+ }
/**
* Generic properties, introspected
if (executor!=null) {
if (executor instanceof ThreadPoolExecutor) {
return ((ThreadPoolExecutor)executor).getPoolSize();
+ } else if (executor instanceof ResizableExecutor) {
+ return ((ResizableExecutor)executor).getPoolSize();
} else {
return -1;
}
if (executor!=null) {
if (executor instanceof ThreadPoolExecutor) {
return ((ThreadPoolExecutor)executor).getActiveCount();
+ } else if (executor instanceof ResizableExecutor) {
+ return ((ResizableExecutor)executor).getActiveCount();
} else {
return -1;
}
if ( dispatch && executor!=null ) executor.execute(sc);
else sc.run();
} catch (RejectedExecutionException rx) {
- if (log.isDebugEnabled()) {
- log.debug("Unable to process socket, executor rejected the task.",rx);
- }
+ log.warn("Socket processing request was rejected for:"+socket,rx);
return false;
} catch (Throwable t) {
// This means we got an OOM or similar creating a thread, or that
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.threads;
+
+import java.util.concurrent.Executor;
+
+public interface ResizableExecutor extends Executor {
+ /**
+ * {@link java.util.concurrent.ThreadPoolExecutor#getPoolSize()}
+ * @return {@link java.util.concurrent.ThreadPoolExecutor#getPoolSize()}
+ */
+ public int getPoolSize();
+
+ /**
+ * {@link java.util.concurrent.ThreadPoolExecutor#getActiveCount()}
+ * @return {@link java.util.concurrent.ThreadPoolExecutor#getActiveCount()}
+ */
+ public int getActiveCount();
+
+ public boolean resizePool(int corePoolSize, int maximumPoolSize);
+
+ public boolean resizeQueue(int capacity);
+
+}