From: markt Date: Mon, 27 Sep 2010 12:13:32 +0000 (+0000) Subject: Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=49884 X-Git-Url: https://git.internetallee.de/?a=commitdiff_plain;h=e131864f18239299594fe24aa8f91cd0f17b7530;p=tomcat7.0 Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=49884 This required a major re-factoring of the async implementation. In summary: - Moved state management to the Coyote Processor - Added a SocketWrapper to the APR socket - Added syncs to ensure only one async state change at a time - Added syncs to ensure only one thread changing a socket's state at a time A number of new bugs were also uncovered and fixed by this re-factoring: - delay processing complete() and dispatch() until request where startAsync() is called finished processing - onAsyncStart listener event Currently the test case for bug 49884 passes with the security manager enabled using "ab -n 5000 -c 150 -k ..." (it broke with "ab -n 50 -c 10 ..." previously) The unit tests pass for all three HTTP connectors. The AJP connectors have only been modified to ensure the code compiles. The following work remains: - Testing all connectors (HTTP and AJP) with TCK + security manager and fixing whatever is broken - Further clean-up - There is further scope for reducing code duplication between the connectors / aligning the code so it is easier to maintain. git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1001698 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/java/org/apache/catalina/connector/CoyoteAdapter.java b/java/org/apache/catalina/connector/CoyoteAdapter.java index 15409203e..2fddb77ed 100644 --- a/java/org/apache/catalina/connector/CoyoteAdapter.java +++ b/java/org/apache/catalina/connector/CoyoteAdapter.java @@ -260,55 +260,25 @@ public class CoyoteAdapter implements Adapter { "Dispatch may only happen on an existing request."); } boolean comet = false; - boolean async = false; boolean success = true; - + AsyncContextImpl asyncConImpl = (AsyncContextImpl)request.getAsyncContext(); try { if (status==SocketStatus.TIMEOUT) { - AsyncContextImpl asyncConImpl = (AsyncContextImpl)request.getAsyncContext(); - //TODO SERVLET3 - async - //configure settings for timed out - asyncConImpl.setTimeoutState(); - } - if (status==SocketStatus.ERROR || status==SocketStatus.DISCONNECT) { - AsyncContextImpl asyncConImpl = (AsyncContextImpl)request.getAsyncContext(); - //TODO SERVLET3 - async - //configure settings for timed out - asyncConImpl.setErrorState(new IOException("Socket error.")); + success = true; + if (!asyncConImpl.timeout()) { + asyncConImpl.setErrorState(null); + } } - while (success) { - AsyncContextImpl impl = (AsyncContextImpl)request.getAsyncContext(); - // Calling the container - if (impl.getState()==AsyncContextImpl.AsyncState.DISPATCHED) { - // Calling the container - try { - impl.complete(); - connector.getService().getContainer().getPipeline().getFirst().invoke(request, response); - } finally { - success = false; - } - } else if (impl.getState()==AsyncContextImpl.AsyncState.STARTED){ - //TODO SERVLET3 - async - res.action(ActionCode.ASYNC_START, request.getAsyncContext()); - async = true; - break; - } else if (impl.getState()==AsyncContextImpl.AsyncState.NOT_STARTED){ - //TODO SERVLET3 - async - async = false; - break; - } else if (impl.getState()==AsyncContextImpl.AsyncState.ERROR_DISPATCHING) { - async = false; - success = false; - connector.getService().getContainer().getPipeline().getFirst().invoke(request, response); - } else { - try { - connector.getService().getContainer().getPipeline().getFirst().invoke(request, response); - } catch (RuntimeException x) { - impl.setErrorState(x); - } + if (request.isAsyncDispatching()) { + success = true; + connector.getService().getContainer().getPipeline().getFirst().invoke(request, response); + Throwable t = (Throwable) request.getAttribute( + Globals.EXCEPTION_ATTR); + if (t != null) { + asyncConImpl.setErrorState(t); } } - + if (request.isComet()) { if (!response.isClosed() && !response.isError()) { if (request.getAvailable() || (request.getContentLength() > 0 && (!request.isParametersParsed()))) { @@ -327,7 +297,7 @@ public class CoyoteAdapter implements Adapter { request.setFilterChain(null); } } - if (!async && !comet) { + if (!request.isAsync() && !comet) { response.finishResponse(); req.action(ActionCode.POST_REQUEST , null); } @@ -341,7 +311,7 @@ public class CoyoteAdapter implements Adapter { } finally { req.getRequestProcessor().setWorkerThreadName(null); // Recycle the wrapper request and response - if (!success || (!comet && !async)) { + if (!success || (!comet && !request.isAsync())) { request.recycle(); response.recycle(); } else { @@ -426,15 +396,8 @@ public class CoyoteAdapter implements Adapter { } AsyncContextImpl asyncConImpl = (AsyncContextImpl)request.getAsyncContext(); - if (asyncConImpl!=null && asyncConImpl.getState()==AsyncContextImpl.AsyncState.STARTED) { - res.action(ActionCode.ASYNC_START, request.getAsyncContext()); + if (asyncConImpl != null) { async = true; - } else if (request.isAsyncDispatching()) { - asyncDispatch(req, res, SocketStatus.OPEN); - if (request.isAsyncStarted()) { - async = true; - res.action(ActionCode.ASYNC_START, request.getAsyncContext()); - } } else if (!comet) { response.finishResponse(); req.action(ActionCode.POST_REQUEST , null); diff --git a/java/org/apache/catalina/connector/Request.java b/java/org/apache/catalina/connector/Request.java index 175f5746d..0b65980d3 100644 --- a/java/org/apache/catalina/connector/Request.java +++ b/java/org/apache/catalina/connector/Request.java @@ -37,6 +37,7 @@ import java.util.Locale; import java.util.Map; import java.util.TimeZone; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicBoolean; import javax.security.auth.Subject; import javax.servlet.AsyncContext; @@ -1613,12 +1614,20 @@ public class Request if (asyncContext == null) { return false; } - - return (asyncContext.getState()==AsyncContextImpl.AsyncState.DISPATCHING || - asyncContext.getState()==AsyncContextImpl.AsyncState.TIMING_OUT || - asyncContext.getState()==AsyncContextImpl.AsyncState.STARTED || - asyncContext.getState()==AsyncContextImpl.AsyncState.ERROR_DISPATCHING || - asyncContext.getState()==AsyncContextImpl.AsyncState.COMPLETING); + + AtomicBoolean result = new AtomicBoolean(false); + coyoteRequest.action(ActionCode.ASYNC_IS_DISPATCHING, result); + return result.get(); + } + + public boolean isAsync() { + if (asyncContext == null) { + return false; + } + + AtomicBoolean result = new AtomicBoolean(false); + coyoteRequest.action(ActionCode.ASYNC_IS_ASYNC, result); + return result.get(); } @Override diff --git a/java/org/apache/catalina/core/AsyncContextImpl.java b/java/org/apache/catalina/core/AsyncContextImpl.java index bbb2f8864..a5fad2f01 100644 --- a/java/org/apache/catalina/core/AsyncContextImpl.java +++ b/java/org/apache/catalina/core/AsyncContextImpl.java @@ -22,7 +22,6 @@ import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import javax.servlet.AsyncContext; import javax.servlet.AsyncEvent; @@ -50,11 +49,6 @@ import org.apache.juli.logging.LogFactory; */ public class AsyncContextImpl implements AsyncContext { - public static enum AsyncState { - NOT_STARTED, STARTED, DISPATCHING, DISPATCHED, COMPLETING, TIMING_OUT, - TIMING_OUT_NEED_COMPLETE, ERROR_DISPATCHING - } - private static final Log log = LogFactory.getLog(AsyncContextImpl.class); private ServletRequest servletRequest = null; @@ -63,7 +57,6 @@ public class AsyncContextImpl implements AsyncContext { private boolean hasOriginalRequestAndResponse = true; private volatile Runnable dispatch = null; private Context context = null; - private AtomicReference state = new AtomicReference(AsyncState.NOT_STARTED); private long timeout = -1; private AsyncEvent event = null; @@ -81,23 +74,46 @@ public class AsyncContextImpl implements AsyncContext { if (log.isDebugEnabled()) { logDebug("complete "); } - if (state.get()==AsyncState.COMPLETING) { - //do nothing - } else if (state.compareAndSet(AsyncState.DISPATCHED, - AsyncState.COMPLETING) || - state.compareAndSet(AsyncState.STARTED, - AsyncState.COMPLETING) || - state.compareAndSet(AsyncState.TIMING_OUT_NEED_COMPLETE, - AsyncState.COMPLETING)) { - AtomicBoolean dispatched = new AtomicBoolean(false); - request.getCoyoteRequest().action(ActionCode.ASYNC_COMPLETE, - dispatched); - if (!dispatched.get()) doInternalComplete(false); - } else { - throw new IllegalStateException( - "Complete not allowed. Invalid state:"+state.get()); + request.getCoyoteRequest().action(ActionCode.ASYNC_COMPLETE, null); + } + + public void fireOnComplete() { + List listenersCopy = + new ArrayList(); + listenersCopy.addAll(listeners); + for (AsyncListenerWrapper listener : listenersCopy) { + try { + listener.fireOnComplete(event); + } catch (IOException ioe) { + log.warn("onComplete() failed for listener of type [" + + listener.getClass().getName() + "]", ioe); + } + } + } + + public boolean timeout() throws IOException { + AtomicBoolean result = new AtomicBoolean(); + request.getCoyoteRequest().action(ActionCode.ASYNC_TIMEOUT, result); + + if (result.get()) { + boolean listenerInvoked = false; + List listenersCopy = + new ArrayList(); + listenersCopy.addAll(listeners); + for (AsyncListenerWrapper listener : listenersCopy) { + listener.fireOnTimeout(event); + listenerInvoked = true; + } + if (listenerInvoked) { + request.getCoyoteRequest().action( + ActionCode.ASYNC_IS_TIMINGOUT, result); + return !result.get(); + } else { + // No listeners, container calls complete + complete(); + } } - + return true; } @Override @@ -119,55 +135,37 @@ public class AsyncContextImpl implements AsyncContext { if (log.isDebugEnabled()) { logDebug("dispatch "); } - - if (state.compareAndSet(AsyncState.STARTED, AsyncState.DISPATCHING) || - state.compareAndSet(AsyncState.DISPATCHED, AsyncState.DISPATCHING)) { - - if (request.getAttribute(ASYNC_REQUEST_URI)==null) { - request.setAttribute(ASYNC_REQUEST_URI, request.getRequestURI()+"?"+request.getQueryString()); - request.setAttribute(ASYNC_CONTEXT_PATH, request.getContextPath()); - request.setAttribute(ASYNC_SERVLET_PATH, request.getServletPath()); - request.setAttribute(ASYNC_QUERY_STRING, request.getQueryString()); - } - final RequestDispatcher requestDispatcher = context.getRequestDispatcher(path); - final HttpServletRequest servletRequest = (HttpServletRequest)getRequest(); - final HttpServletResponse servletResponse = (HttpServletResponse)getResponse(); - Runnable run = new Runnable() { - @Override - public void run() { - DispatcherType type = (DispatcherType)request.getAttribute(Globals.DISPATCHER_TYPE_ATTR); - try { - //piggy back on the request dispatcher to ensure that filters etc get called. - //TODO SERVLET3 - async should this be include/forward or a new dispatch type - //javadoc suggests include with the type of DispatcherType.ASYNC - request.setAttribute(Globals.DISPATCHER_TYPE_ATTR, DispatcherType.ASYNC); - requestDispatcher.include(servletRequest, servletResponse); - }catch (Exception x) { - //log.error("Async.dispatch",x); - throw new RuntimeException(x); - }finally { - request.setAttribute(Globals.DISPATCHER_TYPE_ATTR, type); - } - } - }; - this.dispatch = run; - AtomicBoolean dispatched = new AtomicBoolean(false); - request.getCoyoteRequest().action(ActionCode.ASYNC_DISPATCH, dispatched ); - if (!dispatched.get()) { + if (request.getAttribute(ASYNC_REQUEST_URI)==null) { + request.setAttribute(ASYNC_REQUEST_URI, request.getRequestURI()+"?"+request.getQueryString()); + request.setAttribute(ASYNC_CONTEXT_PATH, request.getContextPath()); + request.setAttribute(ASYNC_SERVLET_PATH, request.getServletPath()); + request.setAttribute(ASYNC_QUERY_STRING, request.getQueryString()); + } + final RequestDispatcher requestDispatcher = context.getRequestDispatcher(path); + final HttpServletRequest servletRequest = (HttpServletRequest)getRequest(); + final HttpServletResponse servletResponse = (HttpServletResponse)getResponse(); + Runnable run = new Runnable() { + @Override + public void run() { + request.getCoyoteRequest().action(ActionCode.ASYNC_DISPATCHED, null); + DispatcherType type = (DispatcherType)request.getAttribute(Globals.DISPATCHER_TYPE_ATTR); try { - doInternalDispatch(); - }catch (ServletException sx) { - throw new RuntimeException(sx); - }catch (IOException ix) { - throw new RuntimeException(ix); + //piggy back on the request dispatcher to ensure that filters etc get called. + //TODO SERVLET3 - async should this be include/forward or a new dispatch type + //javadoc suggests include with the type of DispatcherType.ASYNC + request.setAttribute(Globals.DISPATCHER_TYPE_ATTR, DispatcherType.ASYNC); + requestDispatcher.include(servletRequest, servletResponse); + }catch (Exception x) { + //log.error("Async.dispatch",x); + throw new RuntimeException(x); + }finally { + request.setAttribute(Globals.DISPATCHER_TYPE_ATTR, type); } } - if (state.get().equals(AsyncState.DISPATCHED)) { - complete(); - } - } else { - throw new IllegalStateException("Dispatch not allowed. Invalid state:"+state.get()); - } + }; + + this.dispatch = run; + this.request.getCoyoteRequest().action(ActionCode.ASYNC_DISPATCH, null); } @Override @@ -186,40 +184,8 @@ public class AsyncContextImpl implements AsyncContext { logDebug("start "); } - if (state.get() == AsyncState.STARTED) { - // Execute the runnable using a container thread from the - // Connector's thread pool. Use a wrapper to prevent a memory leak - Runnable wrapper = new RunnableWrapper(run, context); - ClassLoader oldCL; - if (Globals.IS_SECURITY_ENABLED) { - PrivilegedAction pa = new PrivilegedGetTccl(); - oldCL = AccessController.doPrivileged(pa); - } else { - oldCL = Thread.currentThread().getContextClassLoader(); - } - try { - if (Globals.IS_SECURITY_ENABLED) { - PrivilegedAction pa = new PrivilegedSetTccl( - this.getClass().getClassLoader()); - AccessController.doPrivileged(pa); - } else { - Thread.currentThread().setContextClassLoader( - this.getClass().getClassLoader()); - } - request.getConnector().getProtocolHandler().getExecutor( - ).execute(wrapper); - } finally { - if (Globals.IS_SECURITY_ENABLED) { - PrivilegedAction pa = new PrivilegedSetTccl( - oldCL); - AccessController.doPrivileged(pa); - } else { - Thread.currentThread().setContextClassLoader(oldCL); - } - } - } else { - throw new IllegalStateException("Start not allowed. Invalid state:"+state.get()); - } + Runnable wrapper = new RunnableWrapper(run, context); + this.request.getCoyoteRequest().action(ActionCode.ASYNC_RUN, wrapper); } @Override @@ -259,31 +225,43 @@ public class AsyncContextImpl implements AsyncContext { } servletRequest = null; servletResponse = null; - listeners.clear(); hasOriginalRequestAndResponse = true; - state.set(AsyncState.NOT_STARTED); context = null; timeout = -1; event = null; } public boolean isStarted() { - return (state.get() == AsyncState.STARTED || - state.get() == AsyncState.DISPATCHING); + AtomicBoolean result = new AtomicBoolean(false); + request.getCoyoteRequest().action( + ActionCode.ASYNC_IS_STARTED, result); + return result.get(); } public void setStarted(Context context, ServletRequest request, - ServletResponse response, boolean hasOriginalRequestAndResponse) { - if (state.compareAndSet(AsyncState.NOT_STARTED, AsyncState.STARTED) || - state.compareAndSet(AsyncState.DISPATCHED, AsyncState.STARTED)) { - this.context = context; - this.servletRequest = request; - this.servletResponse = response; - this.hasOriginalRequestAndResponse = hasOriginalRequestAndResponse; - this.event = new AsyncEvent(this, request, response); - } else { - throw new IllegalStateException("Start illegal. Invalid state: "+state.get()); + ServletResponse response, boolean originalRequestResponse) { + + this.request.getCoyoteRequest().action( + ActionCode.ASYNC_START, this); + + this.context = context; + this.servletRequest = request; + this.servletResponse = response; + this.hasOriginalRequestAndResponse = originalRequestResponse; + this.event = new AsyncEvent(this, request, response); + + List listenersCopy = + new ArrayList(); + listenersCopy.addAll(listeners); + for (AsyncListenerWrapper listener : listenersCopy) { + try { + listener.fireOnStartAsync(event); + } catch (IOException ioe) { + log.warn("onStartAsync() failed for listener of type [" + + listener.getClass().getName() + "]", ioe); + } } + listeners.clear(); } @Override @@ -295,122 +273,53 @@ public class AsyncContextImpl implements AsyncContext { if (log.isDebugEnabled()) { logDebug("intDispatch"); } - if (this.state.compareAndSet(AsyncState.TIMING_OUT, - AsyncState.TIMING_OUT_NEED_COMPLETE)) { - log.debug("TIMING OUT!"); - boolean listenerInvoked = false; - List listenersCopy = - new ArrayList(); - listenersCopy.addAll(listeners); - for (AsyncListenerWrapper listener : listenersCopy) { - listener.fireOnTimeout(event); - listenerInvoked = true; - } - if (listenerInvoked) { - // Listener should have called complete - if (state.get() != AsyncState.NOT_STARTED) { - ((HttpServletResponse)servletResponse).setStatus(500); - state.set(AsyncState.COMPLETING); - doInternalComplete(true); - } - } else { - // No listeners, container calls complete - state.set(AsyncState.COMPLETING); - doInternalComplete(false); - } - } else if (this.state.compareAndSet(AsyncState.ERROR_DISPATCHING, AsyncState.COMPLETING)) { - log.debug("ON ERROR!"); - boolean listenerInvoked = false; - for (AsyncListenerWrapper listener : listeners) { - try { - listener.fireOnError(event); - }catch (IllegalStateException x) { - log.debug("Listener invoked invalid state.",x); - }catch (Exception x) { - log.debug("Exception during onError.",x); - } - listenerInvoked = true; - } - if (!listenerInvoked) { - ((HttpServletResponse)servletResponse).setStatus(500); - } - doInternalComplete(true); - - } else if (this.state.compareAndSet(AsyncState.DISPATCHING, AsyncState.DISPATCHED)) { - if (this.dispatch!=null) { - try { - dispatch.run(); - } catch (RuntimeException x) { - doInternalComplete(true); - if (x.getCause() instanceof ServletException) throw (ServletException)x.getCause(); - if (x.getCause() instanceof IOException) throw (IOException)x.getCause(); - throw new ServletException(x); - } finally { - dispatch = null; - } - } - } else if (this.state.get()==AsyncState.COMPLETING) { - doInternalComplete(false); - } else { - throw new IllegalStateException("Dispatch illegal. Invalid state: "+state.get()); - } - } - - private void doInternalComplete(boolean error) { - if (log.isDebugEnabled()) { - logDebug("intComplete"); - } - if (state.get()==AsyncState.NOT_STARTED) return; - if (state.compareAndSet(AsyncState.STARTED, AsyncState.NOT_STARTED)) { - //this is the same as - //request.startAsync().complete(); - recycle(); - } else if (state.compareAndSet(AsyncState.COMPLETING, AsyncState.NOT_STARTED)) { - for (AsyncListenerWrapper wrapper : listeners) { - try { - wrapper.fireOnComplete(event); - }catch (IOException x) { - //how does this propagate, or should it? - //TODO SERVLET3 - async - log.error("",x); - } + try { + dispatch.run(); + } catch (RuntimeException x) { + // doInternalComplete(true); + if (x.getCause() instanceof ServletException) { + throw (ServletException)x.getCause(); } - try { - if (!error) getResponse().flushBuffer(); - }catch (Exception x) { - log.error("",x); + if (x.getCause() instanceof IOException) { + throw (IOException)x.getCause(); } - recycle(); - - } else { - throw new IllegalStateException("Complete illegal. Invalid state:"+state.get()); + throw new ServletException(x); } } - - public AsyncState getState() { - return state.get(); - } + @Override public long getTimeout() { return timeout; } - + + @Override public void setTimeout(long timeout) { this.timeout = timeout; request.getCoyoteRequest().action(ActionCode.ASYNC_SETTIMEOUT, Long.valueOf(timeout)); } - - public void setTimeoutState() { - state.set(AsyncState.TIMING_OUT); - } - + + public void setErrorState(Throwable t) { if (t!=null) request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, t); - state.set(AsyncState.ERROR_DISPATCHING); + request.getCoyoteRequest().action(ActionCode.ASYNC_ERROR, null); + AsyncEvent errorEvent = new AsyncEvent(event.getAsyncContext(), + event.getSuppliedRequest(), event.getSuppliedResponse(), t); + List listenersCopy = + new ArrayList(); + listenersCopy.addAll(listeners); + for (AsyncListenerWrapper listener : listenersCopy) { + try { + listener.fireOnError(errorEvent); + } catch (IOException ioe) { + log.warn("onStartAsync() failed for listener of type [" + + listener.getClass().getName() + "]", ioe); + } + } } + private void logDebug(String method) { String rHashCode; @@ -457,7 +366,7 @@ public class AsyncContextImpl implements AsyncContext { "Req: %1$8s CReq: %2$8s RP: %3$8s Stage: %4$s " + "Thread: %5$20s State: %6$20s Method: %7$11s URI: %8$s", rHashCode, crHashCode, rpHashCode, stage, - Thread.currentThread().getName(), state, method, uri); + Thread.currentThread().getName(), "N/A", method, uri); if (log.isTraceEnabled()) { log.trace(msg, new DebugException()); } else { diff --git a/java/org/apache/catalina/security/SecurityClassLoad.java b/java/org/apache/catalina/security/SecurityClassLoad.java index 385bae2ba..d5654427d 100644 --- a/java/org/apache/catalina/security/SecurityClassLoad.java +++ b/java/org/apache/catalina/security/SecurityClassLoad.java @@ -38,6 +38,7 @@ public final class SecurityClassLoad { } loadCorePackage(loader); + loadCoyotePackage(loader); loadLoaderPackage(loader); loadSessionPackage(loader); loadUtilPackage(loader); @@ -64,9 +65,6 @@ public final class SecurityClassLoad { "AsyncContextImpl"); loader.loadClass (basePackage + - "AsyncContextImpl$AsyncState"); - loader.loadClass - (basePackage + "AsyncContextImpl$DebugException"); loader.loadClass (basePackage + @@ -129,6 +127,13 @@ public final class SecurityClassLoad { } + private final static void loadCoyotePackage(ClassLoader loader) + throws Exception { + String basePackage = "org.apache.coyote."; + loader.loadClass(basePackage + "http11.AbstractOutputBuffer$1"); + } + + private final static void loadJavaxPackage(ClassLoader loader) throws Exception { loader.loadClass("javax.servlet.http.Cookie"); @@ -221,13 +226,16 @@ public final class SecurityClassLoad { private final static void loadTomcatPackage(ClassLoader loader) throws Exception { String basePackage = "org.apache.tomcat."; - loader.loadClass(basePackage + "util.net.SSLSupport$CipherData"); - loader.loadClass - (basePackage + "util.net.JIoEndpoint$PrivilegedSetTccl"); // Make sure system property is read at this point Class clazz = loader.loadClass( basePackage + "util.http.FastHttpDateFormat"); clazz.newInstance(); + loader.loadClass(basePackage + "util.http.HttpMessages"); + loader.loadClass(basePackage + "util.net.SSLSupport$CipherData"); + loader.loadClass + (basePackage + "util.net.JIoEndpoint$PrivilegedSetTccl"); + loader.loadClass + (basePackage + "util.net.AprEndpoint$PrivilegedSetTccl"); } } diff --git a/java/org/apache/coyote/ActionCode.java b/java/org/apache/coyote/ActionCode.java index 043b4ddcf..5e811f1ea 100644 --- a/java/org/apache/coyote/ActionCode.java +++ b/java/org/apache/coyote/ActionCode.java @@ -133,18 +133,61 @@ public enum ActionCode { /** * Callback for an async call to + * {@link javax.servlet.AsyncContext#dispatch()} + */ + ASYNC_DISPATCH, + + /** + * Callback to indicate the the actual dispatch has started and that the + * async state needs change. + */ + ASYNC_DISPATCHED, + + /** + * Callback for an async call to + * {@link javax.servlet.AsyncContext#start()} + */ + ASYNC_RUN, + + /** + * Callback for an async call to * {@link javax.servlet.AsyncContext#complete()} */ ASYNC_COMPLETE, + + /** + * Callback to trigger the processing of an async timeout + */ + ASYNC_TIMEOUT, + + /** + * Callback to trigger the error processing + */ + ASYNC_ERROR, + /** * Callback for an async call to * {@link javax.servlet.AsyncContext#setTimeout(long)} */ ASYNC_SETTIMEOUT, + + /** + * Callback to determine if async processing is in progress + */ + ASYNC_IS_ASYNC, + + /** + * Callback to determine if async dispatch is in progress + */ + ASYNC_IS_STARTED, /** - * Callback for an async call to - * {@link javax.servlet.AsyncContext#dispatch()} + * Callback to determine if async dispatch is in progress */ - ASYNC_DISPATCH, + ASYNC_IS_DISPATCHING, + + /** + * Callback to determine if async is timing out + */ + ASYNC_IS_TIMINGOUT } diff --git a/java/org/apache/coyote/ajp/AjpAprProcessor.java b/java/org/apache/coyote/ajp/AjpAprProcessor.java index c0c3c0750..19fd595e3 100644 --- a/java/org/apache/coyote/ajp/AjpAprProcessor.java +++ b/java/org/apache/coyote/ajp/AjpAprProcessor.java @@ -47,6 +47,7 @@ import org.apache.tomcat.util.net.AbstractEndpoint; import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; import org.apache.tomcat.util.net.AprEndpoint; import org.apache.tomcat.util.net.SocketStatus; +import org.apache.tomcat.util.net.SocketWrapper; import org.apache.tomcat.util.res.StringManager; @@ -188,7 +189,7 @@ public class AjpAprProcessor implements ActionHook { /** * Socket associated with the current connection. */ - protected long socket; + protected SocketWrapper socket; /** @@ -355,15 +356,16 @@ public class AjpAprProcessor implements ActionHook { * * @throws IOException error during an I/O operation */ - public boolean process(long socket) + public boolean process(SocketWrapper socket) throws IOException { RequestInfo rp = request.getRequestProcessor(); rp.setStage(org.apache.coyote.Constants.STAGE_PARSE); // Setting up the socket this.socket = socket; - Socket.setrbb(this.socket, inputBuffer); - Socket.setsbb(this.socket, outputBuffer); + long socketRef = socket.getSocket().longValue(); + Socket.setrbb(socketRef, inputBuffer); + Socket.setsbb(socketRef, outputBuffer); // Error flag error = false; @@ -388,7 +390,7 @@ public class AjpAprProcessor implements ActionHook { // not regular request processing int type = requestHeaderMessage.getByte(); if (type == Constants.JK_AJP13_CPING_REQUEST) { - if (Socket.sendb(socket, pongMessageBuffer, 0, + if (Socket.sendb(socketRef, pongMessageBuffer, 0, pongMessageBuffer.position()) < 0) { error = true; } @@ -469,7 +471,7 @@ public class AjpAprProcessor implements ActionHook { // Add the socket to the poller if (!error && !endpoint.isPaused()) { - endpoint.getPoller().add(socket); + endpoint.getPoller().add(socketRef); } else { openSocket = false; } @@ -483,7 +485,8 @@ public class AjpAprProcessor implements ActionHook { } /* Copied from the AjpProcessor.java */ - public SocketState asyncDispatch(long socket, SocketStatus status) throws IOException { + public SocketState asyncDispatch(SocketWrapper socket, + SocketStatus status) throws IOException { // Setting up the socket this.socket = socket; @@ -535,6 +538,8 @@ public class AjpAprProcessor implements ActionHook { */ public void action(ActionCode actionCode, Object param) { + long socketRef = socket.getSocket().longValue(); + if (actionCode == ActionCode.COMMIT) { if (response.isCommitted()) @@ -564,7 +569,7 @@ public class AjpAprProcessor implements ActionHook { try { flush(); // Send explicit flush message - if (Socket.sendb(socket, flushMessageBuffer, 0, + if (Socket.sendb(socketRef, flushMessageBuffer, 0, flushMessageBuffer.position()) < 0) { error = true; } @@ -661,14 +666,14 @@ public class AjpAprProcessor implements ActionHook { } } else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) { if (param==null) return; - if (socket==0) return; + if (socketRef==0) return; long timeout = ((Long)param).longValue(); - Socket.timeoutSet(socket, timeout * 1000); + Socket.timeoutSet(socketRef, timeout * 1000); } else if (actionCode == ActionCode.ASYNC_DISPATCH) { RequestInfo rp = request.getRequestProcessor(); AtomicBoolean dispatch = (AtomicBoolean)param; if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) {//async handling - endpoint.getPoller().add(this.socket); + endpoint.getPoller().add(socketRef); dispatch.set(true); } else { dispatch.set(true); @@ -1127,7 +1132,7 @@ public class AjpAprProcessor implements ActionHook { int nRead; while (inputBuffer.remaining() < n) { nRead = Socket.recvbb - (socket, inputBuffer.limit(), + (socket.getSocket().longValue(), inputBuffer.limit(), inputBuffer.capacity() - inputBuffer.limit()); if (nRead > 0) { inputBuffer.limit(inputBuffer.limit() + nRead); @@ -1160,7 +1165,7 @@ public class AjpAprProcessor implements ActionHook { int nRead; while (inputBuffer.remaining() < n) { nRead = Socket.recvbb - (socket, inputBuffer.limit(), + (socket.getSocket().longValue(), inputBuffer.limit(), inputBuffer.capacity() - inputBuffer.limit()); if (nRead > 0) { inputBuffer.limit(inputBuffer.limit() + nRead); @@ -1224,7 +1229,7 @@ public class AjpAprProcessor implements ActionHook { } // Request more data immediately - Socket.sendb(socket, getBodyMessageBuffer, 0, + Socket.sendb(socket.getSocket().longValue(), getBodyMessageBuffer, 0, getBodyMessageBuffer.position()); boolean moreData = receive(); @@ -1305,7 +1310,7 @@ public class AjpAprProcessor implements ActionHook { protected void flush() throws IOException { if (outputBuffer.position() > 0) { - if (Socket.sendbb(socket, 0, outputBuffer.position()) < 0) { + if (Socket.sendbb(socket.getSocket().longValue(), 0, outputBuffer.position()) < 0) { throw new IOException(sm.getString("ajpprocessor.failedsend")); } outputBuffer.clear(); diff --git a/java/org/apache/coyote/ajp/AjpAprProtocol.java b/java/org/apache/coyote/ajp/AjpAprProtocol.java index 84396cc7e..b0395cde2 100644 --- a/java/org/apache/coyote/ajp/AjpAprProtocol.java +++ b/java/org/apache/coyote/ajp/AjpAprProtocol.java @@ -42,6 +42,7 @@ import org.apache.tomcat.util.modeler.Registry; import org.apache.tomcat.util.net.AprEndpoint; import org.apache.tomcat.util.net.AprEndpoint.Handler; import org.apache.tomcat.util.net.SocketStatus; +import org.apache.tomcat.util.net.SocketWrapper; import org.apache.tomcat.util.res.StringManager; @@ -337,8 +338,8 @@ public class AjpAprProtocol protected AtomicLong registerCount = new AtomicLong(0); protected RequestGroupInfo global = new RequestGroupInfo(); - protected ConcurrentHashMap connections = - new ConcurrentHashMap(); + protected ConcurrentHashMap, AjpAprProcessor> connections = + new ConcurrentHashMap, AjpAprProcessor>(); protected ConcurrentLinkedQueue recycledProcessors = new ConcurrentLinkedQueue() { @@ -384,11 +385,11 @@ public class AjpAprProtocol } // FIXME: Support for this could be added in AJP as well - public SocketState event(long socket, SocketStatus status) { + public SocketState event(SocketWrapper socket, SocketStatus status) { return SocketState.CLOSED; } - public SocketState process(long socket) { + public SocketState process(SocketWrapper socket) { AjpAprProcessor processor = recycledProcessors.poll(); try { @@ -397,7 +398,7 @@ public class AjpAprProtocol } if (processor.process(socket)) { - connections.put(Long.valueOf(socket), processor); + connections.put(socket, processor); return SocketState.OPEN; } else { // recycledProcessors.offer(processor); @@ -431,9 +432,9 @@ public class AjpAprProtocol } // FIXME: Support for this could be added in AJP as well - public SocketState asyncDispatch(long socket, SocketStatus status) { + public SocketState asyncDispatch(SocketWrapper socket, SocketStatus status) { - AjpAprProcessor result = connections.get(Long.valueOf(socket)); + AjpAprProcessor result = connections.get(socket); SocketState state = SocketState.CLOSED; if (result != null) { @@ -462,10 +463,10 @@ public class AjpAprProtocol (sm.getString("ajpprotocol.proto.error"), e); } finally { if (state != SocketState.LONG) { - connections.remove(Long.valueOf(socket)); + connections.remove(socket); recycledProcessors.offer(result); if (state == SocketState.OPEN) { - proto.endpoint.getPoller().add(socket); + proto.endpoint.getPoller().add(socket.getSocket().longValue()); } } } diff --git a/java/org/apache/coyote/http11/AbstractHttp11Processor.java b/java/org/apache/coyote/http11/AbstractHttp11Processor.java index 9226fd226..4a400c173 100644 --- a/java/org/apache/coyote/http11/AbstractHttp11Processor.java +++ b/java/org/apache/coyote/http11/AbstractHttp11Processor.java @@ -17,11 +17,15 @@ package org.apache.coyote.http11; import java.io.IOException; +import java.security.AccessController; +import java.security.PrivilegedAction; import java.util.StringTokenizer; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; +import org.apache.catalina.core.AsyncContextImpl; import org.apache.coyote.ActionCode; import org.apache.coyote.Adapter; import org.apache.coyote.Request; @@ -43,6 +47,7 @@ import org.apache.tomcat.util.buf.MessageBytes; import org.apache.tomcat.util.http.FastHttpDateFormat; import org.apache.tomcat.util.http.MimeHeaders; import org.apache.tomcat.util.net.AbstractEndpoint; +import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; import org.apache.tomcat.util.res.StringManager; public abstract class AbstractHttp11Processor { @@ -236,12 +241,6 @@ public abstract class AbstractHttp11Processor { /** - * Async used - */ - protected boolean async = false; - - - /** * Set compression level. */ public void setCompression(String compression) { @@ -904,7 +903,24 @@ public abstract class AbstractHttp11Processor { request.getInputBuffer(); internalBuffer.addActiveFilter(savedBody); } else if (actionCode == ActionCode.ASYNC_START) { - async = true; + asyncStart((AsyncContextImpl) param); + } else if (actionCode == ActionCode.ASYNC_DISPATCHED) { + asyncDispatched(); + } else if (actionCode == ActionCode.ASYNC_TIMEOUT) { + AtomicBoolean result = (AtomicBoolean) param; + result.set(asyncTimeout()); + } else if (actionCode == ActionCode.ASYNC_RUN) { + asyncRun((Runnable) param); + } else if (actionCode == ActionCode.ASYNC_ERROR) { + asyncError(); + } else if (actionCode == ActionCode.ASYNC_IS_STARTED) { + ((AtomicBoolean) param).set(isAsyncStarted()); + } else if (actionCode == ActionCode.ASYNC_IS_DISPATCHING) { + ((AtomicBoolean) param).set(isAsyncDispatching()); + } else if (actionCode == ActionCode.ASYNC_IS_ASYNC) { + ((AtomicBoolean) param).set(isAsync()); + } else if (actionCode == ActionCode.ASYNC_IS_TIMINGOUT) { + ((AtomicBoolean) param).set(isAsyncTimingOut()); } else { actionInternal(actionCode, param); } @@ -1086,10 +1102,276 @@ public abstract class AbstractHttp11Processor { public final void recycle() { getInputBuffer().recycle(); getOutputBuffer().recycle(); + asyncCtxt = null; recycleInternal(); } protected abstract void recycleInternal(); protected abstract Executor getExecutor(); + + // -------------------------------------------------- Async state management + + /* + * DISPATCHED - Standard request. Not in Async mode. + * STARTING - ServletRequest.startAsync() has been called but the + * request in which that call was made has not finished + * processing. + * STARTED - ServletRequest.startAsync() has been called and the + * request in which that call was made has finished + * processing. + * MUST_COMPLETE - complete() has been called before the request in which + * ServletRequest.startAsync() has finished. As soon as that + * request finishes, the complete() will be processed. + * COMPLETING - The call to complete() was made once the request was in + * the STARTED state. May or may not be triggered by a + * container thread - depends if start(Runnable) was used + * + * TODO - markt - Move this to a separate class + */ + private static enum AsyncState { + DISPATCHED(false, false, false), + STARTING(true, true, false), + STARTED(true, true, false), + MUST_COMPLETE(true, false, false), + COMPLETING(true, false, false), + TIMING_OUT(true, false, false), + MUST_DISPATCH(true, false, true), + DISPATCHING(true, false, true), + ERROR(true,false,false); + + private boolean isAsync; + private boolean isStarted; + private boolean isDispatching; + + private AsyncState(boolean isAsync, boolean isStarted, + boolean isDispatching) { + this.isAsync = isAsync; + this.isStarted = isStarted; + this.isDispatching = isDispatching; + } + + public boolean isAsync() { + return this.isAsync; + } + + public boolean isStarted() { + return this.isStarted; + } + + public boolean isDispatching() { + return this.isDispatching; + } + } + + private volatile AsyncState state = AsyncState.DISPATCHED; + // Need this to fire listener on complete + private AsyncContextImpl asyncCtxt = null; + + protected boolean isAsync() { + return state.isAsync(); + } + + protected boolean isAsyncDispatching() { + return state.isDispatching(); + } + + protected boolean isAsyncStarted() { + return state.isStarted(); + } + + protected boolean isAsyncTimingOut() { + return state == AsyncState.TIMING_OUT; + } + + + private synchronized void asyncStart(AsyncContextImpl asyncCtxt) { + if (state == AsyncState.DISPATCHED) { + state = AsyncState.STARTING; + this.asyncCtxt = asyncCtxt; + } else { + throw new IllegalStateException( + sm.getString("abstractHttp11Protocol.invalidAsyncState", + "startAsync()", state)); + } + } + + /* + * Async has been processed. Whether or not to enter a long poll depends on + * current state. For example, as per SRV.2.3.3.3 can now process calls to + * complete() or dispatch(). + */ + protected synchronized SocketState asyncPostProcess() { + + if (state == AsyncState.STARTING) { + state = AsyncState.STARTED; + return SocketState.LONG; + } else if (state == AsyncState.MUST_COMPLETE) { + asyncCtxt.fireOnComplete(); + state = AsyncState.DISPATCHED; + return SocketState.ASYNC_END; + } else if (state == AsyncState.COMPLETING) { + state = AsyncState.DISPATCHED; + return SocketState.ASYNC_END; + } else if (state == AsyncState.MUST_DISPATCH) { + state = AsyncState.DISPATCHING; + return SocketState.ASYNC_END; + } else if (state == AsyncState.DISPATCHING) { + state = AsyncState.DISPATCHED; + return SocketState.ASYNC_END; + } else if (state == AsyncState.ERROR) { + asyncCtxt.fireOnComplete(); + state = AsyncState.DISPATCHED; + return SocketState.ASYNC_END; + //} else if (state == AsyncState.DISPATCHED) { + // // No state change + // return SocketState.OPEN; + } else { + throw new IllegalStateException( + sm.getString("abstractHttp11Protocol.invalidAsyncState", + "asyncLongPoll()", state)); + } + } + + + protected synchronized boolean asyncComplete() { + boolean doComplete = false; + + if (state == AsyncState.STARTING) { + state = AsyncState.MUST_COMPLETE; + } else if (state == AsyncState.STARTED) { + state = AsyncState.COMPLETING; + doComplete = true; + } else if (state == AsyncState.TIMING_OUT || + state == AsyncState.ERROR) { + state = AsyncState.MUST_COMPLETE; + } else { + throw new IllegalStateException( + sm.getString("abstractHttp11Protocol.invalidAsyncState", + "asyncComplete()", state)); + + } + return doComplete; + } + + + private synchronized boolean asyncTimeout() { + if (state == AsyncState.STARTED) { + state = AsyncState.TIMING_OUT; + return true; + } else if (state == AsyncState.COMPLETING || + state == AsyncState.DISPATCHED) { + // NOOP - App called complete between the the timeout firing and + // execution reaching this point + return false; + } else { + throw new IllegalStateException( + sm.getString("abstractHttp11Protocol.invalidAsyncState", + "timeoutAsync()", state)); + } + } + + + protected synchronized boolean asyncDispatch() { + boolean doDispatch = false; + if (state == AsyncState.STARTING) { + state = AsyncState.MUST_DISPATCH; + } else if (state == AsyncState.STARTED) { + state = AsyncState.DISPATCHING; + doDispatch = true; + } else { + throw new IllegalStateException( + sm.getString("abstractHttp11Protocol.invalidAsyncState", + "dispatchAsync()", state)); + } + return doDispatch; + } + + + private synchronized void asyncDispatched() { + if (state == AsyncState.DISPATCHING) { + state = AsyncState.DISPATCHED; + } else { + throw new IllegalStateException( + sm.getString("abstractHttp11Protocol.invalidAsyncState", + "dispatchAsync()", state)); + } + } + + + private synchronized boolean asyncError() { + boolean doDispatch = false; + if (state == AsyncState.DISPATCHED || + state == AsyncState.TIMING_OUT) { + state = AsyncState.ERROR; + } else { + throw new IllegalStateException( + sm.getString("abstractHttp11Protocol.invalidAsyncState", + "dispatchAsync()", state)); + } + return doDispatch; + } + + private synchronized void asyncRun(Runnable runnable) { + if (state == AsyncState.STARTING || state == AsyncState.STARTED) { + // Execute the runnable using a container thread from the + // Connector's thread pool. Use a wrapper to prevent a memory leak + ClassLoader oldCL; + if (Constants.IS_SECURITY_ENABLED) { + PrivilegedAction pa = new PrivilegedGetTccl(); + oldCL = AccessController.doPrivileged(pa); + } else { + oldCL = Thread.currentThread().getContextClassLoader(); + } + try { + if (Constants.IS_SECURITY_ENABLED) { + PrivilegedAction pa = new PrivilegedSetTccl( + this.getClass().getClassLoader()); + AccessController.doPrivileged(pa); + } else { + Thread.currentThread().setContextClassLoader( + this.getClass().getClassLoader()); + } + + getExecutor().execute(runnable); + } finally { + if (Constants.IS_SECURITY_ENABLED) { + PrivilegedAction pa = new PrivilegedSetTccl( + oldCL); + AccessController.doPrivileged(pa); + } else { + Thread.currentThread().setContextClassLoader(oldCL); + } + } + } else { + throw new IllegalStateException( + sm.getString("abstractHttp11Protocol.invalidAsyncState", + "runAsync()", state)); + } + + } + + private static class PrivilegedSetTccl implements PrivilegedAction { + + private ClassLoader cl; + + PrivilegedSetTccl(ClassLoader cl) { + this.cl = cl; + } + + @Override + public Void run() { + Thread.currentThread().setContextClassLoader(cl); + return null; + } + } + + private static class PrivilegedGetTccl + implements PrivilegedAction { + + @Override + public ClassLoader run() { + return Thread.currentThread().getContextClassLoader(); + } + } } diff --git a/java/org/apache/coyote/http11/Constants.java b/java/org/apache/coyote/http11/Constants.java index ff07a60d1..46514b441 100644 --- a/java/org/apache/coyote/http11/Constants.java +++ b/java/org/apache/coyote/http11/Constants.java @@ -211,5 +211,9 @@ public final class Constants { */ public static final String POST = "POST"; - + /** + * Has security been turned on? + */ + public static final boolean IS_SECURITY_ENABLED = + (System.getSecurityManager() != null); } diff --git a/java/org/apache/coyote/http11/Http11AprProcessor.java b/java/org/apache/coyote/http11/Http11AprProcessor.java index df05c7408..ab2ed8c2b 100644 --- a/java/org/apache/coyote/http11/Http11AprProcessor.java +++ b/java/org/apache/coyote/http11/Http11AprProcessor.java @@ -24,7 +24,6 @@ import java.security.cert.CertificateFactory; import java.security.cert.X509Certificate; import java.util.Locale; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.coyote.ActionCode; import org.apache.coyote.ActionHook; @@ -47,6 +46,7 @@ import org.apache.tomcat.util.net.AbstractEndpoint; import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; import org.apache.tomcat.util.net.AprEndpoint; import org.apache.tomcat.util.net.SocketStatus; +import org.apache.tomcat.util.net.SocketWrapper; /** @@ -124,7 +124,7 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio /** * Socket associated with the current connection. */ - protected long socket = 0; + protected SocketWrapper socket = null; /** @@ -186,7 +186,7 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio * * @throws IOException error during an I/O operation */ - public SocketState process(long socket) + public SocketState process(SocketWrapper socket) throws IOException { RequestInfo rp = request.getRequestProcessor(); rp.setStage(org.apache.coyote.Constants.STAGE_PARSE); @@ -201,13 +201,13 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio // Setting up the socket this.socket = socket; - inputBuffer.setSocket(socket); - outputBuffer.setSocket(socket); + long socketRef = socket.getSocket().longValue(); + inputBuffer.setSocket(socketRef); + outputBuffer.setSocket(socketRef); // Error flag error = false; comet = false; - async = false; keepAlive = true; int keepAliveLeft = maxKeepAliveRequests; @@ -216,12 +216,12 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio boolean keptAlive = false; boolean openSocket = false; - while (!error && keepAlive && !comet && !async && !endpoint.isPaused()) { + while (!error && keepAlive && !comet && !isAsync() && !endpoint.isPaused()) { // Parsing the request header try { if( !disableUploadTimeout && keptAlive && soTimeout > 0 ) { - Socket.timeoutSet(socket, soTimeout * 1000); + Socket.timeoutSet(socketRef, soTimeout * 1000); } if (!inputBuffer.parseRequestLine(keptAlive)) { // This means that no data is available right now @@ -229,13 +229,13 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio // and the method should return true openSocket = true; // Add the socket to the poller - endpoint.getPoller().add(socket); + endpoint.getPoller().add(socketRef); break; } request.setStartTime(System.currentTimeMillis()); keptAlive = true; if (!disableUploadTimeout) { - Socket.timeoutSet(socket, timeout * 1000); + Socket.timeoutSet(socketRef, timeout * 1000); } inputBuffer.parseHeaders(); } catch (IOException e) { @@ -296,7 +296,7 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio } // Finish the handling of the request - if (!comet && !async) { + if (!comet && !isAsync()) { // If we know we are closing the connection, don't drain input. // This way uploading a 100GB file doesn't tie up the thread // if the servlet has rejected it. @@ -312,7 +312,7 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio } request.updateCounters(); - if (!comet && !async) { + if (!comet && !isAsync()) { // Next request inputBuffer.nextRequest(); outputBuffer.nextRequest(); @@ -320,7 +320,7 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio // Do sendfile as needed: add socket to sendfile and end if (sendfileData != null && !error) { - sendfileData.socket = socket; + sendfileData.socket = socketRef; sendfileData.keepAlive = keepAlive; if (!endpoint.getSendfile().add(sendfileData)) { openSocket = true; @@ -335,11 +335,9 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); if (error || endpoint.isPaused()) { - inputBuffer.nextRequest(); - outputBuffer.nextRequest(); recycle(); return SocketState.CLOSED; - } else if (comet || async) { + } else if (comet || isAsync()) { return SocketState.LONG; } else { recycle(); @@ -349,12 +347,14 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio } /* Copied from the AjpProcessor.java */ - public SocketState asyncDispatch(long socket, SocketStatus status) { + public SocketState asyncDispatch(SocketWrapper socket, + SocketStatus status) { // Setting up the socket this.socket = socket; - inputBuffer.setSocket(socket); - outputBuffer.setSocket(socket); + long socketRef = socket.getSocket().longValue(); + inputBuffer.setSocket(socketRef); + outputBuffer.setSocket(socketRef); RequestInfo rp = request.getRequestProcessor(); try { @@ -372,30 +372,25 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); - if (async) { - if (error) { - response.setStatus(500); - request.updateCounters(); - recycle(); + if (error) { + recycle(); + return SocketState.CLOSED; + } else if (isAsync()) { + return SocketState.LONG; + } else { + recycle(); + if (!keepAlive) { return SocketState.CLOSED; } else { - return SocketState.LONG; + return SocketState.OPEN; } - } else { - if (error) { - response.setStatus(500); - } - request.updateCounters(); - recycle(); - return SocketState.CLOSED; } - } @Override public void recycleInternal() { - this.socket = 0; + this.socket = null; } @@ -411,6 +406,8 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio @Override public void actionInternal(ActionCode actionCode, Object param) { + long socketRef = socket.getSocket().longValue(); + if (actionCode == ActionCode.CLOSE) { // Close @@ -418,7 +415,6 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio // transactions with the client comet = false; - async = false; try { outputBuffer.endRequest(); } catch (IOException e) { @@ -429,9 +425,9 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio } else if (actionCode == ActionCode.REQ_HOST_ADDR_ATTRIBUTE) { // Get remote host address - if (remoteAddr == null && (socket != 0)) { + if (remoteAddr == null && (socketRef != 0)) { try { - long sa = Address.get(Socket.APR_REMOTE, socket); + long sa = Address.get(Socket.APR_REMOTE, socketRef); remoteAddr = Address.getip(sa); } catch (Exception e) { log.warn(sm.getString("http11processor.socket.info"), e); @@ -442,9 +438,9 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio } else if (actionCode == ActionCode.REQ_LOCAL_NAME_ATTRIBUTE) { // Get local host name - if (localName == null && (socket != 0)) { + if (localName == null && (socketRef != 0)) { try { - long sa = Address.get(Socket.APR_LOCAL, socket); + long sa = Address.get(Socket.APR_LOCAL, socketRef); localName = Address.getnameinfo(sa, 0); } catch (Exception e) { log.warn(sm.getString("http11processor.socket.info"), e); @@ -455,9 +451,9 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio } else if (actionCode == ActionCode.REQ_HOST_ATTRIBUTE) { // Get remote host name - if (remoteHost == null && (socket != 0)) { + if (remoteHost == null && (socketRef != 0)) { try { - long sa = Address.get(Socket.APR_REMOTE, socket); + long sa = Address.get(Socket.APR_REMOTE, socketRef); remoteHost = Address.getnameinfo(sa, 0); } catch (Exception e) { log.warn(sm.getString("http11processor.socket.info"), e); @@ -468,9 +464,9 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio } else if (actionCode == ActionCode.REQ_LOCAL_ADDR_ATTRIBUTE) { // Get local host address - if (localAddr == null && (socket != 0)) { + if (localAddr == null && (socketRef != 0)) { try { - long sa = Address.get(Socket.APR_LOCAL, socket); + long sa = Address.get(Socket.APR_LOCAL, socketRef); localAddr = Address.getip(sa); } catch (Exception e) { log.warn(sm.getString("http11processor.socket.info"), e); @@ -482,9 +478,9 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio } else if (actionCode == ActionCode.REQ_REMOTEPORT_ATTRIBUTE) { // Get remote port - if (remotePort == -1 && (socket != 0)) { + if (remotePort == -1 && (socketRef != 0)) { try { - long sa = Address.get(Socket.APR_REMOTE, socket); + long sa = Address.get(Socket.APR_REMOTE, socketRef); Sockaddr addr = Address.getInfo(sa); remotePort = addr.port; } catch (Exception e) { @@ -496,9 +492,9 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio } else if (actionCode == ActionCode.REQ_LOCALPORT_ATTRIBUTE) { // Get local port - if (localPort == -1 && (socket != 0)) { + if (localPort == -1 && (socketRef != 0)) { try { - long sa = Address.get(Socket.APR_LOCAL, socket); + long sa = Address.get(Socket.APR_LOCAL, socketRef); Sockaddr addr = Address.getInfo(sa); localPort = addr.port; } catch (Exception e) { @@ -509,24 +505,24 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio } else if (actionCode == ActionCode.REQ_SSL_ATTRIBUTE ) { - if (ssl && (socket != 0)) { + if (ssl && (socketRef != 0)) { try { // Cipher suite - Object sslO = SSLSocket.getInfoS(socket, SSL.SSL_INFO_CIPHER); + Object sslO = SSLSocket.getInfoS(socketRef, SSL.SSL_INFO_CIPHER); if (sslO != null) { request.setAttribute(AbstractEndpoint.CIPHER_SUITE_KEY, sslO); } // Get client certificate and the certificate chain if present // certLength == -1 indicates an error - int certLength = SSLSocket.getInfoI(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN); - byte[] clientCert = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT); + int certLength = SSLSocket.getInfoI(socketRef, SSL.SSL_INFO_CLIENT_CERT_CHAIN); + byte[] clientCert = SSLSocket.getInfoB(socketRef, SSL.SSL_INFO_CLIENT_CERT); X509Certificate[] certs = null; if (clientCert != null && certLength > -1) { certs = new X509Certificate[certLength + 1]; CertificateFactory cf = CertificateFactory.getInstance("X.509"); certs[0] = (X509Certificate) cf.generateCertificate(new ByteArrayInputStream(clientCert)); for (int i = 0; i < certLength; i++) { - byte[] data = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN + i); + byte[] data = SSLSocket.getInfoB(socketRef, SSL.SSL_INFO_CLIENT_CERT_CHAIN + i); certs[i+1] = (X509Certificate) cf.generateCertificate(new ByteArrayInputStream(data)); } } @@ -534,12 +530,12 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio request.setAttribute(AbstractEndpoint.CERTIFICATE_KEY, certs); } // User key size - sslO = Integer.valueOf(SSLSocket.getInfoI(socket, + sslO = Integer.valueOf(SSLSocket.getInfoI(socketRef, SSL.SSL_INFO_CIPHER_USEKEYSIZE)); request.setAttribute(AbstractEndpoint.KEY_SIZE_KEY, sslO); // SSL session ID - sslO = SSLSocket.getInfoS(socket, SSL.SSL_INFO_SESSION_ID); + sslO = SSLSocket.getInfoS(socketRef, SSL.SSL_INFO_SESSION_ID); if (sslO != null) { request.setAttribute(AbstractEndpoint.SESSION_ID_KEY, sslO); } @@ -552,7 +548,7 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio } else if (actionCode == ActionCode.REQ_SSL_CERTIFICATE) { - if (ssl && (socket != 0)) { + if (ssl && (socketRef != 0)) { // Consume and buffer the request body, so that it does not // interfere with the client's handshake messages InputFilter[] inputFilters = inputBuffer.getFilters(); @@ -560,22 +556,22 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio inputBuffer.addActiveFilter(inputFilters[Constants.BUFFERED_FILTER]); try { // Configure connection to require a certificate - SSLSocket.setVerify(socket, SSL.SSL_CVERIFY_REQUIRE, + SSLSocket.setVerify(socketRef, SSL.SSL_CVERIFY_REQUIRE, endpoint.getSSLVerifyDepth()); // Renegotiate certificates - if (SSLSocket.renegotiate(socket) == 0) { + if (SSLSocket.renegotiate(socketRef) == 0) { // Don't look for certs unless we know renegotiation worked. // Get client certificate and the certificate chain if present // certLength == -1 indicates an error - int certLength = SSLSocket.getInfoI(socket,SSL.SSL_INFO_CLIENT_CERT_CHAIN); - byte[] clientCert = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT); + int certLength = SSLSocket.getInfoI(socketRef,SSL.SSL_INFO_CLIENT_CERT_CHAIN); + byte[] clientCert = SSLSocket.getInfoB(socketRef, SSL.SSL_INFO_CLIENT_CERT); X509Certificate[] certs = null; if (clientCert != null && certLength > -1) { certs = new X509Certificate[certLength + 1]; CertificateFactory cf = CertificateFactory.getInstance("X.509"); certs[0] = (X509Certificate) cf.generateCertificate(new ByteArrayInputStream(clientCert)); for (int i = 0; i < certLength; i++) { - byte[] data = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN + i); + byte[] data = SSLSocket.getInfoB(socketRef, SSL.SSL_INFO_CLIENT_CERT_CHAIN + i); certs[i+1] = (X509Certificate) cf.generateCertificate(new ByteArrayInputStream(data)); } } @@ -599,29 +595,16 @@ public class Http11AprProcessor extends AbstractHttp11Processor implements Actio } else if (actionCode == ActionCode.COMET_SETTIMEOUT) { //no op } else if (actionCode == ActionCode.ASYNC_COMPLETE) { - //TODO SERVLET3 - async - that is bit hacky - - AtomicBoolean dispatch = (AtomicBoolean)param; - RequestInfo rp = request.getRequestProcessor(); - if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) { //async handling - dispatch.set(true); - endpoint.getHandler().asyncDispatch(this.socket, SocketStatus.STOP); - } else { - dispatch.set(false); + if (asyncComplete()) { + endpoint.processSocketAsync(this.socket, SocketStatus.OPEN); } } else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) { - //TODO SERVLET3 - async if (param==null) return; - if (socket==0) return; long timeout = ((Long)param).longValue(); - Socket.timeoutSet(socket, timeout * 1000); + socket.setTimeout(timeout); } else if (actionCode == ActionCode.ASYNC_DISPATCH) { - RequestInfo rp = request.getRequestProcessor(); - AtomicBoolean dispatch = (AtomicBoolean)param; - if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) {//async handling - endpoint.getPoller().add(this.socket); - dispatch.set(true); - } else { - dispatch.set(true); + if (asyncDispatch()) { + endpoint.processSocketAsync(this.socket, SocketStatus.OPEN); } } diff --git a/java/org/apache/coyote/http11/Http11AprProtocol.java b/java/org/apache/coyote/http11/Http11AprProtocol.java index 583af68cb..4c4f67b67 100644 --- a/java/org/apache/coyote/http11/Http11AprProtocol.java +++ b/java/org/apache/coyote/http11/Http11AprProtocol.java @@ -17,6 +17,8 @@ package org.apache.coyote.http11; +import java.security.AccessController; +import java.security.PrivilegedAction; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -32,6 +34,7 @@ import org.apache.tomcat.util.modeler.Registry; import org.apache.tomcat.util.net.AprEndpoint; import org.apache.tomcat.util.net.AprEndpoint.Handler; import org.apache.tomcat.util.net.SocketStatus; +import org.apache.tomcat.util.net.SocketWrapper; import org.apache.tomcat.util.res.StringManager; @@ -246,8 +249,8 @@ public class Http11AprProtocol extends AbstractHttp11Protocol { protected AtomicLong registerCount = new AtomicLong(0); protected RequestGroupInfo global = new RequestGroupInfo(); - protected ConcurrentHashMap connections = - new ConcurrentHashMap(); + protected ConcurrentHashMap, Http11AprProcessor> connections = + new ConcurrentHashMap, Http11AprProcessor>(); protected ConcurrentLinkedQueue recycledProcessors = new ConcurrentLinkedQueue() { private static final long serialVersionUID = 1L; @@ -294,8 +297,8 @@ public class Http11AprProtocol extends AbstractHttp11Protocol { } @Override - public SocketState event(long socket, SocketStatus status) { - Http11AprProcessor result = connections.get(Long.valueOf(socket)); + public SocketState event(SocketWrapper socket, SocketStatus status) { + Http11AprProcessor result = connections.get(socket); SocketState state = SocketState.CLOSED; if (result != null) { @@ -324,16 +327,16 @@ public class Http11AprProtocol extends AbstractHttp11Protocol { "http11protocol.proto.error"), e); } finally { if (state != SocketState.LONG) { - connections.remove(Long.valueOf(socket)); + connections.remove(socket); recycledProcessors.offer(result); if (state == SocketState.OPEN) { - ((AprEndpoint)proto.endpoint).getPoller().add(socket); + ((AprEndpoint)proto.endpoint).getPoller().add(socket.getSocket().longValue()); } } else { - ((AprEndpoint)proto.endpoint).getCometPoller().add(socket); + ((AprEndpoint)proto.endpoint).getCometPoller().add(socket.getSocket().longValue()); } } - } else if (result.async) { + } else if (result.isAsync()) { state = asyncDispatch(socket, status); } } @@ -341,7 +344,7 @@ public class Http11AprProtocol extends AbstractHttp11Protocol { } @Override - public SocketState process(long socket) { + public SocketState process(SocketWrapper socket) { Http11AprProcessor processor = recycledProcessors.poll(); try { if (processor == null) { @@ -350,11 +353,13 @@ public class Http11AprProtocol extends AbstractHttp11Protocol { SocketState state = processor.process(socket); if (state == SocketState.LONG) { - // Associate the connection with the processor. The next request - // processed by this thread will use either a new or a recycled - // processor. - connections.put(Long.valueOf(socket), processor); - ((AprEndpoint)proto.endpoint).getCometPoller().add(socket); + // Check if the post processing is going to change the state + state = processor.asyncPostProcess(); + } + if (state == SocketState.LONG || state == SocketState.ASYNC_END) { + // Need to make socket available for next processing cycle + // but no need for the poller + connections.put(socket, processor); } else { recycledProcessors.offer(processor); } @@ -386,8 +391,8 @@ public class Http11AprProtocol extends AbstractHttp11Protocol { } @Override - public SocketState asyncDispatch(long socket, SocketStatus status) { - Http11AprProcessor result = connections.get(Long.valueOf(socket)); + public SocketState asyncDispatch(SocketWrapper socket, SocketStatus status) { + Http11AprProcessor result = connections.get(socket); SocketState state = SocketState.CLOSED; if (result != null) { @@ -404,11 +409,14 @@ public class Http11AprProtocol extends AbstractHttp11Protocol { Http11AprProtocol.log.error (sm.getString("http11protocol.proto.error"), e); } finally { - if (state != SocketState.LONG) { - connections.remove(Long.valueOf(socket)); + if (state == SocketState.LONG && result.isAsync()) { + state = result.asyncPostProcess(); + } + if (state != SocketState.LONG && state != SocketState.ASYNC_END) { + connections.remove(socket); recycledProcessors.offer(result); if (state == SocketState.OPEN) { - ((AprEndpoint)proto.endpoint).getPoller().add(socket); + ((AprEndpoint)proto.endpoint).getPoller().add(socket.getSocket().longValue()); } } } @@ -440,15 +448,29 @@ public class Http11AprProtocol extends AbstractHttp11Protocol { synchronized (this) { try { long count = registerCount.incrementAndGet(); - RequestInfo rp = processor.getRequest().getRequestProcessor(); + final RequestInfo rp = processor.getRequest().getRequestProcessor(); rp.setGlobalProcessor(global); - ObjectName rpName = new ObjectName + final ObjectName rpName = new ObjectName (proto.getDomain() + ":type=RequestProcessor,worker=" + proto.getName() + ",name=HttpRequest" + count); if (log.isDebugEnabled()) { log.debug("Register " + rpName); } - Registry.getRegistry(null, null).registerComponent(rp, rpName, null); + if (Constants.IS_SECURITY_ENABLED) { + AccessController.doPrivileged(new PrivilegedAction() { + @Override + public Void run() { + try { + Registry.getRegistry(null, null).registerComponent(rp, rpName, null); + } catch (Exception e) { + log.warn("Error registering request"); + } + return null; + } + }); + } else { + Registry.getRegistry(null, null).registerComponent(rp, rpName, null); + } rp.setRpName(rpName); } catch (Exception e) { log.warn("Error registering request"); diff --git a/java/org/apache/coyote/http11/Http11NioProcessor.java b/java/org/apache/coyote/http11/Http11NioProcessor.java index 02ed55760..ded215304 100644 --- a/java/org/apache/coyote/http11/Http11NioProcessor.java +++ b/java/org/apache/coyote/http11/Http11NioProcessor.java @@ -22,7 +22,6 @@ import java.net.InetAddress; import java.nio.channels.SelectionKey; import java.util.Locale; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.coyote.ActionCode; import org.apache.coyote.ActionHook; @@ -236,11 +235,13 @@ public class Http11NioProcessor extends AbstractHttp11Processor implements Actio Integer comettimeout = (Integer) request.getAttribute("org.apache.tomcat.comet.timeout"); if (comettimeout != null) attach.setTimeout(comettimeout.longValue()); } else { - //reset the timeout - if (keepAlive && keepAliveTimeout>0) { - attach.setTimeout(keepAliveTimeout); - } else { - attach.setTimeout(soTimeout); + if (isAsyncDispatching()) { + //reset the timeout + if (keepAlive && keepAliveTimeout>0) { + attach.setTimeout(keepAliveTimeout); + } else { + attach.setTimeout(soTimeout); + } } } @@ -261,7 +262,7 @@ public class Http11NioProcessor extends AbstractHttp11Processor implements Actio if (error) { recycle(); return SocketState.CLOSED; - } else if (!comet) { + } else if (!comet && !isAsync()) { recycle(); return (keepAlive)?SocketState.OPEN:SocketState.CLOSED; } else { @@ -291,7 +292,6 @@ public class Http11NioProcessor extends AbstractHttp11Processor implements Actio error = false; keepAlive = true; comet = false; - async = false; long soTimeout = endpoint.getSoTimeout(); int keepAliveTimeout = endpoint.getKeepAliveTimeout(); @@ -301,7 +301,7 @@ public class Http11NioProcessor extends AbstractHttp11Processor implements Actio boolean recycle = true; final KeyAttachment ka = (KeyAttachment)socket.getAttachment(false); - while (!error && keepAlive && !comet && !async && !endpoint.isPaused()) { + while (!error && keepAlive && !comet && !isAsync() && !endpoint.isPaused()) { //always default to our soTimeout ka.setTimeout(soTimeout); // Parsing the request header @@ -412,7 +412,7 @@ public class Http11NioProcessor extends AbstractHttp11Processor implements Actio } // Finish the handling of the request - if (!comet && !async) { + if (!comet && !isAsync()) { // If we know we are closing the connection, don't drain input. // This way uploading a 100GB file doesn't tie up the thread // if the servlet has rejected it. @@ -428,7 +428,7 @@ public class Http11NioProcessor extends AbstractHttp11Processor implements Actio } request.updateCounters(); - if (!comet && !async) { + if (!comet && !isAsync()) { // Next request inputBuffer.nextRequest(); outputBuffer.nextRequest(); @@ -453,7 +453,7 @@ public class Http11NioProcessor extends AbstractHttp11Processor implements Actio if (error || endpoint.isPaused()) { recycle(); return SocketState.CLOSED; - } else if (comet || async) { + } else if (comet || isAsync()) { return SocketState.LONG; } else { if (recycle) { @@ -494,13 +494,11 @@ public class Http11NioProcessor extends AbstractHttp11Processor implements Actio if (actionCode == ActionCode.CLOSE) { // Close - // End the processing of the current request, and stop any further // transactions with the client comet = false; cometClose = true; - async = false; SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); if ( key != null ) { NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment(); @@ -654,17 +652,10 @@ public class Http11NioProcessor extends AbstractHttp11Processor implements Actio if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) //async handling attach.setTimeout(timeout); } else if (actionCode == ActionCode.ASYNC_COMPLETE) { - //TODO SERVLET3 - async - AtomicBoolean dispatch = (AtomicBoolean)param; - RequestInfo rp = request.getRequestProcessor(); - if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) { //async handling - dispatch.set(true); - endpoint.processSocket(this.socket, SocketStatus.STOP, true); - } else { - dispatch.set(false); + if (asyncComplete()) { + endpoint.processSocket(this.socket, SocketStatus.OPEN, true); } } else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) { - //TODO SERVLET3 - async if (param==null) return; if (socket==null || socket.getAttachment(false)==null) return; NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getAttachment(false); @@ -672,13 +663,8 @@ public class Http11NioProcessor extends AbstractHttp11Processor implements Actio //if we are not piggy backing on a worker thread, set the timeout attach.setTimeout(timeout); } else if (actionCode == ActionCode.ASYNC_DISPATCH) { - RequestInfo rp = request.getRequestProcessor(); - AtomicBoolean dispatch = (AtomicBoolean)param; - if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) {//async handling + if (asyncDispatch()) { endpoint.processSocket(this.socket, SocketStatus.OPEN, true); - dispatch.set(true); - } else { - dispatch.set(true); } } } diff --git a/java/org/apache/coyote/http11/Http11NioProtocol.java b/java/org/apache/coyote/http11/Http11NioProtocol.java index bf1f60e1b..5acc5ae7e 100644 --- a/java/org/apache/coyote/http11/Http11NioProtocol.java +++ b/java/org/apache/coyote/http11/Http11NioProtocol.java @@ -18,6 +18,8 @@ package org.apache.coyote.http11; import java.nio.channels.SocketChannel; +import java.security.AccessController; +import java.security.PrivilegedAction; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -288,18 +290,18 @@ public class Http11NioProtocol extends AbstractHttp11JsseProtocol { @Override public SocketState event(NioChannel socket, SocketStatus status) { - Http11NioProcessor result = connections.get(socket); + Http11NioProcessor processor = connections.get(socket); NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false); att.setAsync(false); //no longer check for timeout SocketState state = SocketState.CLOSED; - if (result != null) { - if (log.isDebugEnabled()) log.debug("Http11NioProcessor.error="+result.error); + if (processor != null) { + if (log.isDebugEnabled()) log.debug("Http11NioProcessor.error="+processor.error); // Call the appropriate event try { - if (result.async) { - state = result.asyncDispatch(status); + if (processor.comet) { + state = processor.event(status); } else { - state = result.event(status); + state = processor.asyncDispatch(status); } } catch (java.net.SocketException e) { // SocketExceptions are normal @@ -322,14 +324,21 @@ public class Http11NioProtocol extends AbstractHttp11JsseProtocol { Http11NioProtocol.log.error (sm.getString("http11protocol.proto.error"), e); } finally { - if (state != SocketState.LONG) { + if (processor.isAsync()) { + state = processor.asyncPostProcess(); + } + if (state != SocketState.LONG && state != SocketState.ASYNC_END) { connections.remove(socket); - recycledProcessors.offer(result); + recycledProcessors.offer(processor); if (state == SocketState.OPEN) { socket.getPoller().add(socket); } + } else if (state == SocketState.ASYNC_END) { + // No further work required + } else if (state == SocketState.LONG) { + att.setAsync(true); // Re-enable timeouts } else { - if (log.isDebugEnabled()) log.debug("Keeping processor["+result); + if (log.isDebugEnabled()) log.debug("Keeping processor["+processor); //add correct poller events here based on Comet stuff socket.getPoller().add(socket,att.getCometOps()); } @@ -369,12 +378,18 @@ public class Http11NioProtocol extends AbstractHttp11JsseProtocol { if (processor.comet) { NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false); socket.getPoller().add(socket,att.getCometOps()); - } else if (processor.async) { + } else if (processor.isAsync()) { NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false); att.setAsync(true); + // longPoll may change socket state (e.g. to trigger a + // complete or dispatch) + state = processor.asyncPostProcess(); } else { socket.getPoller().add(socket); } + } + if (state == SocketState.LONG || state == SocketState.ASYNC_END) { + // Already done all we need to do. } else if (state == SocketState.OPEN){ // In keep-alive but between requests. OK to recycle // processor. Continue to poll for the next request. @@ -434,12 +449,26 @@ public class Http11NioProtocol extends AbstractHttp11JsseProtocol { try { registerCount.addAndGet(1); if (log.isDebugEnabled()) log.debug("Register ["+processor+"] count="+registerCount.get()); - RequestInfo rp = processor.getRequest().getRequestProcessor(); + final RequestInfo rp = processor.getRequest().getRequestProcessor(); rp.setGlobalProcessor(global); - ObjectName rpName = new ObjectName + final ObjectName rpName = new ObjectName (proto.getDomain() + ":type=RequestProcessor,worker=" + proto.getName() + ",name=HttpRequest" + count++); - Registry.getRegistry(null, null).registerComponent(rp, rpName, null); + if (Constants.IS_SECURITY_ENABLED) { + AccessController.doPrivileged(new PrivilegedAction() { + @Override + public Void run() { + try { + Registry.getRegistry(null, null).registerComponent(rp, rpName, null); + } catch (Exception e) { + log.warn("Error registering request"); + } + return null; + } + }); + } else { + Registry.getRegistry(null, null).registerComponent(rp, rpName, null); + } rp.setRpName(rpName); } catch (Exception e) { log.warn("Error registering request"); diff --git a/java/org/apache/coyote/http11/Http11Processor.java b/java/org/apache/coyote/http11/Http11Processor.java index 6e59e1a35..97583960f 100644 --- a/java/org/apache/coyote/http11/Http11Processor.java +++ b/java/org/apache/coyote/http11/Http11Processor.java @@ -23,7 +23,6 @@ import java.net.InetAddress; import java.net.Socket; import java.util.Locale; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.coyote.ActionCode; import org.apache.coyote.ActionHook; @@ -265,9 +264,9 @@ public class Http11Processor extends AbstractHttp11Processor implements ActionHo // This way uploading a 100GB file doesn't tie up the thread // if the servlet has rejected it. - if(error && !async) + if(error && !isAsync()) inputBuffer.setSwallowInput(false); - if (!async) + if (!isAsync()) endRequest(); } catch (Throwable t) { log.error(sm.getString("http11processor.request.finish"), t); @@ -296,7 +295,7 @@ public class Http11Processor extends AbstractHttp11Processor implements ActionHo // will reset it // thrA.setParam(null); // Next request - if (!async || error) { + if (!isAsync() || error) { inputBuffer.nextRequest(); outputBuffer.nextRequest(); } @@ -309,7 +308,7 @@ public class Http11Processor extends AbstractHttp11Processor implements ActionHo if (error || endpoint.isPaused()) { recycle(); return SocketState.CLOSED; - } else if (async) { + } else if (isAsync()) { return SocketState.LONG; } else { if (!keepAlive) { @@ -343,7 +342,7 @@ public class Http11Processor extends AbstractHttp11Processor implements ActionHo if (error) { recycle(); return SocketState.CLOSED; - } else if (async) { + } else if (isAsync()) { return SocketState.LONG; } else { recycle(); @@ -360,7 +359,6 @@ public class Http11Processor extends AbstractHttp11Processor implements ActionHo protected void recycleInternal() { // Recycle this.socket = null; - async = false; // Recycle ssl info sslSupport = null; } @@ -380,7 +378,6 @@ public class Http11Processor extends AbstractHttp11Processor implements ActionHo if (actionCode == ActionCode.CLOSE) { // Close - async = false; // End the processing of the current request, and stop any further // transactions with the client @@ -497,33 +494,19 @@ public class Http11Processor extends AbstractHttp11Processor implements ActionHo } } } else if (actionCode == ActionCode.ASYNC_COMPLETE) { - //TODO SERVLET3 - async - AtomicBoolean dispatch = (AtomicBoolean)param; - RequestInfo rp = request.getRequestProcessor(); - if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) { //async handling - dispatch.set(true); + if (asyncComplete()) { endpoint.processSocketAsync(this.socket, SocketStatus.OPEN); - } else { - dispatch.set(false); } } else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) { - //TODO SERVLET3 - async - if (param==null) return; + if (param == null) return; long timeout = ((Long)param).longValue(); - //if we are not piggy backing on a worker thread, set the timeout + // if we are not piggy backing on a worker thread, set the timeout socket.setTimeout(timeout); } else if (actionCode == ActionCode.ASYNC_DISPATCH) { - RequestInfo rp = request.getRequestProcessor(); - AtomicBoolean dispatch = (AtomicBoolean)param; - if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) {//async handling + if (asyncDispatch()) { endpoint.processSocketAsync(this.socket, SocketStatus.OPEN); - dispatch.set(true); - } else { - dispatch.set(true); } } - - } diff --git a/java/org/apache/coyote/http11/Http11Protocol.java b/java/org/apache/coyote/http11/Http11Protocol.java index cf2fba3d6..e99eccdbb 100644 --- a/java/org/apache/coyote/http11/Http11Protocol.java +++ b/java/org/apache/coyote/http11/Http11Protocol.java @@ -17,7 +17,10 @@ package org.apache.coyote.http11; +import java.io.IOException; import java.net.Socket; +import java.security.AccessController; +import java.security.PrivilegedAction; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -257,6 +260,9 @@ public class Http11Protocol extends AbstractHttp11JsseProtocol { if (state == SocketState.LONG) { connections.put(socket, processor); socket.setAsync(true); + // longPoll may change socket state (e.g. to trigger a + // complete or dispatch) + return processor.asyncPostProcess(); } else { connections.remove(socket); socket.setAsync(false); @@ -310,15 +316,29 @@ public class Http11Protocol extends AbstractHttp11JsseProtocol { synchronized (this) { try { long count = registerCount.incrementAndGet(); - RequestInfo rp = processor.getRequest().getRequestProcessor(); + final RequestInfo rp = processor.getRequest().getRequestProcessor(); rp.setGlobalProcessor(global); - ObjectName rpName = new ObjectName + final ObjectName rpName = new ObjectName (proto.getDomain() + ":type=RequestProcessor,worker=" + proto.getName() + ",name=HttpRequest" + count); if (log.isDebugEnabled()) { log.debug("Register " + rpName); } - Registry.getRegistry(null, null).registerComponent(rp, rpName, null); + if (Constants.IS_SECURITY_ENABLED) { + AccessController.doPrivileged(new PrivilegedAction() { + @Override + public Void run() { + try { + Registry.getRegistry(null, null).registerComponent(rp, rpName, null); + } catch (Exception e) { + log.warn("Error registering request"); + } + return null; + } + }); + } else { + Registry.getRegistry(null, null).registerComponent(rp, rpName, null); + } rp.setRpName(rpName); } catch (Exception e) { log.warn("Error registering request"); diff --git a/java/org/apache/coyote/http11/LocalStrings.properties b/java/org/apache/coyote/http11/LocalStrings.properties index 298c66ea0..22b2a03c4 100644 --- a/java/org/apache/coyote/http11/LocalStrings.properties +++ b/java/org/apache/coyote/http11/LocalStrings.properties @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +abstractHttp11Protocol.invalidAsyncState=Calling [{0}] is not valid for a request with Async state [{1}] + http11protocol.destroy=Destroying Coyote HTTP/1.1 on {0} http11protocol.endpoint.initerror=Error initializing endpoint http11protocol.endpoint.starterror=Error starting endpoint diff --git a/java/org/apache/tomcat/util/net/AbstractEndpoint.java b/java/org/apache/tomcat/util/net/AbstractEndpoint.java index 917a6d4a7..8b936a919 100644 --- a/java/org/apache/tomcat/util/net/AbstractEndpoint.java +++ b/java/org/apache/tomcat/util/net/AbstractEndpoint.java @@ -78,7 +78,7 @@ public abstract class AbstractEndpoint { */ public static interface Handler { public enum SocketState { - OPEN, CLOSED, LONG + OPEN, CLOSED, LONG, ASYNC_END } } diff --git a/java/org/apache/tomcat/util/net/AprEndpoint.java b/java/org/apache/tomcat/util/net/AprEndpoint.java index fcb9dde7b..93a445866 100644 --- a/java/org/apache/tomcat/util/net/AprEndpoint.java +++ b/java/org/apache/tomcat/util/net/AprEndpoint.java @@ -21,6 +21,8 @@ import java.security.AccessController; import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.RejectedExecutionException; import org.apache.juli.logging.Log; @@ -93,6 +95,9 @@ public class AprEndpoint extends AbstractEndpoint { private Acceptor acceptors[] = null; + protected ConcurrentLinkedQueue> waitingRequests = + new ConcurrentLinkedQueue>(); + // ------------------------------------------------------------- Properties @@ -101,6 +106,7 @@ public class AprEndpoint extends AbstractEndpoint { */ protected boolean deferAccept = true; public void setDeferAccept(boolean deferAccept) { this.deferAccept = deferAccept; } + @Override public boolean getDeferAccept() { return deferAccept; } @@ -142,6 +148,7 @@ public class AprEndpoint extends AbstractEndpoint { */ protected boolean useSendfile = Library.APR_HAS_SENDFILE; public void setUseSendfile(boolean useSendfile) { this.useSendfile = useSendfile; } + @Override public boolean getUseSendfile() { return useSendfile; } @@ -580,6 +587,12 @@ public class AprEndpoint extends AbstractEndpoint { acceptors[i].start(); } + // Start async timeout thread + Thread timeoutThread = new Thread(new AsyncTimeout(), + getName() + "-AsyncTimeout"); + timeoutThread.setPriority(threadPriority); + timeoutThread.setDaemon(true); + timeoutThread.start(); } } @@ -587,6 +600,7 @@ public class AprEndpoint extends AbstractEndpoint { /** * Stop the endpoint. This will cause all processing threads to stop. */ + @Override public void stop() { if (!paused) { pause(); @@ -748,7 +762,9 @@ public class AprEndpoint extends AbstractEndpoint { try { // During shutdown, executor may be null - avoid NPE if (running) { - getExecutor().execute(new SocketWithOptionsProcessor(socket)); + SocketWrapper wrapper = + new SocketWrapper(Long.valueOf(socket)); + getExecutor().execute(new SocketWithOptionsProcessor(wrapper)); } } catch (RejectedExecutionException x) { log.warn("Socket processing request was rejected for:"+socket,x); @@ -768,7 +784,9 @@ public class AprEndpoint extends AbstractEndpoint { */ protected boolean processSocket(long socket) { try { - getExecutor().execute(new SocketProcessor(socket)); + SocketWrapper wrapper = + new SocketWrapper(Long.valueOf(socket)); + getExecutor().execute(new SocketProcessor(wrapper, null)); } catch (RejectedExecutionException x) { log.warn("Socket processing request was rejected for:"+socket,x); return false; @@ -789,8 +807,10 @@ public class AprEndpoint extends AbstractEndpoint { try { if (status == SocketStatus.OPEN || status == SocketStatus.STOP || status == SocketStatus.TIMEOUT) { + SocketWrapper wrapper = + new SocketWrapper(Long.valueOf(socket)); SocketEventProcessor proc = - new SocketEventProcessor(socket, status); + new SocketEventProcessor(wrapper, status); ClassLoader loader = Thread.currentThread().getContextClassLoader(); try { if (IS_SECURITY_ENABLED) { @@ -822,6 +842,45 @@ public class AprEndpoint extends AbstractEndpoint { return true; } + public boolean processSocketAsync(SocketWrapper socket, + SocketStatus status) { + try { + synchronized (socket) { + if (waitingRequests.remove(socket)) { + SocketProcessor proc = new SocketProcessor(socket, status); + ClassLoader loader = Thread.currentThread().getContextClassLoader(); + try { + if (IS_SECURITY_ENABLED) { + PrivilegedAction pa = new PrivilegedSetTccl( + getClass().getClassLoader()); + AccessController.doPrivileged(pa); + } else { + Thread.currentThread().setContextClassLoader( + getClass().getClassLoader()); + } + getExecutor().execute(proc); + } finally { + if (IS_SECURITY_ENABLED) { + PrivilegedAction pa = new PrivilegedSetTccl(loader); + AccessController.doPrivileged(pa); + } else { + Thread.currentThread().setContextClassLoader(loader); + } + } + } + } + } catch (RejectedExecutionException x) { + log.warn("Socket processing request was rejected for:"+socket,x); + return false; + } catch (Throwable t) { + // This means we got an OOM or similar creating a thread, or that + // the pool and its queue are full + log.error(sm.getString("endpoint.process.fail"), t); + return false; + } + return true; + } + private void destroySocket(long socket) { if (running && socket != 0) { @@ -898,9 +957,50 @@ public class AprEndpoint extends AbstractEndpoint { } - // ----------------------------------------------------- Poller Inner Class - + /** + * Async timeout thread + */ + protected class AsyncTimeout implements Runnable { + /** + * The background thread that checks async requests and fires the + * timeout if there has been no activity. + */ + @Override + public void run() { + // Loop until we receive a shutdown command + while (running) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Ignore + } + long now = System.currentTimeMillis(); + Iterator> sockets = + waitingRequests.iterator(); + while (sockets.hasNext()) { + SocketWrapper socket = sockets.next(); + long access = socket.getLastAccess(); + if ((now-access)>socket.getTimeout()) { + processSocketAsync(socket,SocketStatus.TIMEOUT); + } + } + + // Loop if endpoint is paused + while (paused && running) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Ignore + } + } + + } + } + } + + + // ----------------------------------------------------- Poller Inner Class /** * Poller class. */ @@ -951,6 +1051,7 @@ public class AprEndpoint extends AbstractEndpoint { /** * Destroy the poller. */ + @Override public void destroy() { // Close all sockets in the add queue for (int i = 0; i < addCount; i++) { @@ -1015,6 +1116,7 @@ public class AprEndpoint extends AbstractEndpoint { * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. */ + @Override public void run() { long maintainTime = 0; @@ -1212,6 +1314,7 @@ public class AprEndpoint extends AbstractEndpoint { /** * Destroy the poller. */ + @Override public void destroy() { // Close any socket remaining in the add queue addCount = 0; @@ -1314,6 +1417,7 @@ public class AprEndpoint extends AbstractEndpoint { * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. */ + @Override public void run() { long maintainTime = 0; @@ -1478,15 +1582,16 @@ public class AprEndpoint extends AbstractEndpoint { * thread local fields. */ public interface Handler extends AbstractEndpoint.Handler { - public SocketState process(long socket); - public SocketState event(long socket, SocketStatus status); - public SocketState asyncDispatch(long socket, SocketStatus status); + public SocketState process(SocketWrapper socket); + public SocketState event(SocketWrapper socket, + SocketStatus status); + public SocketState asyncDispatch(SocketWrapper socket, + SocketStatus status); } // ---------------------------------------------- SocketProcessor Inner Class - /** * This class is the equivalent of the Worker, but will simply use in an * external Executor thread pool. This will also set the socket options @@ -1494,29 +1599,32 @@ public class AprEndpoint extends AbstractEndpoint { */ protected class SocketWithOptionsProcessor implements Runnable { - protected long socket = 0; + protected SocketWrapper socket = null; - public SocketWithOptionsProcessor(long socket) { + public SocketWithOptionsProcessor(SocketWrapper socket) { this.socket = socket; } + @Override public void run() { - if (!deferAccept) { - if (setSocketOptions(socket)) { - getPoller().add(socket); + synchronized (socket) { + if (!deferAccept) { + if (setSocketOptions(socket.getSocket().longValue())) { + getPoller().add(socket.getSocket().longValue()); + } else { + // Close socket and pool + destroySocket(socket.getSocket().longValue()); + socket = null; + } } else { - // Close socket and pool - destroySocket(socket); - socket = 0; - } - } else { - // Process the request from this socket - if (!setSocketOptions(socket) - || handler.process(socket) == Handler.SocketState.CLOSED) { - // Close socket and pool - destroySocket(socket); - socket = 0; + // Process the request from this socket + if (!setSocketOptions(socket.getSocket().longValue()) + || handler.process(socket) == Handler.SocketState.CLOSED) { + // Close socket and pool + destroySocket(socket.getSocket().longValue()); + socket = null; + } } } @@ -1534,27 +1642,34 @@ public class AprEndpoint extends AbstractEndpoint { */ protected class SocketProcessor implements Runnable { - protected long socket = 0; - protected boolean async = false; - protected SocketStatus status = SocketStatus.ERROR; + protected SocketWrapper socket = null; + protected SocketStatus status = null; - public SocketProcessor(long socket) { + public SocketProcessor(SocketWrapper socket, + SocketStatus status) { this.socket = socket; - this.async = false; + this.status = status; } + @Override public void run() { - - // Process the request from this socket - Handler.SocketState state = async?handler.asyncDispatch(socket, status):handler.process(socket); - if (state == Handler.SocketState.CLOSED) { - // Close socket and pool - destroySocket(socket); - socket = 0; + synchronized (socket) { + // Process the request from this socket + Handler.SocketState state = (status==null)?handler.process(socket):handler.asyncDispatch(socket, status); + if (state == Handler.SocketState.CLOSED) { + // Close socket and pool + destroySocket(socket.getSocket().longValue()); + socket = null; + } else if (state == Handler.SocketState.LONG) { + socket.access(); + waitingRequests.add(socket); + } else if (state == Handler.SocketState.ASYNC_END) { + socket.access(); + SocketProcessor proc = new SocketProcessor(socket, SocketStatus.OPEN); + getExecutor().execute(proc); + } } - } - } @@ -1567,25 +1682,27 @@ public class AprEndpoint extends AbstractEndpoint { */ protected class SocketEventProcessor implements Runnable { - protected long socket = 0; + protected SocketWrapper socket = null; protected SocketStatus status = null; - public SocketEventProcessor(long socket, SocketStatus status) { + public SocketEventProcessor(SocketWrapper socket, + SocketStatus status) { this.socket = socket; this.status = status; } + @Override public void run() { - - // Process the request from this socket - if (handler.event(socket, status) == Handler.SocketState.CLOSED) { - // Close socket and pool - destroySocket(socket); - socket = 0; + synchronized (socket) { + // Process the request from this socket + Handler.SocketState state = handler.event(socket, status); + if (state == Handler.SocketState.CLOSED) { + // Close socket and pool + destroySocket(socket.getSocket().longValue()); + socket = null; + } } - } - } private static class PrivilegedSetTccl implements PrivilegedAction { @@ -1596,6 +1713,7 @@ public class AprEndpoint extends AbstractEndpoint { this.cl = cl; } + @Override public Void run() { Thread.currentThread().setContextClassLoader(cl); return null; diff --git a/java/org/apache/tomcat/util/net/JIoEndpoint.java b/java/org/apache/tomcat/util/net/JIoEndpoint.java index 4e5d29947..0c489adb0 100644 --- a/java/org/apache/tomcat/util/net/JIoEndpoint.java +++ b/java/org/apache/tomcat/util/net/JIoEndpoint.java @@ -122,8 +122,6 @@ public class JIoEndpoint extends AbstractEndpoint { } - // --------------------------------------------------- Acceptor Inner Class - /** * Async timeout thread */ @@ -165,6 +163,9 @@ public class JIoEndpoint extends AbstractEndpoint { } } } + + + // --------------------------------------------------- Acceptor Inner Class /** * Server socket acceptor thread. */ @@ -199,15 +200,15 @@ public class JIoEndpoint extends AbstractEndpoint { // Configure the socket if (setSocketOptions(socket)) { - // Hand this socket off to an appropriate processor - if (!processSocket(socket)) { - // Close socket right away - try { - socket.close(); - } catch (IOException e) { - // Ignore - } + // Hand this socket off to an appropriate processor + if (!processSocket(socket)) { + // Close socket right away + try { + socket.close(); + } catch (IOException e) { + // Ignore } + } } else { // Close socket right away try { @@ -258,56 +259,58 @@ public class JIoEndpoint extends AbstractEndpoint { @Override public void run() { boolean launch = false; - try { - - if (!socket.processing.compareAndSet(false, true)) { - log.error("Unable to process socket. Invalid state."); - return; - } - - SocketState state = SocketState.OPEN; - + synchronized (socket) { try { - // SSL handshake - serverSocketFactory.handshake(socket.getSocket()); - } catch (Throwable t) { - if (log.isDebugEnabled()) { - log.debug(sm.getString("endpoint.err.handshake"), t); - } - // Tell to close the socket - state = SocketState.CLOSED; - } + SocketState state = SocketState.OPEN; - if ( (state != SocketState.CLOSED) ) { - state = (status==null)?handler.process(socket):handler.process(socket,status); - } - if (state == SocketState.CLOSED) { - // Close socket - if (log.isTraceEnabled()) { - log.trace("Closing socket:"+socket); - } try { - socket.getSocket().close(); - } catch (IOException e) { - // Ignore + // SSL handshake + serverSocketFactory.handshake(socket.getSocket()); + } catch (Throwable t) { + if (log.isDebugEnabled()) { + log.debug(sm.getString("endpoint.err.handshake"), t); + } + // Tell to close the socket + state = SocketState.CLOSED; + } + + if ( (state != SocketState.CLOSED) ) { + state = (status==null)?handler.process(socket):handler.process(socket,status); + } + if (state == SocketState.CLOSED) { + // Close socket + if (log.isTraceEnabled()) { + log.trace("Closing socket:"+socket); + } + try { + socket.getSocket().close(); + } catch (IOException e) { + // Ignore + } + } else if (state == SocketState.ASYNC_END || + state == SocketState.OPEN){ + socket.setKeptAlive(true); + socket.access(); + launch = true; + } else if (state == SocketState.LONG) { + socket.access(); + waitingRequests.add(socket); + } + } finally { + if (launch) { + try { + getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN)); + } catch (NullPointerException npe) { + if (running) { + log.error(sm.getString("endpoint.launch.fail"), + npe); + } + } } - } else if (state == SocketState.OPEN){ - socket.setKeptAlive(true); - socket.access(); - //keepalive connection - //TODO - servlet3 check async status, we may just be in a hold pattern - launch = true; - } else if (state == SocketState.LONG) { - socket.access(); - waitingRequests.add(socket); } - } finally { - socket.processing.set(false); - if (launch) getExecutor().execute(new SocketProcessor(socket)); - socket = null; } + socket = null; // Finish up this request - } } @@ -469,7 +472,7 @@ public class JIoEndpoint extends AbstractEndpoint { */ protected boolean setSocketOptions(Socket socket) { serverSocketFactory.initSocket(socket); - + try { // 1: Set socket options: timeout, linger, etc socketProperties.setProperties(socket); @@ -488,7 +491,7 @@ public class JIoEndpoint extends AbstractEndpoint { return true; } - + /** * Process a new connection from a new client. Wraps the socket so * keep-alive and other attributes can be tracked and then passes the socket diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java index 9047a79ce..9d811509b 100644 --- a/java/org/apache/tomcat/util/net/NioEndpoint.java +++ b/java/org/apache/tomcat/util/net/NioEndpoint.java @@ -811,10 +811,12 @@ public class NioEndpoint extends AbstractEndpoint { } } } - }catch (SocketTimeoutException sx) { + } catch (SocketTimeoutException sx) { //normal condition - }catch ( IOException x ) { - if ( running ) log.error(sm.getString("endpoint.accept.fail"), x); + } catch (IOException x) { + if (running) { + log.error(sm.getString("endpoint.accept.fail"), x); + } } catch (OutOfMemoryError oom) { try { oomParachuteData = null; @@ -1352,7 +1354,6 @@ public class NioEndpoint extends AbstractEndpoint { this.socket = channel; this.poller = poller; lastAccess = System.currentTimeMillis(); - currentAccess = false; comet = false; timeout = soTimeout; error = false; @@ -1489,82 +1490,96 @@ public class NioEndpoint extends AbstractEndpoint { @Override public void run() { - SelectionKey key = null; - try { - key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); - int handshake = -1; - + boolean launch = false; + synchronized (socket) { + SelectionKey key = null; try { - if (key!=null) handshake = socket.handshake(key.isReadable(), key.isWritable()); - }catch ( IOException x ) { - handshake = -1; - if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x); - }catch ( CancelledKeyException ckx ) { - handshake = -1; - } - if ( handshake == 0 ) { - SocketState state = SocketState.OPEN; - // Process the request from this socket - state = (status==null)?handler.process(socket):handler.event(socket,status); - - if (state == SocketState.CLOSED) { - // Close socket and pool + key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); + int handshake = -1; + + try { + if (key!=null) handshake = socket.handshake(key.isReadable(), key.isWritable()); + }catch ( IOException x ) { + handshake = -1; + if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x); + }catch ( CancelledKeyException ckx ) { + handshake = -1; + } + if ( handshake == 0 ) { + SocketState state = SocketState.OPEN; + // Process the request from this socket + state = (status==null)?handler.process(socket):handler.event(socket,status); + + if (state == SocketState.CLOSED) { + // Close socket and pool + try { + KeyAttachment ka = null; + if (key!=null) { + ka = (KeyAttachment) key.attachment(); + if (ka!=null) ka.setComet(false); + socket.getPoller().cancelledKey(key, SocketStatus.ERROR, false); + } + if (socket!=null) nioChannels.offer(socket); + socket = null; + if ( ka!=null ) keyCache.offer(ka); + ka = null; + }catch ( Exception x ) { + log.error("",x); + } + } else if (state == SocketState.ASYNC_END) { + launch = true; + } + } else if (handshake == -1 ) { + KeyAttachment ka = null; + if (key!=null) { + ka = (KeyAttachment) key.attachment(); + socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT, false); + } + if (socket!=null) nioChannels.offer(socket); + socket = null; + if ( ka!=null ) keyCache.offer(ka); + ka = null; + } else { + final SelectionKey fk = key; + final int intops = handshake; + final KeyAttachment ka = (KeyAttachment)fk.attachment(); + ka.getPoller().add(socket,intops); + } + }catch(CancelledKeyException cx) { + socket.getPoller().cancelledKey(key,null,false); + } catch (OutOfMemoryError oom) { + try { + oomParachuteData = null; + socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false); + releaseCaches(); + log.error("", oom); + }catch ( Throwable oomt ) { + try { + System.err.println(oomParachuteMsg); + oomt.printStackTrace(); + }catch (Throwable letsHopeWeDontGetHere){} + } + }catch ( Throwable t ) { + log.error("",t); + socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false); + } finally { + if (launch) { try { - KeyAttachment ka = null; - if (key!=null) { - ka = (KeyAttachment) key.attachment(); - if (ka!=null) ka.setComet(false); - socket.getPoller().cancelledKey(key, SocketStatus.ERROR, false); + getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN)); + } catch (NullPointerException npe) { + if (running) { + log.error(sm.getString("endpoint.launch.fail"), + npe); } - if (socket!=null) nioChannels.offer(socket); - socket = null; - if ( ka!=null ) keyCache.offer(ka); - ka = null; - }catch ( Exception x ) { - log.error("",x); } - } - } else if (handshake == -1 ) { - KeyAttachment ka = null; - if (key!=null) { - ka = (KeyAttachment) key.attachment(); - socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT, false); } - if (socket!=null) nioChannels.offer(socket); socket = null; - if ( ka!=null ) keyCache.offer(ka); - ka = null; - } else { - final SelectionKey fk = key; - final int intops = handshake; - final KeyAttachment ka = (KeyAttachment)fk.attachment(); - ka.getPoller().add(socket,intops); + status = null; + //return to cache + processorCache.offer(this); } - }catch(CancelledKeyException cx) { - socket.getPoller().cancelledKey(key,null,false); - } catch (OutOfMemoryError oom) { - try { - oomParachuteData = null; - socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false); - releaseCaches(); - log.error("", oom); - }catch ( Throwable oomt ) { - try { - System.err.println(oomParachuteMsg); - oomt.printStackTrace(); - }catch (Throwable letsHopeWeDontGetHere){} - } - }catch ( Throwable t ) { - log.error("",t); - socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false); - } finally { - socket = null; - status = null; - //return to cache - processorCache.offer(this); } } - } // ----------------------------------------------- SendfileData Inner Class diff --git a/java/org/apache/tomcat/util/net/SocketWrapper.java b/java/org/apache/tomcat/util/net/SocketWrapper.java index 80df5905a..a0d11c1b4 100644 --- a/java/org/apache/tomcat/util/net/SocketWrapper.java +++ b/java/org/apache/tomcat/util/net/SocketWrapper.java @@ -16,22 +16,17 @@ */ package org.apache.tomcat.util.net; -import java.util.concurrent.atomic.AtomicBoolean; - - public class SocketWrapper { protected volatile E socket; protected volatile long lastAccess = -1; - protected volatile boolean currentAccess = false; protected long timeout = -1; protected boolean error = false; protected long lastRegistered = 0; protected volatile int keepAliveLeft = 100; protected boolean async = false; protected boolean keptAlive = false; - public AtomicBoolean processing = new AtomicBoolean(false); public SocketWrapper(E socket) { this.socket = socket; @@ -46,8 +41,6 @@ public class SocketWrapper { public long getLastAccess() { return lastAccess; } public void access() { access(System.currentTimeMillis()); } public void access(long access) { lastAccess = access; } - public boolean getCurrentAccess() { return currentAccess; } - public void setCurrentAccess(boolean access) { currentAccess = access; } public void setTimeout(long timeout) {this.timeout = timeout;} public long getTimeout() {return this.timeout;} public boolean getError() { return error; } diff --git a/java/org/apache/tomcat/util/net/res/LocalStrings.properties b/java/org/apache/tomcat/util/net/res/LocalStrings.properties index 686731c78..bb7031616 100644 --- a/java/org/apache/tomcat/util/net/res/LocalStrings.properties +++ b/java/org/apache/tomcat/util/net/res/LocalStrings.properties @@ -32,6 +32,7 @@ endpoint.init.bind=Socket bind failed: [{0}] {1} endpoint.init.listen=Socket listen failed: [{0}] {1} endpoint.init.notavail=APR not available endpoint.accept.fail=Socket accept failed +endpoint.launch.fail=Failed to launch new runnable endpoint.poll.limitedpollsize=Failed to create poller with specified size of {0} endpoint.poll.initfail=Poller creation failed endpoint.poll.fail=Critical poller failure (restarting poller): [{0}] {1} diff --git a/test/org/apache/catalina/core/TestAsyncContextImpl.java b/test/org/apache/catalina/core/TestAsyncContextImpl.java index 000a41016..2ba8902cc 100644 --- a/test/org/apache/catalina/core/TestAsyncContextImpl.java +++ b/test/org/apache/catalina/core/TestAsyncContextImpl.java @@ -24,6 +24,7 @@ import javax.servlet.AsyncContext; import javax.servlet.AsyncEvent; import javax.servlet.AsyncListener; import javax.servlet.ServletException; +import javax.servlet.ServletResponse; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -177,7 +178,7 @@ public class TestAsyncContextImpl extends TomcatBaseTest { result = new StringBuilder(); result.append('1'); result.append(req.isAsyncStarted()); - req.startAsync(); + req.startAsync().setTimeout(10000); result.append('2'); result.append(req.isAsyncStarted()); @@ -305,13 +306,22 @@ public class TestAsyncContextImpl extends TomcatBaseTest { throws ServletException, IOException { AsyncContext actxt = req.startAsync(); + actxt.setTimeout(3000); resp.setContentType("text/plain"); resp.getWriter().print("OK"); actxt.complete(); } } - public void testTimeout() throws Exception { + public void testTimeoutListenerComplete() throws Exception { + doTestTimeout(true); + } + + public void testTimeoutListenerNoComplete() throws Exception { + doTestTimeout(false); + } + + private void doTestTimeout(boolean completeOnTimeout) throws Exception { // Setup Tomcat instance Tomcat tomcat = getTomcatInstance(); @@ -326,7 +336,7 @@ public class TestAsyncContextImpl extends TomcatBaseTest { Context ctx = tomcat.addContext("", docBase.getAbsolutePath()); - TimeoutServlet timeout = new TimeoutServlet(); + TimeoutServlet timeout = new TimeoutServlet(completeOnTimeout); Wrapper wrapper = Tomcat.addServlet(ctx, "time", timeout); wrapper.setAsyncSupported(true); @@ -334,47 +344,383 @@ public class TestAsyncContextImpl extends TomcatBaseTest { tomcat.start(); ByteChunk res = getUrl("http://localhost:" + getPort() + "/async"); - assertEquals("OK", res.toString()); + StringBuilder expected = new StringBuilder(); + expected.append("TimeoutServletGet-onTimeout-"); + if (!completeOnTimeout) { + expected.append("onError-"); + } + expected.append("onComplete-"); + assertEquals(expected.toString(), res.toString()); } private static class TimeoutServlet extends HttpServlet { private static final long serialVersionUID = 1L; + private boolean completeOnTimeout; + + public TimeoutServlet(boolean completeOnTimeout) { + this.completeOnTimeout = completeOnTimeout; + } + @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { if (req.isAsyncSupported()) { - resp.getWriter().print("OK"); + resp.getWriter().print("TimeoutServletGet-"); final AsyncContext ac = req.startAsync(); - ac.setTimeout(2000); + ac.setTimeout(3000); - ac.addListener(new TimeoutListener()); + ac.addListener(new TrackingListener(false, completeOnTimeout)); } else resp.getWriter().print("FAIL: Async unsupported"); } } - private static class TimeoutListener implements AsyncListener { + public void testDispatchSingle() throws Exception { + doTestDispatch(1, false); + } + + public void testDispatchDouble() throws Exception { + doTestDispatch(2, false); + } + + public void testDispatchMultiple() throws Exception { + doTestDispatch(5, false); + } + + public void testDispatchWithThreadSingle() throws Exception { + doTestDispatch(1, true); + } + + public void testDispatchWithThreadDouble() throws Exception { + doTestDispatch(2, true); + } + + public void testDispatchWithThreadMultiple() throws Exception { + doTestDispatch(5, true); + } + + private void doTestDispatch(int iter, boolean useThread) throws Exception { + // Setup Tomcat instance + Tomcat tomcat = getTomcatInstance(); + + // Must have a real docBase - just use temp + File docBase = new File(System.getProperty("java.io.tmpdir")); + + Context ctx = tomcat.addContext("", docBase.getAbsolutePath()); + + DispatchingServlet dispatch = new DispatchingServlet(false, false); + Wrapper wrapper = Tomcat.addServlet(ctx, "dispatch", dispatch); + wrapper.setAsyncSupported(true); + ctx.addServletMapping("/stage1", "dispatch"); + + NonAsyncServlet nonasync = new NonAsyncServlet(); + Wrapper wrapper2 = Tomcat.addServlet(ctx, "nonasync", nonasync); + wrapper2.setAsyncSupported(true); + ctx.addServletMapping("/stage2", "nonasync"); + + tomcat.start(); + + StringBuilder url = new StringBuilder(48); + url.append("http://localhost:"); + url.append(getPort()); + url.append("/stage1?iter="); + url.append(iter); + if (useThread) { + url.append("&useThread=y"); + } + ByteChunk res = getUrl(url.toString()); + + StringBuilder expected = new StringBuilder(); + int loop = iter; + while (loop > 0) { + expected.append("DispatchingServletGet-"); + loop--; + } + expected.append("NonAsyncServletGet-"); + assertEquals(expected.toString(), res.toString()); + } + + private static class DispatchingServlet extends HttpServlet { + + private static final long serialVersionUID = 1L; + private static final String ITER_PARAM = "iter"; + private boolean addTrackingListener = false; + private boolean completeOnError = false; + + public DispatchingServlet(boolean addTrackingListener, + boolean completeOnError) { + this.addTrackingListener = addTrackingListener; + this.completeOnError = completeOnError; + } + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) + throws ServletException, IOException { + resp.getWriter().write("DispatchingServletGet-"); + resp.flushBuffer(); + final int iter = Integer.parseInt(req.getParameter(ITER_PARAM)) - 1; + final AsyncContext ctxt = req.startAsync(); + if (addTrackingListener) { + TrackingListener listener = + new TrackingListener(completeOnError, true); + ctxt.addListener(listener); + } + Runnable run = new Runnable() { + @Override + public void run() { + if (iter > 0) { + ctxt.dispatch("/stage1?" + ITER_PARAM + "=" + iter); + } else { + ctxt.dispatch("/stage2"); + } + } + }; + if ("y".equals(req.getParameter("useThread"))) { + new Thread(run).start(); + } else { + run.run(); + } + } + } + + private static class NonAsyncServlet extends HttpServlet { + + private static final long serialVersionUID = 1L; + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) + throws ServletException, IOException { + resp.getWriter().write("NonAsyncServletGet-"); + resp.flushBuffer(); + } + } + + public void testListeners() throws Exception { + // Setup Tomcat instance + Tomcat tomcat = getTomcatInstance(); + + // Must have a real docBase - just use temp + File docBase = new File(System.getProperty("java.io.tmpdir")); + + Context ctx = tomcat.addContext("", docBase.getAbsolutePath()); + + TrackingServlet tracking = new TrackingServlet(); + Wrapper wrapper = Tomcat.addServlet(ctx, "tracking", tracking); + wrapper.setAsyncSupported(true); + ctx.addServletMapping("/stage1", "tracking"); + + TimeoutServlet timeout = new TimeoutServlet(true); + Wrapper wrapper2 = Tomcat.addServlet(ctx, "timeout", timeout); + wrapper2.setAsyncSupported(true); + ctx.addServletMapping("/stage2", "timeout"); + + tomcat.start(); + + StringBuilder url = new StringBuilder(48); + url.append("http://localhost:"); + url.append(getPort()); + url.append("/stage1"); + + ByteChunk res = getUrl(url.toString()); + + assertEquals( + "DispatchingServletGet-DispatchingServletGet-onStartAsync-" + + "TimeoutServletGet-onStartAsync-onTimeout-onComplete-", + res.toString()); + } + + private static class TrackingServlet extends HttpServlet { + + private static final long serialVersionUID = 1L; + + private static volatile boolean first = true; + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) + throws ServletException, IOException { + resp.getWriter().write("DispatchingServletGet-"); + resp.flushBuffer(); + + final boolean first = TrackingServlet.first; + TrackingServlet.first = false; + + final AsyncContext ctxt = req.startAsync(); + TrackingListener listener = new TrackingListener(false, true); + ctxt.addListener(listener); + ctxt.setTimeout(3000); + + Runnable run = new Runnable() { + @Override + public void run() { + if (first) { + ctxt.dispatch("/stage1"); + } else { + ctxt.dispatch("/stage2"); + } + } + }; + if ("y".equals(req.getParameter("useThread"))) { + new Thread(run).start(); + } else { + run.run(); + } + } + } + + private static class TrackingListener implements AsyncListener { + + private boolean completeOnError; + private boolean completeOnTimeout; + + public TrackingListener(boolean completeOnError, + boolean completeOnTimeout) { + this.completeOnError = completeOnError; + this.completeOnTimeout = completeOnTimeout; + } @Override public void onComplete(AsyncEvent event) throws IOException { - // NO-OP + ServletResponse resp = event.getAsyncContext().getResponse(); + resp.getWriter().write("onComplete-"); + resp.flushBuffer(); } @Override public void onTimeout(AsyncEvent event) throws IOException { - event.getAsyncContext().complete(); + ServletResponse resp = event.getAsyncContext().getResponse(); + resp.getWriter().write("onTimeout-"); + resp.flushBuffer(); + if (completeOnTimeout){ + event.getAsyncContext().complete(); + } } @Override public void onError(AsyncEvent event) throws IOException { - // NOOP + ServletResponse resp = event.getAsyncContext().getResponse(); + resp.getWriter().write("onError-"); + resp.flushBuffer(); + if (completeOnError) { + event.getAsyncContext().complete(); + } } @Override public void onStartAsync(AsyncEvent event) throws IOException { - // NOOP + ServletResponse resp = event.getAsyncContext().getResponse(); + resp.getWriter().write("onStartAsync-"); + resp.flushBuffer(); + } + } + + public void testDispatchErrorSingle() throws Exception { + doTestDispatchError(1, false, false); + } + + public void testDispatchErrorDouble() throws Exception { + doTestDispatchError(2, false, false); + } + + public void testDispatchErrorMultiple() throws Exception { + doTestDispatchError(5, false, false); + } + + public void testDispatchErrorWithThreadSingle() throws Exception { + doTestDispatchError(1, true, false); + } + + public void testDispatchErrorWithThreadDouble() throws Exception { + doTestDispatchError(2, true, false); + } + + public void testDispatchErrorWithThreadMultiple() throws Exception { + doTestDispatchError(5, true, false); + } + + public void testDispatchErrorSingleThenComplete() throws Exception { + doTestDispatchError(1, false, true); + } + + public void testDispatchErrorDoubleThenComplete() throws Exception { + doTestDispatchError(2, false, true); + } + + public void testDispatchErrorMultipleThenComplete() throws Exception { + doTestDispatchError(5, false, true); + } + + public void testDispatchErrorWithThreadSingleThenComplete() + throws Exception { + doTestDispatchError(1, true, true); + } + + public void testDispatchErrorWithThreadDoubleThenComplete() + throws Exception { + doTestDispatchError(2, true, true); + } + + public void testDispatchErrorWithThreadMultipleThenComplete() + throws Exception { + doTestDispatchError(5, true, true); + } + + private void doTestDispatchError(int iter, boolean useThread, + boolean completeOnError) + throws Exception { + // Setup Tomcat instance + Tomcat tomcat = getTomcatInstance(); + + // Must have a real docBase - just use temp + File docBase = new File(System.getProperty("java.io.tmpdir")); + + Context ctx = tomcat.addContext("", docBase.getAbsolutePath()); + + DispatchingServlet dispatch = + new DispatchingServlet(true, completeOnError); + Wrapper wrapper = Tomcat.addServlet(ctx, "dispatch", dispatch); + wrapper.setAsyncSupported(true); + ctx.addServletMapping("/stage1", "dispatch"); + + ErrorServlet error = new ErrorServlet(); + Tomcat.addServlet(ctx, "error", error); + ctx.addServletMapping("/stage2", "error"); + + tomcat.start(); + + StringBuilder url = new StringBuilder(48); + url.append("http://localhost:"); + url.append(getPort()); + url.append("/stage1?iter="); + url.append(iter); + if (useThread) { + url.append("&useThread=y"); } + ByteChunk res = getUrl(url.toString()); + StringBuilder expected = new StringBuilder(); + int loop = iter; + while (loop > 0) { + expected.append("DispatchingServletGet-"); + if (loop != iter) { + expected.append("onStartAsync-"); + } + loop--; + } + expected.append("ErrorServletGet-onError-onComplete-"); + assertEquals(expected.toString(), res.toString()); + } + + private static class ErrorServlet extends HttpServlet { + + private static final long serialVersionUID = 1L; + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) + throws ServletException, IOException { + resp.getWriter().write("ErrorServletGet-"); + resp.flushBuffer(); + throw new ServletException("Opps."); + } } } diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index eab9ed7ba..a21559981 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -125,6 +125,11 @@ UnsupportedOperationException. (markt) + 49884: Fix occassional NullPointerException on async + complete(). This resulted in a major refactoring of the async + implementation to address a number of threading issues. (markt) + + Update the version numbers in ServerInfo defaults to Tomcat 7.0.x. (markt)