/*\r
- * Copyright 2005 The Apache Software Foundation\r
+ * Copyright 2005-2006 The Apache Software Foundation\r
*\r
* Licensed under the Apache License, Version 2.0 (the "License");\r
* you may not use this file except in compliance with the License.\r
import java.net.InetAddress;\r
import java.util.ArrayList;\r
import java.util.HashMap;\r
+import java.util.concurrent.Executor;\r
\r
import org.apache.commons.logging.Log;\r
import org.apache.commons.logging.LogFactory;\r
-import org.apache.tomcat.jni.OS;\r
import org.apache.tomcat.jni.Address;\r
import org.apache.tomcat.jni.Error;\r
import org.apache.tomcat.jni.File;\r
import org.apache.tomcat.jni.Library;\r
+import org.apache.tomcat.jni.OS;\r
import org.apache.tomcat.jni.Poll;\r
import org.apache.tomcat.jni.Pool;\r
-import org.apache.tomcat.jni.Socket;\r
-import org.apache.tomcat.jni.Status;\r
import org.apache.tomcat.jni.SSL;\r
import org.apache.tomcat.jni.SSLContext;\r
import org.apache.tomcat.jni.SSLSocket;\r
+import org.apache.tomcat.jni.Socket;\r
+import org.apache.tomcat.jni.Status;\r
import org.apache.tomcat.util.res.StringManager;\r
import org.apache.tomcat.util.threads.ThreadWithAttributes;\r
\r
\r
\r
/**\r
+ * External Executor based thread pool.\r
+ */\r
+ protected Executor executor = null;\r
+ public void setExecutor(Executor executor) { this.executor = executor; }\r
+ public Executor getExecutor() { return executor; }\r
+\r
+\r
+ /**\r
* Maximum amount of worker threads.\r
*/\r
protected int maxThreads = 40;\r
paused = false;\r
\r
// Create worker collection\r
- workers = new WorkerStack(maxThreads);\r
+ if (executor == null) {\r
+ workers = new WorkerStack(maxThreads);\r
+ }\r
\r
// Start acceptor thread\r
for (int i = 0; i < acceptorThreadCount; i++) {\r
}\r
}\r
}\r
+\r
\r
+ /**\r
+ * Process given socket.\r
+ */\r
+ protected boolean processSocket(long socket) {\r
+ try {\r
+ if (executor == null) {\r
+ getWorkerThread().assign(socket);\r
+ } else {\r
+ executor.execute(new SocketProcessor(socket));\r
+ }\r
+ } catch (Throwable t) {\r
+ // This means we got an OOM or similar creating a thread, or that\r
+ // the pool and its queue are full\r
+ log.error(sm.getString("endpoint.process.fail"), t);\r
+ return false;\r
+ }\r
+ return true;\r
+ }\r
\r
\r
// --------------------------------------------------- Acceptor Inner Class\r
}\r
\r
try {\r
- // Allocate a new worker thread\r
- Worker workerThread = getWorkerThread();\r
// Accept the next incoming connection from the server socket\r
long socket = Socket.accept(serverSock);\r
// Hand this socket off to an appropriate processor\r
- if (setSocketOptions(socket)) {\r
- workerThread.assign(socket);\r
- } else {\r
+ if (!setSocketOptions(socket) || !processSocket(socket)) {\r
// Close socket and pool right away\r
Socket.destroy(socket);\r
}\r
if (rv > 0) {\r
keepAliveCount -= rv;\r
for (int n = 0; n < rv; n++) {\r
- // Check for failed sockets\r
+ // Check for failed sockets and hand this socket off to a worker\r
if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)\r
- || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)) {\r
+ || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)\r
+ || (!processSocket(desc[n*2+1]))) {\r
// Close socket and clear pool\r
Socket.destroy(desc[n*2+1]);\r
continue;\r
}\r
- // Hand this socket off to a worker\r
- getWorkerThread().assign(desc[n*2+1]);\r
}\r
} else if (rv < 0) {\r
int errn = -rv;\r
Socket.timeoutSet(state.socket, soTimeout * 1000);\r
// If all done hand this socket off to a worker for\r
// processing of further requests\r
- getWorkerThread().assign(state.socket);\r
+ if (!processSocket(state.socket)) {\r
+ Socket.destroy(state.socket);\r
+ }\r
} else {\r
// Close the socket since this is\r
// the end of not keep-alive request.\r
}\r
}\r
\r
+\r
+ // ---------------------------------------------- SocketProcessor Inner Class\r
+\r
+\r
+ /**\r
+ * This class is the equivalent of the Worker, but will simply use in an\r
+ * external Executor thread pool.\r
+ */\r
+ protected class SocketProcessor implements Runnable {\r
+ \r
+ protected long socket = 0;\r
+ \r
+ public SocketProcessor(long socket) {\r
+ this.socket = socket;\r
+ }\r
+\r
+ public void run() {\r
+\r
+ // Process the request from this socket\r
+ if (!handler.process(socket)) {\r
+ // Close socket and pool\r
+ Socket.destroy(socket);\r
+ socket = 0;\r
+ }\r
+\r
+ }\r
+ \r
+ }\r
+ \r
+ \r
}\r
/*
- * Copyright 1999-2004 The Apache Software Foundation
+ * Copyright 1999-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
+import java.util.concurrent.Executor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
// -------------------------------------------------------------- Constants
- protected static Log log=LogFactory.getLog(JIoEndpoint.class );
+ protected static Log log = LogFactory.getLog(JIoEndpoint.class);
protected StringManager sm =
StringManager.getManager("org.apache.tomcat.util.net.res");
/**
+ * External Executor based thread pool.
+ */
+ protected Executor executor = null;
+ public void setExecutor(Executor executor) { this.executor = executor; }
+ public Executor getExecutor() { return executor; }
+
+
+ /**
* Maximum amount of worker threads.
*/
protected int maxThreads = 40;
}
}
- // Allocate a new worker thread
- Worker workerThread = getWorkerThread();
-
// Accept the next incoming connection from the server socket
try {
Socket socket = serverSocketFactory.acceptSocket(serverSocket);
serverSocketFactory.initSocket(socket);
// Hand this socket off to an appropriate processor
- if (setSocketOptions(socket)) {
- workerThread.assign(socket);
- } else {
+ if (!setSocketOptions(socket) || !processSocket(socket)) {
// Close socket right away
try {
socket.close();
}
+ // ------------------------------------------- SocketProcessor Inner Class
+
+
+ /**
+ * This class is the equivalent of the Worker, but will simply use in an
+ * external Executor thread pool.
+ */
+ protected class SocketProcessor implements Runnable {
+
+ protected Socket socket = null;
+
+ public SocketProcessor(Socket socket) {
+ this.socket = socket;
+ }
+
+ public void run() {
+
+ // Process the request from this socket
+ if (!handler.process(socket)) {
+ // Close socket
+ try {
+ socket.close();
+ } catch (IOException e) {
+ }
+ }
+
+ // Finish up this request
+ socket = null;
+
+ }
+
+ }
+
+
// ----------------------------------------------------- Worker Inner Class
running = true;
paused = false;
+ // Create worker collection
+ if (executor == null) {
+ workers = new WorkerStack(maxThreads);
+ }
+
// Start acceptor thread
acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor");
acceptorThread.setPriority(threadPriority);
}
if (serverSocket != null) {
try {
- if (serverSocket!=null)
+ if (serverSocket != null)
serverSocket.close();
} catch (Exception e) {
log.error(sm.getString("endpoint.err.close"), e);
}
+ /**
+ * Process given socket.
+ */
+ protected boolean processSocket(Socket socket) {
+ try {
+ if (executor == null) {
+ getWorkerThread().assign(socket);
+ } else {
+ executor.execute(new SocketProcessor(socket));
+ }
+ } catch (Throwable t) {
+ // This means we got an OOM or similar creating a thread, or that
+ // the pool and its queue are full
+ log.error(sm.getString("endpoint.process.fail"), t);
+ return false;
+ }
+ return true;
+ }
+
+
// ------------------------------------------------- WorkerStack Inner Class