From: fhanik Date: Fri, 17 Jul 2009 21:32:00 +0000 (+0000) Subject: Async patches phase 1 - Async means the container thread can back out. This means... X-Git-Url: https://git.internetallee.de/?a=commitdiff_plain;h=eb3b30b830ebf8ecc1744082740c051e77d4f414;p=tomcat7.0 Async patches phase 1 - Async means the container thread can back out. This means that valves need to be async aware. For example, access log valve, can no longer log upon exit of the method since the thread can back out based on async behavior. git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@795231 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/java/org/apache/catalina/connector/AsyncContextImpl.java b/java/org/apache/catalina/connector/AsyncContextImpl.java index 39788044c..5cd48acc9 100644 --- a/java/org/apache/catalina/connector/AsyncContextImpl.java +++ b/java/org/apache/catalina/connector/AsyncContextImpl.java @@ -22,10 +22,15 @@ import java.util.List; import javax.servlet.AsyncContext; import javax.servlet.AsyncListener; +import javax.servlet.RequestDispatcher; import javax.servlet.ServletContext; +import javax.servlet.ServletException; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.apache.coyote.ActionCode; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; /** @@ -41,7 +46,8 @@ public class AsyncContextImpl implements AsyncContext { private ServletResponse servletResponse = null; private List listeners = new ArrayList(); private boolean hasOriginalRequestAndResponse = true; - private boolean completed = false; + private boolean completed = true; + private volatile Runnable dispatch = null; private Request request; @@ -54,16 +60,7 @@ public class AsyncContextImpl implements AsyncContext { public void complete() { // TODO SERVLET3 - async - for (AsyncListenerWrapper wrapper : listeners) { - try { - wrapper.fireOnComplete(); - }catch (IOException x) { - //how does this propagate, or should it? - //TODO SERVLET3 - async - log.error("",x); - } - } - this.completed = true; + doInternalComplete(false); } @@ -76,7 +73,27 @@ public class AsyncContextImpl implements AsyncContext { @Override public void dispatch(String path) { // TODO SERVLET3 - async - + if (request.getAttribute(ASYNC_REQUEST_URI)==null) { + request.setAttribute(ASYNC_REQUEST_URI, request.getRequestURI()); + request.setAttribute(ASYNC_CONTEXT_PATH, request.getContextPath()); + request.setAttribute(ASYNC_SERVLET_PATH, request.getServletPath()); + request.setAttribute(ASYNC_QUERY_STRING, request.getQueryString()); + } + final RequestDispatcher requestDispatcher = request.getServletContext().getRequestDispatcher(path); + final HttpServletRequest servletRequest = (HttpServletRequest)getRequest(); + final HttpServletResponse servletResponse = (HttpServletResponse)getResponse(); + Runnable run = new Runnable() { + public void run() { + try { + requestDispatcher.include(servletRequest, servletResponse); + }catch (Exception x) { + //log.error("Async.dispatch",x); + throw new RuntimeException(x); + } + } + }; + this.dispatch = run; + request.coyoteRequest.action(ActionCode.ACTION_ASYNC_DISPATCH, run ); } @Override @@ -124,7 +141,7 @@ public class AsyncContextImpl implements AsyncContext { servletResponse = null; listeners.clear(); hasOriginalRequestAndResponse = true; - completed = false; + completed = true; } public boolean isStarted() { @@ -168,6 +185,40 @@ public class AsyncContextImpl implements AsyncContext { this.completed = completed; } + public void doInternalDispatch() throws ServletException, IOException { + if (this.dispatch!=null) { + try { + dispatch.run(); + } catch (RuntimeException x) { + doInternalComplete(true); + if (x.getCause() instanceof ServletException) throw (ServletException)x.getCause(); + if (x.getCause() instanceof IOException) throw (IOException)x.getCause(); + else throw new ServletException(x); + } finally { + dispatch = null; + } + } + } + public void doInternalComplete(boolean error) { + if (isCompleted()) return; + for (AsyncListenerWrapper wrapper : listeners) { + try { + wrapper.fireOnComplete(); + }catch (IOException x) { + //how does this propagate, or should it? + //TODO SERVLET3 - async + log.error("",x); + } + } + try { + if (!error) getResponse().flushBuffer(); + + }catch (Exception x) { + log.error("",x); + } + request.coyoteRequest.action(ActionCode.ACTION_ASYNC_COMPLETE,null); + recycle(); + } } diff --git a/java/org/apache/catalina/connector/CoyoteAdapter.java b/java/org/apache/catalina/connector/CoyoteAdapter.java index 9258e2fb9..11f3ceffb 100644 --- a/java/org/apache/catalina/connector/CoyoteAdapter.java +++ b/java/org/apache/catalina/connector/CoyoteAdapter.java @@ -250,7 +250,75 @@ public class CoyoteAdapter } } + public boolean asyncDispatch(org.apache.coyote.Request req,org.apache.coyote.Response res) throws Exception { + Request request = (Request) req.getNote(ADAPTER_NOTES); + Response response = (Response) res.getNote(ADAPTER_NOTES); + + if (request == null) { + throw new IllegalStateException("Dispatch may only happen on an existing request."); + } + boolean comet = false; + boolean async = false; + boolean success = true; + + try { + // Calling the container + try { + connector.getContainer().getPipeline().getFirst().invoke(request, response); + }catch (RuntimeException x) { + success = false; + } + + if (request.isComet()) { + if (!response.isClosed() && !response.isError()) { + if (request.getAvailable() || (request.getContentLength() > 0 && (!request.isParametersParsed()))) { + // Invoke a read event right away if there are available bytes + if (event(req, res, SocketStatus.OPEN)) { + comet = true; + res.action(ActionCode.ACTION_COMET_BEGIN, null); + } + } else { + comet = true; + res.action(ActionCode.ACTION_COMET_BEGIN, null); + } + } else { + // Clear the filter chain, as otherwise it will not be reset elsewhere + // since this is a Comet request + request.setFilterChain(null); + } + } + + if (request.isAsyncStarted()) { + //TODO SERVLET3 - async + res.action(ActionCode.ACTION_ASYNC_START, request.getAsyncContext()); + async = true; + } else if (!comet) { + response.finishResponse(); + req.action(ActionCode.ACTION_POST_REQUEST , null); + } + } catch (IOException e) { + success = false; + // Ignore + } catch (Throwable t) { + success = false; + log.error(sm.getString("coyoteAdapter.service"), t); + } finally { + req.getRequestProcessor().setWorkerThreadName(null); + // Recycle the wrapper request and response + if (!comet && !async) { + request.recycle(); + response.recycle(); + } else { + // Clear converters so that the minimum amount of memory + // is used by this processor + request.clearEncoders(); + response.clearEncoders(); + } + } + return success; + } + /** * Service method. */ @@ -288,6 +356,7 @@ public class CoyoteAdapter } boolean comet = false; + boolean async = request.isAsyncStarted(); try { @@ -322,6 +391,7 @@ public class CoyoteAdapter if (request.isAsyncStarted()) { //TODO SERVLET3 - async res.action(ActionCode.ACTION_ASYNC_START, request.getAsyncContext()); + async = true; } else if (!comet) { response.finishResponse(); req.action(ActionCode.ACTION_POST_REQUEST , null); @@ -334,7 +404,7 @@ public class CoyoteAdapter } finally { req.getRequestProcessor().setWorkerThreadName(null); // Recycle the wrapper request and response - if (!comet) { + if (!comet && !async) { request.recycle(); response.recycle(); } else { @@ -461,6 +531,11 @@ public class CoyoteAdapter } else { serverName = req.serverName(); } + if (request.isAsyncStarted()) { + //TODO SERVLET3 - async + //reset mapping data, should prolly be done elsewhere + request.getMappingData().recycle(); + } connector.getMapper().map(serverName, decodedURI, request.getMappingData()); request.setContext((Context) request.getMappingData().context); diff --git a/java/org/apache/catalina/connector/Request.java b/java/org/apache/catalina/connector/Request.java index dda9e1546..a9a1c7679 100644 --- a/java/org/apache/catalina/connector/Request.java +++ b/java/org/apache/catalina/connector/Request.java @@ -1467,6 +1467,7 @@ public class Request asyncContext.setServletRequest(getRequest()); asyncContext.setServletResponse(response.getResponse()); asyncContext.setStarted(true); + asyncContext.setCompleted(false); return asyncContext; } @@ -1474,7 +1475,7 @@ public class Request startAsync(); asyncContext.setServletRequest(request); asyncContext.setServletResponse(response); - asyncContext.setHasOriginalRequestAndResponse(request==getRequest() && response==getResponse()); + asyncContext.setHasOriginalRequestAndResponse(request==getRequest() && response==getResponse().getResponse()); return asyncContext; } diff --git a/java/org/apache/catalina/core/StandardWrapperValve.java b/java/org/apache/catalina/core/StandardWrapperValve.java index 514ed848e..77281ef07 100644 --- a/java/org/apache/catalina/core/StandardWrapperValve.java +++ b/java/org/apache/catalina/core/StandardWrapperValve.java @@ -33,6 +33,7 @@ import org.apache.catalina.CometEvent; import org.apache.catalina.CometProcessor; import org.apache.catalina.Context; import org.apache.catalina.Globals; +import org.apache.catalina.connector.AsyncContextImpl; import org.apache.catalina.connector.ClientAbortException; import org.apache.catalina.connector.Request; import org.apache.catalina.connector.Response; @@ -209,7 +210,9 @@ final class StandardWrapperValve if (context.getSwallowOutput()) { try { SystemLogHandler.startCapture(); - if (comet) { + if (request.isAsyncStarted()) { + ((AsyncContextImpl)request.getAsyncContext()).doInternalDispatch(); + } else if (comet) { filterChain.doFilterEvent(request.getEvent()); request.setComet(true); } else { @@ -223,7 +226,9 @@ final class StandardWrapperValve } } } else { - if (comet) { + if (request.isAsyncStarted()) { + ((AsyncContextImpl)request.getAsyncContext()).doInternalDispatch(); + } else if (comet) { request.setComet(true); filterChain.doFilterEvent(request.getEvent()); } else { diff --git a/java/org/apache/coyote/ActionCode.java b/java/org/apache/coyote/ActionCode.java index 7219483e6..642863646 100644 --- a/java/org/apache/coyote/ActionCode.java +++ b/java/org/apache/coyote/ActionCode.java @@ -175,6 +175,11 @@ public final class ActionCode { */ public static final ActionCode ACTION_ASYNC_SETTIMEOUT = new ActionCode(28); + /** + * Callback for an async call to {@link javax.servlet.AsyncContext#dispatch()} + */ + public static final ActionCode ACTION_ASYNC_DISPATCH = new ActionCode(29); + // ----------------------------------------------------------- Constructors int code; diff --git a/java/org/apache/coyote/Adapter.java b/java/org/apache/coyote/Adapter.java index 420be0283..05b24baf8 100644 --- a/java/org/apache/coyote/Adapter.java +++ b/java/org/apache/coyote/Adapter.java @@ -49,5 +49,7 @@ public interface Adapter { public boolean event(Request req, Response res, SocketStatus status) throws Exception; + + public boolean asyncDispatch(Request req,Response res) throws Exception; } diff --git a/java/org/apache/coyote/http11/Http11NioProcessor.java b/java/org/apache/coyote/http11/Http11NioProcessor.java index 830540976..4534db674 100644 --- a/java/org/apache/coyote/http11/Http11NioProcessor.java +++ b/java/org/apache/coyote/http11/Http11NioProcessor.java @@ -184,6 +184,18 @@ public class Http11NioProcessor implements ActionHook { * Closed by HttpServletResponse.getWriter().close() */ protected boolean cometClose = false; + + /** + * Async used + */ + protected boolean async = false; + /** + * Closed flag, a Comet async thread can + * signal for this Nio processor to be closed and recycled instead + * of waiting for a timeout. + * Closed by HttpServletRequest.getAsyncContext().complete() + */ + protected boolean asyncClose; /** * Content delimitator for the request (if false, the connection will @@ -322,8 +334,7 @@ public class Http11NioProcessor implements ActionHook { * Allow a customized the server header for the tin-foil hat folks. */ protected String server = null; - - + // ------------------------------------------------------------- Properties @@ -770,6 +781,63 @@ public class Http11NioProcessor implements ActionHook { return SocketState.LONG; } } + + + /** + * Process pipelined HTTP requests using the specified input and output + * streams. + * + * @throws IOException error during an I/O operation + */ + public SocketState asyncDispatch(SocketStatus status) + throws IOException { + + long soTimeout = endpoint.getSoTimeout(); + int keepAliveTimeout = endpoint.getKeepAliveTimeout(); + + RequestInfo rp = request.getRequestProcessor(); + final NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getAttachment(false); + try { + rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); + error = !adapter.asyncDispatch(request, response); + if ( !error ) { + if (attach != null) { + attach.setComet(comet); + if (comet) { + Integer comettimeout = (Integer) request.getAttribute("org.apache.tomcat.comet.timeout"); + if (comettimeout != null) attach.setTimeout(comettimeout.longValue()); + } else { + //reset the timeout + if (keepAlive && keepAliveTimeout>0) { + attach.setTimeout(keepAliveTimeout); + } else { + attach.setTimeout(soTimeout); + } + } + + } + } + } catch (InterruptedIOException e) { + error = true; + } catch (Throwable t) { + log.error(sm.getString("http11processor.request.process"), t); + // 500 - Internal Server Error + response.setStatus(500); + error = true; + } + + rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); + + if (error) { + recycle(); + return SocketState.CLOSED; + } else if (!comet) { + recycle(); + return (keepAlive)?SocketState.OPEN:SocketState.CLOSED; + } else { + return SocketState.LONG; + } + } /** * Process pipelined HTTP requests using the specified input and output @@ -905,7 +973,7 @@ public class Http11NioProcessor implements ActionHook { } // Finish the handling of the request - if (!comet) { + if (!comet && !async) { // If we know we are closing the connection, don't drain input. // This way uploading a 100GB file doesn't tie up the thread // if the servlet has rejected it. @@ -921,7 +989,7 @@ public class Http11NioProcessor implements ActionHook { } request.updateCounters(); - if (!comet) { + if (!comet && !async) { // Next request inputBuffer.nextRequest(); outputBuffer.nextRequest(); @@ -943,7 +1011,7 @@ public class Http11NioProcessor implements ActionHook { }//while rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); - if (comet) { + if (comet || async) { if (error) { recycle(); return SocketState.CLOSED; @@ -1064,6 +1132,8 @@ public class Http11NioProcessor implements ActionHook { comet = false; cometClose = true; + async = false; + asyncClose = false; SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); if ( key != null ) { NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment(); @@ -1240,10 +1310,30 @@ public class Http11NioProcessor implements ActionHook { attach.setTimeout(timeout); } else if (actionCode == ActionCode.ACTION_ASYNC_START) { //TODO SERVLET3 - async + async = true; } else if (actionCode == ActionCode.ACTION_ASYNC_COMPLETE) { //TODO SERVLET3 - async + asyncClose = true; + RequestInfo rp = request.getRequestProcessor(); + if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) //async handling + socket.getPoller().cometInterest(socket); } else if (actionCode == ActionCode.ACTION_ASYNC_SETTIMEOUT) { //TODO SERVLET3 - async + if (param==null) return; + if (socket==null || socket.getAttachment(false)==null) return; + NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getAttachment(false); + long timeout = ((Long)param).longValue(); + //if we are not piggy backing on a worker thread, set the timeout + RequestInfo rp = request.getRequestProcessor(); + if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) //async handling + attach.setTimeout(timeout); + } else if (actionCode == ActionCode.ACTION_ASYNC_DISPATCH) { + RequestInfo rp = request.getRequestProcessor(); + if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) {//async handling + endpoint.processSocket(this.socket, SocketStatus.OPEN, true); + } else { + throw new UnsupportedOperationException("Can't call dispatch on the worker thread."); + } } } diff --git a/java/org/apache/coyote/http11/Http11NioProtocol.java b/java/org/apache/coyote/http11/Http11NioProtocol.java index 13f9b75b1..88741f6ec 100644 --- a/java/org/apache/coyote/http11/Http11NioProtocol.java +++ b/java/org/apache/coyote/http11/Http11NioProtocol.java @@ -680,13 +680,18 @@ public class Http11NioProtocol implements ProtocolHandler, MBeanRegistration public SocketState event(NioChannel socket, SocketStatus status) { Http11NioProcessor result = connections.get(socket); - + NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false); + att.setAsync(false); //no longer check for timeout SocketState state = SocketState.CLOSED; if (result != null) { if (log.isDebugEnabled()) log.debug("Http11NioProcessor.error="+result.error); // Call the appropriate event try { - state = result.event(status); + if (result.async) { + state = result.asyncDispatch(status); + } else { + state = result.event(status); + } } catch (java.net.SocketException e) { // SocketExceptions are normal Http11NioProtocol.log.debug @@ -717,7 +722,6 @@ public class Http11NioProtocol implements ProtocolHandler, MBeanRegistration } else { if (log.isDebugEnabled()) log.debug("Keeping processor["+result); //add correct poller events here based on Comet stuff - NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false); socket.getPoller().add(socket,att.getCometOps()); } } @@ -756,9 +760,13 @@ public class Http11NioProtocol implements ProtocolHandler, MBeanRegistration // processor. //if (log.isDebugEnabled()) log.debug("Not recycling ["+processor+"] Comet="+((NioEndpoint.KeyAttachment)socket.getAttachment(false)).getComet()); connections.put(socket, processor); + if (processor.comet) { NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false); socket.getPoller().add(socket,att.getCometOps()); + } else if (processor.async) { + NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false); + att.setAsync(true); } else { //we should not hold on to the processor objects release(socket); diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java index 7efa1256f..5d63197a0 100644 --- a/java/org/apache/tomcat/util/net/NioEndpoint.java +++ b/java/org/apache/tomcat/util/net/NioEndpoint.java @@ -1133,7 +1133,7 @@ public class NioEndpoint { return false; } - protected boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) { + public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) { try { KeyAttachment attachment = (KeyAttachment)socket.getAttachment(false); attachment.setCometNotify(false); //will get reset upon next reg @@ -1745,12 +1745,15 @@ public class NioEndpoint { cometOps = SelectionKey.OP_READ; sendfileData = null; keepAliveLeft = 100; + async = false; } public void reset() { reset(null,null,-1); } + public boolean isAsync() { return async; } + public void setAsync(boolean async) { this.async = async; } public Poller getPoller() { return poller;} public void setPoller(Poller poller){this.poller = poller;} public long getLastAccess() { return lastAccess; } @@ -1821,6 +1824,7 @@ public class NioEndpoint { protected long lastRegistered = 0; protected SendfileData sendfileData = null; protected int keepAliveLeft = 100; + protected boolean async = false; } // ------------------------------------------------ Application Buffer Handler @@ -1864,9 +1868,8 @@ public class NioEndpoint { } - // ---------------------------------------------- SocketProcessor Inner Class - + // ---------------------------------------------- SocketProcessor Inner Class /** * This class is the equivalent of the Worker, but will simply use in an * external Executor thread pool.