Async patches phase 1 - Async means the container thread can back out. This means...
authorfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Fri, 17 Jul 2009 21:32:00 +0000 (21:32 +0000)
committerfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Fri, 17 Jul 2009 21:32:00 +0000 (21:32 +0000)
git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@795231 13f79535-47bb-0310-9956-ffa450edef68

java/org/apache/catalina/connector/AsyncContextImpl.java
java/org/apache/catalina/connector/CoyoteAdapter.java
java/org/apache/catalina/connector/Request.java
java/org/apache/catalina/core/StandardWrapperValve.java
java/org/apache/coyote/ActionCode.java
java/org/apache/coyote/Adapter.java
java/org/apache/coyote/http11/Http11NioProcessor.java
java/org/apache/coyote/http11/Http11NioProtocol.java
java/org/apache/tomcat/util/net/NioEndpoint.java

index 3978804..5cd48ac 100644 (file)
@@ -22,10 +22,15 @@ import java.util.List;
 
 import javax.servlet.AsyncContext;
 import javax.servlet.AsyncListener;
+import javax.servlet.RequestDispatcher;
 import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
 import javax.servlet.ServletRequest;
 import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
 
+import org.apache.coyote.ActionCode;
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
 /**
@@ -41,7 +46,8 @@ public class AsyncContextImpl implements AsyncContext {
     private ServletResponse servletResponse = null;
     private List<AsyncListenerWrapper> listeners = new ArrayList<AsyncListenerWrapper>();
     private boolean hasOriginalRequestAndResponse = true;
-    private boolean completed = false;
+    private boolean completed = true;
+    private volatile Runnable dispatch = null;
     
     private Request request;
     
@@ -54,16 +60,7 @@ public class AsyncContextImpl implements AsyncContext {
     public void complete() {
         // TODO SERVLET3 - async
         
-        for (AsyncListenerWrapper wrapper : listeners) {
-            try {
-                wrapper.fireOnComplete();
-            }catch (IOException x) {
-                //how does this propagate, or should it?
-               //TODO SERVLET3 - async 
-                log.error("",x);
-            }
-        }
-        this.completed = true;
+        doInternalComplete(false);
 
     }
 
@@ -76,7 +73,27 @@ public class AsyncContextImpl implements AsyncContext {
     @Override
     public void dispatch(String path) {
         // TODO SERVLET3 - async
-
+        if (request.getAttribute(ASYNC_REQUEST_URI)==null) {
+            request.setAttribute(ASYNC_REQUEST_URI, request.getRequestURI());
+            request.setAttribute(ASYNC_CONTEXT_PATH, request.getContextPath());
+            request.setAttribute(ASYNC_SERVLET_PATH, request.getServletPath());
+            request.setAttribute(ASYNC_QUERY_STRING, request.getQueryString());
+        }
+        final RequestDispatcher requestDispatcher = request.getServletContext().getRequestDispatcher(path);
+        final HttpServletRequest servletRequest = (HttpServletRequest)getRequest();
+        final HttpServletResponse servletResponse = (HttpServletResponse)getResponse();
+        Runnable run = new Runnable() {
+            public void run() {
+                try {
+                    requestDispatcher.include(servletRequest, servletResponse);
+                }catch (Exception x) {
+                    //log.error("Async.dispatch",x);
+                    throw new RuntimeException(x);
+                }
+            }
+        };
+        this.dispatch = run;
+        request.coyoteRequest.action(ActionCode.ACTION_ASYNC_DISPATCH, run );
     }
 
     @Override
@@ -124,7 +141,7 @@ public class AsyncContextImpl implements AsyncContext {
         servletResponse = null;
         listeners.clear();
         hasOriginalRequestAndResponse = true;
-        completed = false;
+        completed = true;
     }
 
     public boolean isStarted() {
@@ -168,6 +185,40 @@ public class AsyncContextImpl implements AsyncContext {
         this.completed = completed;
     }
     
+    public void doInternalDispatch() throws ServletException, IOException {
+        if (this.dispatch!=null) {
+            try {
+                dispatch.run();
+            } catch (RuntimeException x) {
+                doInternalComplete(true);
+                if (x.getCause() instanceof ServletException) throw (ServletException)x.getCause();
+                if (x.getCause() instanceof IOException) throw (IOException)x.getCause();
+                else throw new ServletException(x);
+            } finally {
+                dispatch = null;
+            }
+        }
+    }
     
+    public void doInternalComplete(boolean error) {
+        if (isCompleted()) return;
+        for (AsyncListenerWrapper wrapper : listeners) {
+            try {
+                wrapper.fireOnComplete();
+            }catch (IOException x) {
+                //how does this propagate, or should it?
+               //TODO SERVLET3 - async 
+                log.error("",x);
+            }
+        }
+        try {
+            if (!error) getResponse().flushBuffer();
+            
+        }catch (Exception x) {
+            log.error("",x);
+        }
+        request.coyoteRequest.action(ActionCode.ACTION_ASYNC_COMPLETE,null);
+        recycle();
+    }
 
 }
index 9258e2f..11f3cef 100644 (file)
@@ -250,7 +250,75 @@ public class CoyoteAdapter
         }
     }
     
+    public boolean asyncDispatch(org.apache.coyote.Request req,org.apache.coyote.Response res) throws Exception {
+        Request request = (Request) req.getNote(ADAPTER_NOTES);
+        Response response = (Response) res.getNote(ADAPTER_NOTES);
+
+        if (request == null) {
+            throw new IllegalStateException("Dispatch may only happen on an existing request.");
+        }
+        boolean comet = false;
+        boolean async = false;
+        boolean success = true;
+        
+        try {
+            // Calling the container
+            try {
+                connector.getContainer().getPipeline().getFirst().invoke(request, response);
+            }catch (RuntimeException x) {
+                success = false;
+            }
+
+            if (request.isComet()) {
+                if (!response.isClosed() && !response.isError()) {
+                    if (request.getAvailable() || (request.getContentLength() > 0 && (!request.isParametersParsed()))) {
+                        // Invoke a read event right away if there are available bytes
+                        if (event(req, res, SocketStatus.OPEN)) {
+                            comet = true;
+                            res.action(ActionCode.ACTION_COMET_BEGIN, null);
+                        }
+                    } else {
+                        comet = true;
+                        res.action(ActionCode.ACTION_COMET_BEGIN, null);
+                    }
+                } else {
+                    // Clear the filter chain, as otherwise it will not be reset elsewhere
+                    // since this is a Comet request
+                    request.setFilterChain(null);
+                }
+            }
+
+            if (request.isAsyncStarted()) {
+                //TODO SERVLET3 - async
+                res.action(ActionCode.ACTION_ASYNC_START, request.getAsyncContext());
+                async = true;
+            } else if (!comet) {
+                response.finishResponse();
+                req.action(ActionCode.ACTION_POST_REQUEST , null);
+            }
 
+        } catch (IOException e) {
+            success = false;
+            // Ignore
+        } catch (Throwable t) {
+            success = false;
+            log.error(sm.getString("coyoteAdapter.service"), t);
+        } finally {
+            req.getRequestProcessor().setWorkerThreadName(null);
+            // Recycle the wrapper request and response
+            if (!comet && !async) {
+                request.recycle();
+                response.recycle();
+            } else {
+                // Clear converters so that the minimum amount of memory 
+                // is used by this processor
+                request.clearEncoders();
+                response.clearEncoders();
+            }
+        }
+        return success;
+    }
+    
     /**
      * Service method.
      */
@@ -288,6 +356,7 @@ public class CoyoteAdapter
         }
 
         boolean comet = false;
+        boolean async = request.isAsyncStarted();
         
         try {
 
@@ -322,6 +391,7 @@ public class CoyoteAdapter
             if (request.isAsyncStarted()) {
                 //TODO SERVLET3 - async
                 res.action(ActionCode.ACTION_ASYNC_START, request.getAsyncContext());
+                async = true;
             } else if (!comet) {
                 response.finishResponse();
                 req.action(ActionCode.ACTION_POST_REQUEST , null);
@@ -334,7 +404,7 @@ public class CoyoteAdapter
         } finally {
             req.getRequestProcessor().setWorkerThreadName(null);
             // Recycle the wrapper request and response
-            if (!comet) {
+            if (!comet && !async) {
                 request.recycle();
                 response.recycle();
             } else {
@@ -461,6 +531,11 @@ public class CoyoteAdapter
         } else {
             serverName = req.serverName();
         }
+        if (request.isAsyncStarted()) {
+            //TODO SERVLET3 - async
+            //reset mapping data, should prolly be done elsewhere
+            request.getMappingData().recycle();
+        }
         connector.getMapper().map(serverName, decodedURI, 
                                   request.getMappingData());
         request.setContext((Context) request.getMappingData().context);
index dda9e15..a9a1c76 100644 (file)
@@ -1467,6 +1467,7 @@ public class Request
         asyncContext.setServletRequest(getRequest());
         asyncContext.setServletResponse(response.getResponse());
         asyncContext.setStarted(true);
+        asyncContext.setCompleted(false);
         return asyncContext;
     }
 
@@ -1474,7 +1475,7 @@ public class Request
         startAsync();
         asyncContext.setServletRequest(request);
         asyncContext.setServletResponse(response);
-        asyncContext.setHasOriginalRequestAndResponse(request==getRequest() && response==getResponse());
+        asyncContext.setHasOriginalRequestAndResponse(request==getRequest() && response==getResponse().getResponse());
         return asyncContext;
     }
 
index 514ed84..77281ef 100644 (file)
@@ -33,6 +33,7 @@ import org.apache.catalina.CometEvent;
 import org.apache.catalina.CometProcessor;
 import org.apache.catalina.Context;
 import org.apache.catalina.Globals;
+import org.apache.catalina.connector.AsyncContextImpl;
 import org.apache.catalina.connector.ClientAbortException;
 import org.apache.catalina.connector.Request;
 import org.apache.catalina.connector.Response;
@@ -209,7 +210,9 @@ final class StandardWrapperValve
                 if (context.getSwallowOutput()) {
                     try {
                         SystemLogHandler.startCapture();
-                        if (comet) {
+                        if (request.isAsyncStarted()) {
+                           ((AsyncContextImpl)request.getAsyncContext()).doInternalDispatch(); 
+                        } else if (comet) {
                             filterChain.doFilterEvent(request.getEvent());
                             request.setComet(true);
                         } else {
@@ -223,7 +226,9 @@ final class StandardWrapperValve
                         }
                     }
                 } else {
-                    if (comet) {
+                    if (request.isAsyncStarted()) {
+                        ((AsyncContextImpl)request.getAsyncContext()).doInternalDispatch();
+                    } else if (comet) {
                         request.setComet(true);
                         filterChain.doFilterEvent(request.getEvent());
                     } else {
index 7219483..6428636 100644 (file)
@@ -175,6 +175,11 @@ public final class ActionCode {
      */
     public static final ActionCode ACTION_ASYNC_SETTIMEOUT = new ActionCode(28);
     
+    /**
+     * Callback for an async call to {@link javax.servlet.AsyncContext#dispatch()}
+     */
+    public static final ActionCode ACTION_ASYNC_DISPATCH = new ActionCode(29);
+    
     
     // ----------------------------------------------------------- Constructors
     int code;
index 420be02..05b24ba 100644 (file)
@@ -49,5 +49,7 @@ public interface Adapter {
 
     public boolean event(Request req, Response res, SocketStatus status)
     throws Exception;
+    
+    public boolean asyncDispatch(Request req,Response res) throws Exception;
 
 }
index 8305409..4534db6 100644 (file)
@@ -184,6 +184,18 @@ public class Http11NioProcessor implements ActionHook {
      * Closed by HttpServletResponse.getWriter().close()
      */
     protected boolean cometClose = false;
+    
+    /**
+     * Async used
+     */
+    protected boolean async = false;
+    /**
+     * Closed flag, a Comet async thread can 
+     * signal for this Nio processor to be closed and recycled instead
+     * of waiting for a timeout.
+     * Closed by HttpServletRequest.getAsyncContext().complete()
+     */
+    protected boolean asyncClose;
 
     /**
      * Content delimitator for the request (if false, the connection will
@@ -322,8 +334,7 @@ public class Http11NioProcessor implements ActionHook {
      * Allow a customized the server header for the tin-foil hat folks.
      */
     protected String server = null;
-
-
+    
     // ------------------------------------------------------------- Properties
 
 
@@ -770,6 +781,63 @@ public class Http11NioProcessor implements ActionHook {
             return SocketState.LONG;
         }
     }
+    
+    
+    /**
+     * Process pipelined HTTP requests using the specified input and output
+     * streams.
+     *
+     * @throws IOException error during an I/O operation
+     */
+    public SocketState asyncDispatch(SocketStatus status)
+        throws IOException {
+
+        long soTimeout = endpoint.getSoTimeout();
+        int keepAliveTimeout = endpoint.getKeepAliveTimeout();
+
+        RequestInfo rp = request.getRequestProcessor();
+        final NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
+        try {
+            rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
+            error = !adapter.asyncDispatch(request, response);
+            if ( !error ) {
+                if (attach != null) {
+                    attach.setComet(comet);
+                    if (comet) {
+                        Integer comettimeout = (Integer) request.getAttribute("org.apache.tomcat.comet.timeout");
+                        if (comettimeout != null) attach.setTimeout(comettimeout.longValue());
+                    } else {
+                        //reset the timeout
+                        if (keepAlive && keepAliveTimeout>0) {
+                            attach.setTimeout(keepAliveTimeout);
+                        } else {
+                            attach.setTimeout(soTimeout);
+                        }
+                    }
+
+                }
+            }
+        } catch (InterruptedIOException e) {
+            error = true;
+        } catch (Throwable t) {
+            log.error(sm.getString("http11processor.request.process"), t);
+            // 500 - Internal Server Error
+            response.setStatus(500);
+            error = true;
+        }
+
+        rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
+
+        if (error) {
+            recycle();
+            return SocketState.CLOSED;
+        } else if (!comet) {
+            recycle();
+            return (keepAlive)?SocketState.OPEN:SocketState.CLOSED;
+        } else {
+            return SocketState.LONG;
+        }
+    }
 
     /**
      * Process pipelined HTTP requests using the specified input and output
@@ -905,7 +973,7 @@ public class Http11NioProcessor implements ActionHook {
             }
 
             // Finish the handling of the request
-            if (!comet) {
+            if (!comet && !async) {
                 // If we know we are closing the connection, don't drain input.
                 // This way uploading a 100GB file doesn't tie up the thread 
                 // if the servlet has rejected it.
@@ -921,7 +989,7 @@ public class Http11NioProcessor implements ActionHook {
             }
             request.updateCounters();
 
-            if (!comet) {
+            if (!comet && !async) {
                 // Next request
                 inputBuffer.nextRequest();
                 outputBuffer.nextRequest();
@@ -943,7 +1011,7 @@ public class Http11NioProcessor implements ActionHook {
         }//while
 
         rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
-        if (comet) {
+        if (comet || async) {
             if (error) {
                 recycle();
                 return SocketState.CLOSED;
@@ -1064,6 +1132,8 @@ public class Http11NioProcessor implements ActionHook {
 
             comet = false;
             cometClose = true;
+            async = false;
+            asyncClose = false;
             SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
             if ( key != null ) {
                 NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
@@ -1240,10 +1310,30 @@ public class Http11NioProcessor implements ActionHook {
                 attach.setTimeout(timeout);
         } else if (actionCode == ActionCode.ACTION_ASYNC_START) {
             //TODO SERVLET3 - async
+            async = true;
         } else if (actionCode == ActionCode.ACTION_ASYNC_COMPLETE) {
           //TODO SERVLET3 - async
+            asyncClose = true;
+            RequestInfo rp = request.getRequestProcessor();
+            if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) //async handling
+                socket.getPoller().cometInterest(socket);
         } else if (actionCode == ActionCode.ACTION_ASYNC_SETTIMEOUT) {
           //TODO SERVLET3 - async
+            if (param==null) return;
+            if (socket==null || socket.getAttachment(false)==null) return;
+            NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
+            long timeout = ((Long)param).longValue();
+            //if we are not piggy backing on a worker thread, set the timeout
+            RequestInfo rp = request.getRequestProcessor();
+            if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) //async handling
+                attach.setTimeout(timeout);
+        } else if (actionCode == ActionCode.ACTION_ASYNC_DISPATCH) {
+            RequestInfo rp = request.getRequestProcessor();
+            if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) {//async handling
+                endpoint.processSocket(this.socket, SocketStatus.OPEN, true);
+            } else { 
+                throw new UnsupportedOperationException("Can't call dispatch on the worker thread.");
+            }
         }
     }
 
index 13f9b75..88741f6 100644 (file)
@@ -680,13 +680,18 @@ public class Http11NioProtocol implements ProtocolHandler, MBeanRegistration
 
         public SocketState event(NioChannel socket, SocketStatus status) {
             Http11NioProcessor result = connections.get(socket);
-
+            NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
+            att.setAsync(false); //no longer check for timeout
             SocketState state = SocketState.CLOSED; 
             if (result != null) {
                 if (log.isDebugEnabled()) log.debug("Http11NioProcessor.error="+result.error);
                 // Call the appropriate event
                 try {
-                    state = result.event(status);
+                    if (result.async) {
+                        state = result.asyncDispatch(status);
+                    } else {
+                        state = result.event(status);
+                    }
                 } catch (java.net.SocketException e) {
                     // SocketExceptions are normal
                     Http11NioProtocol.log.debug
@@ -717,7 +722,6 @@ public class Http11NioProtocol implements ProtocolHandler, MBeanRegistration
                     } else {
                         if (log.isDebugEnabled()) log.debug("Keeping processor["+result);
                         //add correct poller events here based on Comet stuff
-                        NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
                         socket.getPoller().add(socket,att.getCometOps());
                     }
                 }
@@ -756,9 +760,13 @@ public class Http11NioProtocol implements ProtocolHandler, MBeanRegistration
                     // processor.
                     //if (log.isDebugEnabled()) log.debug("Not recycling ["+processor+"] Comet="+((NioEndpoint.KeyAttachment)socket.getAttachment(false)).getComet());
                     connections.put(socket, processor);
+                    
                     if (processor.comet) {
                         NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
                         socket.getPoller().add(socket,att.getCometOps());
+                    } else if (processor.async) {
+                        NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
+                        att.setAsync(true);
                     } else {
                         //we should not hold on to the processor objects
                         release(socket);
index 7efa125..5d63197 100644 (file)
@@ -1133,7 +1133,7 @@ public class NioEndpoint {
         return false;
     }
 
-    protected boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
+    public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
         try {
             KeyAttachment attachment = (KeyAttachment)socket.getAttachment(false);
             attachment.setCometNotify(false); //will get reset upon next reg
@@ -1745,12 +1745,15 @@ public class NioEndpoint {
             cometOps = SelectionKey.OP_READ;
             sendfileData = null;
             keepAliveLeft = 100;
+            async = false;
         }
         
         public void reset() {
             reset(null,null,-1);
         }
         
+        public boolean isAsync() { return async; }
+        public void setAsync(boolean async) { this.async = async; }
         public Poller getPoller() { return poller;}
         public void setPoller(Poller poller){this.poller = poller;}
         public long getLastAccess() { return lastAccess; }
@@ -1821,6 +1824,7 @@ public class NioEndpoint {
         protected long lastRegistered = 0;
         protected SendfileData sendfileData = null;
         protected int keepAliveLeft = 100;
+        protected boolean async = false;
     }
 
     // ------------------------------------------------ Application Buffer Handler
@@ -1864,9 +1868,8 @@ public class NioEndpoint {
     }
 
 
-    // ---------------------------------------------- SocketProcessor Inner Class
-
 
+    // ---------------------------------------------- SocketProcessor Inner Class
     /**
      * This class is the equivalent of the Worker, but will simply use in an
      * external Executor thread pool.