--- /dev/null
+package org.apache.catalina;
+
+import java.io.IOException;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+public interface CometProcessor {
+
+ public void begin(HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException;
+ public void end(HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException;
+
+ public void error(HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException;
+ public void read(HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException;
+
+}
\r
import java.io.IOException;\r
\r
+import org.apache.catalina.CometProcessor;\r
import org.apache.catalina.Context;\r
import org.apache.catalina.Globals;\r
import org.apache.catalina.Wrapper;\r
\r
}\r
\r
+ // Comet processing\r
+ if (request.getWrapper() != null \r
+ && request.getWrapper() instanceof CometProcessor) {\r
+ try {\r
+ if (request.getAttribute("org.apache.tomcat.comet.error") != null) {\r
+ ((CometProcessor) request.getWrapper()).error(request.getRequest(), response.getResponse());\r
+ } else {\r
+ ((CometProcessor) request.getWrapper()).read(request.getRequest(), response.getResponse());\r
+ }\r
+ } catch (IOException e) {\r
+ ;\r
+ } catch (Throwable t) {\r
+ log.error(sm.getString("coyoteAdapter.service"), t);\r
+ } finally {\r
+ // Recycle the wrapper request and response\r
+ if (request.getAttribute("org.apache.tomcat.comet") == null) {\r
+ request.recycle();\r
+ response.recycle();\r
+ }\r
+ }\r
+ return;\r
+ }\r
+ \r
if (connector.getXpoweredBy()) {\r
response.addHeader("X-Powered-By", "Servlet/2.5");\r
}\r
\r
+ boolean comet = false;\r
+ \r
try {\r
\r
// Parse and set Catalina and configuration specific \r
connector.getContainer().getPipeline().getFirst().invoke(request, response);\r
}\r
\r
- response.finishResponse();\r
- req.action( ActionCode.ACTION_POST_REQUEST , null);\r
+ if (request.getAttribute("org.apache.tomcat.comet.support") == Boolean.TRUE \r
+ && request.getWrapper() instanceof CometProcessor) {\r
+ request.setAttribute("org.apache.tomcat.comet", Boolean.TRUE);\r
+ comet = true;\r
+ }\r
+\r
+ if (!comet) {\r
+ response.finishResponse();\r
+ req.action( ActionCode.ACTION_POST_REQUEST , null);\r
+ }\r
\r
} catch (IOException e) {\r
;\r
log.error(sm.getString("coyoteAdapter.service"), t);\r
} finally {\r
// Recycle the wrapper request and response\r
- request.recycle();\r
- response.recycle();\r
+ if (!comet) {\r
+ request.recycle();\r
+ response.recycle();\r
+ }\r
}\r
\r
}\r
if (readOnlyAttributes.containsKey(name)) {\r
return;\r
}\r
+\r
+ // Pass special attributes to the native layer\r
+ if (name.startsWith("org.apache.tomcat.")) {\r
+ coyoteRequest.getAttributes().remove(name);\r
+ }\r
+\r
found = attributes.containsKey(name);\r
if (found) {\r
value = attributes.get(name);\r
} else {\r
return;\r
}\r
-\r
+ \r
// Notify interested application event listeners\r
Object listeners[] = context.getApplicationEventListeners();\r
if ((listeners == null) || (listeners.length == 0))\r
--- /dev/null
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.catalina.servlets;
+
+
+import java.io.IOException;
+
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.catalina.CometProcessor;
+
+
+/**
+ * Helper class to implement Comet functionality.
+ */
+public abstract class CometServlet
+ extends HttpServlet implements CometProcessor {
+
+ public void begin(HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+
+ }
+
+ public void end(HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+ request.removeAttribute("org.apache.tomcat.comet");
+ }
+
+ public void error(HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+ end(request, response);
+ }
+
+ public abstract void read(HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException;
+
+ protected void service(HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+
+ if (request.getAttribute("org.apache.tomcat.comet.support") == Boolean.TRUE) {
+ begin(request, response);
+ } else {
+ // FIXME: Implement without comet support
+ begin(request, response);
+
+ // Loop reading data
+
+ end(request, response);
+ }
+
+ }
+
+}
import org.apache.tomcat.util.modeler.Registry;\r
import org.apache.tomcat.util.net.AprEndpoint;\r
import org.apache.tomcat.util.net.AprEndpoint.Handler;\r
+import org.apache.tomcat.util.net.AprEndpoint.Handler.SocketState;\r
import org.apache.tomcat.util.res.StringManager;\r
\r
\r
this.proto = proto;\r
}\r
\r
- public boolean process(long socket) {\r
+ // FIXME: Support for this could be added in AJP as well\r
+ public SocketState event(long socket, boolean error) {\r
+ return SocketState.CLOSED;\r
+ }\r
+ \r
+ public SocketState process(long socket) {\r
AjpAprProcessor processor = null;\r
try {\r
processor = (AjpAprProcessor) localProcessor.get();\r
((ActionHook) processor).action(ActionCode.ACTION_START, null);\r
}\r
\r
- return processor.process(socket);\r
+ if (processor.process(socket)) {\r
+ return SocketState.OPEN;\r
+ } else {\r
+ return SocketState.CLOSED;\r
+ }\r
\r
} catch(java.net.SocketException e) {\r
// SocketExceptions are normal\r
((ActionHook) processor).action(ActionCode.ACTION_STOP, null);\r
}\r
}\r
- return false;\r
+ return SocketState.CLOSED;\r
}\r
}\r
\r
import org.apache.tomcat.util.http.FastHttpDateFormat;\r
import org.apache.tomcat.util.http.MimeHeaders;\r
import org.apache.tomcat.util.net.AprEndpoint;\r
+import org.apache.tomcat.util.net.AprEndpoint.Handler.SocketState;\r
import org.apache.tomcat.util.res.StringManager;\r
\r
\r
\r
\r
/**\r
- * State flag.\r
- */\r
- protected boolean started = false;\r
-\r
-\r
- /**\r
* Error flag.\r
*/\r
protected boolean error = false;\r
\r
\r
/**\r
+ * Comet used.\r
+ */\r
+ protected boolean comet = false;\r
+\r
+\r
+ /**\r
* Content delimitator for the request (if false, the connection will\r
* be closed at the end of the request).\r
*/\r
*\r
* @throws IOException error during an I/O operation\r
*/\r
- public boolean process(long socket)\r
+ public SocketState event(boolean error)\r
+ throws IOException {\r
+ \r
+ RequestInfo rp = request.getRequestProcessor();\r
+ \r
+ try {\r
+ rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);\r
+ if (error) {\r
+ request.setAttribute("org.apache.tomcat.comet.error", Boolean.TRUE);\r
+ }\r
+ // FIXME: It is also possible to add a new "event" method in the adapter\r
+ // or something similar\r
+ adapter.service(request, response);\r
+ if (request.getAttribute("org.apache.tomcat.comet") == null) {\r
+ comet = false;\r
+ endpoint.getCometPoller().remove(socket);\r
+ }\r
+ } catch (InterruptedIOException e) {\r
+ error = true;\r
+ } catch (Throwable t) {\r
+ log.error(sm.getString("http11processor.request.process"), t);\r
+ // 500 - Internal Server Error\r
+ response.setStatus(500);\r
+ error = true;\r
+ }\r
+ \r
+ rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);\r
+\r
+ if (error) {\r
+ recycle();\r
+ return SocketState.CLOSED;\r
+ } else if (!comet) {\r
+ recycle();\r
+ endpoint.getPoller().add(socket);\r
+ return SocketState.OPEN;\r
+ } else {\r
+ return SocketState.LONG;\r
+ }\r
+ }\r
+ \r
+ /**\r
+ * Process pipelined HTTP requests using the specified input and output\r
+ * streams.\r
+ *\r
+ * @throws IOException error during an I/O operation\r
+ */\r
+ public SocketState process(long socket)\r
throws IOException {\r
RequestInfo rp = request.getRequestProcessor();\r
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);\r
boolean keptAlive = false;\r
boolean openSocket = false;\r
\r
- while (started && !error && keepAlive) {\r
+ while (!error && keepAlive) {\r
\r
// Parsing the request header\r
try {\r
error = response.getErrorException() != null ||\r
statusDropsConnection(response.getStatus());\r
}\r
-\r
+ // Comet support\r
+ if (request.getAttribute("org.apache.tomcat.comet") != null) {\r
+ comet = true;\r
+ }\r
} catch (InterruptedIOException e) {\r
error = true;\r
} catch (Throwable t) {\r
}\r
\r
// Finish the handling of the request\r
- try {\r
- rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);\r
- inputBuffer.endRequest();\r
- } catch (IOException e) {\r
- error = true;\r
- } catch (Throwable t) {\r
- log.error(sm.getString("http11processor.request.finish"), t);\r
- // 500 - Internal Server Error\r
- response.setStatus(500);\r
- error = true;\r
- }\r
- try {\r
- rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);\r
- outputBuffer.endRequest();\r
- } catch (IOException e) {\r
- error = true;\r
- } catch (Throwable t) {\r
- log.error(sm.getString("http11processor.response.finish"), t);\r
- error = true;\r
+ if (!comet) {\r
+ endRequest();\r
}\r
\r
// If there was an error, make sure the request is counted as\r
}\r
request.updateCounters();\r
\r
- rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);\r
-\r
- // Don't reset the param - we'll see it as ended. Next request\r
- // will reset it\r
- // thrA.setParam(null);\r
- // Next request\r
- inputBuffer.nextRequest();\r
- outputBuffer.nextRequest();\r
-\r
// Do sendfile as needed: add socket to sendfile and end\r
- if (sendfileData != null) {\r
+ if (sendfileData != null && !error) {\r
sendfileData.socket = socket;\r
sendfileData.keepAlive = keepAlive;\r
if (!endpoint.getSendfile().add(sendfileData)) {\r
}\r
}\r
\r
+ rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);\r
+\r
}\r
\r
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);\r
\r
- // Recycle\r
+ if (comet) {\r
+ if (error) {\r
+ recycle();\r
+ return SocketState.CLOSED;\r
+ } else {\r
+ endpoint.getCometPoller().add(socket);\r
+ return SocketState.LONG;\r
+ }\r
+ } else {\r
+ recycle();\r
+ return (openSocket) ? SocketState.OPEN : SocketState.CLOSED;\r
+ }\r
+ \r
+ }\r
+\r
+ \r
+ public void endRequest() {\r
+ \r
+ // Finish the handling of the request\r
+ try {\r
+ inputBuffer.endRequest();\r
+ } catch (IOException e) {\r
+ error = true;\r
+ } catch (Throwable t) {\r
+ log.error(sm.getString("http11processor.request.finish"), t);\r
+ // 500 - Internal Server Error\r
+ response.setStatus(500);\r
+ error = true;\r
+ }\r
+ try {\r
+ outputBuffer.endRequest();\r
+ } catch (IOException e) {\r
+ error = true;\r
+ } catch (Throwable t) {\r
+ log.error(sm.getString("http11processor.response.finish"), t);\r
+ error = true;\r
+ }\r
+\r
+ // Next request\r
+ inputBuffer.nextRequest();\r
+ outputBuffer.nextRequest();\r
+ \r
+ }\r
+ \r
+ \r
+ public void recycle() {\r
inputBuffer.recycle();\r
outputBuffer.recycle();\r
this.socket = 0;\r
-\r
- return openSocket;\r
- \r
}\r
-\r
+ \r
\r
// ----------------------------------------------------- ActionHook Methods\r
\r
// End the processing of the current request, and stop any further\r
// transactions with the client\r
\r
+ comet = false;\r
try {\r
outputBuffer.endRequest();\r
} catch (IOException e) {\r
\r
// Do nothing\r
\r
- } else if (actionCode == ActionCode.ACTION_START) {\r
-\r
- started = true;\r
-\r
- } else if (actionCode == ActionCode.ACTION_STOP) {\r
-\r
- started = false;\r
-\r
} else if (actionCode == ActionCode.ACTION_REQ_HOST_ADDR_ATTRIBUTE) {\r
\r
// Get remote host address\r
if (endpoint.getUseSendfile()) {\r
request.setAttribute("org.apache.tomcat.sendfile.support", Boolean.TRUE);\r
}\r
+ // Advertise comet support through a request attribute\r
+ request.setAttribute("org.apache.tomcat.comet.support", Boolean.TRUE);\r
\r
}\r
\r
import java.net.URLEncoder;\r
import java.util.Hashtable;\r
import java.util.Iterator;\r
+import java.util.concurrent.ConcurrentHashMap;\r
import java.util.concurrent.Executor;\r
\r
import javax.management.MBeanRegistration;\r
// -------------------- Connection handler --------------------\r
\r
static class Http11ConnectionHandler implements Handler {\r
- Http11AprProtocol proto;\r
- static int count=0;\r
- RequestGroupInfo global=new RequestGroupInfo();\r
- ThreadLocal localProcessor = new ThreadLocal();\r
-\r
- Http11ConnectionHandler( Http11AprProtocol proto ) {\r
- this.proto=proto;\r
+ \r
+ protected Http11AprProtocol proto;\r
+ protected static int count = 0;\r
+ protected RequestGroupInfo global = new RequestGroupInfo();\r
+ \r
+ protected ThreadLocal<Http11AprProcessor> localProcessor = \r
+ new ThreadLocal<Http11AprProcessor>();\r
+ protected ConcurrentHashMap<Long, Http11AprProcessor> connections =\r
+ new ConcurrentHashMap<Long, Http11AprProcessor>();\r
+ protected java.util.Stack<Http11AprProcessor> recycledProcessors = \r
+ new java.util.Stack<Http11AprProcessor>();\r
+\r
+ Http11ConnectionHandler(Http11AprProtocol proto) {\r
+ this.proto = proto;\r
}\r
\r
- public boolean process(long socket) {\r
+ public SocketState event(long socket, boolean error) {\r
+ Http11AprProcessor result = connections.get(socket);\r
+ SocketState state = SocketState.CLOSED; \r
+ if (result != null) {\r
+ boolean recycle = error;\r
+ // Call the appropriate event\r
+ try {\r
+ state = result.event(error);\r
+ } catch (java.net.SocketException e) {\r
+ // SocketExceptions are normal\r
+ Http11AprProtocol.log.debug\r
+ (sm.getString\r
+ ("http11protocol.proto.socketexception.debug"), e);\r
+ } catch (java.io.IOException e) {\r
+ // IOExceptions are normal\r
+ Http11AprProtocol.log.debug\r
+ (sm.getString\r
+ ("http11protocol.proto.ioexception.debug"), e);\r
+ }\r
+ // Future developers: if you discover any other\r
+ // rare-but-nonfatal exceptions, catch them here, and log as\r
+ // above.\r
+ catch (Throwable e) {\r
+ // any other exception or error is odd. Here we log it\r
+ // with "ERROR" level, so it will show up even on\r
+ // less-than-verbose logs.\r
+ Http11AprProtocol.log.error\r
+ (sm.getString("http11protocol.proto.error"), e);\r
+ } finally {\r
+ if (state != SocketState.LONG) {\r
+ connections.remove(socket);\r
+ recycledProcessors.push(result);\r
+ }\r
+ }\r
+ }\r
+ return state;\r
+ }\r
+ \r
+ public SocketState process(long socket) {\r
Http11AprProcessor processor = null;\r
try {\r
processor = (Http11AprProcessor) localProcessor.get();\r
if (processor == null) {\r
+ synchronized (recycledProcessors) {\r
+ if (!recycledProcessors.isEmpty()) {\r
+ processor = recycledProcessors.pop();\r
+ localProcessor.set(processor);\r
+ }\r
+ }\r
+ }\r
+ if (processor == null) {\r
processor =\r
new Http11AprProcessor(proto.maxHttpHeaderSize, proto.ep);\r
processor.setAdapter(proto.adapter);\r
((ActionHook) processor).action(ActionCode.ACTION_START, null);\r
}\r
\r
- return processor.process(socket);\r
+ SocketState state = processor.process(socket);\r
+ if (state == SocketState.LONG) {\r
+ // Associate the connection with the processor. The next request \r
+ // processed by this thread will use either a new or a recycled\r
+ // processor.\r
+ connections.put(socket, processor);\r
+ localProcessor.set(null);\r
+ }\r
+ return state;\r
\r
} catch(java.net.SocketException e) {\r
// SocketExceptions are normal\r
// less-than-verbose logs.\r
Http11AprProtocol.log.error\r
(sm.getString("http11protocol.proto.error"), e);\r
- } finally {\r
- // if(proto.adapter != null) proto.adapter.recycle();\r
- // processor.recycle();\r
-\r
- if (processor instanceof ActionHook) {\r
- ((ActionHook) processor).action(ActionCode.ACTION_STOP, null);\r
- }\r
}\r
- return false;\r
+ return SocketState.CLOSED;\r
}\r
}\r
\r
* consumed. This method only resets all the pointers so that we are ready\r
* to parse the next HTTP request.\r
*/\r
- public void nextRequest()\r
- throws IOException {\r
+ public void nextRequest() {\r
\r
// Recycle Request object\r
request.recycle();\r
\r
\r
/**\r
+ * Allow comet request handling.\r
+ */\r
+ protected boolean useComet = true;\r
+ public void setUseComet(boolean useComet) { this.useComet = useComet; }\r
+ public boolean getUseComet() { return useComet; }\r
+\r
+\r
+ /**\r
* Acceptor thread count.\r
*/\r
protected int acceptorThreadCount = 0;\r
\r
\r
/**\r
+ * The socket poller used for Comet support.\r
+ */\r
+ protected Poller[] cometPollers = null;\r
+ protected int cometPollerRoundRobin = 0;\r
+ public Poller getCometPoller() {\r
+ cometPollerRoundRobin = (cometPollerRoundRobin + 1) % cometPollers.length;\r
+ return cometPollers[cometPollerRoundRobin];\r
+ }\r
+\r
+\r
+ /**\r
* The static file sender.\r
*/\r
protected Sendfile[] sendfiles = null;\r
addressStr = address.getHostAddress();\r
}\r
int family = Socket.APR_INET;\r
- if (Library.APR_HAVE_IPV6) {\r
- if (addressStr == null)\r
- family = Socket.APR_UNSPEC;\r
- else if (addressStr.indexOf(':') >= 0)\r
- family = Socket.APR_UNSPEC;\r
+ if (Library.APR_HAVE_IPV6 && (addressStr == null || addressStr.indexOf(':') >= 0)) {\r
+ family = Socket.APR_UNSPEC;\r
}\r
long inetAddress = Address.info(addressStr, family,\r
port, 0, rootPool);\r
// Start poller threads\r
pollers = new Poller[pollerThreadCount];\r
for (int i = 0; i < pollerThreadCount; i++) {\r
- pollers[i] = new Poller();\r
+ pollers[i] = new Poller(false);\r
pollers[i].init();\r
Thread pollerThread = new Thread(pollers[i], getName() + "-Poller-" + i);\r
pollerThread.setPriority(threadPriority);\r
pollerThread.start();\r
}\r
\r
+ // Start comet poller threads\r
+ cometPollers = new Poller[pollerThreadCount];\r
+ for (int i = 0; i < pollerThreadCount; i++) {\r
+ cometPollers[i] = new Poller(true);\r
+ cometPollers[i].init();\r
+ Thread pollerThread = new Thread(cometPollers[i], getName() + "-CometPoller-" + i);\r
+ pollerThread.setPriority(threadPriority);\r
+ pollerThread.setDaemon(true);\r
+ pollerThread.start();\r
+ }\r
+\r
// Start sendfile threads\r
if (useSendfile) {\r
sendfiles = new Sendfile[sendfileThreadCount];\r
}\r
\r
\r
+ /**\r
+ * Process given socket for an event.\r
+ */\r
+ protected boolean processSocket(long socket, boolean error) {\r
+ try {\r
+ if (executor == null) {\r
+ getWorkerThread().assign(socket, error);\r
+ } else {\r
+ executor.execute(new SocketEventProcessor(socket, error));\r
+ }\r
+ } catch (Throwable t) {\r
+ // This means we got an OOM or similar creating a thread, or that\r
+ // the pool and its queue are full\r
+ log.error(sm.getString("endpoint.process.fail"), t);\r
+ return false;\r
+ }\r
+ return true;\r
+ }\r
+ \r
+\r
// --------------------------------------------------- Acceptor Inner Class\r
\r
\r
\r
protected long[] addS;\r
protected int addCount = 0;\r
+ protected long[] removeS;\r
+ protected int removeCount = 0;\r
\r
+ protected boolean comet = true;\r
+\r
protected int keepAliveCount = 0;\r
public int getKeepAliveCount() { return keepAliveCount; }\r
\r
+ public Poller(boolean comet) {\r
+ this.comet = comet;\r
+ }\r
+ \r
/**\r
* Create the poller. With some versions of APR, the maximum poller size will\r
* be 62 (reocmpiling APR is necessary to remove this limitation).\r
protected void init() {\r
pool = Pool.create(serverSockPool);\r
int size = pollerSize / pollerThreadCount;\r
- serverPollset = allocatePoller(size, pool, soTimeout);\r
+ int timeout = soTimeout;\r
+ if (comet) {\r
+ // FIXME: Find an appropriate timeout value, for now, "longer than usual"\r
+ // semms appropriate\r
+ timeout = soTimeout * 20;\r
+ }\r
+ serverPollset = allocatePoller(size, pool, timeout);\r
if (serverPollset == 0 && size > 1024) {\r
size = 1024;\r
- serverPollset = allocatePoller(size, pool, soTimeout);\r
+ serverPollset = allocatePoller(size, pool, timeout);\r
}\r
if (serverPollset == 0) {\r
size = 62;\r
- serverPollset = allocatePoller(size, pool, soTimeout);\r
+ serverPollset = allocatePoller(size, pool, timeout);\r
}\r
desc = new long[size * 2];\r
keepAliveCount = 0;\r
addS = new long[size];\r
addCount = 0;\r
+ if (comet) {\r
+ removeS = new long[size];\r
+ }\r
+ removeCount = 0;\r
}\r
\r
/**\r
protected void destroy() {\r
// Close all sockets in the add queue\r
for (int i = 0; i < addCount; i++) {\r
+ if (comet) {\r
+ processSocket(addS[i], true);\r
+ }\r
Socket.destroy(addS[i]);\r
}\r
+ // Close all sockets in the remove queue\r
+ for (int i = 0; i < removeCount; i++) {\r
+ if (comet) {\r
+ processSocket(removeS[i], true);\r
+ }\r
+ Socket.destroy(removeS[i]);\r
+ }\r
// Close all sockets still in the poller\r
int rv = Poll.pollset(serverPollset, desc);\r
if (rv > 0) {\r
for (int n = 0; n < rv; n++) {\r
+ if (comet) {\r
+ processSocket(desc[n*2+1], true);\r
+ }\r
Socket.destroy(desc[n*2+1]);\r
}\r
}\r
Pool.destroy(pool);\r
keepAliveCount = 0;\r
addCount = 0;\r
+ removeCount = 0;\r
}\r
\r
/**\r
// at most for pollTime before being polled\r
if (addCount >= addS.length) {\r
// Can't do anything: close the socket right away\r
+ if (comet) {\r
+ processSocket(socket, true);\r
+ }\r
Socket.destroy(socket);\r
return;\r
}\r
}\r
\r
/**\r
+ * Remove specified socket and associated pool from the poller. The socket will\r
+ * be added to a temporary array, and polled first after a maximum amount\r
+ * of time equal to pollTime (in most cases, latency will be much lower,\r
+ * however). Note that this is automatic, except if the poller is used for\r
+ * comet.\r
+ *\r
+ * @param socket to remove from the poller\r
+ */\r
+ public void remove(long socket) {\r
+ synchronized (this) {\r
+ // Add socket to the list. Newly added sockets will wait\r
+ // at most for pollTime before being polled\r
+ if (removeCount >= removeS.length) {\r
+ // Normally, it cannot happen ...\r
+ Socket.destroy(socket);\r
+ return;\r
+ }\r
+ removeS[removeCount] = socket;\r
+ removeCount++;\r
+ this.notify();\r
+ }\r
+ }\r
+\r
+ /**\r
* The background thread that listens for incoming TCP/IP connections and\r
* hands them off to an appropriate processor.\r
*/\r
keepAliveCount++;\r
} else {\r
// Can't do anything: close the socket right away\r
+ if (comet) {\r
+ processSocket(addS[i], true);\r
+ }\r
Socket.destroy(addS[i]);\r
}\r
}\r
addCount = 0;\r
}\r
}\r
+ // Remove sockets which are waiting to the poller\r
+ if (removeCount > 0) {\r
+ synchronized (this) {\r
+ for (int i = 0; i < removeCount; i++) {\r
+ int rv = Poll.remove(serverPollset, removeS[i]);\r
+ }\r
+ removeCount = 0;\r
+ }\r
+ }\r
+\r
maintainTime += pollTime;\r
// Pool for the specified interval\r
- int rv = Poll.poll(serverPollset, pollTime, desc, true);\r
+ int rv = Poll.poll(serverPollset, pollTime, desc, !comet);\r
if (rv > 0) {\r
keepAliveCount -= rv;\r
for (int n = 0; n < rv; n++) {\r
// Check for failed sockets and hand this socket off to a worker\r
if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)\r
|| ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)\r
+ || (comet && (!processSocket(desc[n*2+1], false))) \r
|| (!processSocket(desc[n*2+1]))) {\r
// Close socket and clear pool\r
+ if (comet) {\r
+ processSocket(desc[n*2+1], true);\r
+ Poll.remove(serverPollset, desc[n*2+1]);\r
+ }\r
Socket.destroy(desc[n*2+1]);\r
continue;\r
}\r
keepAliveCount -= rv;\r
for (int n = 0; n < rv; n++) {\r
// Close socket and clear pool\r
+ if (comet) {\r
+ // FIXME: should really close in case of timeout ?\r
+ // FIXME: maybe comet should use an extended timeout\r
+ processSocket(desc[n], true);\r
+ }\r
Socket.destroy(desc[n]);\r
}\r
}\r
protected Thread thread = null;\r
protected boolean available = false;\r
protected long socket = 0;\r
+ protected boolean event = false;\r
+ protected boolean error = false;\r
\r
\r
/**\r
\r
// Store the newly available Socket and notify our thread\r
this.socket = socket;\r
+ event = false;\r
+ error = false;\r
+ available = true;\r
+ notifyAll();\r
+\r
+ }\r
+\r
+\r
+ protected synchronized void assign(long socket, boolean error) {\r
+\r
+ // Wait for the Processor to get the previous Socket\r
+ while (available) {\r
+ try {\r
+ wait();\r
+ } catch (InterruptedException e) {\r
+ }\r
+ }\r
+\r
+ // Store the newly available Socket and notify our thread\r
+ this.socket = socket;\r
+ event = true;\r
+ this.error = error;\r
available = true;\r
notifyAll();\r
\r
continue;\r
\r
// Process the request from this socket\r
- if (!handler.process(socket)) {\r
+ if ((event) && (handler.event(socket, error) == Handler.SocketState.CLOSED)) {\r
+ // Close socket and pool\r
+ Socket.destroy(socket);\r
+ socket = 0;\r
+ } else if (handler.process(socket) == Handler.SocketState.CLOSED) {\r
// Close socket and pool\r
Socket.destroy(socket);\r
socket = 0;\r
* thread local fields.\r
*/\r
public interface Handler {\r
- public boolean process(long socket);\r
+ public enum SocketState {\r
+ OPEN, CLOSED, LONG\r
+ }\r
+ public SocketState process(long socket);\r
+ public SocketState event(long socket, boolean error);\r
}\r
\r
\r
public void run() {\r
\r
// Process the request from this socket\r
- if (!handler.process(socket)) {\r
+ if (handler.process(socket) == Handler.SocketState.CLOSED) {\r
+ // Close socket and pool\r
+ Socket.destroy(socket);\r
+ socket = 0;\r
+ }\r
+\r
+ }\r
+ \r
+ }\r
+ \r
+ \r
+ // --------------------------------------- SocketEventProcessor Inner Class\r
+\r
+\r
+ /**\r
+ * This class is the equivalent of the Worker, but will simply use in an\r
+ * external Executor thread pool.\r
+ */\r
+ protected class SocketEventProcessor implements Runnable {\r
+ \r
+ protected long socket = 0;\r
+ protected boolean error = false; \r
+ \r
+ public SocketEventProcessor(long socket, boolean error) {\r
+ this.socket = socket;\r
+ this.error = error;\r
+ }\r
+\r
+ public void run() {\r
+\r
+ // Process the request from this socket\r
+ if (handler.event(socket, error) == Handler.SocketState.CLOSED) {\r
// Close socket and pool\r
Socket.destroy(socket);\r
socket = 0;\r