Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=49884
authormarkt <markt@13f79535-47bb-0310-9956-ffa450edef68>
Mon, 27 Sep 2010 12:13:32 +0000 (12:13 +0000)
committermarkt <markt@13f79535-47bb-0310-9956-ffa450edef68>
Mon, 27 Sep 2010 12:13:32 +0000 (12:13 +0000)
This required a major re-factoring of the async implementation. In summary:
- Moved state management to the Coyote Processor
- Added a SocketWrapper to the APR socket
- Added syncs to ensure only one async state change at a time
- Added syncs to ensure only one thread changing a socket's state at a time

A number of new bugs were also uncovered and fixed by this re-factoring:
- delay processing complete() and dispatch() until request where startAsync() is called finished processing
- onAsyncStart listener event

Currently the test case for bug 49884 passes with the security manager enabled using "ab -n 5000 -c 150 -k ..." (it broke with "ab -n 50 -c 10 ..." previously)

The unit tests pass for all three HTTP connectors.

The AJP connectors have only been modified to ensure the code compiles.

The following work remains:
- Testing all connectors (HTTP and AJP) with TCK + security manager and fixing whatever is broken
- Further clean-up
- There is further scope for reducing code duplication between the connectors / aligning the code so it is easier to maintain.

git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1001698 13f79535-47bb-0310-9956-ffa450edef68

24 files changed:
java/org/apache/catalina/connector/CoyoteAdapter.java
java/org/apache/catalina/connector/Request.java
java/org/apache/catalina/core/AsyncContextImpl.java
java/org/apache/catalina/security/SecurityClassLoad.java
java/org/apache/coyote/ActionCode.java
java/org/apache/coyote/ajp/AjpAprProcessor.java
java/org/apache/coyote/ajp/AjpAprProtocol.java
java/org/apache/coyote/http11/AbstractHttp11Processor.java
java/org/apache/coyote/http11/Constants.java
java/org/apache/coyote/http11/Http11AprProcessor.java
java/org/apache/coyote/http11/Http11AprProtocol.java
java/org/apache/coyote/http11/Http11NioProcessor.java
java/org/apache/coyote/http11/Http11NioProtocol.java
java/org/apache/coyote/http11/Http11Processor.java
java/org/apache/coyote/http11/Http11Protocol.java
java/org/apache/coyote/http11/LocalStrings.properties
java/org/apache/tomcat/util/net/AbstractEndpoint.java
java/org/apache/tomcat/util/net/AprEndpoint.java
java/org/apache/tomcat/util/net/JIoEndpoint.java
java/org/apache/tomcat/util/net/NioEndpoint.java
java/org/apache/tomcat/util/net/SocketWrapper.java
java/org/apache/tomcat/util/net/res/LocalStrings.properties
test/org/apache/catalina/core/TestAsyncContextImpl.java
webapps/docs/changelog.xml

index 1540920..2fddb77 100644 (file)
@@ -260,55 +260,25 @@ public class CoyoteAdapter implements Adapter {
                     "Dispatch may only happen on an existing request.");
         }
         boolean comet = false;
-        boolean async = false;
         boolean success = true;
-        
+        AsyncContextImpl asyncConImpl = (AsyncContextImpl)request.getAsyncContext();
         try {
             if (status==SocketStatus.TIMEOUT) {
-                AsyncContextImpl asyncConImpl = (AsyncContextImpl)request.getAsyncContext();
-                //TODO SERVLET3 - async
-                //configure settings for timed out
-                asyncConImpl.setTimeoutState();
-            }
-            if (status==SocketStatus.ERROR || status==SocketStatus.DISCONNECT) {
-                AsyncContextImpl asyncConImpl = (AsyncContextImpl)request.getAsyncContext();
-                //TODO SERVLET3 - async
-                //configure settings for timed out
-                asyncConImpl.setErrorState(new IOException("Socket error."));
+                success = true;
+                if (!asyncConImpl.timeout()) {
+                    asyncConImpl.setErrorState(null);
+                }
             }
-            while (success) {
-                AsyncContextImpl impl = (AsyncContextImpl)request.getAsyncContext();
-                    // Calling the container
-                if (impl.getState()==AsyncContextImpl.AsyncState.DISPATCHED) {
-                    // Calling the container
-                    try {
-                        impl.complete();
-                        connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
-                    } finally {
-                        success = false;
-                    }
-                } else if (impl.getState()==AsyncContextImpl.AsyncState.STARTED){
-                    //TODO SERVLET3 - async
-                    res.action(ActionCode.ASYNC_START, request.getAsyncContext());
-                    async = true;
-                    break;
-                } else if (impl.getState()==AsyncContextImpl.AsyncState.NOT_STARTED){
-                    //TODO SERVLET3 - async
-                    async = false;
-                    break;
-                } else if (impl.getState()==AsyncContextImpl.AsyncState.ERROR_DISPATCHING) {
-                    async = false;
-                    success = false;
-                    connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
-                } else {
-                    try {
-                        connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
-                    } catch (RuntimeException x) {
-                        impl.setErrorState(x);
-                    }
+            if (request.isAsyncDispatching()) {
+                success = true;
+                connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
+                Throwable t = (Throwable) request.getAttribute(
+                        Globals.EXCEPTION_ATTR);
+                if (t != null) {
+                    asyncConImpl.setErrorState(t);
                 }
             }
-            
+
             if (request.isComet()) {
                 if (!response.isClosed() && !response.isError()) {
                     if (request.getAvailable() || (request.getContentLength() > 0 && (!request.isParametersParsed()))) {
@@ -327,7 +297,7 @@ public class CoyoteAdapter implements Adapter {
                     request.setFilterChain(null);
                 }
             }
-            if (!async && !comet) {
+            if (!request.isAsync() && !comet) {
                 response.finishResponse();
                 req.action(ActionCode.POST_REQUEST , null);
             }
@@ -341,7 +311,7 @@ public class CoyoteAdapter implements Adapter {
         } finally {
             req.getRequestProcessor().setWorkerThreadName(null);
             // Recycle the wrapper request and response
-            if (!success || (!comet && !async)) {
+            if (!success || (!comet && !request.isAsync())) {
                 request.recycle();
                 response.recycle();
             } else {
@@ -426,15 +396,8 @@ public class CoyoteAdapter implements Adapter {
 
             }
             AsyncContextImpl asyncConImpl = (AsyncContextImpl)request.getAsyncContext();
-            if (asyncConImpl!=null && asyncConImpl.getState()==AsyncContextImpl.AsyncState.STARTED) {
-                res.action(ActionCode.ASYNC_START, request.getAsyncContext());
+            if (asyncConImpl != null) {
                 async = true;
-            } else if (request.isAsyncDispatching()) {
-                asyncDispatch(req, res, SocketStatus.OPEN);
-                if (request.isAsyncStarted()) {
-                    async = true;
-                    res.action(ActionCode.ASYNC_START, request.getAsyncContext());
-                }
             } else if (!comet) {
                 response.finishResponse();
                 req.action(ActionCode.POST_REQUEST , null);
index 175f574..0b65980 100644 (file)
@@ -37,6 +37,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.TimeZone;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.security.auth.Subject;
 import javax.servlet.AsyncContext;
@@ -1613,12 +1614,20 @@ public class Request
         if (asyncContext == null) {
             return false;
         }
-        
-        return (asyncContext.getState()==AsyncContextImpl.AsyncState.DISPATCHING ||
-                asyncContext.getState()==AsyncContextImpl.AsyncState.TIMING_OUT  ||
-                asyncContext.getState()==AsyncContextImpl.AsyncState.STARTED     ||
-                asyncContext.getState()==AsyncContextImpl.AsyncState.ERROR_DISPATCHING ||
-                asyncContext.getState()==AsyncContextImpl.AsyncState.COMPLETING);
+
+        AtomicBoolean result = new AtomicBoolean(false);
+        coyoteRequest.action(ActionCode.ASYNC_IS_DISPATCHING, result);
+        return result.get();
+    }
+
+    public boolean isAsync() {
+        if (asyncContext == null) {
+            return false;
+        }
+
+        AtomicBoolean result = new AtomicBoolean(false);
+        coyoteRequest.action(ActionCode.ASYNC_IS_ASYNC, result);
+        return result.get();
     }
 
     @Override
index bbb2f88..a5fad2f 100644 (file)
@@ -22,7 +22,6 @@ import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 import javax.servlet.AsyncContext;
 import javax.servlet.AsyncEvent;
@@ -50,11 +49,6 @@ import org.apache.juli.logging.LogFactory;
  */
 public class AsyncContextImpl implements AsyncContext {
     
-    public static enum AsyncState {
-        NOT_STARTED, STARTED, DISPATCHING, DISPATCHED, COMPLETING, TIMING_OUT,
-        TIMING_OUT_NEED_COMPLETE, ERROR_DISPATCHING
-    }
-    
     private static final Log log = LogFactory.getLog(AsyncContextImpl.class);
     
     private ServletRequest servletRequest = null;
@@ -63,7 +57,6 @@ public class AsyncContextImpl implements AsyncContext {
     private boolean hasOriginalRequestAndResponse = true;
     private volatile Runnable dispatch = null;
     private Context context = null;
-    private AtomicReference<AsyncState> state = new AtomicReference<AsyncState>(AsyncState.NOT_STARTED);
     private long timeout = -1;
     private AsyncEvent event = null;
     
@@ -81,23 +74,46 @@ public class AsyncContextImpl implements AsyncContext {
         if (log.isDebugEnabled()) {
             logDebug("complete   ");
         }
-        if (state.get()==AsyncState.COMPLETING) {
-            //do nothing
-        } else if (state.compareAndSet(AsyncState.DISPATCHED,
-                           AsyncState.COMPLETING) ||
-                   state.compareAndSet(AsyncState.STARTED,
-                           AsyncState.COMPLETING) ||
-                   state.compareAndSet(AsyncState.TIMING_OUT_NEED_COMPLETE,
-                           AsyncState.COMPLETING)) {
-            AtomicBoolean dispatched = new AtomicBoolean(false);
-            request.getCoyoteRequest().action(ActionCode.ASYNC_COMPLETE,
-                    dispatched);
-            if (!dispatched.get()) doInternalComplete(false);
-        } else {
-            throw new IllegalStateException(
-                    "Complete not allowed. Invalid state:"+state.get());
+        request.getCoyoteRequest().action(ActionCode.ASYNC_COMPLETE, null);
+    }
+
+    public void fireOnComplete() {
+        List<AsyncListenerWrapper> listenersCopy =
+            new ArrayList<AsyncListenerWrapper>();
+        listenersCopy.addAll(listeners);
+        for (AsyncListenerWrapper listener : listenersCopy) {
+            try {
+                listener.fireOnComplete(event);
+            } catch (IOException ioe) {
+                log.warn("onComplete() failed for listener of type [" +
+                        listener.getClass().getName() + "]", ioe);
+            }
+        }
+    }
+    
+    public boolean timeout() throws IOException {
+        AtomicBoolean result = new AtomicBoolean();
+        request.getCoyoteRequest().action(ActionCode.ASYNC_TIMEOUT, result);
+        
+        if (result.get()) {
+            boolean listenerInvoked = false;
+            List<AsyncListenerWrapper> listenersCopy =
+                new ArrayList<AsyncListenerWrapper>();
+            listenersCopy.addAll(listeners);
+            for (AsyncListenerWrapper listener : listenersCopy) {
+                listener.fireOnTimeout(event);
+                listenerInvoked = true;
+            }
+            if (listenerInvoked) {
+                request.getCoyoteRequest().action(
+                        ActionCode.ASYNC_IS_TIMINGOUT, result);
+                return !result.get();
+            } else {
+                // No listeners, container calls complete
+                complete();
+            }
         }
-       
+        return true;
     }
 
     @Override
@@ -119,55 +135,37 @@ public class AsyncContextImpl implements AsyncContext {
         if (log.isDebugEnabled()) {
             logDebug("dispatch   ");
         }
-
-        if (state.compareAndSet(AsyncState.STARTED, AsyncState.DISPATCHING) ||
-            state.compareAndSet(AsyncState.DISPATCHED, AsyncState.DISPATCHING)) {
-
-            if (request.getAttribute(ASYNC_REQUEST_URI)==null) {
-                request.setAttribute(ASYNC_REQUEST_URI, request.getRequestURI()+"?"+request.getQueryString());
-                request.setAttribute(ASYNC_CONTEXT_PATH, request.getContextPath());
-                request.setAttribute(ASYNC_SERVLET_PATH, request.getServletPath());
-                request.setAttribute(ASYNC_QUERY_STRING, request.getQueryString());
-            }
-            final RequestDispatcher requestDispatcher = context.getRequestDispatcher(path);
-            final HttpServletRequest servletRequest = (HttpServletRequest)getRequest();
-            final HttpServletResponse servletResponse = (HttpServletResponse)getResponse();
-            Runnable run = new Runnable() {
-                @Override
-                public void run() {
-                    DispatcherType type = (DispatcherType)request.getAttribute(Globals.DISPATCHER_TYPE_ATTR);
-                    try {
-                        //piggy back on the request dispatcher to ensure that filters etc get called.
-                        //TODO SERVLET3 - async should this be include/forward or a new dispatch type
-                        //javadoc suggests include with the type of DispatcherType.ASYNC
-                        request.setAttribute(Globals.DISPATCHER_TYPE_ATTR, DispatcherType.ASYNC);
-                        requestDispatcher.include(servletRequest, servletResponse);
-                    }catch (Exception x) {
-                        //log.error("Async.dispatch",x);
-                        throw new RuntimeException(x);
-                    }finally {
-                        request.setAttribute(Globals.DISPATCHER_TYPE_ATTR, type);
-                    }
-                }
-            };
-            this.dispatch = run;
-            AtomicBoolean dispatched = new AtomicBoolean(false);
-            request.getCoyoteRequest().action(ActionCode.ASYNC_DISPATCH, dispatched );
-            if (!dispatched.get()) {
+        if (request.getAttribute(ASYNC_REQUEST_URI)==null) {
+            request.setAttribute(ASYNC_REQUEST_URI, request.getRequestURI()+"?"+request.getQueryString());
+            request.setAttribute(ASYNC_CONTEXT_PATH, request.getContextPath());
+            request.setAttribute(ASYNC_SERVLET_PATH, request.getServletPath());
+            request.setAttribute(ASYNC_QUERY_STRING, request.getQueryString());
+        }
+        final RequestDispatcher requestDispatcher = context.getRequestDispatcher(path);
+        final HttpServletRequest servletRequest = (HttpServletRequest)getRequest();
+        final HttpServletResponse servletResponse = (HttpServletResponse)getResponse();
+        Runnable run = new Runnable() {
+            @Override
+            public void run() {
+                request.getCoyoteRequest().action(ActionCode.ASYNC_DISPATCHED, null);
+                DispatcherType type = (DispatcherType)request.getAttribute(Globals.DISPATCHER_TYPE_ATTR);
                 try {
-                    doInternalDispatch();
-                }catch (ServletException sx) {
-                    throw new RuntimeException(sx);
-                }catch (IOException ix) {
-                    throw new RuntimeException(ix);
+                    //piggy back on the request dispatcher to ensure that filters etc get called.
+                    //TODO SERVLET3 - async should this be include/forward or a new dispatch type
+                    //javadoc suggests include with the type of DispatcherType.ASYNC
+                    request.setAttribute(Globals.DISPATCHER_TYPE_ATTR, DispatcherType.ASYNC);
+                    requestDispatcher.include(servletRequest, servletResponse);
+                }catch (Exception x) {
+                    //log.error("Async.dispatch",x);
+                    throw new RuntimeException(x);
+                }finally {
+                    request.setAttribute(Globals.DISPATCHER_TYPE_ATTR, type);
                 }
             }
-            if (state.get().equals(AsyncState.DISPATCHED)) {
-                complete();
-            }
-        } else {
-            throw new IllegalStateException("Dispatch not allowed. Invalid state:"+state.get());
-        }
+        };
+        
+        this.dispatch = run;
+        this.request.getCoyoteRequest().action(ActionCode.ASYNC_DISPATCH, null);
     }
 
     @Override
@@ -186,40 +184,8 @@ public class AsyncContextImpl implements AsyncContext {
             logDebug("start      ");
         }
 
-        if (state.get() ==  AsyncState.STARTED) {
-            // Execute the runnable using a container thread from the
-            // Connector's thread pool. Use a wrapper to prevent a memory leak
-            Runnable wrapper = new RunnableWrapper(run, context);
-            ClassLoader oldCL;
-            if (Globals.IS_SECURITY_ENABLED) {
-                PrivilegedAction<ClassLoader> pa = new PrivilegedGetTccl();
-                oldCL = AccessController.doPrivileged(pa);
-            } else {
-                oldCL = Thread.currentThread().getContextClassLoader();
-            }
-            try {
-                if (Globals.IS_SECURITY_ENABLED) {
-                    PrivilegedAction<Void> pa = new PrivilegedSetTccl(
-                            this.getClass().getClassLoader());
-                    AccessController.doPrivileged(pa);
-                } else {
-                    Thread.currentThread().setContextClassLoader(
-                            this.getClass().getClassLoader());
-                }
-                request.getConnector().getProtocolHandler().getExecutor(
-                        ).execute(wrapper);
-            } finally {
-                if (Globals.IS_SECURITY_ENABLED) {
-                    PrivilegedAction<Void> pa = new PrivilegedSetTccl(
-                            oldCL);
-                    AccessController.doPrivileged(pa);
-                } else {
-                    Thread.currentThread().setContextClassLoader(oldCL);
-                }
-            }
-        } else {
-            throw new IllegalStateException("Start not allowed. Invalid state:"+state.get());
-        }
+        Runnable wrapper = new RunnableWrapper(run, context);
+        this.request.getCoyoteRequest().action(ActionCode.ASYNC_RUN, wrapper);
     }
     
     @Override
@@ -259,31 +225,43 @@ public class AsyncContextImpl implements AsyncContext {
         }
         servletRequest = null;
         servletResponse = null;
-        listeners.clear();
         hasOriginalRequestAndResponse = true;
-        state.set(AsyncState.NOT_STARTED);
         context = null;
         timeout = -1;
         event = null;
     }
 
     public boolean isStarted() {
-        return (state.get() == AsyncState.STARTED ||
-                state.get() == AsyncState.DISPATCHING);
+        AtomicBoolean result = new AtomicBoolean(false);
+        request.getCoyoteRequest().action(
+                ActionCode.ASYNC_IS_STARTED, result);
+        return result.get();
     }
 
     public void setStarted(Context context, ServletRequest request,
-            ServletResponse response, boolean hasOriginalRequestAndResponse) {
-        if (state.compareAndSet(AsyncState.NOT_STARTED, AsyncState.STARTED) ||
-                state.compareAndSet(AsyncState.DISPATCHED, AsyncState.STARTED)) {
-            this.context = context;
-            this.servletRequest = request;
-            this.servletResponse = response;
-            this.hasOriginalRequestAndResponse = hasOriginalRequestAndResponse;
-            this.event = new AsyncEvent(this, request, response); 
-        } else {
-            throw new IllegalStateException("Start illegal. Invalid state: "+state.get());
+            ServletResponse response, boolean originalRequestResponse) {
+        
+        this.request.getCoyoteRequest().action(
+                ActionCode.ASYNC_START, this);
+
+        this.context = context;
+        this.servletRequest = request;
+        this.servletResponse = response;
+        this.hasOriginalRequestAndResponse = originalRequestResponse;
+        this.event = new AsyncEvent(this, request, response);
+        
+        List<AsyncListenerWrapper> listenersCopy =
+            new ArrayList<AsyncListenerWrapper>();
+        listenersCopy.addAll(listeners);
+        for (AsyncListenerWrapper listener : listenersCopy) {
+            try {
+                listener.fireOnStartAsync(event);
+            } catch (IOException ioe) {
+                log.warn("onStartAsync() failed for listener of type [" +
+                        listener.getClass().getName() + "]", ioe);
+            }
         }
+        listeners.clear();
     }
 
     @Override
@@ -295,122 +273,53 @@ public class AsyncContextImpl implements AsyncContext {
         if (log.isDebugEnabled()) {
             logDebug("intDispatch");
         }
-        if (this.state.compareAndSet(AsyncState.TIMING_OUT,
-                AsyncState.TIMING_OUT_NEED_COMPLETE)) {
-            log.debug("TIMING OUT!");
-            boolean listenerInvoked = false;
-            List<AsyncListenerWrapper> listenersCopy =
-                new ArrayList<AsyncListenerWrapper>();
-            listenersCopy.addAll(listeners);
-            for (AsyncListenerWrapper listener : listenersCopy) {
-                listener.fireOnTimeout(event);
-                listenerInvoked = true;
-            }
-            if (listenerInvoked) {
-                // Listener should have called complete
-                if (state.get() != AsyncState.NOT_STARTED) {
-                    ((HttpServletResponse)servletResponse).setStatus(500);
-                    state.set(AsyncState.COMPLETING);
-                    doInternalComplete(true);
-                }
-            } else {
-                // No listeners, container calls complete
-                state.set(AsyncState.COMPLETING);
-                doInternalComplete(false);
-            }
-        } else if (this.state.compareAndSet(AsyncState.ERROR_DISPATCHING, AsyncState.COMPLETING)) {
-            log.debug("ON ERROR!");
-            boolean listenerInvoked = false;
-            for (AsyncListenerWrapper listener : listeners) {
-                try {
-                    listener.fireOnError(event);
-                }catch (IllegalStateException x) {
-                    log.debug("Listener invoked invalid state.",x);
-                }catch (Exception x) {
-                    log.debug("Exception during onError.",x);
-                }
-                listenerInvoked = true;
-            }
-            if (!listenerInvoked) {
-                ((HttpServletResponse)servletResponse).setStatus(500);
-            }
-            doInternalComplete(true);
-        
-        } else if (this.state.compareAndSet(AsyncState.DISPATCHING, AsyncState.DISPATCHED)) {
-            if (this.dispatch!=null) {
-                try {
-                    dispatch.run();
-                } catch (RuntimeException x) {
-                    doInternalComplete(true);
-                    if (x.getCause() instanceof ServletException) throw (ServletException)x.getCause();
-                    if (x.getCause() instanceof IOException) throw (IOException)x.getCause();
-                    throw new ServletException(x);
-                } finally {
-                    dispatch = null;
-                }
-            }
-        } else if (this.state.get()==AsyncState.COMPLETING) {
-            doInternalComplete(false);
-        } else {
-            throw new IllegalStateException("Dispatch illegal. Invalid state: "+state.get());
-        }
-    }
-    
-    private void doInternalComplete(boolean error) {
-        if (log.isDebugEnabled()) {
-            logDebug("intComplete");
-        }
-        if (state.get()==AsyncState.NOT_STARTED) return;
-        if (state.compareAndSet(AsyncState.STARTED, AsyncState.NOT_STARTED)) {
-            //this is the same as
-            //request.startAsync().complete();
-            recycle();
-        } else if (state.compareAndSet(AsyncState.COMPLETING, AsyncState.NOT_STARTED)) {
-            for (AsyncListenerWrapper wrapper : listeners) {
-                try {
-                    wrapper.fireOnComplete(event);
-                }catch (IOException x) {
-                    //how does this propagate, or should it?
-                    //TODO SERVLET3 - async 
-                    log.error("",x);
-                }
+        try {
+            dispatch.run();
+        } catch (RuntimeException x) {
+            // doInternalComplete(true);
+            if (x.getCause() instanceof ServletException) {
+                throw (ServletException)x.getCause();
             }
-            try {
-                if (!error) getResponse().flushBuffer();
-            }catch (Exception x) {
-                log.error("",x);
+            if (x.getCause() instanceof IOException) {
+                throw (IOException)x.getCause();
             }
-            recycle();
-            
-        } else { 
-            throw new IllegalStateException("Complete illegal. Invalid state:"+state.get());
+            throw new ServletException(x);
         }
     }
-    
-    public AsyncState getState() {
-        return state.get();
-    }
+
     
     @Override
     public long getTimeout() {
         return timeout;
     }
-    
+
+
     @Override
     public void setTimeout(long timeout) {
         this.timeout = timeout;
         request.getCoyoteRequest().action(ActionCode.ASYNC_SETTIMEOUT,
                 Long.valueOf(timeout));
     }
-    
-    public void setTimeoutState() {
-        state.set(AsyncState.TIMING_OUT);
-    }
-    
+
+
     public void setErrorState(Throwable t) {
         if (t!=null) request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, t);
-        state.set(AsyncState.ERROR_DISPATCHING);
+        request.getCoyoteRequest().action(ActionCode.ASYNC_ERROR, null);
+        AsyncEvent errorEvent = new AsyncEvent(event.getAsyncContext(),
+                event.getSuppliedRequest(), event.getSuppliedResponse(), t);
+        List<AsyncListenerWrapper> listenersCopy =
+            new ArrayList<AsyncListenerWrapper>();
+        listenersCopy.addAll(listeners);
+        for (AsyncListenerWrapper listener : listenersCopy) {
+            try {
+                listener.fireOnError(errorEvent);
+            } catch (IOException ioe) {
+                log.warn("onStartAsync() failed for listener of type [" +
+                        listener.getClass().getName() + "]", ioe);
+            }
+        }
     }
+
     
     private void logDebug(String method) {
         String rHashCode;
@@ -457,7 +366,7 @@ public class AsyncContextImpl implements AsyncContext {
                 "Req: %1$8s  CReq: %2$8s  RP: %3$8s  Stage: %4$s  " +
                 "Thread: %5$20s  State: %6$20s  Method: %7$11s  URI: %8$s",
                 rHashCode, crHashCode, rpHashCode, stage,
-                Thread.currentThread().getName(), state, method, uri);
+                Thread.currentThread().getName(), "N/A", method, uri);
         if (log.isTraceEnabled()) {
             log.trace(msg, new DebugException());
         } else {
index 385bae2..d565442 100644 (file)
@@ -38,6 +38,7 @@ public final class SecurityClassLoad {
         }
         
         loadCorePackage(loader);
+        loadCoyotePackage(loader);
         loadLoaderPackage(loader);
         loadSessionPackage(loader);
         loadUtilPackage(loader);
@@ -64,9 +65,6 @@ public final class SecurityClassLoad {
             "AsyncContextImpl");
         loader.loadClass
             (basePackage +
-            "AsyncContextImpl$AsyncState");
-        loader.loadClass
-            (basePackage +
             "AsyncContextImpl$DebugException");
         loader.loadClass
             (basePackage +
@@ -129,6 +127,13 @@ public final class SecurityClassLoad {
     }
     
     
+    private final static void loadCoyotePackage(ClassLoader loader)
+            throws Exception {
+        String basePackage = "org.apache.coyote.";
+        loader.loadClass(basePackage + "http11.AbstractOutputBuffer$1");
+    }
+
+
     private final static void loadJavaxPackage(ClassLoader loader)
         throws Exception {
         loader.loadClass("javax.servlet.http.Cookie");
@@ -221,13 +226,16 @@ public final class SecurityClassLoad {
     private final static void loadTomcatPackage(ClassLoader loader)
         throws Exception {
         String basePackage = "org.apache.tomcat.";
-        loader.loadClass(basePackage + "util.net.SSLSupport$CipherData");
-        loader.loadClass
-            (basePackage + "util.net.JIoEndpoint$PrivilegedSetTccl");
         // Make sure system property is read at this point
         Class<?> clazz = loader.loadClass(
                 basePackage + "util.http.FastHttpDateFormat");
         clazz.newInstance();
+        loader.loadClass(basePackage + "util.http.HttpMessages");
+        loader.loadClass(basePackage + "util.net.SSLSupport$CipherData");
+        loader.loadClass
+            (basePackage + "util.net.JIoEndpoint$PrivilegedSetTccl");
+        loader.loadClass
+            (basePackage + "util.net.AprEndpoint$PrivilegedSetTccl");
     }
 }
 
index 043b4dd..5e811f1 100644 (file)
@@ -133,18 +133,61 @@ public enum ActionCode {
 
     /**
      * Callback for an async call to
+     * {@link javax.servlet.AsyncContext#dispatch()}
+     */
+    ASYNC_DISPATCH,
+
+    /**
+     * Callback to indicate the the actual dispatch has started and that the
+     * async state needs change.
+     */
+    ASYNC_DISPATCHED,
+
+    /**
+     * Callback for an async call to
+     * {@link javax.servlet.AsyncContext#start()}
+     */
+    ASYNC_RUN,
+
+    /**
+     * Callback for an async call to
      * {@link javax.servlet.AsyncContext#complete()}
      */
     ASYNC_COMPLETE,
+    
+    /**
+     * Callback to trigger the processing of an async timeout
+     */
+    ASYNC_TIMEOUT,
+    
+    /**
+     * Callback to trigger the error processing
+     */
+    ASYNC_ERROR,
+    
     /**
      * Callback for an async call to
      * {@link javax.servlet.AsyncContext#setTimeout(long)}
      */
     ASYNC_SETTIMEOUT,
+    
+    /**
+     * Callback to determine if async processing is in progress 
+     */
+    ASYNC_IS_ASYNC,
+    
+    /**
+     * Callback to determine if async dispatch is in progress
+     */
+    ASYNC_IS_STARTED,
 
     /**
-     * Callback for an async call to
-     * {@link javax.servlet.AsyncContext#dispatch()}
+     * Callback to determine if async dispatch is in progress
      */
-    ASYNC_DISPATCH,
+    ASYNC_IS_DISPATCHING,
+
+    /**
+     * Callback to determine if async is timing out
+     */
+    ASYNC_IS_TIMINGOUT
 }
index c0c3c07..19fd595 100644 (file)
@@ -47,6 +47,7 @@ import org.apache.tomcat.util.net.AbstractEndpoint;
 import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
 import org.apache.tomcat.util.net.AprEndpoint;
 import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.SocketWrapper;
 import org.apache.tomcat.util.res.StringManager;
 
 
@@ -188,7 +189,7 @@ public class AjpAprProcessor implements ActionHook {
     /**
      * Socket associated with the current connection.
      */
-    protected long socket;
+    protected SocketWrapper<Long> socket;
 
 
     /**
@@ -355,15 +356,16 @@ public class AjpAprProcessor implements ActionHook {
      *
      * @throws IOException error during an I/O operation
      */
-    public boolean process(long socket)
+    public boolean process(SocketWrapper<Long> socket)
         throws IOException {
         RequestInfo rp = request.getRequestProcessor();
         rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
 
         // Setting up the socket
         this.socket = socket;
-        Socket.setrbb(this.socket, inputBuffer);
-        Socket.setsbb(this.socket, outputBuffer);
+        long socketRef = socket.getSocket().longValue();
+        Socket.setrbb(socketRef, inputBuffer);
+        Socket.setsbb(socketRef, outputBuffer);
 
         // Error flag
         error = false;
@@ -388,7 +390,7 @@ public class AjpAprProcessor implements ActionHook {
                 // not regular request processing
                 int type = requestHeaderMessage.getByte();
                 if (type == Constants.JK_AJP13_CPING_REQUEST) {
-                    if (Socket.sendb(socket, pongMessageBuffer, 0,
+                    if (Socket.sendb(socketRef, pongMessageBuffer, 0,
                             pongMessageBuffer.position()) < 0) {
                         error = true;
                     }
@@ -469,7 +471,7 @@ public class AjpAprProcessor implements ActionHook {
 
         // Add the socket to the poller
         if (!error && !endpoint.isPaused()) {
-            endpoint.getPoller().add(socket);
+            endpoint.getPoller().add(socketRef);
         } else {
             openSocket = false;
         }
@@ -483,7 +485,8 @@ public class AjpAprProcessor implements ActionHook {
     }
 
     /* Copied from the AjpProcessor.java */
-    public SocketState asyncDispatch(long socket, SocketStatus status) throws IOException {
+    public SocketState asyncDispatch(SocketWrapper<Long> socket,
+            SocketStatus status) throws IOException {
 
         // Setting up the socket
         this.socket = socket;
@@ -535,6 +538,8 @@ public class AjpAprProcessor implements ActionHook {
      */
     public void action(ActionCode actionCode, Object param) {
 
+        long socketRef = socket.getSocket().longValue();
+        
         if (actionCode == ActionCode.COMMIT) {
 
             if (response.isCommitted())
@@ -564,7 +569,7 @@ public class AjpAprProcessor implements ActionHook {
             try {
                 flush();
                 // Send explicit flush message
-                if (Socket.sendb(socket, flushMessageBuffer, 0,
+                if (Socket.sendb(socketRef, flushMessageBuffer, 0,
                                  flushMessageBuffer.position()) < 0) {
                     error = true;                    
                 }
@@ -661,14 +666,14 @@ public class AjpAprProcessor implements ActionHook {
             }        
         } else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) {
             if (param==null) return;
-            if (socket==0) return;
+            if (socketRef==0) return;
             long timeout = ((Long)param).longValue();
-            Socket.timeoutSet(socket, timeout * 1000); 
+            Socket.timeoutSet(socketRef, timeout * 1000); 
         } else if (actionCode == ActionCode.ASYNC_DISPATCH) {
            RequestInfo rp = request.getRequestProcessor();
             AtomicBoolean dispatch = (AtomicBoolean)param;
             if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) {//async handling
-                endpoint.getPoller().add(this.socket);
+                endpoint.getPoller().add(socketRef);
                 dispatch.set(true);
             } else {
                 dispatch.set(true);
@@ -1127,7 +1132,7 @@ public class AjpAprProcessor implements ActionHook {
         int nRead;
         while (inputBuffer.remaining() < n) {
             nRead = Socket.recvbb
-                (socket, inputBuffer.limit(),
+                (socket.getSocket().longValue(), inputBuffer.limit(),
                         inputBuffer.capacity() - inputBuffer.limit());
             if (nRead > 0) {
                 inputBuffer.limit(inputBuffer.limit() + nRead);
@@ -1160,7 +1165,7 @@ public class AjpAprProcessor implements ActionHook {
         int nRead;
         while (inputBuffer.remaining() < n) {
             nRead = Socket.recvbb
-                (socket, inputBuffer.limit(),
+                (socket.getSocket().longValue(), inputBuffer.limit(),
                     inputBuffer.capacity() - inputBuffer.limit());
             if (nRead > 0) {
                 inputBuffer.limit(inputBuffer.limit() + nRead);
@@ -1224,7 +1229,7 @@ public class AjpAprProcessor implements ActionHook {
         }
 
         // Request more data immediately
-        Socket.sendb(socket, getBodyMessageBuffer, 0,
+        Socket.sendb(socket.getSocket().longValue(), getBodyMessageBuffer, 0,
                 getBodyMessageBuffer.position());
 
         boolean moreData = receive();
@@ -1305,7 +1310,7 @@ public class AjpAprProcessor implements ActionHook {
     protected void flush()
         throws IOException {
         if (outputBuffer.position() > 0) {
-            if (Socket.sendbb(socket, 0, outputBuffer.position()) < 0) {
+            if (Socket.sendbb(socket.getSocket().longValue(), 0, outputBuffer.position()) < 0) {
                 throw new IOException(sm.getString("ajpprocessor.failedsend"));
             }
             outputBuffer.clear();
index 84396cc..b0395cd 100644 (file)
@@ -42,6 +42,7 @@ import org.apache.tomcat.util.modeler.Registry;
 import org.apache.tomcat.util.net.AprEndpoint;
 import org.apache.tomcat.util.net.AprEndpoint.Handler;
 import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.SocketWrapper;
 import org.apache.tomcat.util.res.StringManager;
 
 
@@ -337,8 +338,8 @@ public class AjpAprProtocol
         protected AtomicLong registerCount = new AtomicLong(0);
         protected RequestGroupInfo global = new RequestGroupInfo();
 
-        protected ConcurrentHashMap<Long, AjpAprProcessor> connections =
-            new ConcurrentHashMap<Long, AjpAprProcessor>();
+        protected ConcurrentHashMap<SocketWrapper<Long>, AjpAprProcessor> connections =
+            new ConcurrentHashMap<SocketWrapper<Long>, AjpAprProcessor>();
 
         protected ConcurrentLinkedQueue<AjpAprProcessor> recycledProcessors = 
             new ConcurrentLinkedQueue<AjpAprProcessor>() {
@@ -384,11 +385,11 @@ public class AjpAprProtocol
         }
 
         // FIXME: Support for this could be added in AJP as well
-        public SocketState event(long socket, SocketStatus status) {
+        public SocketState event(SocketWrapper<Long> socket, SocketStatus status) {
             return SocketState.CLOSED;
         }
         
-        public SocketState process(long socket) {
+        public SocketState process(SocketWrapper<Long> socket) {
             AjpAprProcessor processor = recycledProcessors.poll();
             try {
 
@@ -397,7 +398,7 @@ public class AjpAprProtocol
                 }
 
                 if (processor.process(socket)) {
-                    connections.put(Long.valueOf(socket), processor);
+                    connections.put(socket, processor);
                     return SocketState.OPEN;
                 } else {
                     // recycledProcessors.offer(processor);
@@ -431,9 +432,9 @@ public class AjpAprProtocol
         }
 
         // FIXME: Support for this could be added in AJP as well
-        public SocketState asyncDispatch(long socket, SocketStatus status) {
+        public SocketState asyncDispatch(SocketWrapper<Long> socket, SocketStatus status) {
 
-            AjpAprProcessor result = connections.get(Long.valueOf(socket));
+            AjpAprProcessor result = connections.get(socket);
             
             SocketState state = SocketState.CLOSED; 
             if (result != null) {
@@ -462,10 +463,10 @@ public class AjpAprProtocol
                         (sm.getString("ajpprotocol.proto.error"), e);
                 } finally {
                     if (state != SocketState.LONG) {
-                        connections.remove(Long.valueOf(socket));
+                        connections.remove(socket);
                         recycledProcessors.offer(result);
                         if (state == SocketState.OPEN) {
-                            proto.endpoint.getPoller().add(socket);
+                            proto.endpoint.getPoller().add(socket.getSocket().longValue());
                         }
                     }
                 }
index 9226fd2..4a400c1 100644 (file)
 package org.apache.coyote.http11;
 
 import java.io.IOException;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
 import java.util.StringTokenizer;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
 
+import org.apache.catalina.core.AsyncContextImpl;
 import org.apache.coyote.ActionCode;
 import org.apache.coyote.Adapter;
 import org.apache.coyote.Request;
@@ -43,6 +47,7 @@ import org.apache.tomcat.util.buf.MessageBytes;
 import org.apache.tomcat.util.http.FastHttpDateFormat;
 import org.apache.tomcat.util.http.MimeHeaders;
 import org.apache.tomcat.util.net.AbstractEndpoint;
+import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
 import org.apache.tomcat.util.res.StringManager;
 
 public abstract class AbstractHttp11Processor {
@@ -236,12 +241,6 @@ public abstract class AbstractHttp11Processor {
 
     
     /**
-     * Async used
-     */
-    protected boolean async = false;
-
-
-    /**
      * Set compression level.
      */
     public void setCompression(String compression) {
@@ -904,7 +903,24 @@ public abstract class AbstractHttp11Processor {
                 request.getInputBuffer();
             internalBuffer.addActiveFilter(savedBody);
         } else if (actionCode == ActionCode.ASYNC_START) {
-            async = true;
+            asyncStart((AsyncContextImpl) param);
+        } else if (actionCode == ActionCode.ASYNC_DISPATCHED) {
+            asyncDispatched();
+        } else if (actionCode == ActionCode.ASYNC_TIMEOUT) {
+            AtomicBoolean result = (AtomicBoolean) param;
+            result.set(asyncTimeout());
+        } else if (actionCode == ActionCode.ASYNC_RUN) {
+            asyncRun((Runnable) param);
+        } else if (actionCode == ActionCode.ASYNC_ERROR) {
+            asyncError();
+        } else if (actionCode == ActionCode.ASYNC_IS_STARTED) {
+            ((AtomicBoolean) param).set(isAsyncStarted());
+        } else if (actionCode == ActionCode.ASYNC_IS_DISPATCHING) {
+            ((AtomicBoolean) param).set(isAsyncDispatching());
+        } else if (actionCode == ActionCode.ASYNC_IS_ASYNC) {
+            ((AtomicBoolean) param).set(isAsync());
+        } else if (actionCode == ActionCode.ASYNC_IS_TIMINGOUT) {
+            ((AtomicBoolean) param).set(isAsyncTimingOut());
         } else {
             actionInternal(actionCode, param);
         }
@@ -1086,10 +1102,276 @@ public abstract class AbstractHttp11Processor {
     public final void recycle() {
         getInputBuffer().recycle();
         getOutputBuffer().recycle();
+        asyncCtxt = null;
         recycleInternal();
     }
     
     protected abstract void recycleInternal();
     
     protected abstract Executor getExecutor();
+    
+    // -------------------------------------------------- Async state management
+    
+    /*
+     * DISPATCHED    - Standard request. Not in Async mode.
+     * STARTING      - ServletRequest.startAsync() has been called but the
+     *                 request in which that call was made has not finished
+     *                 processing.
+     * STARTED       - ServletRequest.startAsync() has been called and the
+     *                 request in which that call was made has finished
+     *                 processing.
+     * MUST_COMPLETE - complete() has been called before the request in which
+     *                 ServletRequest.startAsync() has finished. As soon as that
+     *                 request finishes, the complete() will be processed.
+     * COMPLETING    - The call to complete() was made once the request was in
+     *                 the STARTED state. May or may not be triggered by a
+     *                 container thread - depends if start(Runnable) was used
+     * 
+     * TODO - markt - Move this to a separate class
+     */
+    private static enum AsyncState {
+        DISPATCHED(false, false, false),
+        STARTING(true, true, false),
+        STARTED(true, true, false),
+        MUST_COMPLETE(true, false, false),
+        COMPLETING(true, false, false),
+        TIMING_OUT(true, false, false),
+        MUST_DISPATCH(true, false, true),
+        DISPATCHING(true, false, true),
+        ERROR(true,false,false);
+    
+        private boolean isAsync;
+        private boolean isStarted;
+        private boolean isDispatching;
+        
+        private AsyncState(boolean isAsync, boolean isStarted,
+                boolean isDispatching) {
+            this.isAsync = isAsync;
+            this.isStarted = isStarted;
+            this.isDispatching = isDispatching;
+        }
+        
+        public boolean isAsync() {
+            return this.isAsync;
+        }
+        
+        public boolean isStarted() {
+            return this.isStarted;
+        }
+        
+        public boolean isDispatching() {
+            return this.isDispatching;
+        }
+    }
+    
+    private volatile AsyncState state = AsyncState.DISPATCHED;
+    // Need this to fire listener on complete
+    private AsyncContextImpl asyncCtxt = null;
+    
+    protected boolean isAsync() {
+        return state.isAsync();
+    }
+
+    protected boolean isAsyncDispatching() {
+        return state.isDispatching();
+    }
+
+    protected boolean isAsyncStarted() {
+        return state.isStarted();
+    }
+
+    protected boolean isAsyncTimingOut() {
+        return state == AsyncState.TIMING_OUT;
+    }
+
+
+    private synchronized void asyncStart(AsyncContextImpl asyncCtxt) {
+        if (state == AsyncState.DISPATCHED) {
+            state = AsyncState.STARTING;
+            this.asyncCtxt = asyncCtxt;
+        } else {
+            throw new IllegalStateException(
+                    sm.getString("abstractHttp11Protocol.invalidAsyncState",
+                            "startAsync()", state));
+        }
+    }
+    
+    /*
+     * Async has been processed. Whether or not to enter a long poll depends on
+     * current state. For example, as per SRV.2.3.3.3 can now process calls to
+     * complete() or dispatch().
+     */
+    protected synchronized SocketState asyncPostProcess() {
+        
+        if (state == AsyncState.STARTING) {
+            state = AsyncState.STARTED;
+            return SocketState.LONG;
+        } else if (state == AsyncState.MUST_COMPLETE) {
+            asyncCtxt.fireOnComplete();
+            state = AsyncState.DISPATCHED;
+            return SocketState.ASYNC_END;
+        } else if (state == AsyncState.COMPLETING) {
+            state = AsyncState.DISPATCHED;
+            return SocketState.ASYNC_END;
+        } else if (state == AsyncState.MUST_DISPATCH) {
+            state = AsyncState.DISPATCHING;
+            return SocketState.ASYNC_END;
+        } else if (state == AsyncState.DISPATCHING) {
+            state = AsyncState.DISPATCHED;
+            return SocketState.ASYNC_END;
+        } else if (state == AsyncState.ERROR) {
+            asyncCtxt.fireOnComplete();
+            state = AsyncState.DISPATCHED;
+            return SocketState.ASYNC_END;
+        //} else if (state == AsyncState.DISPATCHED) {
+        //    // No state change
+        //    return SocketState.OPEN;
+        } else {
+            throw new IllegalStateException(
+                    sm.getString("abstractHttp11Protocol.invalidAsyncState",
+                            "asyncLongPoll()", state));
+        }
+    }
+    
+
+    protected synchronized boolean asyncComplete() {
+        boolean doComplete = false;
+        
+        if (state == AsyncState.STARTING) {
+            state = AsyncState.MUST_COMPLETE;
+        } else if (state == AsyncState.STARTED) {
+            state = AsyncState.COMPLETING;
+            doComplete = true;
+        } else if (state == AsyncState.TIMING_OUT ||
+                state == AsyncState.ERROR) {
+            state = AsyncState.MUST_COMPLETE;
+        } else {
+            throw new IllegalStateException(
+                    sm.getString("abstractHttp11Protocol.invalidAsyncState",
+                            "asyncComplete()", state));
+            
+        }
+        return doComplete;
+    }
+    
+    
+    private synchronized boolean asyncTimeout() {
+        if (state == AsyncState.STARTED) {
+            state = AsyncState.TIMING_OUT;
+            return true;
+        } else if (state == AsyncState.COMPLETING ||
+                state == AsyncState.DISPATCHED) {
+            // NOOP - App called complete between the the timeout firing and
+            // execution reaching this point
+            return false;
+        } else {
+            throw new IllegalStateException(
+                    sm.getString("abstractHttp11Protocol.invalidAsyncState",
+                            "timeoutAsync()", state));
+        }
+    }
+    
+    
+    protected synchronized boolean asyncDispatch() {
+        boolean doDispatch = false;
+        if (state == AsyncState.STARTING) {
+            state = AsyncState.MUST_DISPATCH;
+        } else if (state == AsyncState.STARTED) {
+            state = AsyncState.DISPATCHING;
+            doDispatch = true;
+        } else {
+            throw new IllegalStateException(
+                    sm.getString("abstractHttp11Protocol.invalidAsyncState",
+                            "dispatchAsync()", state));
+        }
+        return doDispatch;
+    }
+    
+    
+    private synchronized void asyncDispatched() {
+        if (state == AsyncState.DISPATCHING) {
+            state = AsyncState.DISPATCHED;
+        } else {
+            throw new IllegalStateException(
+                    sm.getString("abstractHttp11Protocol.invalidAsyncState",
+                            "dispatchAsync()", state));
+        }
+    }
+    
+    
+    private synchronized boolean asyncError() {
+        boolean doDispatch = false;
+        if (state == AsyncState.DISPATCHED ||
+                state == AsyncState.TIMING_OUT) {
+            state = AsyncState.ERROR;
+        } else {
+            throw new IllegalStateException(
+                    sm.getString("abstractHttp11Protocol.invalidAsyncState",
+                            "dispatchAsync()", state));
+        }
+        return doDispatch;
+    }
+    
+    private synchronized void asyncRun(Runnable runnable) {
+        if (state == AsyncState.STARTING || state ==  AsyncState.STARTED) {
+            // Execute the runnable using a container thread from the
+            // Connector's thread pool. Use a wrapper to prevent a memory leak
+            ClassLoader oldCL;
+            if (Constants.IS_SECURITY_ENABLED) {
+                PrivilegedAction<ClassLoader> pa = new PrivilegedGetTccl();
+                oldCL = AccessController.doPrivileged(pa);
+            } else {
+                oldCL = Thread.currentThread().getContextClassLoader();
+            }
+            try {
+                if (Constants.IS_SECURITY_ENABLED) {
+                    PrivilegedAction<Void> pa = new PrivilegedSetTccl(
+                            this.getClass().getClassLoader());
+                    AccessController.doPrivileged(pa);
+                } else {
+                    Thread.currentThread().setContextClassLoader(
+                            this.getClass().getClassLoader());
+                }
+                
+                getExecutor().execute(runnable);
+            } finally {
+                if (Constants.IS_SECURITY_ENABLED) {
+                    PrivilegedAction<Void> pa = new PrivilegedSetTccl(
+                            oldCL);
+                    AccessController.doPrivileged(pa);
+                } else {
+                    Thread.currentThread().setContextClassLoader(oldCL);
+                }
+            }
+        } else {
+            throw new IllegalStateException(
+                    sm.getString("abstractHttp11Protocol.invalidAsyncState",
+                            "runAsync()", state));
+        }
+
+    }
+    
+    private static class PrivilegedSetTccl implements PrivilegedAction<Void> {
+
+        private ClassLoader cl;
+
+        PrivilegedSetTccl(ClassLoader cl) {
+            this.cl = cl;
+        }
+
+        @Override
+        public Void run() {
+            Thread.currentThread().setContextClassLoader(cl);
+            return null;
+        }
+    }
+
+    private static class PrivilegedGetTccl
+            implements PrivilegedAction<ClassLoader> {
+
+        @Override
+        public ClassLoader run() {
+            return Thread.currentThread().getContextClassLoader();
+        }
+    }
 }
index ff07a60..46514b4 100644 (file)
@@ -211,5 +211,9 @@ public final class Constants {
      */
     public static final String POST = "POST";
 
-
+    /**
+     * Has security been turned on?
+     */
+    public static final boolean IS_SECURITY_ENABLED =
+        (System.getSecurityManager() != null);
 }
index df05c74..ab2ed8c 100644 (file)
@@ -24,7 +24,6 @@ import java.security.cert.CertificateFactory;
 import java.security.cert.X509Certificate;
 import java.util.Locale;
 import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.coyote.ActionCode;
 import org.apache.coyote.ActionHook;
@@ -47,6 +46,7 @@ import org.apache.tomcat.util.net.AbstractEndpoint;
 import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
 import org.apache.tomcat.util.net.AprEndpoint;
 import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.SocketWrapper;
 
 
 /**
@@ -124,7 +124,7 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio
     /**
      * Socket associated with the current connection.
      */
-    protected long socket = 0;
+    protected SocketWrapper<Long> socket = null;
 
 
     /**
@@ -186,7 +186,7 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio
      *
      * @throws IOException error during an I/O operation
      */
-    public SocketState process(long socket)
+    public SocketState process(SocketWrapper<Long> socket)
         throws IOException {
         RequestInfo rp = request.getRequestProcessor();
         rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
@@ -201,13 +201,13 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio
 
         // Setting up the socket
         this.socket = socket;
-        inputBuffer.setSocket(socket);
-        outputBuffer.setSocket(socket);
+        long socketRef = socket.getSocket().longValue();
+        inputBuffer.setSocket(socketRef);
+        outputBuffer.setSocket(socketRef);
 
         // Error flag
         error = false;
         comet = false;
-        async = false;
         keepAlive = true;
 
         int keepAliveLeft = maxKeepAliveRequests;
@@ -216,12 +216,12 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio
         boolean keptAlive = false;
         boolean openSocket = false;
 
-        while (!error && keepAlive && !comet && !async && !endpoint.isPaused()) {
+        while (!error && keepAlive && !comet && !isAsync() && !endpoint.isPaused()) {
 
             // Parsing the request header
             try {
                 if( !disableUploadTimeout && keptAlive && soTimeout > 0 ) {
-                    Socket.timeoutSet(socket, soTimeout * 1000);
+                    Socket.timeoutSet(socketRef, soTimeout * 1000);
                 }
                 if (!inputBuffer.parseRequestLine(keptAlive)) {
                     // This means that no data is available right now
@@ -229,13 +229,13 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio
                     // and the method should return true
                     openSocket = true;
                     // Add the socket to the poller
-                    endpoint.getPoller().add(socket);
+                    endpoint.getPoller().add(socketRef);
                     break;
                 }
                 request.setStartTime(System.currentTimeMillis());
                 keptAlive = true;
                 if (!disableUploadTimeout) {
-                    Socket.timeoutSet(socket, timeout * 1000);
+                    Socket.timeoutSet(socketRef, timeout * 1000);
                 }
                 inputBuffer.parseHeaders();
             } catch (IOException e) {
@@ -296,7 +296,7 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio
             }
 
             // Finish the handling of the request
-            if (!comet && !async) {
+            if (!comet && !isAsync()) {
                 // If we know we are closing the connection, don't drain input.
                 // This way uploading a 100GB file doesn't tie up the thread 
                 // if the servlet has rejected it.
@@ -312,7 +312,7 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio
             }
             request.updateCounters();
 
-            if (!comet && !async) {
+            if (!comet && !isAsync()) {
                 // Next request
                 inputBuffer.nextRequest();
                 outputBuffer.nextRequest();
@@ -320,7 +320,7 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio
             
             // Do sendfile as needed: add socket to sendfile and end
             if (sendfileData != null && !error) {
-                sendfileData.socket = socket;
+                sendfileData.socket = socketRef;
                 sendfileData.keepAlive = keepAlive;
                 if (!endpoint.getSendfile().add(sendfileData)) {
                     openSocket = true;
@@ -335,11 +335,9 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio
         rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
 
         if (error || endpoint.isPaused()) {
-            inputBuffer.nextRequest();
-            outputBuffer.nextRequest();
             recycle();
             return SocketState.CLOSED;
-        } else if (comet  || async) {
+        } else if (comet  || isAsync()) {
             return SocketState.LONG;
         } else {
             recycle();
@@ -349,12 +347,14 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio
     }
 
     /* Copied from the AjpProcessor.java */
-    public SocketState asyncDispatch(long socket, SocketStatus status) {
+    public SocketState asyncDispatch(SocketWrapper<Long> socket,
+            SocketStatus status) {
 
         // Setting up the socket
         this.socket = socket;
-        inputBuffer.setSocket(socket);
-        outputBuffer.setSocket(socket);
+        long socketRef = socket.getSocket().longValue();
+        inputBuffer.setSocket(socketRef);
+        outputBuffer.setSocket(socketRef);
 
         RequestInfo rp = request.getRequestProcessor();
         try {
@@ -372,30 +372,25 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio
 
         rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
 
-        if (async) {
-            if (error) {
-                response.setStatus(500);
-                request.updateCounters();
-                recycle();
+        if (error) {
+            recycle();
+            return SocketState.CLOSED;
+        } else if (isAsync()) {
+            return SocketState.LONG;
+        } else {
+            recycle();
+            if (!keepAlive) {
                 return SocketState.CLOSED;
             } else {
-                return SocketState.LONG;
+                return SocketState.OPEN;
             }
-        } else {
-            if (error) {
-                response.setStatus(500);
-            }
-            request.updateCounters();
-            recycle();
-            return SocketState.CLOSED;
         }
-        
     }
 
 
     @Override
     public void recycleInternal() {
-        this.socket = 0;
+        this.socket = null;
     }
     
 
@@ -411,6 +406,8 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio
     @Override
     public void actionInternal(ActionCode actionCode, Object param) {
 
+        long socketRef = socket.getSocket().longValue();
+        
         if (actionCode == ActionCode.CLOSE) {
             // Close
 
@@ -418,7 +415,6 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio
             // transactions with the client
 
             comet = false;
-            async = false;
             try {
                 outputBuffer.endRequest();
             } catch (IOException e) {
@@ -429,9 +425,9 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio
         } else if (actionCode == ActionCode.REQ_HOST_ADDR_ATTRIBUTE) {
 
             // Get remote host address
-            if (remoteAddr == null && (socket != 0)) {
+            if (remoteAddr == null && (socketRef != 0)) {
                 try {
-                    long sa = Address.get(Socket.APR_REMOTE, socket);
+                    long sa = Address.get(Socket.APR_REMOTE, socketRef);
                     remoteAddr = Address.getip(sa);
                 } catch (Exception e) {
                     log.warn(sm.getString("http11processor.socket.info"), e);
@@ -442,9 +438,9 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio
         } else if (actionCode == ActionCode.REQ_LOCAL_NAME_ATTRIBUTE) {
 
             // Get local host name
-            if (localName == null && (socket != 0)) {
+            if (localName == null && (socketRef != 0)) {
                 try {
-                    long sa = Address.get(Socket.APR_LOCAL, socket);
+                    long sa = Address.get(Socket.APR_LOCAL, socketRef);
                     localName = Address.getnameinfo(sa, 0);
                 } catch (Exception e) {
                     log.warn(sm.getString("http11processor.socket.info"), e);
@@ -455,9 +451,9 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio
         } else if (actionCode == ActionCode.REQ_HOST_ATTRIBUTE) {
 
             // Get remote host name
-            if (remoteHost == null && (socket != 0)) {
+            if (remoteHost == null && (socketRef != 0)) {
                 try {
-                    long sa = Address.get(Socket.APR_REMOTE, socket);
+                    long sa = Address.get(Socket.APR_REMOTE, socketRef);
                     remoteHost = Address.getnameinfo(sa, 0);
                 } catch (Exception e) {
                     log.warn(sm.getString("http11processor.socket.info"), e);
@@ -468,9 +464,9 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio
         } else if (actionCode == ActionCode.REQ_LOCAL_ADDR_ATTRIBUTE) {
 
             // Get local host address
-            if (localAddr == null && (socket != 0)) {
+            if (localAddr == null && (socketRef != 0)) {
                 try {
-                    long sa = Address.get(Socket.APR_LOCAL, socket);
+                    long sa = Address.get(Socket.APR_LOCAL, socketRef);
                     localAddr = Address.getip(sa);
                 } catch (Exception e) {
                     log.warn(sm.getString("http11processor.socket.info"), e);
@@ -482,9 +478,9 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio
         } else if (actionCode == ActionCode.REQ_REMOTEPORT_ATTRIBUTE) {
 
             // Get remote port
-            if (remotePort == -1 && (socket != 0)) {
+            if (remotePort == -1 && (socketRef != 0)) {
                 try {
-                    long sa = Address.get(Socket.APR_REMOTE, socket);
+                    long sa = Address.get(Socket.APR_REMOTE, socketRef);
                     Sockaddr addr = Address.getInfo(sa);
                     remotePort = addr.port;
                 } catch (Exception e) {
@@ -496,9 +492,9 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio
         } else if (actionCode == ActionCode.REQ_LOCALPORT_ATTRIBUTE) {
 
             // Get local port
-            if (localPort == -1 && (socket != 0)) {
+            if (localPort == -1 && (socketRef != 0)) {
                 try {
-                    long sa = Address.get(Socket.APR_LOCAL, socket);
+                    long sa = Address.get(Socket.APR_LOCAL, socketRef);
                     Sockaddr addr = Address.getInfo(sa);
                     localPort = addr.port;
                 } catch (Exception e) {
@@ -509,24 +505,24 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio
 
         } else if (actionCode == ActionCode.REQ_SSL_ATTRIBUTE ) {
 
-            if (ssl && (socket != 0)) {
+            if (ssl && (socketRef != 0)) {
                 try {
                     // Cipher suite
-                    Object sslO = SSLSocket.getInfoS(socket, SSL.SSL_INFO_CIPHER);
+                    Object sslO = SSLSocket.getInfoS(socketRef, SSL.SSL_INFO_CIPHER);
                     if (sslO != null) {
                         request.setAttribute(AbstractEndpoint.CIPHER_SUITE_KEY, sslO);
                     }
                     // Get client certificate and the certificate chain if present
                     // certLength == -1 indicates an error
-                    int certLength = SSLSocket.getInfoI(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN);
-                    byte[] clientCert = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT);
+                    int certLength = SSLSocket.getInfoI(socketRef, SSL.SSL_INFO_CLIENT_CERT_CHAIN);
+                    byte[] clientCert = SSLSocket.getInfoB(socketRef, SSL.SSL_INFO_CLIENT_CERT);
                     X509Certificate[] certs = null;
                     if (clientCert != null  && certLength > -1) {
                         certs = new X509Certificate[certLength + 1];
                         CertificateFactory cf = CertificateFactory.getInstance("X.509");
                         certs[0] = (X509Certificate) cf.generateCertificate(new ByteArrayInputStream(clientCert));
                         for (int i = 0; i < certLength; i++) {
-                            byte[] data = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN + i);
+                            byte[] data = SSLSocket.getInfoB(socketRef, SSL.SSL_INFO_CLIENT_CERT_CHAIN + i);
                             certs[i+1] = (X509Certificate) cf.generateCertificate(new ByteArrayInputStream(data));
                         }
                     }
@@ -534,12 +530,12 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio
                         request.setAttribute(AbstractEndpoint.CERTIFICATE_KEY, certs);
                     }
                     // User key size
-                    sslO = Integer.valueOf(SSLSocket.getInfoI(socket,
+                    sslO = Integer.valueOf(SSLSocket.getInfoI(socketRef,
                             SSL.SSL_INFO_CIPHER_USEKEYSIZE));
                     request.setAttribute(AbstractEndpoint.KEY_SIZE_KEY, sslO);
 
                     // SSL session ID
-                    sslO = SSLSocket.getInfoS(socket, SSL.SSL_INFO_SESSION_ID);
+                    sslO = SSLSocket.getInfoS(socketRef, SSL.SSL_INFO_SESSION_ID);
                     if (sslO != null) {
                         request.setAttribute(AbstractEndpoint.SESSION_ID_KEY, sslO);
                     }
@@ -552,7 +548,7 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio
 
         } else if (actionCode == ActionCode.REQ_SSL_CERTIFICATE) {
 
-            if (ssl && (socket != 0)) {
+            if (ssl && (socketRef != 0)) {
                 // Consume and buffer the request body, so that it does not
                 // interfere with the client's handshake messages
                 InputFilter[] inputFilters = inputBuffer.getFilters();
@@ -560,22 +556,22 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio
                 inputBuffer.addActiveFilter(inputFilters[Constants.BUFFERED_FILTER]);
                 try {
                     // Configure connection to require a certificate
-                    SSLSocket.setVerify(socket, SSL.SSL_CVERIFY_REQUIRE,
+                    SSLSocket.setVerify(socketRef, SSL.SSL_CVERIFY_REQUIRE,
                             endpoint.getSSLVerifyDepth());
                     // Renegotiate certificates
-                    if (SSLSocket.renegotiate(socket) == 0) {
+                    if (SSLSocket.renegotiate(socketRef) == 0) {
                         // Don't look for certs unless we know renegotiation worked.
                         // Get client certificate and the certificate chain if present
                         // certLength == -1 indicates an error 
-                        int certLength = SSLSocket.getInfoI(socket,SSL.SSL_INFO_CLIENT_CERT_CHAIN);
-                        byte[] clientCert = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT);
+                        int certLength = SSLSocket.getInfoI(socketRef,SSL.SSL_INFO_CLIENT_CERT_CHAIN);
+                        byte[] clientCert = SSLSocket.getInfoB(socketRef, SSL.SSL_INFO_CLIENT_CERT);
                         X509Certificate[] certs = null;
                         if (clientCert != null && certLength > -1) {
                             certs = new X509Certificate[certLength + 1];
                             CertificateFactory cf = CertificateFactory.getInstance("X.509");
                             certs[0] = (X509Certificate) cf.generateCertificate(new ByteArrayInputStream(clientCert));
                             for (int i = 0; i < certLength; i++) {
-                                byte[] data = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN + i);
+                                byte[] data = SSLSocket.getInfoB(socketRef, SSL.SSL_INFO_CLIENT_CERT_CHAIN + i);
                                 certs[i+1] = (X509Certificate) cf.generateCertificate(new ByteArrayInputStream(data));
                             }
                         }
@@ -599,29 +595,16 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio
         } else if (actionCode == ActionCode.COMET_SETTIMEOUT) {
             //no op
         } else if (actionCode == ActionCode.ASYNC_COMPLETE) {
-          //TODO SERVLET3 - async - that is bit hacky -
-            AtomicBoolean dispatch = (AtomicBoolean)param;
-            RequestInfo rp = request.getRequestProcessor();
-            if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) { //async handling
-                dispatch.set(true);
-                endpoint.getHandler().asyncDispatch(this.socket, SocketStatus.STOP);
-            } else {
-                dispatch.set(false);
+            if (asyncComplete()) {
+                endpoint.processSocketAsync(this.socket, SocketStatus.OPEN);
             }
         } else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) {
-          //TODO SERVLET3 - async
             if (param==null) return;
-            if (socket==0) return;
             long timeout = ((Long)param).longValue();
-            Socket.timeoutSet(socket, timeout * 1000);
+            socket.setTimeout(timeout);
         } else if (actionCode == ActionCode.ASYNC_DISPATCH) {
-            RequestInfo rp = request.getRequestProcessor();
-            AtomicBoolean dispatch = (AtomicBoolean)param;
-            if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) {//async handling
-                endpoint.getPoller().add(this.socket);
-                dispatch.set(true);
-            } else {
-                dispatch.set(true);
+            if (asyncDispatch()) {
+                endpoint.processSocketAsync(this.socket, SocketStatus.OPEN);
             }
         }
         
index 583af68..4c4f67b 100644 (file)
@@ -17,6 +17,8 @@
 
 package org.apache.coyote.http11;
 
+import java.security.AccessController;
+import java.security.PrivilegedAction;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -32,6 +34,7 @@ import org.apache.tomcat.util.modeler.Registry;
 import org.apache.tomcat.util.net.AprEndpoint;
 import org.apache.tomcat.util.net.AprEndpoint.Handler;
 import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.SocketWrapper;
 import org.apache.tomcat.util.res.StringManager;
 
 
@@ -246,8 +249,8 @@ public class Http11AprProtocol extends AbstractHttp11Protocol {
         protected AtomicLong registerCount = new AtomicLong(0);
         protected RequestGroupInfo global = new RequestGroupInfo();
         
-        protected ConcurrentHashMap<Long, Http11AprProcessor> connections =
-            new ConcurrentHashMap<Long, Http11AprProcessor>();
+        protected ConcurrentHashMap<SocketWrapper<Long>, Http11AprProcessor> connections =
+            new ConcurrentHashMap<SocketWrapper<Long>, Http11AprProcessor>();
         protected ConcurrentLinkedQueue<Http11AprProcessor> recycledProcessors = 
             new ConcurrentLinkedQueue<Http11AprProcessor>() {
             private static final long serialVersionUID = 1L;
@@ -294,8 +297,8 @@ public class Http11AprProtocol extends AbstractHttp11Protocol {
         }
 
         @Override
-        public SocketState event(long socket, SocketStatus status) {
-            Http11AprProcessor result = connections.get(Long.valueOf(socket));
+        public SocketState event(SocketWrapper<Long> socket, SocketStatus status) {
+            Http11AprProcessor result = connections.get(socket);
             
             SocketState state = SocketState.CLOSED; 
             if (result != null) {
@@ -324,16 +327,16 @@ public class Http11AprProtocol extends AbstractHttp11Protocol {
                                 "http11protocol.proto.error"), e);
                     } finally {
                         if (state != SocketState.LONG) {
-                            connections.remove(Long.valueOf(socket));
+                            connections.remove(socket);
                             recycledProcessors.offer(result);
                             if (state == SocketState.OPEN) {
-                                ((AprEndpoint)proto.endpoint).getPoller().add(socket);
+                                ((AprEndpoint)proto.endpoint).getPoller().add(socket.getSocket().longValue());
                             }
                         } else {
-                            ((AprEndpoint)proto.endpoint).getCometPoller().add(socket);
+                            ((AprEndpoint)proto.endpoint).getCometPoller().add(socket.getSocket().longValue());
                         }
                     }
-                } else if (result.async) {
+                } else if (result.isAsync()) {
                     state = asyncDispatch(socket, status);
                 }
             }
@@ -341,7 +344,7 @@ public class Http11AprProtocol extends AbstractHttp11Protocol {
         }
         
         @Override
-        public SocketState process(long socket) {
+        public SocketState process(SocketWrapper<Long> socket) {
             Http11AprProcessor processor = recycledProcessors.poll();
             try {
                 if (processor == null) {
@@ -350,11 +353,13 @@ public class Http11AprProtocol extends AbstractHttp11Protocol {
 
                 SocketState state = processor.process(socket);
                 if (state == SocketState.LONG) {
-                    // Associate the connection with the processor. The next request 
-                    // processed by this thread will use either a new or a recycled
-                    // processor.
-                    connections.put(Long.valueOf(socket), processor);
-                    ((AprEndpoint)proto.endpoint).getCometPoller().add(socket);
+                    // Check if the post processing is going to change the state
+                    state = processor.asyncPostProcess();
+                }
+                if (state == SocketState.LONG || state == SocketState.ASYNC_END) {
+                    // Need to make socket available for next processing cycle
+                    // but no need for the poller
+                    connections.put(socket, processor);
                 } else {
                     recycledProcessors.offer(processor);
                 }
@@ -386,8 +391,8 @@ public class Http11AprProtocol extends AbstractHttp11Protocol {
         }
 
         @Override
-        public SocketState asyncDispatch(long socket, SocketStatus status) {
-            Http11AprProcessor result = connections.get(Long.valueOf(socket));
+        public SocketState asyncDispatch(SocketWrapper<Long> socket, SocketStatus status) {
+            Http11AprProcessor result = connections.get(socket);
             
             SocketState state = SocketState.CLOSED; 
             if (result != null) {
@@ -404,11 +409,14 @@ public class Http11AprProtocol extends AbstractHttp11Protocol {
                     Http11AprProtocol.log.error
                         (sm.getString("http11protocol.proto.error"), e);
                 } finally {
-                    if (state != SocketState.LONG) {
-                        connections.remove(Long.valueOf(socket));
+                    if (state == SocketState.LONG && result.isAsync()) {
+                        state = result.asyncPostProcess();
+                    }
+                    if (state != SocketState.LONG && state != SocketState.ASYNC_END) {
+                        connections.remove(socket);
                         recycledProcessors.offer(result);
                         if (state == SocketState.OPEN) {
-                            ((AprEndpoint)proto.endpoint).getPoller().add(socket);
+                            ((AprEndpoint)proto.endpoint).getPoller().add(socket.getSocket().longValue());
                         }
                     }
                 }
@@ -440,15 +448,29 @@ public class Http11AprProtocol extends AbstractHttp11Protocol {
                 synchronized (this) {
                     try {
                         long count = registerCount.incrementAndGet();
-                        RequestInfo rp = processor.getRequest().getRequestProcessor();
+                        final RequestInfo rp = processor.getRequest().getRequestProcessor();
                         rp.setGlobalProcessor(global);
-                        ObjectName rpName = new ObjectName
+                        final ObjectName rpName = new ObjectName
                             (proto.getDomain() + ":type=RequestProcessor,worker="
                                 + proto.getName() + ",name=HttpRequest" + count);
                         if (log.isDebugEnabled()) {
                             log.debug("Register " + rpName);
                         }
-                        Registry.getRegistry(null, null).registerComponent(rp, rpName, null);
+                        if (Constants.IS_SECURITY_ENABLED) {
+                            AccessController.doPrivileged(new PrivilegedAction<Void>() {
+                                @Override
+                                public Void run() {
+                                    try {
+                                        Registry.getRegistry(null, null).registerComponent(rp, rpName, null);
+                                    } catch (Exception e) {
+                                        log.warn("Error registering request");
+                                    }
+                                    return null;
+                                }
+                            });
+                        } else {
+                            Registry.getRegistry(null, null).registerComponent(rp, rpName, null);
+                        }
                         rp.setRpName(rpName);
                     } catch (Exception e) {
                         log.warn("Error registering request");
index 02ed557..ded2153 100644 (file)
@@ -22,7 +22,6 @@ import java.net.InetAddress;
 import java.nio.channels.SelectionKey;
 import java.util.Locale;
 import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.coyote.ActionCode;
 import org.apache.coyote.ActionHook;
@@ -236,11 +235,13 @@ public class Http11NioProcessor extends AbstractHttp11Processor implements Actio
                         Integer comettimeout = (Integer) request.getAttribute("org.apache.tomcat.comet.timeout");
                         if (comettimeout != null) attach.setTimeout(comettimeout.longValue());
                     } else {
-                        //reset the timeout
-                        if (keepAlive && keepAliveTimeout>0) {
-                            attach.setTimeout(keepAliveTimeout);
-                        } else {
-                            attach.setTimeout(soTimeout);
+                        if (isAsyncDispatching()) {
+                            //reset the timeout
+                            if (keepAlive && keepAliveTimeout>0) {
+                                attach.setTimeout(keepAliveTimeout);
+                            } else {
+                                attach.setTimeout(soTimeout);
+                            }
                         }
                     }
 
@@ -261,7 +262,7 @@ public class Http11NioProcessor extends AbstractHttp11Processor implements Actio
         if (error) {
             recycle();
             return SocketState.CLOSED;
-        } else if (!comet) {
+        } else if (!comet && !isAsync()) {
             recycle();
             return (keepAlive)?SocketState.OPEN:SocketState.CLOSED;
         } else {
@@ -291,7 +292,6 @@ public class Http11NioProcessor extends AbstractHttp11Processor implements Actio
         error = false;
         keepAlive = true;
         comet = false;
-        async = false;
         
         long soTimeout = endpoint.getSoTimeout();
         int keepAliveTimeout = endpoint.getKeepAliveTimeout();
@@ -301,7 +301,7 @@ public class Http11NioProcessor extends AbstractHttp11Processor implements Actio
         boolean recycle = true;
         final KeyAttachment ka = (KeyAttachment)socket.getAttachment(false);
         
-        while (!error && keepAlive && !comet && !async && !endpoint.isPaused()) {
+        while (!error && keepAlive && !comet && !isAsync() && !endpoint.isPaused()) {
             //always default to our soTimeout
             ka.setTimeout(soTimeout);
             // Parsing the request header
@@ -412,7 +412,7 @@ public class Http11NioProcessor extends AbstractHttp11Processor implements Actio
             }
 
             // Finish the handling of the request
-            if (!comet && !async) {
+            if (!comet && !isAsync()) {
                 // If we know we are closing the connection, don't drain input.
                 // This way uploading a 100GB file doesn't tie up the thread 
                 // if the servlet has rejected it.
@@ -428,7 +428,7 @@ public class Http11NioProcessor extends AbstractHttp11Processor implements Actio
             }
             request.updateCounters();
 
-            if (!comet && !async) {
+            if (!comet && !isAsync()) {
                 // Next request
                 inputBuffer.nextRequest();
                 outputBuffer.nextRequest();
@@ -453,7 +453,7 @@ public class Http11NioProcessor extends AbstractHttp11Processor implements Actio
         if (error || endpoint.isPaused()) {
             recycle();
             return SocketState.CLOSED;
-        } else if (comet || async) {
+        } else if (comet || isAsync()) {
             return SocketState.LONG;
         } else {
             if (recycle) {
@@ -494,13 +494,11 @@ public class Http11NioProcessor extends AbstractHttp11Processor implements Actio
 
         if (actionCode == ActionCode.CLOSE) {
             // Close
-
             // End the processing of the current request, and stop any further
             // transactions with the client
 
             comet = false;
             cometClose = true;
-            async = false;
             SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
             if ( key != null ) {
                 NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
@@ -654,17 +652,10 @@ public class Http11NioProcessor extends AbstractHttp11Processor implements Actio
             if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) //async handling
                 attach.setTimeout(timeout);
         } else if (actionCode == ActionCode.ASYNC_COMPLETE) {
-          //TODO SERVLET3 - async
-            AtomicBoolean dispatch = (AtomicBoolean)param;
-            RequestInfo rp = request.getRequestProcessor();
-            if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) { //async handling
-                dispatch.set(true);
-                endpoint.processSocket(this.socket, SocketStatus.STOP, true);
-            } else {
-                dispatch.set(false);
+            if (asyncComplete()) {
+                endpoint.processSocket(this.socket, SocketStatus.OPEN, true);
             }
         } else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) {
-          //TODO SERVLET3 - async
             if (param==null) return;
             if (socket==null || socket.getAttachment(false)==null) return;
             NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
@@ -672,13 +663,8 @@ public class Http11NioProcessor extends AbstractHttp11Processor implements Actio
             //if we are not piggy backing on a worker thread, set the timeout
             attach.setTimeout(timeout);
         } else if (actionCode == ActionCode.ASYNC_DISPATCH) {
-            RequestInfo rp = request.getRequestProcessor();
-            AtomicBoolean dispatch = (AtomicBoolean)param;
-            if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) {//async handling
+            if (asyncDispatch()) {
                 endpoint.processSocket(this.socket, SocketStatus.OPEN, true);
-                dispatch.set(true);
-            } else { 
-                dispatch.set(true);
             }
         }
     }
index bf1f60e..5acc5ae 100644 (file)
@@ -18,6 +18,8 @@
 package org.apache.coyote.http11;
 
 import java.nio.channels.SocketChannel;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -288,18 +290,18 @@ public class Http11NioProtocol extends AbstractHttp11JsseProtocol {
 
         @Override
         public SocketState event(NioChannel socket, SocketStatus status) {
-            Http11NioProcessor result = connections.get(socket);
+            Http11NioProcessor processor = connections.get(socket);
             NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
             att.setAsync(false); //no longer check for timeout
             SocketState state = SocketState.CLOSED; 
-            if (result != null) {
-                if (log.isDebugEnabled()) log.debug("Http11NioProcessor.error="+result.error);
+            if (processor != null) {
+                if (log.isDebugEnabled()) log.debug("Http11NioProcessor.error="+processor.error);
                 // Call the appropriate event
                 try {
-                    if (result.async) {
-                        state = result.asyncDispatch(status);
+                    if (processor.comet) {
+                        state = processor.event(status);
                     } else {
-                        state = result.event(status);
+                        state = processor.asyncDispatch(status);
                     }
                 } catch (java.net.SocketException e) {
                     // SocketExceptions are normal
@@ -322,14 +324,21 @@ public class Http11NioProtocol extends AbstractHttp11JsseProtocol {
                     Http11NioProtocol.log.error
                         (sm.getString("http11protocol.proto.error"), e);
                 } finally {
-                    if (state != SocketState.LONG) {
+                    if (processor.isAsync()) {
+                        state = processor.asyncPostProcess();
+                    }
+                    if (state != SocketState.LONG && state != SocketState.ASYNC_END) {
                         connections.remove(socket);
-                        recycledProcessors.offer(result);
+                        recycledProcessors.offer(processor);
                         if (state == SocketState.OPEN) {
                             socket.getPoller().add(socket);
                         }
+                    } else if (state == SocketState.ASYNC_END) {
+                        // No further work required
+                    } else if (state == SocketState.LONG) {
+                        att.setAsync(true); // Re-enable timeouts
                     } else {
-                        if (log.isDebugEnabled()) log.debug("Keeping processor["+result);
+                        if (log.isDebugEnabled()) log.debug("Keeping processor["+processor);
                         //add correct poller events here based on Comet stuff
                         socket.getPoller().add(socket,att.getCometOps());
                     }
@@ -369,12 +378,18 @@ public class Http11NioProtocol extends AbstractHttp11JsseProtocol {
                     if (processor.comet) {
                         NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
                         socket.getPoller().add(socket,att.getCometOps());
-                    } else if (processor.async) {
+                    } else if (processor.isAsync()) {
                         NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
                         att.setAsync(true);
+                        // longPoll may change socket state (e.g. to trigger a
+                        // complete or dispatch)
+                        state = processor.asyncPostProcess();
                     } else {
                         socket.getPoller().add(socket);
                     }
+                }
+                if (state == SocketState.LONG || state == SocketState.ASYNC_END) {
+                    // Already done all we need to do.
                 } else if (state == SocketState.OPEN){
                     // In keep-alive but between requests. OK to recycle
                     // processor. Continue to poll for the next request.
@@ -434,12 +449,26 @@ public class Http11NioProtocol extends AbstractHttp11JsseProtocol {
                     try {
                         registerCount.addAndGet(1);
                         if (log.isDebugEnabled()) log.debug("Register ["+processor+"] count="+registerCount.get());
-                        RequestInfo rp = processor.getRequest().getRequestProcessor();
+                        final RequestInfo rp = processor.getRequest().getRequestProcessor();
                         rp.setGlobalProcessor(global);
-                        ObjectName rpName = new ObjectName
+                        final ObjectName rpName = new ObjectName
                             (proto.getDomain() + ":type=RequestProcessor,worker="
                              + proto.getName() + ",name=HttpRequest" + count++);
-                        Registry.getRegistry(null, null).registerComponent(rp, rpName, null);
+                        if (Constants.IS_SECURITY_ENABLED) {
+                            AccessController.doPrivileged(new PrivilegedAction<Void>() {
+                                @Override
+                                public Void run() {
+                                    try {
+                                        Registry.getRegistry(null, null).registerComponent(rp, rpName, null);
+                                    } catch (Exception e) {
+                                        log.warn("Error registering request");
+                                    }
+                                    return null;
+                                }
+                            });
+                        } else {
+                            Registry.getRegistry(null, null).registerComponent(rp, rpName, null);
+                        }
                         rp.setRpName(rpName);
                     } catch (Exception e) {
                         log.warn("Error registering request");
index 6e59e1a..9758396 100644 (file)
@@ -23,7 +23,6 @@ import java.net.InetAddress;
 import java.net.Socket;
 import java.util.Locale;
 import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.coyote.ActionCode;
 import org.apache.coyote.ActionHook;
@@ -265,9 +264,9 @@ public class Http11Processor extends AbstractHttp11Processor implements ActionHo
                 // This way uploading a 100GB file doesn't tie up the thread 
                 // if the servlet has rejected it.
                 
-                if(error && !async)
+                if(error && !isAsync())
                     inputBuffer.setSwallowInput(false);
-                if (!async)
+                if (!isAsync())
                     endRequest();
             } catch (Throwable t) {
                 log.error(sm.getString("http11processor.request.finish"), t);
@@ -296,7 +295,7 @@ public class Http11Processor extends AbstractHttp11Processor implements ActionHo
             // will reset it
             // thrA.setParam(null);
             // Next request
-            if (!async || error) {
+            if (!isAsync() || error) {
                 inputBuffer.nextRequest();
                 outputBuffer.nextRequest();
             }
@@ -309,7 +308,7 @@ public class Http11Processor extends AbstractHttp11Processor implements ActionHo
         if (error || endpoint.isPaused()) {
             recycle();
             return SocketState.CLOSED;
-        } else if (async) {
+        } else if (isAsync()) {
             return SocketState.LONG;
         } else {
             if (!keepAlive) {
@@ -343,7 +342,7 @@ public class Http11Processor extends AbstractHttp11Processor implements ActionHo
         if (error) {
             recycle();
             return SocketState.CLOSED;
-        } else if (async) {
+        } else if (isAsync()) {
             return SocketState.LONG;
         } else {
             recycle();
@@ -360,7 +359,6 @@ public class Http11Processor extends AbstractHttp11Processor implements ActionHo
     protected void recycleInternal() {
         // Recycle
         this.socket = null;
-        async = false;
         // Recycle ssl info
         sslSupport = null;
     }
@@ -380,7 +378,6 @@ public class Http11Processor extends AbstractHttp11Processor implements ActionHo
 
         if (actionCode == ActionCode.CLOSE) {
             // Close
-            async = false;
             // End the processing of the current request, and stop any further
             // transactions with the client
 
@@ -497,33 +494,19 @@ public class Http11Processor extends AbstractHttp11Processor implements ActionHo
                 }
             }
         } else if (actionCode == ActionCode.ASYNC_COMPLETE) {
-            //TODO SERVLET3 - async
-            AtomicBoolean dispatch = (AtomicBoolean)param;
-            RequestInfo rp = request.getRequestProcessor();
-            if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) { //async handling
-                dispatch.set(true);
+            if (asyncComplete()) {
                 endpoint.processSocketAsync(this.socket, SocketStatus.OPEN);
-            } else {
-                dispatch.set(false);
             }
         } else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) {
-          //TODO SERVLET3 - async
-            if (param==null) return;
+            if (param == null) return;
             long timeout = ((Long)param).longValue();
-            //if we are not piggy backing on a worker thread, set the timeout
+            // if we are not piggy backing on a worker thread, set the timeout
             socket.setTimeout(timeout);
         } else if (actionCode == ActionCode.ASYNC_DISPATCH) {
-            RequestInfo rp = request.getRequestProcessor();
-            AtomicBoolean dispatch = (AtomicBoolean)param;
-            if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) {//async handling
+            if (asyncDispatch()) {
                 endpoint.processSocketAsync(this.socket, SocketStatus.OPEN);
-                dispatch.set(true);
-            } else { 
-                dispatch.set(true);
             }
         }
-
-
     }
 
 
index cf2fba3..e99eccd 100644 (file)
 
 package org.apache.coyote.http11;
 
+import java.io.IOException;
 import java.net.Socket;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -257,6 +260,9 @@ public class Http11Protocol extends AbstractHttp11JsseProtocol {
                 if (state == SocketState.LONG) {
                     connections.put(socket, processor);
                     socket.setAsync(true);
+                    // longPoll may change socket state (e.g. to trigger a
+                    // complete or dispatch)
+                    return processor.asyncPostProcess();
                 } else {
                     connections.remove(socket);
                     socket.setAsync(false);
@@ -310,15 +316,29 @@ public class Http11Protocol extends AbstractHttp11JsseProtocol {
                 synchronized (this) {
                     try {
                         long count = registerCount.incrementAndGet();
-                        RequestInfo rp = processor.getRequest().getRequestProcessor();
+                        final RequestInfo rp = processor.getRequest().getRequestProcessor();
                         rp.setGlobalProcessor(global);
-                        ObjectName rpName = new ObjectName
+                        final ObjectName rpName = new ObjectName
                             (proto.getDomain() + ":type=RequestProcessor,worker="
                                 + proto.getName() + ",name=HttpRequest" + count);
                         if (log.isDebugEnabled()) {
                             log.debug("Register " + rpName);
                         }
-                        Registry.getRegistry(null, null).registerComponent(rp, rpName, null);
+                        if (Constants.IS_SECURITY_ENABLED) {
+                            AccessController.doPrivileged(new PrivilegedAction<Void>() {
+                                @Override
+                                public Void run() {
+                                    try {
+                                        Registry.getRegistry(null, null).registerComponent(rp, rpName, null);
+                                    } catch (Exception e) {
+                                        log.warn("Error registering request");
+                                    }
+                                    return null;
+                                }
+                            });
+                        } else {
+                            Registry.getRegistry(null, null).registerComponent(rp, rpName, null);
+                        }
                         rp.setRpName(rpName);
                     } catch (Exception e) {
                         log.warn("Error registering request");
index 298c66e..22b2a03 100644 (file)
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+abstractHttp11Protocol.invalidAsyncState=Calling [{0}] is not valid for a request with Async state [{1}]
+
 http11protocol.destroy=Destroying Coyote HTTP/1.1 on {0}
 http11protocol.endpoint.initerror=Error initializing endpoint
 http11protocol.endpoint.starterror=Error starting endpoint
index 917a6d4..8b936a9 100644 (file)
@@ -78,7 +78,7 @@ public abstract class AbstractEndpoint {
      */
     public static interface Handler {
         public enum SocketState {
-            OPEN, CLOSED, LONG
+            OPEN, CLOSED, LONG, ASYNC_END
         }
     }
     
index fcb9dde..93a4458 100644 (file)
@@ -21,6 +21,8 @@ import java.security.AccessController;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.juli.logging.Log;
@@ -93,6 +95,9 @@ public class AprEndpoint extends AbstractEndpoint {
     private Acceptor acceptors[] = null;
 
 
+    protected ConcurrentLinkedQueue<SocketWrapper<Long>> waitingRequests =
+        new ConcurrentLinkedQueue<SocketWrapper<Long>>();
+    
     // ------------------------------------------------------------- Properties
 
 
@@ -101,6 +106,7 @@ public class AprEndpoint extends AbstractEndpoint {
      */
     protected boolean deferAccept = true;
     public void setDeferAccept(boolean deferAccept) { this.deferAccept = deferAccept; }
+    @Override
     public boolean getDeferAccept() { return deferAccept; }
 
 
@@ -142,6 +148,7 @@ public class AprEndpoint extends AbstractEndpoint {
      */
     protected boolean useSendfile = Library.APR_HAS_SENDFILE;
     public void setUseSendfile(boolean useSendfile) { this.useSendfile = useSendfile; }
+    @Override
     public boolean getUseSendfile() { return useSendfile; }
 
 
@@ -580,6 +587,12 @@ public class AprEndpoint extends AbstractEndpoint {
                 acceptors[i].start();
             }
 
+            // Start async timeout thread
+            Thread timeoutThread = new Thread(new AsyncTimeout(),
+                    getName() + "-AsyncTimeout");
+            timeoutThread.setPriority(threadPriority);
+            timeoutThread.setDaemon(true);
+            timeoutThread.start();
         }
     }
 
@@ -587,6 +600,7 @@ public class AprEndpoint extends AbstractEndpoint {
     /**
      * Stop the endpoint. This will cause all processing threads to stop.
      */
+    @Override
     public void stop() {
         if (!paused) {
             pause();
@@ -748,7 +762,9 @@ public class AprEndpoint extends AbstractEndpoint {
         try {
             // During shutdown, executor may be null - avoid NPE
             if (running) {
-                getExecutor().execute(new SocketWithOptionsProcessor(socket));
+                SocketWrapper<Long> wrapper =
+                    new SocketWrapper<Long>(Long.valueOf(socket));
+                getExecutor().execute(new SocketWithOptionsProcessor(wrapper));
             }
         } catch (RejectedExecutionException x) {
             log.warn("Socket processing request was rejected for:"+socket,x);
@@ -768,7 +784,9 @@ public class AprEndpoint extends AbstractEndpoint {
      */
     protected boolean processSocket(long socket) {
         try {
-            getExecutor().execute(new SocketProcessor(socket));
+            SocketWrapper<Long> wrapper =
+                new SocketWrapper<Long>(Long.valueOf(socket));
+            getExecutor().execute(new SocketProcessor(wrapper, null));
         } catch (RejectedExecutionException x) {
             log.warn("Socket processing request was rejected for:"+socket,x);
             return false;
@@ -789,8 +807,10 @@ public class AprEndpoint extends AbstractEndpoint {
         try {
             if (status == SocketStatus.OPEN || status == SocketStatus.STOP ||
                     status == SocketStatus.TIMEOUT) {
+                SocketWrapper<Long> wrapper =
+                    new SocketWrapper<Long>(Long.valueOf(socket));
                 SocketEventProcessor proc =
-                    new SocketEventProcessor(socket, status);
+                    new SocketEventProcessor(wrapper, status);
                 ClassLoader loader = Thread.currentThread().getContextClassLoader();
                 try {
                     if (IS_SECURITY_ENABLED) {
@@ -822,6 +842,45 @@ public class AprEndpoint extends AbstractEndpoint {
         return true;
     }
 
+    public boolean processSocketAsync(SocketWrapper<Long> socket,
+            SocketStatus status) {
+        try {
+            synchronized (socket) {
+                if (waitingRequests.remove(socket)) {
+                    SocketProcessor proc = new SocketProcessor(socket, status);
+                    ClassLoader loader = Thread.currentThread().getContextClassLoader();
+                    try {
+                        if (IS_SECURITY_ENABLED) {
+                            PrivilegedAction<Void> pa = new PrivilegedSetTccl(
+                                    getClass().getClassLoader());
+                            AccessController.doPrivileged(pa);
+                        } else {
+                            Thread.currentThread().setContextClassLoader(
+                                    getClass().getClassLoader());
+                        }
+                        getExecutor().execute(proc);
+                    } finally {
+                        if (IS_SECURITY_ENABLED) {
+                            PrivilegedAction<Void> pa = new PrivilegedSetTccl(loader);
+                            AccessController.doPrivileged(pa);
+                        } else {
+                            Thread.currentThread().setContextClassLoader(loader);
+                        }
+                    }
+                }
+            }
+        } catch (RejectedExecutionException x) {
+            log.warn("Socket processing request was rejected for:"+socket,x);
+            return false;
+        } catch (Throwable t) {
+            // This means we got an OOM or similar creating a thread, or that
+            // the pool and its queue are full
+            log.error(sm.getString("endpoint.process.fail"), t);
+            return false;
+        }
+        return true;
+    }
+
     private void destroySocket(long socket)
     {
         if (running && socket != 0) {
@@ -898,9 +957,50 @@ public class AprEndpoint extends AbstractEndpoint {
     }
 
 
-    // ----------------------------------------------------- Poller Inner Class
-
+    /**
+     * Async timeout thread
+     */
+    protected class AsyncTimeout implements Runnable {
+        /**
+         * The background thread that checks async requests and fires the
+         * timeout if there has been no activity.
+         */
+        @Override
+        public void run() {
 
+            // Loop until we receive a shutdown command
+            while (running) {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    // Ignore
+                }
+                long now = System.currentTimeMillis();
+                Iterator<SocketWrapper<Long>> sockets =
+                    waitingRequests.iterator();
+                while (sockets.hasNext()) {
+                    SocketWrapper<Long> socket = sockets.next();
+                    long access = socket.getLastAccess();
+                    if ((now-access)>socket.getTimeout()) {
+                        processSocketAsync(socket,SocketStatus.TIMEOUT);
+                    }
+                }
+                
+                // Loop if endpoint is paused
+                while (paused && running) {
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e) {
+                        // Ignore
+                    }
+                }
+                
+            }
+        }
+    }
+    
+    
+    // ----------------------------------------------------- Poller Inner Class
     /**
      * Poller class.
      */
@@ -951,6 +1051,7 @@ public class AprEndpoint extends AbstractEndpoint {
         /**
          * Destroy the poller.
          */
+        @Override
         public void destroy() {
             // Close all sockets in the add queue
             for (int i = 0; i < addCount; i++) {
@@ -1015,6 +1116,7 @@ public class AprEndpoint extends AbstractEndpoint {
          * The background thread that listens for incoming TCP/IP connections and
          * hands them off to an appropriate processor.
          */
+        @Override
         public void run() {
 
             long maintainTime = 0;
@@ -1212,6 +1314,7 @@ public class AprEndpoint extends AbstractEndpoint {
         /**
          * Destroy the poller.
          */
+        @Override
         public void destroy() {
             // Close any socket remaining in the add queue
             addCount = 0;
@@ -1314,6 +1417,7 @@ public class AprEndpoint extends AbstractEndpoint {
          * The background thread that listens for incoming TCP/IP connections and
          * hands them off to an appropriate processor.
          */
+        @Override
         public void run() {
 
             long maintainTime = 0;
@@ -1478,15 +1582,16 @@ public class AprEndpoint extends AbstractEndpoint {
      * thread local fields.
      */
     public interface Handler extends AbstractEndpoint.Handler {
-        public SocketState process(long socket);
-        public SocketState event(long socket, SocketStatus status);
-        public SocketState asyncDispatch(long socket, SocketStatus status);
+        public SocketState process(SocketWrapper<Long> socket);
+        public SocketState event(SocketWrapper<Long> socket,
+                SocketStatus status);
+        public SocketState asyncDispatch(SocketWrapper<Long> socket,
+                SocketStatus status);
     }
 
 
     // ---------------------------------------------- SocketProcessor Inner Class
 
-
     /**
      * This class is the equivalent of the Worker, but will simply use in an
      * external Executor thread pool. This will also set the socket options
@@ -1494,29 +1599,32 @@ public class AprEndpoint extends AbstractEndpoint {
      */
     protected class SocketWithOptionsProcessor implements Runnable {
 
-        protected long socket = 0;
+        protected SocketWrapper<Long> socket = null;
 
-        public SocketWithOptionsProcessor(long socket) {
+        public SocketWithOptionsProcessor(SocketWrapper<Long> socket) {
             this.socket = socket;
         }
 
+        @Override
         public void run() {
 
-            if (!deferAccept) {
-                if (setSocketOptions(socket)) {
-                    getPoller().add(socket);
+            synchronized (socket) {
+                if (!deferAccept) {
+                    if (setSocketOptions(socket.getSocket().longValue())) {
+                        getPoller().add(socket.getSocket().longValue());
+                    } else {
+                        // Close socket and pool
+                        destroySocket(socket.getSocket().longValue());
+                        socket = null;
+                    }
                 } else {
-                    // Close socket and pool
-                    destroySocket(socket);
-                    socket = 0;
-                }
-            } else {
-                // Process the request from this socket
-                if (!setSocketOptions(socket)
-                        || handler.process(socket) == Handler.SocketState.CLOSED) {
-                    // Close socket and pool
-                    destroySocket(socket);
-                    socket = 0;
+                    // Process the request from this socket
+                    if (!setSocketOptions(socket.getSocket().longValue())
+                            || handler.process(socket) == Handler.SocketState.CLOSED) {
+                        // Close socket and pool
+                        destroySocket(socket.getSocket().longValue());
+                        socket = null;
+                    }
                 }
             }
 
@@ -1534,27 +1642,34 @@ public class AprEndpoint extends AbstractEndpoint {
      */
     protected class SocketProcessor implements Runnable {
 
-        protected long socket = 0;
-        protected boolean async = false;
-        protected SocketStatus status = SocketStatus.ERROR;
+        protected SocketWrapper<Long> socket = null;
+        protected SocketStatus status = null;
 
-        public SocketProcessor(long socket) {
+        public SocketProcessor(SocketWrapper<Long> socket,
+                SocketStatus status) {
             this.socket = socket;
-            this.async = false;
+            this.status = status;
         }
 
+        @Override
         public void run() {
-
-            // Process the request from this socket
-            Handler.SocketState state = async?handler.asyncDispatch(socket, status):handler.process(socket);
-            if (state == Handler.SocketState.CLOSED) {
-                // Close socket and pool
-                destroySocket(socket);
-                socket = 0;
+            synchronized (socket) {
+                // Process the request from this socket
+                Handler.SocketState state = (status==null)?handler.process(socket):handler.asyncDispatch(socket, status);
+                if (state == Handler.SocketState.CLOSED) {
+                    // Close socket and pool
+                    destroySocket(socket.getSocket().longValue());
+                    socket = null;
+                } else if (state == Handler.SocketState.LONG) {
+                    socket.access();
+                    waitingRequests.add(socket);
+                } else if (state == Handler.SocketState.ASYNC_END) {
+                    socket.access();
+                    SocketProcessor proc = new SocketProcessor(socket, SocketStatus.OPEN);
+                    getExecutor().execute(proc);
+                }
             }
-
         }
-
     }
 
 
@@ -1567,25 +1682,27 @@ public class AprEndpoint extends AbstractEndpoint {
      */
     protected class SocketEventProcessor implements Runnable {
 
-        protected long socket = 0;
+        protected SocketWrapper<Long> socket = null;
         protected SocketStatus status = null;
 
-        public SocketEventProcessor(long socket, SocketStatus status) {
+        public SocketEventProcessor(SocketWrapper<Long> socket,
+                SocketStatus status) {
             this.socket = socket;
             this.status = status;
         }
 
+        @Override
         public void run() {
-
-            // Process the request from this socket
-            if (handler.event(socket, status) == Handler.SocketState.CLOSED) {
-                // Close socket and pool
-                destroySocket(socket);
-                socket = 0;
+            synchronized (socket) {
+                // Process the request from this socket
+                Handler.SocketState state = handler.event(socket, status);
+                if (state == Handler.SocketState.CLOSED) {
+                    // Close socket and pool
+                    destroySocket(socket.getSocket().longValue());
+                    socket = null;
+                }
             }
-
         }
-
     }
 
     private static class PrivilegedSetTccl implements PrivilegedAction<Void> {
@@ -1596,6 +1713,7 @@ public class AprEndpoint extends AbstractEndpoint {
             this.cl = cl;
         }
 
+        @Override
         public Void run() {
             Thread.currentThread().setContextClassLoader(cl);
             return null;
index 4e5d299..0c489ad 100644 (file)
@@ -122,8 +122,6 @@ public class JIoEndpoint extends AbstractEndpoint {
     }
 
 
-    // --------------------------------------------------- Acceptor Inner Class
-
     /**
      * Async timeout thread
      */
@@ -165,6 +163,9 @@ public class JIoEndpoint extends AbstractEndpoint {
             }
         }
     }
+
+    
+    // --------------------------------------------------- Acceptor Inner Class
     /**
      * Server socket acceptor thread.
      */
@@ -199,15 +200,15 @@ public class JIoEndpoint extends AbstractEndpoint {
                     
                     // Configure the socket
                     if (setSocketOptions(socket)) {
-                        // Hand this socket off to an appropriate processor
-                        if (!processSocket(socket)) {
-                            // Close socket right away
-                            try {
-                                socket.close();
-                            } catch (IOException e) {
-                                // Ignore
-                            }
+                    // Hand this socket off to an appropriate processor
+                    if (!processSocket(socket)) {
+                        // Close socket right away
+                        try {
+                            socket.close();
+                        } catch (IOException e) {
+                            // Ignore
                         }
+                    }
                     } else {
                         // Close socket right away
                         try {
@@ -258,56 +259,58 @@ public class JIoEndpoint extends AbstractEndpoint {
         @Override
         public void run() {
             boolean launch = false;
-            try {
-                
-                if (!socket.processing.compareAndSet(false, true)) {
-                    log.error("Unable to process socket. Invalid state.");
-                    return;
-                }
-                
-                SocketState state = SocketState.OPEN;
-
+            synchronized (socket) {
                 try {
-                    // SSL handshake
-                    serverSocketFactory.handshake(socket.getSocket());
-                } catch (Throwable t) {
-                    if (log.isDebugEnabled()) {
-                        log.debug(sm.getString("endpoint.err.handshake"), t);
-                    }
-                    // Tell to close the socket
-                    state = SocketState.CLOSED;
-                }
+                    SocketState state = SocketState.OPEN;
 
-                if ( (state != SocketState.CLOSED) ) {
-                    state = (status==null)?handler.process(socket):handler.process(socket,status);
-                }
-                if (state == SocketState.CLOSED) {
-                    // Close socket
-                    if (log.isTraceEnabled()) {
-                        log.trace("Closing socket:"+socket);
-                    }
                     try {
-                        socket.getSocket().close();
-                    } catch (IOException e) {
-                        // Ignore
+                        // SSL handshake
+                        serverSocketFactory.handshake(socket.getSocket());
+                    } catch (Throwable t) {
+                        if (log.isDebugEnabled()) {
+                            log.debug(sm.getString("endpoint.err.handshake"), t);
+                        }
+                        // Tell to close the socket
+                        state = SocketState.CLOSED;
+                    }
+                        
+                    if ( (state != SocketState.CLOSED) ) {
+                        state = (status==null)?handler.process(socket):handler.process(socket,status);
+                    }
+                    if (state == SocketState.CLOSED) {
+                        // Close socket
+                        if (log.isTraceEnabled()) {
+                            log.trace("Closing socket:"+socket);
+                        }
+                        try {
+                            socket.getSocket().close();
+                        } catch (IOException e) {
+                            // Ignore
+                        }
+                    } else if (state == SocketState.ASYNC_END ||
+                            state == SocketState.OPEN){
+                        socket.setKeptAlive(true);
+                        socket.access();
+                        launch = true;
+                    } else if (state == SocketState.LONG) {
+                        socket.access();
+                        waitingRequests.add(socket);
+                    }
+                } finally {
+                    if (launch) {
+                        try {
+                            getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN));
+                        } catch (NullPointerException npe) {
+                            if (running) {
+                                log.error(sm.getString("endpoint.launch.fail"),
+                                        npe);
+                            }
+                        }
                     }
-                } else if (state == SocketState.OPEN){
-                    socket.setKeptAlive(true);
-                    socket.access();
-                    //keepalive connection
-                    //TODO - servlet3 check async status, we may just be in a hold pattern
-                    launch = true;
-                } else if (state == SocketState.LONG) {
-                    socket.access();
-                    waitingRequests.add(socket);
                 }
-            } finally {
-                socket.processing.set(false);
-                if (launch) getExecutor().execute(new SocketProcessor(socket));
-                socket = null;
             }
+            socket = null;
             // Finish up this request
-            
         }
         
     }
@@ -469,7 +472,7 @@ public class JIoEndpoint extends AbstractEndpoint {
      */
     protected boolean setSocketOptions(Socket socket) {
         serverSocketFactory.initSocket(socket);
-
+        
         try {
             // 1: Set socket options: timeout, linger, etc
             socketProperties.setProperties(socket);
@@ -488,7 +491,7 @@ public class JIoEndpoint extends AbstractEndpoint {
         return true;
     }
 
-
+    
     /**
      * Process a new connection from a new client. Wraps the socket so
      * keep-alive and other attributes can be tracked and then passes the socket
index 9047a79..9d81150 100644 (file)
@@ -811,10 +811,12 @@ public class NioEndpoint extends AbstractEndpoint {
                             }
                         } 
                     }
-                }catch (SocketTimeoutException sx) {
+                } catch (SocketTimeoutException sx) {
                     //normal condition
-                }catch ( IOException x ) {
-                    if ( running ) log.error(sm.getString("endpoint.accept.fail"), x);
+                } catch (IOException x) {
+                    if (running) {
+                        log.error(sm.getString("endpoint.accept.fail"), x);
+                    }
                 } catch (OutOfMemoryError oom) {
                     try {
                         oomParachuteData = null;
@@ -1352,7 +1354,6 @@ public class NioEndpoint extends AbstractEndpoint {
             this.socket = channel;
             this.poller = poller;
             lastAccess = System.currentTimeMillis();
-            currentAccess = false;
             comet = false;
             timeout = soTimeout;
             error = false;
@@ -1489,82 +1490,96 @@ public class NioEndpoint extends AbstractEndpoint {
          
         @Override
         public void run() {
-            SelectionKey key = null;
-            try {
-                key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
-                int handshake = -1;
-                
+            boolean launch = false;
+            synchronized (socket) {
+                SelectionKey key = null;
                 try {
-                    if (key!=null) handshake = socket.handshake(key.isReadable(), key.isWritable());
-                }catch ( IOException x ) {
-                    handshake = -1;
-                    if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x);
-                }catch ( CancelledKeyException ckx ) {
-                    handshake = -1;
-                }
-                if ( handshake == 0 ) {
-                    SocketState state = SocketState.OPEN;
-                    // Process the request from this socket
-                    state = (status==null)?handler.process(socket):handler.event(socket,status);
-
-                    if (state == SocketState.CLOSED) {
-                        // Close socket and pool
+                    key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
+                    int handshake = -1;
+                    
+                    try {
+                        if (key!=null) handshake = socket.handshake(key.isReadable(), key.isWritable());
+                    }catch ( IOException x ) {
+                        handshake = -1;
+                        if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x);
+                    }catch ( CancelledKeyException ckx ) {
+                        handshake = -1;
+                    }
+                    if ( handshake == 0 ) {
+                        SocketState state = SocketState.OPEN;
+                        // Process the request from this socket
+                        state = (status==null)?handler.process(socket):handler.event(socket,status);
+    
+                        if (state == SocketState.CLOSED) {
+                            // Close socket and pool
+                            try {
+                                KeyAttachment ka = null;
+                                if (key!=null) {
+                                    ka = (KeyAttachment) key.attachment();
+                                    if (ka!=null) ka.setComet(false);
+                                    socket.getPoller().cancelledKey(key, SocketStatus.ERROR, false);
+                                }
+                                if (socket!=null) nioChannels.offer(socket);
+                                socket = null;
+                                if ( ka!=null ) keyCache.offer(ka);
+                                ka = null;
+                            }catch ( Exception x ) {
+                                log.error("",x);
+                            }
+                        } else if (state == SocketState.ASYNC_END) {
+                            launch = true;
+                        }
+                    } else if (handshake == -1 ) {
+                        KeyAttachment ka = null;
+                        if (key!=null) {
+                            ka = (KeyAttachment) key.attachment();
+                            socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT, false);
+                        }
+                        if (socket!=null) nioChannels.offer(socket);
+                        socket = null;
+                        if ( ka!=null ) keyCache.offer(ka);
+                        ka = null;
+                    } else {
+                        final SelectionKey fk = key;
+                        final int intops = handshake;
+                        final KeyAttachment ka = (KeyAttachment)fk.attachment();
+                        ka.getPoller().add(socket,intops);
+                    }
+                }catch(CancelledKeyException cx) {
+                    socket.getPoller().cancelledKey(key,null,false);
+                } catch (OutOfMemoryError oom) {
+                    try {
+                        oomParachuteData = null;
+                        socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
+                        releaseCaches();
+                        log.error("", oom);
+                    }catch ( Throwable oomt ) {
+                        try {
+                            System.err.println(oomParachuteMsg);
+                            oomt.printStackTrace();
+                        }catch (Throwable letsHopeWeDontGetHere){}
+                    }
+                }catch ( Throwable t ) {
+                    log.error("",t);
+                    socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
+                } finally {
+                    if (launch) {
                         try {
-                            KeyAttachment ka = null;
-                            if (key!=null) {
-                                ka = (KeyAttachment) key.attachment();
-                                if (ka!=null) ka.setComet(false);
-                                socket.getPoller().cancelledKey(key, SocketStatus.ERROR, false);
+                            getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN));
+                        } catch (NullPointerException npe) {
+                            if (running) {
+                                log.error(sm.getString("endpoint.launch.fail"),
+                                        npe);
                             }
-                            if (socket!=null) nioChannels.offer(socket);
-                            socket = null;
-                            if ( ka!=null ) keyCache.offer(ka);
-                            ka = null;
-                        }catch ( Exception x ) {
-                            log.error("",x);
                         }
-                    } 
-                } else if (handshake == -1 ) {
-                    KeyAttachment ka = null;
-                    if (key!=null) {
-                        ka = (KeyAttachment) key.attachment();
-                        socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT, false);
                     }
-                    if (socket!=null) nioChannels.offer(socket);
                     socket = null;
-                    if ( ka!=null ) keyCache.offer(ka);
-                    ka = null;
-                } else {
-                    final SelectionKey fk = key;
-                    final int intops = handshake;
-                    final KeyAttachment ka = (KeyAttachment)fk.attachment();
-                    ka.getPoller().add(socket,intops);
+                    status = null;
+                    //return to cache
+                    processorCache.offer(this);
                 }
-            }catch(CancelledKeyException cx) {
-                socket.getPoller().cancelledKey(key,null,false);
-            } catch (OutOfMemoryError oom) {
-                try {
-                    oomParachuteData = null;
-                    socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
-                    releaseCaches();
-                    log.error("", oom);
-                }catch ( Throwable oomt ) {
-                    try {
-                        System.err.println(oomParachuteMsg);
-                        oomt.printStackTrace();
-                    }catch (Throwable letsHopeWeDontGetHere){}
-                }
-            }catch ( Throwable t ) {
-                log.error("",t);
-                socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
-            } finally {
-                socket = null;
-                status = null;
-                //return to cache
-                processorCache.offer(this);
             }
         }
-
     }
 
     // ----------------------------------------------- SendfileData Inner Class
index 80df590..a0d11c1 100644 (file)
  */
 package org.apache.tomcat.util.net;
 
-import java.util.concurrent.atomic.AtomicBoolean;
-
-
 public class SocketWrapper<E> {
     
     protected volatile E socket;
     
     protected volatile long lastAccess = -1;
-    protected volatile boolean currentAccess = false;
     protected long timeout = -1;
     protected boolean error = false;
     protected long lastRegistered = 0;
     protected volatile int keepAliveLeft = 100;
     protected boolean async = false;
     protected boolean keptAlive = false;
-    public AtomicBoolean processing = new AtomicBoolean(false);
     
     public SocketWrapper(E socket) {
         this.socket = socket;
@@ -46,8 +41,6 @@ public class SocketWrapper<E> {
     public long getLastAccess() { return lastAccess; }
     public void access() { access(System.currentTimeMillis()); }
     public void access(long access) { lastAccess = access; }
-    public boolean getCurrentAccess() { return currentAccess; }
-    public void setCurrentAccess(boolean access) { currentAccess = access; }
     public void setTimeout(long timeout) {this.timeout = timeout;}
     public long getTimeout() {return this.timeout;}
     public boolean getError() { return error; }
index 686731c..bb70316 100644 (file)
@@ -32,6 +32,7 @@ endpoint.init.bind=Socket bind failed: [{0}] {1}
 endpoint.init.listen=Socket listen failed: [{0}] {1}
 endpoint.init.notavail=APR not available
 endpoint.accept.fail=Socket accept failed
+endpoint.launch.fail=Failed to launch new runnable
 endpoint.poll.limitedpollsize=Failed to create poller with specified size of {0}
 endpoint.poll.initfail=Poller creation failed
 endpoint.poll.fail=Critical poller failure (restarting poller): [{0}] {1}
index 000a410..2ba8902 100644 (file)
@@ -24,6 +24,7 @@ import javax.servlet.AsyncContext;
 import javax.servlet.AsyncEvent;
 import javax.servlet.AsyncListener;
 import javax.servlet.ServletException;
+import javax.servlet.ServletResponse;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
@@ -177,7 +178,7 @@ public class TestAsyncContextImpl extends TomcatBaseTest {
             result  = new StringBuilder();
             result.append('1');
             result.append(req.isAsyncStarted());
-            req.startAsync();
+            req.startAsync().setTimeout(10000);
             result.append('2');
             result.append(req.isAsyncStarted());
             
@@ -305,13 +306,22 @@ public class TestAsyncContextImpl extends TomcatBaseTest {
                 throws ServletException, IOException {
             
             AsyncContext actxt = req.startAsync();
+            actxt.setTimeout(3000);
             resp.setContentType("text/plain");
             resp.getWriter().print("OK");
             actxt.complete();
         }
     }
 
-    public void testTimeout() throws Exception {
+    public void testTimeoutListenerComplete() throws Exception {
+        doTestTimeout(true);
+    }
+    
+    public void testTimeoutListenerNoComplete() throws Exception {
+        doTestTimeout(false);
+    }
+    
+    private void doTestTimeout(boolean completeOnTimeout) throws Exception {
         // Setup Tomcat instance
         Tomcat tomcat = getTomcatInstance();
         
@@ -326,7 +336,7 @@ public class TestAsyncContextImpl extends TomcatBaseTest {
         
         Context ctx = tomcat.addContext("", docBase.getAbsolutePath());
 
-        TimeoutServlet timeout = new TimeoutServlet();
+        TimeoutServlet timeout = new TimeoutServlet(completeOnTimeout);
 
         Wrapper wrapper = Tomcat.addServlet(ctx, "time", timeout);
         wrapper.setAsyncSupported(true);
@@ -334,47 +344,383 @@ public class TestAsyncContextImpl extends TomcatBaseTest {
 
         tomcat.start();
         ByteChunk res = getUrl("http://localhost:" + getPort() + "/async");
-        assertEquals("OK", res.toString());
+        StringBuilder expected = new StringBuilder();
+        expected.append("TimeoutServletGet-onTimeout-");
+        if (!completeOnTimeout) {
+            expected.append("onError-");
+        }
+        expected.append("onComplete-");
+        assertEquals(expected.toString(), res.toString());
     }
     
     private static class TimeoutServlet extends HttpServlet {
         private static final long serialVersionUID = 1L;
 
+        private boolean completeOnTimeout;
+        
+        public TimeoutServlet(boolean completeOnTimeout) {
+            this.completeOnTimeout = completeOnTimeout;
+        }
+        
         @Override
         protected void doGet(HttpServletRequest req, HttpServletResponse resp) 
                 throws ServletException, IOException {
             if (req.isAsyncSupported()) {
-                resp.getWriter().print("OK");
+                resp.getWriter().print("TimeoutServletGet-");
                 final AsyncContext ac = req.startAsync();
-                ac.setTimeout(2000);
+                ac.setTimeout(3000);
                 
-                ac.addListener(new TimeoutListener());
+                ac.addListener(new TrackingListener(false, completeOnTimeout));
             } else
                 resp.getWriter().print("FAIL: Async unsupported");
         }
     }
 
-    private static class TimeoutListener implements AsyncListener {
+    public void testDispatchSingle() throws Exception {
+        doTestDispatch(1, false);
+    }
+    
+    public void testDispatchDouble() throws Exception {
+        doTestDispatch(2, false);
+    }
+    
+    public void testDispatchMultiple() throws Exception {
+        doTestDispatch(5, false);
+    }
+    
+    public void testDispatchWithThreadSingle() throws Exception {
+        doTestDispatch(1, true);
+    }
+    
+    public void testDispatchWithThreadDouble() throws Exception {
+        doTestDispatch(2, true);
+    }
+    
+    public void testDispatchWithThreadMultiple() throws Exception {
+        doTestDispatch(5, true);
+    }
+    
+    private void doTestDispatch(int iter, boolean useThread) throws Exception {
+        // Setup Tomcat instance
+        Tomcat tomcat = getTomcatInstance();
+        
+        // Must have a real docBase - just use temp
+        File docBase = new File(System.getProperty("java.io.tmpdir"));
+        
+        Context ctx = tomcat.addContext("", docBase.getAbsolutePath());
+
+        DispatchingServlet dispatch = new DispatchingServlet(false, false);
+        Wrapper wrapper = Tomcat.addServlet(ctx, "dispatch", dispatch);
+        wrapper.setAsyncSupported(true);
+        ctx.addServletMapping("/stage1", "dispatch");
+
+        NonAsyncServlet nonasync = new NonAsyncServlet();
+        Wrapper wrapper2 = Tomcat.addServlet(ctx, "nonasync", nonasync);
+        wrapper2.setAsyncSupported(true);
+        ctx.addServletMapping("/stage2", "nonasync");
+
+        tomcat.start();
+        
+        StringBuilder url = new StringBuilder(48);
+        url.append("http://localhost:");
+        url.append(getPort());
+        url.append("/stage1?iter=");
+        url.append(iter);
+        if (useThread) {
+            url.append("&useThread=y");
+        }
+        ByteChunk res = getUrl(url.toString());
+        
+        StringBuilder expected = new StringBuilder();
+        int loop = iter;
+        while (loop > 0) {
+            expected.append("DispatchingServletGet-");
+            loop--;
+        }
+        expected.append("NonAsyncServletGet-");
+        assertEquals(expected.toString(), res.toString());
+    }
+    
+    private static class DispatchingServlet extends HttpServlet {
+
+        private static final long serialVersionUID = 1L;
+        private static final String ITER_PARAM = "iter";
+        private boolean addTrackingListener = false;
+        private boolean completeOnError = false;
+        
+        public DispatchingServlet(boolean addTrackingListener,
+                boolean completeOnError) {
+            this.addTrackingListener = addTrackingListener;
+            this.completeOnError = completeOnError;
+        }
+        
+        @Override
+        protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+                throws ServletException, IOException {
+            resp.getWriter().write("DispatchingServletGet-");
+            resp.flushBuffer();
+            final int iter = Integer.parseInt(req.getParameter(ITER_PARAM)) - 1;
+            final AsyncContext ctxt = req.startAsync();
+            if (addTrackingListener) {
+                TrackingListener listener =
+                    new TrackingListener(completeOnError, true); 
+                ctxt.addListener(listener);
+            }
+            Runnable run = new Runnable() {
+                @Override
+                public void run() {
+                    if (iter > 0) {
+                        ctxt.dispatch("/stage1?" + ITER_PARAM + "=" + iter);
+                    } else {
+                        ctxt.dispatch("/stage2");
+                    }
+                }
+            };
+            if ("y".equals(req.getParameter("useThread"))) {
+                new Thread(run).start();
+            } else {
+                run.run();
+            }
+        }
+    }
+
+    private static class NonAsyncServlet extends HttpServlet {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+                throws ServletException, IOException {
+            resp.getWriter().write("NonAsyncServletGet-");
+            resp.flushBuffer();
+        }
+    }
+    
+    public void testListeners() throws Exception {
+        // Setup Tomcat instance
+        Tomcat tomcat = getTomcatInstance();
+        
+        // Must have a real docBase - just use temp
+        File docBase = new File(System.getProperty("java.io.tmpdir"));
+        
+        Context ctx = tomcat.addContext("", docBase.getAbsolutePath());
+
+        TrackingServlet tracking = new TrackingServlet();
+        Wrapper wrapper = Tomcat.addServlet(ctx, "tracking", tracking);
+        wrapper.setAsyncSupported(true);
+        ctx.addServletMapping("/stage1", "tracking");
+
+        TimeoutServlet timeout = new TimeoutServlet(true);
+        Wrapper wrapper2 = Tomcat.addServlet(ctx, "timeout", timeout);
+        wrapper2.setAsyncSupported(true);
+        ctx.addServletMapping("/stage2", "timeout");
+
+        tomcat.start();
+        
+        StringBuilder url = new StringBuilder(48);
+        url.append("http://localhost:");
+        url.append(getPort());
+        url.append("/stage1");
+
+        ByteChunk res = getUrl(url.toString());
+        
+        assertEquals(
+                "DispatchingServletGet-DispatchingServletGet-onStartAsync-" +
+                "TimeoutServletGet-onStartAsync-onTimeout-onComplete-",
+                res.toString());
+    }
+
+    private static class TrackingServlet extends HttpServlet {
+
+        private static final long serialVersionUID = 1L;
+        
+        private static volatile boolean first = true;
+        
+        @Override
+        protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+                throws ServletException, IOException {
+            resp.getWriter().write("DispatchingServletGet-");
+            resp.flushBuffer();
+
+            final boolean first = TrackingServlet.first;
+            TrackingServlet.first = false;
+
+            final AsyncContext ctxt = req.startAsync();
+            TrackingListener listener = new TrackingListener(false, true); 
+            ctxt.addListener(listener);
+            ctxt.setTimeout(3000);
+
+            Runnable run = new Runnable() {
+                @Override
+                public void run() {
+                    if (first) {
+                        ctxt.dispatch("/stage1");
+                    } else {
+                        ctxt.dispatch("/stage2");
+                    }
+                }
+            };
+            if ("y".equals(req.getParameter("useThread"))) {
+                new Thread(run).start();
+            } else {
+                run.run();
+            }
+        }
+    }
+
+    private static class TrackingListener implements AsyncListener {
+        
+        private boolean completeOnError;
+        private boolean completeOnTimeout;
+        
+        public TrackingListener(boolean completeOnError,
+                boolean completeOnTimeout) {
+            this.completeOnError = completeOnError;
+            this.completeOnTimeout = completeOnTimeout;
+        }
 
         @Override
         public void onComplete(AsyncEvent event) throws IOException {
-            // NO-OP
+            ServletResponse resp = event.getAsyncContext().getResponse(); 
+            resp.getWriter().write("onComplete-");
+            resp.flushBuffer();
         }
 
         @Override
         public void onTimeout(AsyncEvent event) throws IOException {
-            event.getAsyncContext().complete();
+            ServletResponse resp = event.getAsyncContext().getResponse(); 
+            resp.getWriter().write("onTimeout-");
+            resp.flushBuffer();
+            if (completeOnTimeout){
+                event.getAsyncContext().complete();
+            }
         }
 
         @Override
         public void onError(AsyncEvent event) throws IOException {
-            // NOOP
+            ServletResponse resp = event.getAsyncContext().getResponse(); 
+            resp.getWriter().write("onError-");
+            resp.flushBuffer();
+            if (completeOnError) {
+                event.getAsyncContext().complete();
+            }
         }
 
         @Override
         public void onStartAsync(AsyncEvent event) throws IOException {
-            // NOOP
+            ServletResponse resp = event.getAsyncContext().getResponse(); 
+            resp.getWriter().write("onStartAsync-");
+            resp.flushBuffer();
+        }
+    }
+    
+    public void testDispatchErrorSingle() throws Exception {
+        doTestDispatchError(1, false, false);
+    }
+    
+    public void testDispatchErrorDouble() throws Exception {
+        doTestDispatchError(2, false, false);
+    }
+    
+    public void testDispatchErrorMultiple() throws Exception {
+        doTestDispatchError(5, false, false);
+    }
+    
+    public void testDispatchErrorWithThreadSingle() throws Exception {
+        doTestDispatchError(1, true, false);
+    }
+    
+    public void testDispatchErrorWithThreadDouble() throws Exception {
+        doTestDispatchError(2, true, false);
+    }
+    
+    public void testDispatchErrorWithThreadMultiple() throws Exception {
+        doTestDispatchError(5, true, false);
+    }
+    
+    public void testDispatchErrorSingleThenComplete() throws Exception {
+        doTestDispatchError(1, false, true);
+    }
+    
+    public void testDispatchErrorDoubleThenComplete() throws Exception {
+        doTestDispatchError(2, false, true);
+    }
+    
+    public void testDispatchErrorMultipleThenComplete() throws Exception {
+        doTestDispatchError(5, false, true);
+    }
+    
+    public void testDispatchErrorWithThreadSingleThenComplete()
+            throws Exception {
+        doTestDispatchError(1, true, true);
+    }
+    
+    public void testDispatchErrorWithThreadDoubleThenComplete()
+            throws Exception {
+        doTestDispatchError(2, true, true);
+    }
+    
+    public void testDispatchErrorWithThreadMultipleThenComplete()
+            throws Exception {
+        doTestDispatchError(5, true, true);
+    }
+    
+    private void doTestDispatchError(int iter, boolean useThread,
+            boolean completeOnError)
+            throws Exception {
+        // Setup Tomcat instance
+        Tomcat tomcat = getTomcatInstance();
+        
+        // Must have a real docBase - just use temp
+        File docBase = new File(System.getProperty("java.io.tmpdir"));
+        
+        Context ctx = tomcat.addContext("", docBase.getAbsolutePath());
+
+        DispatchingServlet dispatch =
+            new DispatchingServlet(true, completeOnError);
+        Wrapper wrapper = Tomcat.addServlet(ctx, "dispatch", dispatch);
+        wrapper.setAsyncSupported(true);
+        ctx.addServletMapping("/stage1", "dispatch");
+
+        ErrorServlet error = new ErrorServlet();
+        Tomcat.addServlet(ctx, "error", error);
+        ctx.addServletMapping("/stage2", "error");
+
+        tomcat.start();
+        
+        StringBuilder url = new StringBuilder(48);
+        url.append("http://localhost:");
+        url.append(getPort());
+        url.append("/stage1?iter=");
+        url.append(iter);
+        if (useThread) {
+            url.append("&useThread=y");
         }
+        ByteChunk res = getUrl(url.toString());
         
+        StringBuilder expected = new StringBuilder();
+        int loop = iter;
+        while (loop > 0) {
+            expected.append("DispatchingServletGet-");
+            if (loop != iter) {
+                expected.append("onStartAsync-");
+            }
+            loop--;
+        }
+        expected.append("ErrorServletGet-onError-onComplete-");
+        assertEquals(expected.toString(), res.toString());
+    }
+    
+    private static class ErrorServlet extends HttpServlet {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+                throws ServletException, IOException {
+            resp.getWriter().write("ErrorServletGet-");
+            resp.flushBuffer();
+            throw new ServletException("Opps.");
+        }
     }
 }
index eab9ed7..a215599 100644 (file)
         <code>UnsupportedOperationException</code>. (markt)
       </fix>
       <fix>
+        <bug>49884</bug>: Fix occassional NullPointerException on async
+        complete(). This resulted in a major refactoring of the async
+        implementation to address a number of threading issues. (markt)
+      </fix>
+      <fix>
         Update the version numbers in ServerInfo defaults to Tomcat 7.0.x.
         (markt)
       </fix>