- Start work on comet support. Note: it doesn't work yet, I think (I didn't test...
authorremm <remm@13f79535-47bb-0310-9956-ffa450edef68>
Wed, 17 May 2006 12:55:39 +0000 (12:55 +0000)
committerremm <remm@13f79535-47bb-0310-9956-ffa450edef68>
Wed, 17 May 2006 12:55:39 +0000 (12:55 +0000)
  is very preliminary. It is relatively straightforward, though.

git-svn-id: https://svn.apache.org/repos/asf/tomcat/tc6.0.x/trunk@407241 13f79535-47bb-0310-9956-ffa450edef68

java/org/apache/catalina/CometProcessor.java [new file with mode: 0644]
java/org/apache/catalina/connector/CoyoteAdapter.java
java/org/apache/catalina/connector/Request.java
java/org/apache/catalina/servlets/CometServlet.java [new file with mode: 0644]
java/org/apache/coyote/ajp/AjpAprProtocol.java
java/org/apache/coyote/http11/Http11AprProcessor.java
java/org/apache/coyote/http11/Http11AprProtocol.java
java/org/apache/coyote/http11/InternalAprInputBuffer.java
java/org/apache/tomcat/util/net/AprEndpoint.java

diff --git a/java/org/apache/catalina/CometProcessor.java b/java/org/apache/catalina/CometProcessor.java
new file mode 100644 (file)
index 0000000..1c219dd
--- /dev/null
@@ -0,0 +1,21 @@
+package org.apache.catalina;
+
+import java.io.IOException;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+public interface CometProcessor {
+
+    public void begin(HttpServletRequest request, HttpServletResponse response)
+        throws IOException, ServletException;
+    public void end(HttpServletRequest request, HttpServletResponse response)
+        throws IOException, ServletException;
+
+    public void error(HttpServletRequest request, HttpServletResponse response)
+        throws IOException, ServletException;
+    public void read(HttpServletRequest request, HttpServletResponse response)
+        throws IOException, ServletException;
+
+}
index a10a4d5..ace2985 100644 (file)
@@ -19,6 +19,7 @@ package org.apache.catalina.connector;
 \r
 import java.io.IOException;\r
 \r
+import org.apache.catalina.CometProcessor;\r
 import org.apache.catalina.Context;\r
 import org.apache.catalina.Globals;\r
 import org.apache.catalina.Wrapper;\r
@@ -135,10 +136,35 @@ public class CoyoteAdapter
 \r
         }\r
 \r
+        // Comet processing\r
+        if (request.getWrapper() != null \r
+                && request.getWrapper() instanceof CometProcessor) {\r
+            try {\r
+                if (request.getAttribute("org.apache.tomcat.comet.error") != null) {\r
+                    ((CometProcessor) request.getWrapper()).error(request.getRequest(), response.getResponse());\r
+                } else {\r
+                    ((CometProcessor) request.getWrapper()).read(request.getRequest(), response.getResponse());\r
+                }\r
+            } catch (IOException e) {\r
+                ;\r
+            } catch (Throwable t) {\r
+                log.error(sm.getString("coyoteAdapter.service"), t);\r
+            } finally {\r
+                // Recycle the wrapper request and response\r
+                if (request.getAttribute("org.apache.tomcat.comet") == null) {\r
+                    request.recycle();\r
+                    response.recycle();\r
+                }\r
+            }\r
+            return;\r
+        }\r
+        \r
         if (connector.getXpoweredBy()) {\r
             response.addHeader("X-Powered-By", "Servlet/2.5");\r
         }\r
 \r
+        boolean comet = false;\r
+        \r
         try {\r
 \r
             // Parse and set Catalina and configuration specific \r
@@ -148,8 +174,16 @@ public class CoyoteAdapter
                 connector.getContainer().getPipeline().getFirst().invoke(request, response);\r
             }\r
 \r
-            response.finishResponse();\r
-            req.action( ActionCode.ACTION_POST_REQUEST , null);\r
+            if (request.getAttribute("org.apache.tomcat.comet.support") == Boolean.TRUE \r
+                    && request.getWrapper() instanceof CometProcessor) {\r
+                request.setAttribute("org.apache.tomcat.comet", Boolean.TRUE);\r
+                comet = true;\r
+            }\r
+\r
+            if (!comet) {\r
+                response.finishResponse();\r
+                req.action( ActionCode.ACTION_POST_REQUEST , null);\r
+            }\r
 \r
         } catch (IOException e) {\r
             ;\r
@@ -157,8 +191,10 @@ public class CoyoteAdapter
             log.error(sm.getString("coyoteAdapter.service"), t);\r
         } finally {\r
             // Recycle the wrapper request and response\r
-            request.recycle();\r
-            response.recycle();\r
+            if (!comet) {\r
+                request.recycle();\r
+                response.recycle();\r
+            }\r
         }\r
 \r
     }\r
index 94140c4..d46e66e 100644 (file)
@@ -1294,6 +1294,12 @@ public class Request
         if (readOnlyAttributes.containsKey(name)) {\r
             return;\r
         }\r
+\r
+        // Pass special attributes to the native layer\r
+        if (name.startsWith("org.apache.tomcat.")) {\r
+            coyoteRequest.getAttributes().remove(name);\r
+        }\r
+\r
         found = attributes.containsKey(name);\r
         if (found) {\r
             value = attributes.get(name);\r
@@ -1301,7 +1307,7 @@ public class Request
         } else {\r
             return;\r
         }\r
-\r
+        \r
         // Notify interested application event listeners\r
         Object listeners[] = context.getApplicationEventListeners();\r
         if ((listeners == null) || (listeners.length == 0))\r
diff --git a/java/org/apache/catalina/servlets/CometServlet.java b/java/org/apache/catalina/servlets/CometServlet.java
new file mode 100644 (file)
index 0000000..066eb4a
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.catalina.servlets;
+
+
+import java.io.IOException;
+
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.catalina.CometProcessor;
+
+
+/**
+ * Helper class to implement Comet functionality.
+ */
+public abstract class CometServlet
+    extends HttpServlet implements CometProcessor {
+
+    public void begin(HttpServletRequest request, HttpServletResponse response)
+        throws IOException, ServletException {
+        
+    }
+    
+    public void end(HttpServletRequest request, HttpServletResponse response)
+        throws IOException, ServletException {
+        request.removeAttribute("org.apache.tomcat.comet");
+    }
+    
+    public void error(HttpServletRequest request, HttpServletResponse response)
+        throws IOException, ServletException {
+        end(request, response);
+    }
+    
+    public abstract void read(HttpServletRequest request, HttpServletResponse response)
+        throws IOException, ServletException;
+
+    protected void service(HttpServletRequest request, HttpServletResponse response)
+        throws IOException, ServletException {
+        
+        if (request.getAttribute("org.apache.tomcat.comet.support") == Boolean.TRUE) {
+            begin(request, response);
+        } else {
+            // FIXME: Implement without comet support
+            begin(request, response);
+            
+            // Loop reading data
+            
+            end(request, response);
+        }
+        
+    }
+
+}
index 862ef4a..5670ecc 100644 (file)
@@ -35,6 +35,7 @@ import org.apache.coyote.RequestInfo;
 import org.apache.tomcat.util.modeler.Registry;\r
 import org.apache.tomcat.util.net.AprEndpoint;\r
 import org.apache.tomcat.util.net.AprEndpoint.Handler;\r
+import org.apache.tomcat.util.net.AprEndpoint.Handler.SocketState;\r
 import org.apache.tomcat.util.res.StringManager;\r
 \r
 \r
@@ -429,7 +430,12 @@ public class AjpAprProtocol
             this.proto = proto;\r
         }\r
 \r
-        public boolean process(long socket) {\r
+        // FIXME: Support for this could be added in AJP as well\r
+        public SocketState event(long socket, boolean error) {\r
+            return SocketState.CLOSED;\r
+        }\r
+        \r
+        public SocketState process(long socket) {\r
             AjpAprProcessor processor = null;\r
             try {\r
                 processor = (AjpAprProcessor) localProcessor.get();\r
@@ -460,7 +466,11 @@ public class AjpAprProtocol
                     ((ActionHook) processor).action(ActionCode.ACTION_START, null);\r
                 }\r
 \r
-                return processor.process(socket);\r
+                if (processor.process(socket)) {\r
+                    return SocketState.OPEN;\r
+                } else {\r
+                    return SocketState.CLOSED;\r
+                }\r
 \r
             } catch(java.net.SocketException e) {\r
                 // SocketExceptions are normal\r
@@ -487,7 +497,7 @@ public class AjpAprProtocol
                     ((ActionHook) processor).action(ActionCode.ACTION_STOP, null);\r
                 }\r
             }\r
-            return false;\r
+            return SocketState.CLOSED;\r
         }\r
     }\r
 \r
index 47267de..b6d5194 100644 (file)
@@ -52,6 +52,7 @@ import org.apache.tomcat.util.buf.MessageBytes;
 import org.apache.tomcat.util.http.FastHttpDateFormat;\r
 import org.apache.tomcat.util.http.MimeHeaders;\r
 import org.apache.tomcat.util.net.AprEndpoint;\r
+import org.apache.tomcat.util.net.AprEndpoint.Handler.SocketState;\r
 import org.apache.tomcat.util.res.StringManager;\r
 \r
 \r
@@ -147,12 +148,6 @@ public class Http11AprProcessor implements ActionHook {
 \r
 \r
     /**\r
-     * State flag.\r
-     */\r
-    protected boolean started = false;\r
-\r
-\r
-    /**\r
      * Error flag.\r
      */\r
     protected boolean error = false;\r
@@ -183,6 +178,12 @@ public class Http11AprProcessor implements ActionHook {
 \r
 \r
     /**\r
+     * Comet used.\r
+     */\r
+    protected boolean comet = false;\r
+\r
+\r
+    /**\r
      * Content delimitator for the request (if false, the connection will\r
      * be closed at the end of the request).\r
      */\r
@@ -735,7 +736,53 @@ public class Http11AprProcessor implements ActionHook {
      *\r
      * @throws IOException error during an I/O operation\r
      */\r
-    public boolean process(long socket)\r
+    public SocketState event(boolean error)\r
+        throws IOException {\r
+        \r
+        RequestInfo rp = request.getRequestProcessor();\r
+        \r
+        try {\r
+            rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);\r
+            if (error) {\r
+                request.setAttribute("org.apache.tomcat.comet.error", Boolean.TRUE);\r
+            }\r
+            // FIXME: It is also possible to add a new "event" method in the adapter\r
+            // or something similar\r
+            adapter.service(request, response);\r
+            if (request.getAttribute("org.apache.tomcat.comet") == null) {\r
+                comet = false;\r
+                endpoint.getCometPoller().remove(socket);\r
+            }\r
+        } catch (InterruptedIOException e) {\r
+            error = true;\r
+        } catch (Throwable t) {\r
+            log.error(sm.getString("http11processor.request.process"), t);\r
+            // 500 - Internal Server Error\r
+            response.setStatus(500);\r
+            error = true;\r
+        }\r
+        \r
+        rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);\r
+\r
+        if (error) {\r
+            recycle();\r
+            return SocketState.CLOSED;\r
+        } else if (!comet) {\r
+            recycle();\r
+            endpoint.getPoller().add(socket);\r
+            return SocketState.OPEN;\r
+        } else {\r
+            return SocketState.LONG;\r
+        }\r
+    }\r
+    \r
+    /**\r
+     * Process pipelined HTTP requests using the specified input and output\r
+     * streams.\r
+     *\r
+     * @throws IOException error during an I/O operation\r
+     */\r
+    public SocketState process(long socket)\r
         throws IOException {\r
         RequestInfo rp = request.getRequestProcessor();\r
         rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);\r
@@ -768,7 +815,7 @@ public class Http11AprProcessor implements ActionHook {
         boolean keptAlive = false;\r
         boolean openSocket = false;\r
 \r
-        while (started && !error && keepAlive) {\r
+        while (!error && keepAlive) {\r
 \r
             // Parsing the request header\r
             try {\r
@@ -833,7 +880,10 @@ public class Http11AprProcessor implements ActionHook {
                         error = response.getErrorException() != null ||\r
                                 statusDropsConnection(response.getStatus());\r
                     }\r
-\r
+                    // Comet support\r
+                    if (request.getAttribute("org.apache.tomcat.comet") != null) {\r
+                        comet = true;\r
+                    }\r
                 } catch (InterruptedIOException e) {\r
                     error = true;\r
                 } catch (Throwable t) {\r
@@ -845,25 +895,8 @@ public class Http11AprProcessor implements ActionHook {
             }\r
 \r
             // Finish the handling of the request\r
-            try {\r
-                rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);\r
-                inputBuffer.endRequest();\r
-            } catch (IOException e) {\r
-                error = true;\r
-            } catch (Throwable t) {\r
-                log.error(sm.getString("http11processor.request.finish"), t);\r
-                // 500 - Internal Server Error\r
-                response.setStatus(500);\r
-                error = true;\r
-            }\r
-            try {\r
-                rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);\r
-                outputBuffer.endRequest();\r
-            } catch (IOException e) {\r
-                error = true;\r
-            } catch (Throwable t) {\r
-                log.error(sm.getString("http11processor.response.finish"), t);\r
-                error = true;\r
+            if (!comet) {\r
+                endRequest();\r
             }\r
 \r
             // If there was an error, make sure the request is counted as\r
@@ -873,17 +906,8 @@ public class Http11AprProcessor implements ActionHook {
             }\r
             request.updateCounters();\r
 \r
-            rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);\r
-\r
-            // Don't reset the param - we'll see it as ended. Next request\r
-            // will reset it\r
-            // thrA.setParam(null);\r
-            // Next request\r
-            inputBuffer.nextRequest();\r
-            outputBuffer.nextRequest();\r
-\r
             // Do sendfile as needed: add socket to sendfile and end\r
-            if (sendfileData != null) {\r
+            if (sendfileData != null && !error) {\r
                 sendfileData.socket = socket;\r
                 sendfileData.keepAlive = keepAlive;\r
                 if (!endpoint.getSendfile().add(sendfileData)) {\r
@@ -892,19 +916,63 @@ public class Http11AprProcessor implements ActionHook {
                 }\r
             }\r
             \r
+            rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);\r
+\r
         }\r
 \r
         rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);\r
 \r
-        // Recycle\r
+        if (comet) {\r
+            if (error) {\r
+                recycle();\r
+                return SocketState.CLOSED;\r
+            } else {\r
+                endpoint.getCometPoller().add(socket);\r
+                return SocketState.LONG;\r
+            }\r
+        } else {\r
+            recycle();\r
+            return (openSocket) ? SocketState.OPEN : SocketState.CLOSED;\r
+        }\r
+        \r
+    }\r
+\r
+    \r
+    public void endRequest() {\r
+        \r
+        // Finish the handling of the request\r
+        try {\r
+            inputBuffer.endRequest();\r
+        } catch (IOException e) {\r
+            error = true;\r
+        } catch (Throwable t) {\r
+            log.error(sm.getString("http11processor.request.finish"), t);\r
+            // 500 - Internal Server Error\r
+            response.setStatus(500);\r
+            error = true;\r
+        }\r
+        try {\r
+            outputBuffer.endRequest();\r
+        } catch (IOException e) {\r
+            error = true;\r
+        } catch (Throwable t) {\r
+            log.error(sm.getString("http11processor.response.finish"), t);\r
+            error = true;\r
+        }\r
+\r
+        // Next request\r
+        inputBuffer.nextRequest();\r
+        outputBuffer.nextRequest();\r
+        \r
+    }\r
+    \r
+    \r
+    public void recycle() {\r
         inputBuffer.recycle();\r
         outputBuffer.recycle();\r
         this.socket = 0;\r
-\r
-        return openSocket;\r
-        \r
     }\r
-\r
+    \r
 \r
     // ----------------------------------------------------- ActionHook Methods\r
 \r
@@ -966,6 +1034,7 @@ public class Http11AprProcessor implements ActionHook {
             // End the processing of the current request, and stop any further\r
             // transactions with the client\r
 \r
+            comet = false;\r
             try {\r
                 outputBuffer.endRequest();\r
             } catch (IOException e) {\r
@@ -985,14 +1054,6 @@ public class Http11AprProcessor implements ActionHook {
 \r
             // Do nothing\r
 \r
-        } else if (actionCode == ActionCode.ACTION_START) {\r
-\r
-            started = true;\r
-\r
-        } else if (actionCode == ActionCode.ACTION_STOP) {\r
-\r
-            started = false;\r
-\r
         } else if (actionCode == ActionCode.ACTION_REQ_HOST_ADDR_ATTRIBUTE) {\r
 \r
             // Get remote host address\r
@@ -1368,6 +1429,8 @@ public class Http11AprProcessor implements ActionHook {
         if (endpoint.getUseSendfile()) {\r
             request.setAttribute("org.apache.tomcat.sendfile.support", Boolean.TRUE);\r
         }\r
+        // Advertise comet support through a request attribute\r
+        request.setAttribute("org.apache.tomcat.comet.support", Boolean.TRUE);\r
         \r
     }\r
 \r
index 0f037d8..dd10203 100644 (file)
@@ -20,6 +20,7 @@ import java.net.InetAddress;
 import java.net.URLEncoder;\r
 import java.util.Hashtable;\r
 import java.util.Iterator;\r
+import java.util.concurrent.ConcurrentHashMap;\r
 import java.util.concurrent.Executor;\r
 \r
 import javax.management.MBeanRegistration;\r
@@ -598,20 +599,73 @@ public class Http11AprProtocol implements ProtocolHandler, MBeanRegistration
     // --------------------  Connection handler --------------------\r
 \r
     static class Http11ConnectionHandler implements Handler {\r
-        Http11AprProtocol proto;\r
-        static int count=0;\r
-        RequestGroupInfo global=new RequestGroupInfo();\r
-        ThreadLocal localProcessor = new ThreadLocal();\r
-\r
-        Http11ConnectionHandler( Http11AprProtocol proto ) {\r
-            this.proto=proto;\r
+        \r
+        protected Http11AprProtocol proto;\r
+        protected static int count = 0;\r
+        protected RequestGroupInfo global = new RequestGroupInfo();\r
+        \r
+        protected ThreadLocal<Http11AprProcessor> localProcessor = \r
+            new ThreadLocal<Http11AprProcessor>();\r
+        protected ConcurrentHashMap<Long, Http11AprProcessor> connections =\r
+            new ConcurrentHashMap<Long, Http11AprProcessor>();\r
+        protected java.util.Stack<Http11AprProcessor> recycledProcessors = \r
+            new java.util.Stack<Http11AprProcessor>();\r
+\r
+        Http11ConnectionHandler(Http11AprProtocol proto) {\r
+            this.proto = proto;\r
         }\r
 \r
-        public boolean process(long socket) {\r
+        public SocketState event(long socket, boolean error) {\r
+            Http11AprProcessor result = connections.get(socket);\r
+            SocketState state = SocketState.CLOSED; \r
+            if (result != null) {\r
+                boolean recycle = error;\r
+                // Call the appropriate event\r
+                try {\r
+                    state = result.event(error);\r
+                } catch (java.net.SocketException e) {\r
+                    // SocketExceptions are normal\r
+                    Http11AprProtocol.log.debug\r
+                        (sm.getString\r
+                            ("http11protocol.proto.socketexception.debug"), e);\r
+                } catch (java.io.IOException e) {\r
+                    // IOExceptions are normal\r
+                    Http11AprProtocol.log.debug\r
+                        (sm.getString\r
+                            ("http11protocol.proto.ioexception.debug"), e);\r
+                }\r
+                // Future developers: if you discover any other\r
+                // rare-but-nonfatal exceptions, catch them here, and log as\r
+                // above.\r
+                catch (Throwable e) {\r
+                    // any other exception or error is odd. Here we log it\r
+                    // with "ERROR" level, so it will show up even on\r
+                    // less-than-verbose logs.\r
+                    Http11AprProtocol.log.error\r
+                        (sm.getString("http11protocol.proto.error"), e);\r
+                } finally {\r
+                    if (state != SocketState.LONG) {\r
+                        connections.remove(socket);\r
+                        recycledProcessors.push(result);\r
+                    }\r
+                }\r
+            }\r
+            return state;\r
+        }\r
+        \r
+        public SocketState process(long socket) {\r
             Http11AprProcessor processor = null;\r
             try {\r
                 processor = (Http11AprProcessor) localProcessor.get();\r
                 if (processor == null) {\r
+                    synchronized (recycledProcessors) {\r
+                        if (!recycledProcessors.isEmpty()) {\r
+                            processor = recycledProcessors.pop();\r
+                            localProcessor.set(processor);\r
+                        }\r
+                    }\r
+                }\r
+                if (processor == null) {\r
                     processor =\r
                         new Http11AprProcessor(proto.maxHttpHeaderSize, proto.ep);\r
                     processor.setAdapter(proto.adapter);\r
@@ -647,7 +701,15 @@ public class Http11AprProtocol implements ProtocolHandler, MBeanRegistration
                     ((ActionHook) processor).action(ActionCode.ACTION_START, null);\r
                 }\r
 \r
-                return processor.process(socket);\r
+                SocketState state = processor.process(socket);\r
+                if (state == SocketState.LONG) {\r
+                    // Associate the connection with the processor. The next request \r
+                    // processed by this thread will use either a new or a recycled\r
+                    // processor.\r
+                    connections.put(socket, processor);\r
+                    localProcessor.set(null);\r
+                }\r
+                return state;\r
 \r
             } catch(java.net.SocketException e) {\r
                 // SocketExceptions are normal\r
@@ -669,15 +731,8 @@ public class Http11AprProtocol implements ProtocolHandler, MBeanRegistration
                 // less-than-verbose logs.\r
                 Http11AprProtocol.log.error\r
                     (sm.getString("http11protocol.proto.error"), e);\r
-            } finally {\r
-                //       if(proto.adapter != null) proto.adapter.recycle();\r
-                //                processor.recycle();\r
-\r
-                if (processor instanceof ActionHook) {\r
-                    ((ActionHook) processor).action(ActionCode.ACTION_STOP, null);\r
-                }\r
             }\r
-            return false;\r
+            return SocketState.CLOSED;\r
         }\r
     }\r
 \r
index 01ad7b5..f40429c 100644 (file)
@@ -329,8 +329,7 @@ public class InternalAprInputBuffer implements InputBuffer {
      * consumed. This method only resets all the pointers so that we are ready\r
      * to parse the next HTTP request.\r
      */\r
-    public void nextRequest()\r
-        throws IOException {\r
+    public void nextRequest() {\r
 \r
         // Recycle Request object\r
         request.recycle();\r
index d9a0762..1e2ddf6 100644 (file)
@@ -300,6 +300,14 @@ public class AprEndpoint {
 \r
 \r
     /**\r
+     * Allow comet request handling.\r
+     */\r
+    protected boolean useComet = true;\r
+    public void setUseComet(boolean useComet) { this.useComet = useComet; }\r
+    public boolean getUseComet() { return useComet; }\r
+\r
+\r
+    /**\r
      * Acceptor thread count.\r
      */\r
     protected int acceptorThreadCount = 0;\r
@@ -335,6 +343,17 @@ public class AprEndpoint {
 \r
 \r
     /**\r
+     * The socket poller used for Comet support.\r
+     */\r
+    protected Poller[] cometPollers = null;\r
+    protected int cometPollerRoundRobin = 0;\r
+    public Poller getCometPoller() {\r
+        cometPollerRoundRobin = (cometPollerRoundRobin + 1) % cometPollers.length;\r
+        return cometPollers[cometPollerRoundRobin];\r
+    }\r
+\r
+\r
+    /**\r
      * The static file sender.\r
      */\r
     protected Sendfile[] sendfiles = null;\r
@@ -561,11 +580,8 @@ public class AprEndpoint {
             addressStr = address.getHostAddress();\r
         }\r
         int family = Socket.APR_INET;\r
-        if (Library.APR_HAVE_IPV6) {\r
-            if (addressStr == null)\r
-                family = Socket.APR_UNSPEC;\r
-            else if (addressStr.indexOf(':') >= 0)\r
-                family = Socket.APR_UNSPEC;\r
+        if (Library.APR_HAVE_IPV6 && (addressStr == null || addressStr.indexOf(':') >= 0)) {\r
+            family = Socket.APR_UNSPEC;\r
         }\r
         long inetAddress = Address.info(addressStr, family,\r
                 port, 0, rootPool);\r
@@ -712,7 +728,7 @@ public class AprEndpoint {
             // Start poller threads\r
             pollers = new Poller[pollerThreadCount];\r
             for (int i = 0; i < pollerThreadCount; i++) {\r
-                pollers[i] = new Poller();\r
+                pollers[i] = new Poller(false);\r
                 pollers[i].init();\r
                 Thread pollerThread = new Thread(pollers[i], getName() + "-Poller-" + i);\r
                 pollerThread.setPriority(threadPriority);\r
@@ -720,6 +736,17 @@ public class AprEndpoint {
                 pollerThread.start();\r
             }\r
 \r
+            // Start comet poller threads\r
+            cometPollers = new Poller[pollerThreadCount];\r
+            for (int i = 0; i < pollerThreadCount; i++) {\r
+                cometPollers[i] = new Poller(true);\r
+                cometPollers[i].init();\r
+                Thread pollerThread = new Thread(cometPollers[i], getName() + "-CometPoller-" + i);\r
+                pollerThread.setPriority(threadPriority);\r
+                pollerThread.setDaemon(true);\r
+                pollerThread.start();\r
+            }\r
+\r
             // Start sendfile threads\r
             if (useSendfile) {\r
                 sendfiles = new Sendfile[sendfileThreadCount];\r
@@ -998,6 +1025,26 @@ public class AprEndpoint {
     }\r
     \r
 \r
+    /**\r
+     * Process given socket for an event.\r
+     */\r
+    protected boolean processSocket(long socket, boolean error) {\r
+        try {\r
+            if (executor == null) {\r
+                getWorkerThread().assign(socket, error);\r
+            } else {\r
+                executor.execute(new SocketEventProcessor(socket, error));\r
+            }\r
+        } catch (Throwable t) {\r
+            // This means we got an OOM or similar creating a thread, or that\r
+            // the pool and its queue are full\r
+            log.error(sm.getString("endpoint.process.fail"), t);\r
+            return false;\r
+        }\r
+        return true;\r
+    }\r
+    \r
+\r
     // --------------------------------------------------- Acceptor Inner Class\r
 \r
 \r
@@ -1060,10 +1107,18 @@ public class AprEndpoint {
 \r
         protected long[] addS;\r
         protected int addCount = 0;\r
+        protected long[] removeS;\r
+        protected int removeCount = 0;\r
         \r
+        protected boolean comet = true;\r
+\r
         protected int keepAliveCount = 0;\r
         public int getKeepAliveCount() { return keepAliveCount; }\r
 \r
+        public Poller(boolean comet) {\r
+            this.comet = comet;\r
+        }\r
+        \r
         /**\r
          * Create the poller. With some versions of APR, the maximum poller size will\r
          * be 62 (reocmpiling APR is necessary to remove this limitation).\r
@@ -1071,19 +1126,29 @@ public class AprEndpoint {
         protected void init() {\r
             pool = Pool.create(serverSockPool);\r
             int size = pollerSize / pollerThreadCount;\r
-            serverPollset = allocatePoller(size, pool, soTimeout);\r
+            int timeout = soTimeout;\r
+            if (comet) {\r
+                // FIXME: Find an appropriate timeout value, for now, "longer than usual"\r
+                // semms appropriate\r
+                timeout = soTimeout * 20;\r
+            }\r
+            serverPollset = allocatePoller(size, pool, timeout);\r
             if (serverPollset == 0 && size > 1024) {\r
                 size = 1024;\r
-                serverPollset = allocatePoller(size, pool, soTimeout);\r
+                serverPollset = allocatePoller(size, pool, timeout);\r
             }\r
             if (serverPollset == 0) {\r
                 size = 62;\r
-                serverPollset = allocatePoller(size, pool, soTimeout);\r
+                serverPollset = allocatePoller(size, pool, timeout);\r
             }\r
             desc = new long[size * 2];\r
             keepAliveCount = 0;\r
             addS = new long[size];\r
             addCount = 0;\r
+            if (comet) {\r
+                removeS = new long[size];\r
+            }\r
+            removeCount = 0;\r
         }\r
 \r
         /**\r
@@ -1092,18 +1157,32 @@ public class AprEndpoint {
         protected void destroy() {\r
             // Close all sockets in the add queue\r
             for (int i = 0; i < addCount; i++) {\r
+                if (comet) {\r
+                    processSocket(addS[i], true);\r
+                }\r
                 Socket.destroy(addS[i]);\r
             }\r
+            // Close all sockets in the remove queue\r
+            for (int i = 0; i < removeCount; i++) {\r
+                if (comet) {\r
+                    processSocket(removeS[i], true);\r
+                }\r
+                Socket.destroy(removeS[i]);\r
+            }\r
             // Close all sockets still in the poller\r
             int rv = Poll.pollset(serverPollset, desc);\r
             if (rv > 0) {\r
                 for (int n = 0; n < rv; n++) {\r
+                    if (comet) {\r
+                        processSocket(desc[n*2+1], true);\r
+                    }\r
                     Socket.destroy(desc[n*2+1]);\r
                 }\r
             }\r
             Pool.destroy(pool);\r
             keepAliveCount = 0;\r
             addCount = 0;\r
+            removeCount = 0;\r
         }\r
 \r
         /**\r
@@ -1120,6 +1199,9 @@ public class AprEndpoint {
                 // at most for pollTime before being polled\r
                 if (addCount >= addS.length) {\r
                     // Can't do anything: close the socket right away\r
+                    if (comet) {\r
+                        processSocket(socket, true);\r
+                    }\r
                     Socket.destroy(socket);\r
                     return;\r
                 }\r
@@ -1130,6 +1212,30 @@ public class AprEndpoint {
         }\r
 \r
         /**\r
+         * Remove specified socket and associated pool from the poller. The socket will\r
+         * be added to a temporary array, and polled first after a maximum amount\r
+         * of time equal to pollTime (in most cases, latency will be much lower,\r
+         * however). Note that this is automatic, except if the poller is used for\r
+         * comet.\r
+         *\r
+         * @param socket to remove from the poller\r
+         */\r
+        public void remove(long socket) {\r
+            synchronized (this) {\r
+                // Add socket to the list. Newly added sockets will wait\r
+                // at most for pollTime before being polled\r
+                if (removeCount >= removeS.length) {\r
+                    // Normally, it cannot happen ...\r
+                    Socket.destroy(socket);\r
+                    return;\r
+                }\r
+                removeS[removeCount] = socket;\r
+                removeCount++;\r
+                this.notify();\r
+            }\r
+        }\r
+\r
+        /**\r
          * The background thread that listens for incoming TCP/IP connections and\r
          * hands them off to an appropriate processor.\r
          */\r
@@ -1171,23 +1277,41 @@ public class AprEndpoint {
                                     keepAliveCount++;\r
                                 } else {\r
                                     // Can't do anything: close the socket right away\r
+                                    if (comet) {\r
+                                        processSocket(addS[i], true);\r
+                                    }\r
                                     Socket.destroy(addS[i]);\r
                                 }\r
                             }\r
                             addCount = 0;\r
                         }\r
                     }\r
+                    // Remove sockets which are waiting to the poller\r
+                    if (removeCount > 0) {\r
+                        synchronized (this) {\r
+                            for (int i = 0; i < removeCount; i++) {\r
+                                int rv = Poll.remove(serverPollset, removeS[i]);\r
+                            }\r
+                            removeCount = 0;\r
+                        }\r
+                    }\r
+\r
                     maintainTime += pollTime;\r
                     // Pool for the specified interval\r
-                    int rv = Poll.poll(serverPollset, pollTime, desc, true);\r
+                    int rv = Poll.poll(serverPollset, pollTime, desc, !comet);\r
                     if (rv > 0) {\r
                         keepAliveCount -= rv;\r
                         for (int n = 0; n < rv; n++) {\r
                             // Check for failed sockets and hand this socket off to a worker\r
                             if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)\r
                                     || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)\r
+                                    || (comet && (!processSocket(desc[n*2+1], false))) \r
                                     || (!processSocket(desc[n*2+1]))) {\r
                                 // Close socket and clear pool\r
+                                if (comet) {\r
+                                    processSocket(desc[n*2+1], true);\r
+                                    Poll.remove(serverPollset, desc[n*2+1]);\r
+                                }\r
                                 Socket.destroy(desc[n*2+1]);\r
                                 continue;\r
                             }\r
@@ -1215,6 +1339,11 @@ public class AprEndpoint {
                             keepAliveCount -= rv;\r
                             for (int n = 0; n < rv; n++) {\r
                                 // Close socket and clear pool\r
+                                if (comet) {\r
+                                    // FIXME: should really close in case of timeout ?\r
+                                    // FIXME: maybe comet should use an extended timeout\r
+                                    processSocket(desc[n], true);\r
+                                }\r
                                 Socket.destroy(desc[n]);\r
                             }\r
                         }\r
@@ -1242,6 +1371,8 @@ public class AprEndpoint {
         protected Thread thread = null;\r
         protected boolean available = false;\r
         protected long socket = 0;\r
+        protected boolean event = false;\r
+        protected boolean error = false;\r
 \r
 \r
         /**\r
@@ -1265,6 +1396,28 @@ public class AprEndpoint {
 \r
             // Store the newly available Socket and notify our thread\r
             this.socket = socket;\r
+            event = false;\r
+            error = false;\r
+            available = true;\r
+            notifyAll();\r
+\r
+        }\r
+\r
+\r
+        protected synchronized void assign(long socket, boolean error) {\r
+\r
+            // Wait for the Processor to get the previous Socket\r
+            while (available) {\r
+                try {\r
+                    wait();\r
+                } catch (InterruptedException e) {\r
+                }\r
+            }\r
+\r
+            // Store the newly available Socket and notify our thread\r
+            this.socket = socket;\r
+            event = true;\r
+            this.error = error;\r
             available = true;\r
             notifyAll();\r
 \r
@@ -1310,7 +1463,11 @@ public class AprEndpoint {
                     continue;\r
 \r
                 // Process the request from this socket\r
-                if (!handler.process(socket)) {\r
+                if ((event) && (handler.event(socket, error) == Handler.SocketState.CLOSED)) {\r
+                    // Close socket and pool\r
+                    Socket.destroy(socket);\r
+                    socket = 0;\r
+                } else if (handler.process(socket) == Handler.SocketState.CLOSED) {\r
                     // Close socket and pool\r
                     Socket.destroy(socket);\r
                     socket = 0;\r
@@ -1622,7 +1779,11 @@ public class AprEndpoint {
      * thread local fields.\r
      */\r
     public interface Handler {\r
-        public boolean process(long socket);\r
+        public enum SocketState {\r
+            OPEN, CLOSED, LONG\r
+        }\r
+        public SocketState process(long socket);\r
+        public SocketState event(long socket, boolean error);\r
     }\r
 \r
 \r
@@ -1700,7 +1861,38 @@ public class AprEndpoint {
         public void run() {\r
 \r
             // Process the request from this socket\r
-            if (!handler.process(socket)) {\r
+            if (handler.process(socket) == Handler.SocketState.CLOSED) {\r
+                // Close socket and pool\r
+                Socket.destroy(socket);\r
+                socket = 0;\r
+            }\r
+\r
+        }\r
+        \r
+    }\r
+    \r
+    \r
+    // --------------------------------------- SocketEventProcessor Inner Class\r
+\r
+\r
+    /**\r
+     * This class is the equivalent of the Worker, but will simply use in an\r
+     * external Executor thread pool.\r
+     */\r
+    protected class SocketEventProcessor implements Runnable {\r
+        \r
+        protected long socket = 0;\r
+        protected boolean error = false; \r
+        \r
+        public SocketEventProcessor(long socket, boolean error) {\r
+            this.socket = socket;\r
+            this.error = error;\r
+        }\r
+\r
+        public void run() {\r
+\r
+            // Process the request from this socket\r
+            if (handler.event(socket, error) == Handler.SocketState.CLOSED) {\r
                 // Close socket and pool\r
                 Socket.destroy(socket);\r
                 socket = 0;\r