- Add support for using an Executor (the idea in that case is to define one executor...
authorremm <remm@13f79535-47bb-0310-9956-ffa450edef68>
Sun, 23 Apr 2006 00:18:56 +0000 (00:18 +0000)
committerremm <remm@13f79535-47bb-0310-9956-ffa450edef68>
Sun, 23 Apr 2006 00:18:56 +0000 (00:18 +0000)
  server, with an appropriate queue, etc). By default, I think it is good to continue using the
  dumb stack, though.

git-svn-id: https://svn.apache.org/repos/asf/tomcat/tc6.0.x/trunk@396185 13f79535-47bb-0310-9956-ffa450edef68

java/org/apache/tomcat/util/net/AprEndpoint.java
java/org/apache/tomcat/util/net/JIoEndpoint.java
java/org/apache/tomcat/util/net/res/LocalStrings.properties

index 24db674..ed26b78 100644 (file)
@@ -1,5 +1,5 @@
 /*\r
- *  Copyright 2005 The Apache Software Foundation\r
+ *  Copyright 2005-2006 The Apache Software Foundation\r
  *\r
  *  Licensed under the Apache License, Version 2.0 (the "License");\r
  *  you may not use this file except in compliance with the License.\r
@@ -19,21 +19,22 @@ package org.apache.tomcat.util.net;
 import java.net.InetAddress;\r
 import java.util.ArrayList;\r
 import java.util.HashMap;\r
+import java.util.concurrent.Executor;\r
 \r
 import org.apache.commons.logging.Log;\r
 import org.apache.commons.logging.LogFactory;\r
-import org.apache.tomcat.jni.OS;\r
 import org.apache.tomcat.jni.Address;\r
 import org.apache.tomcat.jni.Error;\r
 import org.apache.tomcat.jni.File;\r
 import org.apache.tomcat.jni.Library;\r
+import org.apache.tomcat.jni.OS;\r
 import org.apache.tomcat.jni.Poll;\r
 import org.apache.tomcat.jni.Pool;\r
-import org.apache.tomcat.jni.Socket;\r
-import org.apache.tomcat.jni.Status;\r
 import org.apache.tomcat.jni.SSL;\r
 import org.apache.tomcat.jni.SSLContext;\r
 import org.apache.tomcat.jni.SSLSocket;\r
+import org.apache.tomcat.jni.Socket;\r
+import org.apache.tomcat.jni.Status;\r
 import org.apache.tomcat.util.res.StringManager;\r
 import org.apache.tomcat.util.threads.ThreadWithAttributes;\r
 \r
@@ -159,6 +160,14 @@ public class AprEndpoint {
 \r
 \r
     /**\r
+     * External Executor based thread pool.\r
+     */\r
+    protected Executor executor = null;\r
+    public void setExecutor(Executor executor) { this.executor = executor; }\r
+    public Executor getExecutor() { return executor; }\r
+\r
+\r
+    /**\r
      * Maximum amount of worker threads.\r
      */\r
     protected int maxThreads = 40;\r
@@ -685,7 +694,9 @@ public class AprEndpoint {
             paused = false;\r
 \r
             // Create worker collection\r
-            workers = new WorkerStack(maxThreads);\r
+            if (executor == null) {\r
+                workers = new WorkerStack(maxThreads);\r
+            }\r
 \r
             // Start acceptor thread\r
             for (int i = 0; i < acceptorThreadCount; i++) {\r
@@ -962,7 +973,26 @@ public class AprEndpoint {
             }\r
         }\r
     }\r
+\r
     \r
+    /**\r
+     * Process given socket.\r
+     */\r
+    protected boolean processSocket(long socket) {\r
+        try {\r
+            if (executor == null) {\r
+                getWorkerThread().assign(socket);\r
+            } else {\r
+                executor.execute(new SocketProcessor(socket));\r
+            }\r
+        } catch (Throwable t) {\r
+            // This means we got an OOM or similar creating a thread, or that\r
+            // the pool and its queue are full\r
+            log.error(sm.getString("endpoint.process.fail"), t);\r
+            return false;\r
+        }\r
+        return true;\r
+    }\r
     \r
 \r
     // --------------------------------------------------- Acceptor Inner Class\r
@@ -993,14 +1023,10 @@ public class AprEndpoint {
                 }\r
 \r
                 try {\r
-                    // Allocate a new worker thread\r
-                    Worker workerThread = getWorkerThread();\r
                     // Accept the next incoming connection from the server socket\r
                     long socket = Socket.accept(serverSock);\r
                     // Hand this socket off to an appropriate processor\r
-                    if (setSocketOptions(socket)) {\r
-                        workerThread.assign(socket);\r
-                    } else {\r
+                    if (!setSocketOptions(socket) || !processSocket(socket)) {\r
                         // Close socket and pool right away\r
                         Socket.destroy(socket);\r
                     }\r
@@ -1154,15 +1180,14 @@ public class AprEndpoint {
                     if (rv > 0) {\r
                         keepAliveCount -= rv;\r
                         for (int n = 0; n < rv; n++) {\r
-                            // Check for failed sockets\r
+                            // Check for failed sockets and hand this socket off to a worker\r
                             if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)\r
-                                    || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)) {\r
+                                    || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)\r
+                                    || (!processSocket(desc[n*2+1]))) {\r
                                 // Close socket and clear pool\r
                                 Socket.destroy(desc[n*2+1]);\r
                                 continue;\r
                             }\r
-                            // Hand this socket off to a worker\r
-                            getWorkerThread().assign(desc[n*2+1]);\r
                         }\r
                     } else if (rv < 0) {\r
                         int errn = -rv;\r
@@ -1548,7 +1573,9 @@ public class AprEndpoint {
                                     Socket.timeoutSet(state.socket, soTimeout * 1000);\r
                                     // If all done hand this socket off to a worker for\r
                                     // processing of further requests\r
-                                    getWorkerThread().assign(state.socket);\r
+                                    if (!processSocket(state.socket)) {\r
+                                        Socket.destroy(state.socket);\r
+                                    }\r
                                 } else {\r
                                     // Close the socket since this is\r
                                     // the end of not keep-alive request.\r
@@ -1651,4 +1678,34 @@ public class AprEndpoint {
         }\r
     }\r
 \r
+\r
+    // ---------------------------------------------- SocketProcessor Inner Class\r
+\r
+\r
+    /**\r
+     * This class is the equivalent of the Worker, but will simply use in an\r
+     * external Executor thread pool.\r
+     */\r
+    protected class SocketProcessor implements Runnable {\r
+        \r
+        protected long socket = 0;\r
+        \r
+        public SocketProcessor(long socket) {\r
+            this.socket = socket;\r
+        }\r
+\r
+        public void run() {\r
+\r
+            // Process the request from this socket\r
+            if (!handler.process(socket)) {\r
+                // Close socket and pool\r
+                Socket.destroy(socket);\r
+                socket = 0;\r
+            }\r
+\r
+        }\r
+        \r
+    }\r
+    \r
+    \r
 }\r
index 908743b..f783d5d 100644 (file)
@@ -1,5 +1,5 @@
 /*
- *  Copyright 1999-2004 The Apache Software Foundation
+ *  Copyright 1999-2006 The Apache Software Foundation
  *
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@ import java.net.BindException;
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.util.concurrent.Executor;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -49,7 +50,7 @@ public class JIoEndpoint {
     // -------------------------------------------------------------- Constants
 
 
-    protected static Log log=LogFactory.getLog(JIoEndpoint.class );
+    protected static Log log = LogFactory.getLog(JIoEndpoint.class);
 
     protected StringManager sm = 
         StringManager.getManager("org.apache.tomcat.util.net.res");
@@ -116,6 +117,14 @@ public class JIoEndpoint {
 
 
     /**
+     * External Executor based thread pool.
+     */
+    protected Executor executor = null;
+    public void setExecutor(Executor executor) { this.executor = executor; }
+    public Executor getExecutor() { return executor; }
+
+
+    /**
      * Maximum amount of worker threads.
      */
     protected int maxThreads = 40;
@@ -272,17 +281,12 @@ public class JIoEndpoint {
                     }
                 }
 
-                // Allocate a new worker thread
-                Worker workerThread = getWorkerThread();
-
                 // Accept the next incoming connection from the server socket
                 try {
                     Socket socket = serverSocketFactory.acceptSocket(serverSocket);
                     serverSocketFactory.initSocket(socket);
                     // Hand this socket off to an appropriate processor
-                    if (setSocketOptions(socket)) {
-                        workerThread.assign(socket);
-                    } else {
+                    if (!setSocketOptions(socket) || !processSocket(socket)) {
                         // Close socket right away
                         try {
                             socket.close();
@@ -302,6 +306,40 @@ public class JIoEndpoint {
     }
 
 
+    // ------------------------------------------- SocketProcessor Inner Class
+
+
+    /**
+     * This class is the equivalent of the Worker, but will simply use in an
+     * external Executor thread pool.
+     */
+    protected class SocketProcessor implements Runnable {
+        
+        protected Socket socket = null;
+        
+        public SocketProcessor(Socket socket) {
+            this.socket = socket;
+        }
+
+        public void run() {
+
+            // Process the request from this socket
+            if (!handler.process(socket)) {
+                // Close socket
+                try {
+                    socket.close();
+                } catch (IOException e) {
+                }
+            }
+
+            // Finish up this request
+            socket = null;
+
+        }
+        
+    }
+    
+    
     // ----------------------------------------------------- Worker Inner Class
 
 
@@ -442,6 +480,11 @@ public class JIoEndpoint {
             running = true;
             paused = false;
 
+            // Create worker collection
+            if (executor == null) {
+                workers = new WorkerStack(maxThreads);
+            }
+
             // Start acceptor thread
             acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor");
             acceptorThread.setPriority(threadPriority);
@@ -480,7 +523,7 @@ public class JIoEndpoint {
         }
         if (serverSocket != null) {
             try {
-                if (serverSocket!=null)
+                if (serverSocket != null)
                     serverSocket.close();
             } catch (Exception e) {
                 log.error(sm.getString("endpoint.err.close"), e);
@@ -636,6 +679,26 @@ public class JIoEndpoint {
     }
 
 
+    /**
+     * Process given socket.
+     */
+    protected boolean processSocket(Socket socket) {
+        try {
+            if (executor == null) {
+                getWorkerThread().assign(socket);
+            } else {
+                executor.execute(new SocketProcessor(socket));
+            }
+        } catch (Throwable t) {
+            // This means we got an OOM or similar creating a thread, or that
+            // the pool and its queue are full
+            log.error(sm.getString("endpoint.process.fail"), t);
+            return false;
+        }
+        return true;
+    }
+    
+
     // ------------------------------------------------- WorkerStack Inner Class
 
 
index 9bc59a1..13f3f42 100644 (file)
@@ -19,6 +19,7 @@ endpoint.poll.limitedpollsize=Failed to create poller with specified size of {0}
 endpoint.poll.initfail=Poller creation failed\r
 endpoint.poll.fail=Critical poller failure (restarting poller): [{0}] {1}\r
 endpoint.poll.error=Unexpected poller error\r
+endpoint.process.fail=Error allocating socket processor\r
 endpoint.sendfile.error=Unexpected sendfile error\r
 endpoint.sendfile.addfail=Sednfile failure: [{0}] {1}\r
 endpoint.sendfile.nosupport=Disabling sendfile, since either the APR version or the system doesn't support it\r