"Dispatch may only happen on an existing request.");
}
boolean comet = false;
- boolean async = false;
boolean success = true;
-
+ AsyncContextImpl asyncConImpl = (AsyncContextImpl)request.getAsyncContext();
try {
if (status==SocketStatus.TIMEOUT) {
- AsyncContextImpl asyncConImpl = (AsyncContextImpl)request.getAsyncContext();
- //TODO SERVLET3 - async
- //configure settings for timed out
- asyncConImpl.setTimeoutState();
- }
- if (status==SocketStatus.ERROR || status==SocketStatus.DISCONNECT) {
- AsyncContextImpl asyncConImpl = (AsyncContextImpl)request.getAsyncContext();
- //TODO SERVLET3 - async
- //configure settings for timed out
- asyncConImpl.setErrorState(new IOException("Socket error."));
+ success = true;
+ if (!asyncConImpl.timeout()) {
+ asyncConImpl.setErrorState(null);
+ }
}
- while (success) {
- AsyncContextImpl impl = (AsyncContextImpl)request.getAsyncContext();
- // Calling the container
- if (impl.getState()==AsyncContextImpl.AsyncState.DISPATCHED) {
- // Calling the container
- try {
- impl.complete();
- connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
- } finally {
- success = false;
- }
- } else if (impl.getState()==AsyncContextImpl.AsyncState.STARTED){
- //TODO SERVLET3 - async
- res.action(ActionCode.ASYNC_START, request.getAsyncContext());
- async = true;
- break;
- } else if (impl.getState()==AsyncContextImpl.AsyncState.NOT_STARTED){
- //TODO SERVLET3 - async
- async = false;
- break;
- } else if (impl.getState()==AsyncContextImpl.AsyncState.ERROR_DISPATCHING) {
- async = false;
- success = false;
- connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
- } else {
- try {
- connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
- } catch (RuntimeException x) {
- impl.setErrorState(x);
- }
+ if (request.isAsyncDispatching()) {
+ success = true;
+ connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
+ Throwable t = (Throwable) request.getAttribute(
+ Globals.EXCEPTION_ATTR);
+ if (t != null) {
+ asyncConImpl.setErrorState(t);
}
}
-
+
if (request.isComet()) {
if (!response.isClosed() && !response.isError()) {
if (request.getAvailable() || (request.getContentLength() > 0 && (!request.isParametersParsed()))) {
request.setFilterChain(null);
}
}
- if (!async && !comet) {
+ if (!request.isAsync() && !comet) {
response.finishResponse();
req.action(ActionCode.POST_REQUEST , null);
}
} finally {
req.getRequestProcessor().setWorkerThreadName(null);
// Recycle the wrapper request and response
- if (!success || (!comet && !async)) {
+ if (!success || (!comet && !request.isAsync())) {
request.recycle();
response.recycle();
} else {
}
AsyncContextImpl asyncConImpl = (AsyncContextImpl)request.getAsyncContext();
- if (asyncConImpl!=null && asyncConImpl.getState()==AsyncContextImpl.AsyncState.STARTED) {
- res.action(ActionCode.ASYNC_START, request.getAsyncContext());
+ if (asyncConImpl != null) {
async = true;
- } else if (request.isAsyncDispatching()) {
- asyncDispatch(req, res, SocketStatus.OPEN);
- if (request.isAsyncStarted()) {
- async = true;
- res.action(ActionCode.ASYNC_START, request.getAsyncContext());
- }
} else if (!comet) {
response.finishResponse();
req.action(ActionCode.POST_REQUEST , null);
import java.util.Map;
import java.util.TimeZone;
import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.security.auth.Subject;
import javax.servlet.AsyncContext;
if (asyncContext == null) {
return false;
}
-
- return (asyncContext.getState()==AsyncContextImpl.AsyncState.DISPATCHING ||
- asyncContext.getState()==AsyncContextImpl.AsyncState.TIMING_OUT ||
- asyncContext.getState()==AsyncContextImpl.AsyncState.STARTED ||
- asyncContext.getState()==AsyncContextImpl.AsyncState.ERROR_DISPATCHING ||
- asyncContext.getState()==AsyncContextImpl.AsyncState.COMPLETING);
+
+ AtomicBoolean result = new AtomicBoolean(false);
+ coyoteRequest.action(ActionCode.ASYNC_IS_DISPATCHING, result);
+ return result.get();
+ }
+
+ public boolean isAsync() {
+ if (asyncContext == null) {
+ return false;
+ }
+
+ AtomicBoolean result = new AtomicBoolean(false);
+ coyoteRequest.action(ActionCode.ASYNC_IS_ASYNC, result);
+ return result.get();
}
@Override
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
*/
public class AsyncContextImpl implements AsyncContext {
- public static enum AsyncState {
- NOT_STARTED, STARTED, DISPATCHING, DISPATCHED, COMPLETING, TIMING_OUT,
- TIMING_OUT_NEED_COMPLETE, ERROR_DISPATCHING
- }
-
private static final Log log = LogFactory.getLog(AsyncContextImpl.class);
private ServletRequest servletRequest = null;
private boolean hasOriginalRequestAndResponse = true;
private volatile Runnable dispatch = null;
private Context context = null;
- private AtomicReference<AsyncState> state = new AtomicReference<AsyncState>(AsyncState.NOT_STARTED);
private long timeout = -1;
private AsyncEvent event = null;
if (log.isDebugEnabled()) {
logDebug("complete ");
}
- if (state.get()==AsyncState.COMPLETING) {
- //do nothing
- } else if (state.compareAndSet(AsyncState.DISPATCHED,
- AsyncState.COMPLETING) ||
- state.compareAndSet(AsyncState.STARTED,
- AsyncState.COMPLETING) ||
- state.compareAndSet(AsyncState.TIMING_OUT_NEED_COMPLETE,
- AsyncState.COMPLETING)) {
- AtomicBoolean dispatched = new AtomicBoolean(false);
- request.getCoyoteRequest().action(ActionCode.ASYNC_COMPLETE,
- dispatched);
- if (!dispatched.get()) doInternalComplete(false);
- } else {
- throw new IllegalStateException(
- "Complete not allowed. Invalid state:"+state.get());
+ request.getCoyoteRequest().action(ActionCode.ASYNC_COMPLETE, null);
+ }
+
+ public void fireOnComplete() {
+ List<AsyncListenerWrapper> listenersCopy =
+ new ArrayList<AsyncListenerWrapper>();
+ listenersCopy.addAll(listeners);
+ for (AsyncListenerWrapper listener : listenersCopy) {
+ try {
+ listener.fireOnComplete(event);
+ } catch (IOException ioe) {
+ log.warn("onComplete() failed for listener of type [" +
+ listener.getClass().getName() + "]", ioe);
+ }
+ }
+ }
+
+ public boolean timeout() throws IOException {
+ AtomicBoolean result = new AtomicBoolean();
+ request.getCoyoteRequest().action(ActionCode.ASYNC_TIMEOUT, result);
+
+ if (result.get()) {
+ boolean listenerInvoked = false;
+ List<AsyncListenerWrapper> listenersCopy =
+ new ArrayList<AsyncListenerWrapper>();
+ listenersCopy.addAll(listeners);
+ for (AsyncListenerWrapper listener : listenersCopy) {
+ listener.fireOnTimeout(event);
+ listenerInvoked = true;
+ }
+ if (listenerInvoked) {
+ request.getCoyoteRequest().action(
+ ActionCode.ASYNC_IS_TIMINGOUT, result);
+ return !result.get();
+ } else {
+ // No listeners, container calls complete
+ complete();
+ }
}
-
+ return true;
}
@Override
if (log.isDebugEnabled()) {
logDebug("dispatch ");
}
-
- if (state.compareAndSet(AsyncState.STARTED, AsyncState.DISPATCHING) ||
- state.compareAndSet(AsyncState.DISPATCHED, AsyncState.DISPATCHING)) {
-
- if (request.getAttribute(ASYNC_REQUEST_URI)==null) {
- request.setAttribute(ASYNC_REQUEST_URI, request.getRequestURI()+"?"+request.getQueryString());
- request.setAttribute(ASYNC_CONTEXT_PATH, request.getContextPath());
- request.setAttribute(ASYNC_SERVLET_PATH, request.getServletPath());
- request.setAttribute(ASYNC_QUERY_STRING, request.getQueryString());
- }
- final RequestDispatcher requestDispatcher = context.getRequestDispatcher(path);
- final HttpServletRequest servletRequest = (HttpServletRequest)getRequest();
- final HttpServletResponse servletResponse = (HttpServletResponse)getResponse();
- Runnable run = new Runnable() {
- @Override
- public void run() {
- DispatcherType type = (DispatcherType)request.getAttribute(Globals.DISPATCHER_TYPE_ATTR);
- try {
- //piggy back on the request dispatcher to ensure that filters etc get called.
- //TODO SERVLET3 - async should this be include/forward or a new dispatch type
- //javadoc suggests include with the type of DispatcherType.ASYNC
- request.setAttribute(Globals.DISPATCHER_TYPE_ATTR, DispatcherType.ASYNC);
- requestDispatcher.include(servletRequest, servletResponse);
- }catch (Exception x) {
- //log.error("Async.dispatch",x);
- throw new RuntimeException(x);
- }finally {
- request.setAttribute(Globals.DISPATCHER_TYPE_ATTR, type);
- }
- }
- };
- this.dispatch = run;
- AtomicBoolean dispatched = new AtomicBoolean(false);
- request.getCoyoteRequest().action(ActionCode.ASYNC_DISPATCH, dispatched );
- if (!dispatched.get()) {
+ if (request.getAttribute(ASYNC_REQUEST_URI)==null) {
+ request.setAttribute(ASYNC_REQUEST_URI, request.getRequestURI()+"?"+request.getQueryString());
+ request.setAttribute(ASYNC_CONTEXT_PATH, request.getContextPath());
+ request.setAttribute(ASYNC_SERVLET_PATH, request.getServletPath());
+ request.setAttribute(ASYNC_QUERY_STRING, request.getQueryString());
+ }
+ final RequestDispatcher requestDispatcher = context.getRequestDispatcher(path);
+ final HttpServletRequest servletRequest = (HttpServletRequest)getRequest();
+ final HttpServletResponse servletResponse = (HttpServletResponse)getResponse();
+ Runnable run = new Runnable() {
+ @Override
+ public void run() {
+ request.getCoyoteRequest().action(ActionCode.ASYNC_DISPATCHED, null);
+ DispatcherType type = (DispatcherType)request.getAttribute(Globals.DISPATCHER_TYPE_ATTR);
try {
- doInternalDispatch();
- }catch (ServletException sx) {
- throw new RuntimeException(sx);
- }catch (IOException ix) {
- throw new RuntimeException(ix);
+ //piggy back on the request dispatcher to ensure that filters etc get called.
+ //TODO SERVLET3 - async should this be include/forward or a new dispatch type
+ //javadoc suggests include with the type of DispatcherType.ASYNC
+ request.setAttribute(Globals.DISPATCHER_TYPE_ATTR, DispatcherType.ASYNC);
+ requestDispatcher.include(servletRequest, servletResponse);
+ }catch (Exception x) {
+ //log.error("Async.dispatch",x);
+ throw new RuntimeException(x);
+ }finally {
+ request.setAttribute(Globals.DISPATCHER_TYPE_ATTR, type);
}
}
- if (state.get().equals(AsyncState.DISPATCHED)) {
- complete();
- }
- } else {
- throw new IllegalStateException("Dispatch not allowed. Invalid state:"+state.get());
- }
+ };
+
+ this.dispatch = run;
+ this.request.getCoyoteRequest().action(ActionCode.ASYNC_DISPATCH, null);
}
@Override
logDebug("start ");
}
- if (state.get() == AsyncState.STARTED) {
- // Execute the runnable using a container thread from the
- // Connector's thread pool. Use a wrapper to prevent a memory leak
- Runnable wrapper = new RunnableWrapper(run, context);
- ClassLoader oldCL;
- if (Globals.IS_SECURITY_ENABLED) {
- PrivilegedAction<ClassLoader> pa = new PrivilegedGetTccl();
- oldCL = AccessController.doPrivileged(pa);
- } else {
- oldCL = Thread.currentThread().getContextClassLoader();
- }
- try {
- if (Globals.IS_SECURITY_ENABLED) {
- PrivilegedAction<Void> pa = new PrivilegedSetTccl(
- this.getClass().getClassLoader());
- AccessController.doPrivileged(pa);
- } else {
- Thread.currentThread().setContextClassLoader(
- this.getClass().getClassLoader());
- }
- request.getConnector().getProtocolHandler().getExecutor(
- ).execute(wrapper);
- } finally {
- if (Globals.IS_SECURITY_ENABLED) {
- PrivilegedAction<Void> pa = new PrivilegedSetTccl(
- oldCL);
- AccessController.doPrivileged(pa);
- } else {
- Thread.currentThread().setContextClassLoader(oldCL);
- }
- }
- } else {
- throw new IllegalStateException("Start not allowed. Invalid state:"+state.get());
- }
+ Runnable wrapper = new RunnableWrapper(run, context);
+ this.request.getCoyoteRequest().action(ActionCode.ASYNC_RUN, wrapper);
}
@Override
}
servletRequest = null;
servletResponse = null;
- listeners.clear();
hasOriginalRequestAndResponse = true;
- state.set(AsyncState.NOT_STARTED);
context = null;
timeout = -1;
event = null;
}
public boolean isStarted() {
- return (state.get() == AsyncState.STARTED ||
- state.get() == AsyncState.DISPATCHING);
+ AtomicBoolean result = new AtomicBoolean(false);
+ request.getCoyoteRequest().action(
+ ActionCode.ASYNC_IS_STARTED, result);
+ return result.get();
}
public void setStarted(Context context, ServletRequest request,
- ServletResponse response, boolean hasOriginalRequestAndResponse) {
- if (state.compareAndSet(AsyncState.NOT_STARTED, AsyncState.STARTED) ||
- state.compareAndSet(AsyncState.DISPATCHED, AsyncState.STARTED)) {
- this.context = context;
- this.servletRequest = request;
- this.servletResponse = response;
- this.hasOriginalRequestAndResponse = hasOriginalRequestAndResponse;
- this.event = new AsyncEvent(this, request, response);
- } else {
- throw new IllegalStateException("Start illegal. Invalid state: "+state.get());
+ ServletResponse response, boolean originalRequestResponse) {
+
+ this.request.getCoyoteRequest().action(
+ ActionCode.ASYNC_START, this);
+
+ this.context = context;
+ this.servletRequest = request;
+ this.servletResponse = response;
+ this.hasOriginalRequestAndResponse = originalRequestResponse;
+ this.event = new AsyncEvent(this, request, response);
+
+ List<AsyncListenerWrapper> listenersCopy =
+ new ArrayList<AsyncListenerWrapper>();
+ listenersCopy.addAll(listeners);
+ for (AsyncListenerWrapper listener : listenersCopy) {
+ try {
+ listener.fireOnStartAsync(event);
+ } catch (IOException ioe) {
+ log.warn("onStartAsync() failed for listener of type [" +
+ listener.getClass().getName() + "]", ioe);
+ }
}
+ listeners.clear();
}
@Override
if (log.isDebugEnabled()) {
logDebug("intDispatch");
}
- if (this.state.compareAndSet(AsyncState.TIMING_OUT,
- AsyncState.TIMING_OUT_NEED_COMPLETE)) {
- log.debug("TIMING OUT!");
- boolean listenerInvoked = false;
- List<AsyncListenerWrapper> listenersCopy =
- new ArrayList<AsyncListenerWrapper>();
- listenersCopy.addAll(listeners);
- for (AsyncListenerWrapper listener : listenersCopy) {
- listener.fireOnTimeout(event);
- listenerInvoked = true;
- }
- if (listenerInvoked) {
- // Listener should have called complete
- if (state.get() != AsyncState.NOT_STARTED) {
- ((HttpServletResponse)servletResponse).setStatus(500);
- state.set(AsyncState.COMPLETING);
- doInternalComplete(true);
- }
- } else {
- // No listeners, container calls complete
- state.set(AsyncState.COMPLETING);
- doInternalComplete(false);
- }
- } else if (this.state.compareAndSet(AsyncState.ERROR_DISPATCHING, AsyncState.COMPLETING)) {
- log.debug("ON ERROR!");
- boolean listenerInvoked = false;
- for (AsyncListenerWrapper listener : listeners) {
- try {
- listener.fireOnError(event);
- }catch (IllegalStateException x) {
- log.debug("Listener invoked invalid state.",x);
- }catch (Exception x) {
- log.debug("Exception during onError.",x);
- }
- listenerInvoked = true;
- }
- if (!listenerInvoked) {
- ((HttpServletResponse)servletResponse).setStatus(500);
- }
- doInternalComplete(true);
-
- } else if (this.state.compareAndSet(AsyncState.DISPATCHING, AsyncState.DISPATCHED)) {
- if (this.dispatch!=null) {
- try {
- dispatch.run();
- } catch (RuntimeException x) {
- doInternalComplete(true);
- if (x.getCause() instanceof ServletException) throw (ServletException)x.getCause();
- if (x.getCause() instanceof IOException) throw (IOException)x.getCause();
- throw new ServletException(x);
- } finally {
- dispatch = null;
- }
- }
- } else if (this.state.get()==AsyncState.COMPLETING) {
- doInternalComplete(false);
- } else {
- throw new IllegalStateException("Dispatch illegal. Invalid state: "+state.get());
- }
- }
-
- private void doInternalComplete(boolean error) {
- if (log.isDebugEnabled()) {
- logDebug("intComplete");
- }
- if (state.get()==AsyncState.NOT_STARTED) return;
- if (state.compareAndSet(AsyncState.STARTED, AsyncState.NOT_STARTED)) {
- //this is the same as
- //request.startAsync().complete();
- recycle();
- } else if (state.compareAndSet(AsyncState.COMPLETING, AsyncState.NOT_STARTED)) {
- for (AsyncListenerWrapper wrapper : listeners) {
- try {
- wrapper.fireOnComplete(event);
- }catch (IOException x) {
- //how does this propagate, or should it?
- //TODO SERVLET3 - async
- log.error("",x);
- }
+ try {
+ dispatch.run();
+ } catch (RuntimeException x) {
+ // doInternalComplete(true);
+ if (x.getCause() instanceof ServletException) {
+ throw (ServletException)x.getCause();
}
- try {
- if (!error) getResponse().flushBuffer();
- }catch (Exception x) {
- log.error("",x);
+ if (x.getCause() instanceof IOException) {
+ throw (IOException)x.getCause();
}
- recycle();
-
- } else {
- throw new IllegalStateException("Complete illegal. Invalid state:"+state.get());
+ throw new ServletException(x);
}
}
-
- public AsyncState getState() {
- return state.get();
- }
+
@Override
public long getTimeout() {
return timeout;
}
-
+
+
@Override
public void setTimeout(long timeout) {
this.timeout = timeout;
request.getCoyoteRequest().action(ActionCode.ASYNC_SETTIMEOUT,
Long.valueOf(timeout));
}
-
- public void setTimeoutState() {
- state.set(AsyncState.TIMING_OUT);
- }
-
+
+
public void setErrorState(Throwable t) {
if (t!=null) request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, t);
- state.set(AsyncState.ERROR_DISPATCHING);
+ request.getCoyoteRequest().action(ActionCode.ASYNC_ERROR, null);
+ AsyncEvent errorEvent = new AsyncEvent(event.getAsyncContext(),
+ event.getSuppliedRequest(), event.getSuppliedResponse(), t);
+ List<AsyncListenerWrapper> listenersCopy =
+ new ArrayList<AsyncListenerWrapper>();
+ listenersCopy.addAll(listeners);
+ for (AsyncListenerWrapper listener : listenersCopy) {
+ try {
+ listener.fireOnError(errorEvent);
+ } catch (IOException ioe) {
+ log.warn("onStartAsync() failed for listener of type [" +
+ listener.getClass().getName() + "]", ioe);
+ }
+ }
}
+
private void logDebug(String method) {
String rHashCode;
"Req: %1$8s CReq: %2$8s RP: %3$8s Stage: %4$s " +
"Thread: %5$20s State: %6$20s Method: %7$11s URI: %8$s",
rHashCode, crHashCode, rpHashCode, stage,
- Thread.currentThread().getName(), state, method, uri);
+ Thread.currentThread().getName(), "N/A", method, uri);
if (log.isTraceEnabled()) {
log.trace(msg, new DebugException());
} else {
}
loadCorePackage(loader);
+ loadCoyotePackage(loader);
loadLoaderPackage(loader);
loadSessionPackage(loader);
loadUtilPackage(loader);
"AsyncContextImpl");
loader.loadClass
(basePackage +
- "AsyncContextImpl$AsyncState");
- loader.loadClass
- (basePackage +
"AsyncContextImpl$DebugException");
loader.loadClass
(basePackage +
}
+ private final static void loadCoyotePackage(ClassLoader loader)
+ throws Exception {
+ String basePackage = "org.apache.coyote.";
+ loader.loadClass(basePackage + "http11.AbstractOutputBuffer$1");
+ }
+
+
private final static void loadJavaxPackage(ClassLoader loader)
throws Exception {
loader.loadClass("javax.servlet.http.Cookie");
private final static void loadTomcatPackage(ClassLoader loader)
throws Exception {
String basePackage = "org.apache.tomcat.";
- loader.loadClass(basePackage + "util.net.SSLSupport$CipherData");
- loader.loadClass
- (basePackage + "util.net.JIoEndpoint$PrivilegedSetTccl");
// Make sure system property is read at this point
Class<?> clazz = loader.loadClass(
basePackage + "util.http.FastHttpDateFormat");
clazz.newInstance();
+ loader.loadClass(basePackage + "util.http.HttpMessages");
+ loader.loadClass(basePackage + "util.net.SSLSupport$CipherData");
+ loader.loadClass
+ (basePackage + "util.net.JIoEndpoint$PrivilegedSetTccl");
+ loader.loadClass
+ (basePackage + "util.net.AprEndpoint$PrivilegedSetTccl");
}
}
/**
* Callback for an async call to
+ * {@link javax.servlet.AsyncContext#dispatch()}
+ */
+ ASYNC_DISPATCH,
+
+ /**
+ * Callback to indicate the the actual dispatch has started and that the
+ * async state needs change.
+ */
+ ASYNC_DISPATCHED,
+
+ /**
+ * Callback for an async call to
+ * {@link javax.servlet.AsyncContext#start()}
+ */
+ ASYNC_RUN,
+
+ /**
+ * Callback for an async call to
* {@link javax.servlet.AsyncContext#complete()}
*/
ASYNC_COMPLETE,
+
+ /**
+ * Callback to trigger the processing of an async timeout
+ */
+ ASYNC_TIMEOUT,
+
+ /**
+ * Callback to trigger the error processing
+ */
+ ASYNC_ERROR,
+
/**
* Callback for an async call to
* {@link javax.servlet.AsyncContext#setTimeout(long)}
*/
ASYNC_SETTIMEOUT,
+
+ /**
+ * Callback to determine if async processing is in progress
+ */
+ ASYNC_IS_ASYNC,
+
+ /**
+ * Callback to determine if async dispatch is in progress
+ */
+ ASYNC_IS_STARTED,
/**
- * Callback for an async call to
- * {@link javax.servlet.AsyncContext#dispatch()}
+ * Callback to determine if async dispatch is in progress
*/
- ASYNC_DISPATCH,
+ ASYNC_IS_DISPATCHING,
+
+ /**
+ * Callback to determine if async is timing out
+ */
+ ASYNC_IS_TIMINGOUT
}
import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
import org.apache.tomcat.util.net.AprEndpoint;
import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.SocketWrapper;
import org.apache.tomcat.util.res.StringManager;
/**
* Socket associated with the current connection.
*/
- protected long socket;
+ protected SocketWrapper<Long> socket;
/**
*
* @throws IOException error during an I/O operation
*/
- public boolean process(long socket)
+ public boolean process(SocketWrapper<Long> socket)
throws IOException {
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
// Setting up the socket
this.socket = socket;
- Socket.setrbb(this.socket, inputBuffer);
- Socket.setsbb(this.socket, outputBuffer);
+ long socketRef = socket.getSocket().longValue();
+ Socket.setrbb(socketRef, inputBuffer);
+ Socket.setsbb(socketRef, outputBuffer);
// Error flag
error = false;
// not regular request processing
int type = requestHeaderMessage.getByte();
if (type == Constants.JK_AJP13_CPING_REQUEST) {
- if (Socket.sendb(socket, pongMessageBuffer, 0,
+ if (Socket.sendb(socketRef, pongMessageBuffer, 0,
pongMessageBuffer.position()) < 0) {
error = true;
}
// Add the socket to the poller
if (!error && !endpoint.isPaused()) {
- endpoint.getPoller().add(socket);
+ endpoint.getPoller().add(socketRef);
} else {
openSocket = false;
}
}
/* Copied from the AjpProcessor.java */
- public SocketState asyncDispatch(long socket, SocketStatus status) throws IOException {
+ public SocketState asyncDispatch(SocketWrapper<Long> socket,
+ SocketStatus status) throws IOException {
// Setting up the socket
this.socket = socket;
*/
public void action(ActionCode actionCode, Object param) {
+ long socketRef = socket.getSocket().longValue();
+
if (actionCode == ActionCode.COMMIT) {
if (response.isCommitted())
try {
flush();
// Send explicit flush message
- if (Socket.sendb(socket, flushMessageBuffer, 0,
+ if (Socket.sendb(socketRef, flushMessageBuffer, 0,
flushMessageBuffer.position()) < 0) {
error = true;
}
}
} else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) {
if (param==null) return;
- if (socket==0) return;
+ if (socketRef==0) return;
long timeout = ((Long)param).longValue();
- Socket.timeoutSet(socket, timeout * 1000);
+ Socket.timeoutSet(socketRef, timeout * 1000);
} else if (actionCode == ActionCode.ASYNC_DISPATCH) {
RequestInfo rp = request.getRequestProcessor();
AtomicBoolean dispatch = (AtomicBoolean)param;
if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) {//async handling
- endpoint.getPoller().add(this.socket);
+ endpoint.getPoller().add(socketRef);
dispatch.set(true);
} else {
dispatch.set(true);
int nRead;
while (inputBuffer.remaining() < n) {
nRead = Socket.recvbb
- (socket, inputBuffer.limit(),
+ (socket.getSocket().longValue(), inputBuffer.limit(),
inputBuffer.capacity() - inputBuffer.limit());
if (nRead > 0) {
inputBuffer.limit(inputBuffer.limit() + nRead);
int nRead;
while (inputBuffer.remaining() < n) {
nRead = Socket.recvbb
- (socket, inputBuffer.limit(),
+ (socket.getSocket().longValue(), inputBuffer.limit(),
inputBuffer.capacity() - inputBuffer.limit());
if (nRead > 0) {
inputBuffer.limit(inputBuffer.limit() + nRead);
}
// Request more data immediately
- Socket.sendb(socket, getBodyMessageBuffer, 0,
+ Socket.sendb(socket.getSocket().longValue(), getBodyMessageBuffer, 0,
getBodyMessageBuffer.position());
boolean moreData = receive();
protected void flush()
throws IOException {
if (outputBuffer.position() > 0) {
- if (Socket.sendbb(socket, 0, outputBuffer.position()) < 0) {
+ if (Socket.sendbb(socket.getSocket().longValue(), 0, outputBuffer.position()) < 0) {
throw new IOException(sm.getString("ajpprocessor.failedsend"));
}
outputBuffer.clear();
import org.apache.tomcat.util.net.AprEndpoint;
import org.apache.tomcat.util.net.AprEndpoint.Handler;
import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.SocketWrapper;
import org.apache.tomcat.util.res.StringManager;
protected AtomicLong registerCount = new AtomicLong(0);
protected RequestGroupInfo global = new RequestGroupInfo();
- protected ConcurrentHashMap<Long, AjpAprProcessor> connections =
- new ConcurrentHashMap<Long, AjpAprProcessor>();
+ protected ConcurrentHashMap<SocketWrapper<Long>, AjpAprProcessor> connections =
+ new ConcurrentHashMap<SocketWrapper<Long>, AjpAprProcessor>();
protected ConcurrentLinkedQueue<AjpAprProcessor> recycledProcessors =
new ConcurrentLinkedQueue<AjpAprProcessor>() {
}
// FIXME: Support for this could be added in AJP as well
- public SocketState event(long socket, SocketStatus status) {
+ public SocketState event(SocketWrapper<Long> socket, SocketStatus status) {
return SocketState.CLOSED;
}
- public SocketState process(long socket) {
+ public SocketState process(SocketWrapper<Long> socket) {
AjpAprProcessor processor = recycledProcessors.poll();
try {
}
if (processor.process(socket)) {
- connections.put(Long.valueOf(socket), processor);
+ connections.put(socket, processor);
return SocketState.OPEN;
} else {
// recycledProcessors.offer(processor);
}
// FIXME: Support for this could be added in AJP as well
- public SocketState asyncDispatch(long socket, SocketStatus status) {
+ public SocketState asyncDispatch(SocketWrapper<Long> socket, SocketStatus status) {
- AjpAprProcessor result = connections.get(Long.valueOf(socket));
+ AjpAprProcessor result = connections.get(socket);
SocketState state = SocketState.CLOSED;
if (result != null) {
(sm.getString("ajpprotocol.proto.error"), e);
} finally {
if (state != SocketState.LONG) {
- connections.remove(Long.valueOf(socket));
+ connections.remove(socket);
recycledProcessors.offer(result);
if (state == SocketState.OPEN) {
- proto.endpoint.getPoller().add(socket);
+ proto.endpoint.getPoller().add(socket.getSocket().longValue());
}
}
}
package org.apache.coyote.http11;
import java.io.IOException;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
import java.util.StringTokenizer;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
+import org.apache.catalina.core.AsyncContextImpl;
import org.apache.coyote.ActionCode;
import org.apache.coyote.Adapter;
import org.apache.coyote.Request;
import org.apache.tomcat.util.http.FastHttpDateFormat;
import org.apache.tomcat.util.http.MimeHeaders;
import org.apache.tomcat.util.net.AbstractEndpoint;
+import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
import org.apache.tomcat.util.res.StringManager;
public abstract class AbstractHttp11Processor {
/**
- * Async used
- */
- protected boolean async = false;
-
-
- /**
* Set compression level.
*/
public void setCompression(String compression) {
request.getInputBuffer();
internalBuffer.addActiveFilter(savedBody);
} else if (actionCode == ActionCode.ASYNC_START) {
- async = true;
+ asyncStart((AsyncContextImpl) param);
+ } else if (actionCode == ActionCode.ASYNC_DISPATCHED) {
+ asyncDispatched();
+ } else if (actionCode == ActionCode.ASYNC_TIMEOUT) {
+ AtomicBoolean result = (AtomicBoolean) param;
+ result.set(asyncTimeout());
+ } else if (actionCode == ActionCode.ASYNC_RUN) {
+ asyncRun((Runnable) param);
+ } else if (actionCode == ActionCode.ASYNC_ERROR) {
+ asyncError();
+ } else if (actionCode == ActionCode.ASYNC_IS_STARTED) {
+ ((AtomicBoolean) param).set(isAsyncStarted());
+ } else if (actionCode == ActionCode.ASYNC_IS_DISPATCHING) {
+ ((AtomicBoolean) param).set(isAsyncDispatching());
+ } else if (actionCode == ActionCode.ASYNC_IS_ASYNC) {
+ ((AtomicBoolean) param).set(isAsync());
+ } else if (actionCode == ActionCode.ASYNC_IS_TIMINGOUT) {
+ ((AtomicBoolean) param).set(isAsyncTimingOut());
} else {
actionInternal(actionCode, param);
}
public final void recycle() {
getInputBuffer().recycle();
getOutputBuffer().recycle();
+ asyncCtxt = null;
recycleInternal();
}
protected abstract void recycleInternal();
protected abstract Executor getExecutor();
+
+ // -------------------------------------------------- Async state management
+
+ /*
+ * DISPATCHED - Standard request. Not in Async mode.
+ * STARTING - ServletRequest.startAsync() has been called but the
+ * request in which that call was made has not finished
+ * processing.
+ * STARTED - ServletRequest.startAsync() has been called and the
+ * request in which that call was made has finished
+ * processing.
+ * MUST_COMPLETE - complete() has been called before the request in which
+ * ServletRequest.startAsync() has finished. As soon as that
+ * request finishes, the complete() will be processed.
+ * COMPLETING - The call to complete() was made once the request was in
+ * the STARTED state. May or may not be triggered by a
+ * container thread - depends if start(Runnable) was used
+ *
+ * TODO - markt - Move this to a separate class
+ */
+ private static enum AsyncState {
+ DISPATCHED(false, false, false),
+ STARTING(true, true, false),
+ STARTED(true, true, false),
+ MUST_COMPLETE(true, false, false),
+ COMPLETING(true, false, false),
+ TIMING_OUT(true, false, false),
+ MUST_DISPATCH(true, false, true),
+ DISPATCHING(true, false, true),
+ ERROR(true,false,false);
+
+ private boolean isAsync;
+ private boolean isStarted;
+ private boolean isDispatching;
+
+ private AsyncState(boolean isAsync, boolean isStarted,
+ boolean isDispatching) {
+ this.isAsync = isAsync;
+ this.isStarted = isStarted;
+ this.isDispatching = isDispatching;
+ }
+
+ public boolean isAsync() {
+ return this.isAsync;
+ }
+
+ public boolean isStarted() {
+ return this.isStarted;
+ }
+
+ public boolean isDispatching() {
+ return this.isDispatching;
+ }
+ }
+
+ private volatile AsyncState state = AsyncState.DISPATCHED;
+ // Need this to fire listener on complete
+ private AsyncContextImpl asyncCtxt = null;
+
+ protected boolean isAsync() {
+ return state.isAsync();
+ }
+
+ protected boolean isAsyncDispatching() {
+ return state.isDispatching();
+ }
+
+ protected boolean isAsyncStarted() {
+ return state.isStarted();
+ }
+
+ protected boolean isAsyncTimingOut() {
+ return state == AsyncState.TIMING_OUT;
+ }
+
+
+ private synchronized void asyncStart(AsyncContextImpl asyncCtxt) {
+ if (state == AsyncState.DISPATCHED) {
+ state = AsyncState.STARTING;
+ this.asyncCtxt = asyncCtxt;
+ } else {
+ throw new IllegalStateException(
+ sm.getString("abstractHttp11Protocol.invalidAsyncState",
+ "startAsync()", state));
+ }
+ }
+
+ /*
+ * Async has been processed. Whether or not to enter a long poll depends on
+ * current state. For example, as per SRV.2.3.3.3 can now process calls to
+ * complete() or dispatch().
+ */
+ protected synchronized SocketState asyncPostProcess() {
+
+ if (state == AsyncState.STARTING) {
+ state = AsyncState.STARTED;
+ return SocketState.LONG;
+ } else if (state == AsyncState.MUST_COMPLETE) {
+ asyncCtxt.fireOnComplete();
+ state = AsyncState.DISPATCHED;
+ return SocketState.ASYNC_END;
+ } else if (state == AsyncState.COMPLETING) {
+ state = AsyncState.DISPATCHED;
+ return SocketState.ASYNC_END;
+ } else if (state == AsyncState.MUST_DISPATCH) {
+ state = AsyncState.DISPATCHING;
+ return SocketState.ASYNC_END;
+ } else if (state == AsyncState.DISPATCHING) {
+ state = AsyncState.DISPATCHED;
+ return SocketState.ASYNC_END;
+ } else if (state == AsyncState.ERROR) {
+ asyncCtxt.fireOnComplete();
+ state = AsyncState.DISPATCHED;
+ return SocketState.ASYNC_END;
+ //} else if (state == AsyncState.DISPATCHED) {
+ // // No state change
+ // return SocketState.OPEN;
+ } else {
+ throw new IllegalStateException(
+ sm.getString("abstractHttp11Protocol.invalidAsyncState",
+ "asyncLongPoll()", state));
+ }
+ }
+
+
+ protected synchronized boolean asyncComplete() {
+ boolean doComplete = false;
+
+ if (state == AsyncState.STARTING) {
+ state = AsyncState.MUST_COMPLETE;
+ } else if (state == AsyncState.STARTED) {
+ state = AsyncState.COMPLETING;
+ doComplete = true;
+ } else if (state == AsyncState.TIMING_OUT ||
+ state == AsyncState.ERROR) {
+ state = AsyncState.MUST_COMPLETE;
+ } else {
+ throw new IllegalStateException(
+ sm.getString("abstractHttp11Protocol.invalidAsyncState",
+ "asyncComplete()", state));
+
+ }
+ return doComplete;
+ }
+
+
+ private synchronized boolean asyncTimeout() {
+ if (state == AsyncState.STARTED) {
+ state = AsyncState.TIMING_OUT;
+ return true;
+ } else if (state == AsyncState.COMPLETING ||
+ state == AsyncState.DISPATCHED) {
+ // NOOP - App called complete between the the timeout firing and
+ // execution reaching this point
+ return false;
+ } else {
+ throw new IllegalStateException(
+ sm.getString("abstractHttp11Protocol.invalidAsyncState",
+ "timeoutAsync()", state));
+ }
+ }
+
+
+ protected synchronized boolean asyncDispatch() {
+ boolean doDispatch = false;
+ if (state == AsyncState.STARTING) {
+ state = AsyncState.MUST_DISPATCH;
+ } else if (state == AsyncState.STARTED) {
+ state = AsyncState.DISPATCHING;
+ doDispatch = true;
+ } else {
+ throw new IllegalStateException(
+ sm.getString("abstractHttp11Protocol.invalidAsyncState",
+ "dispatchAsync()", state));
+ }
+ return doDispatch;
+ }
+
+
+ private synchronized void asyncDispatched() {
+ if (state == AsyncState.DISPATCHING) {
+ state = AsyncState.DISPATCHED;
+ } else {
+ throw new IllegalStateException(
+ sm.getString("abstractHttp11Protocol.invalidAsyncState",
+ "dispatchAsync()", state));
+ }
+ }
+
+
+ private synchronized boolean asyncError() {
+ boolean doDispatch = false;
+ if (state == AsyncState.DISPATCHED ||
+ state == AsyncState.TIMING_OUT) {
+ state = AsyncState.ERROR;
+ } else {
+ throw new IllegalStateException(
+ sm.getString("abstractHttp11Protocol.invalidAsyncState",
+ "dispatchAsync()", state));
+ }
+ return doDispatch;
+ }
+
+ private synchronized void asyncRun(Runnable runnable) {
+ if (state == AsyncState.STARTING || state == AsyncState.STARTED) {
+ // Execute the runnable using a container thread from the
+ // Connector's thread pool. Use a wrapper to prevent a memory leak
+ ClassLoader oldCL;
+ if (Constants.IS_SECURITY_ENABLED) {
+ PrivilegedAction<ClassLoader> pa = new PrivilegedGetTccl();
+ oldCL = AccessController.doPrivileged(pa);
+ } else {
+ oldCL = Thread.currentThread().getContextClassLoader();
+ }
+ try {
+ if (Constants.IS_SECURITY_ENABLED) {
+ PrivilegedAction<Void> pa = new PrivilegedSetTccl(
+ this.getClass().getClassLoader());
+ AccessController.doPrivileged(pa);
+ } else {
+ Thread.currentThread().setContextClassLoader(
+ this.getClass().getClassLoader());
+ }
+
+ getExecutor().execute(runnable);
+ } finally {
+ if (Constants.IS_SECURITY_ENABLED) {
+ PrivilegedAction<Void> pa = new PrivilegedSetTccl(
+ oldCL);
+ AccessController.doPrivileged(pa);
+ } else {
+ Thread.currentThread().setContextClassLoader(oldCL);
+ }
+ }
+ } else {
+ throw new IllegalStateException(
+ sm.getString("abstractHttp11Protocol.invalidAsyncState",
+ "runAsync()", state));
+ }
+
+ }
+
+ private static class PrivilegedSetTccl implements PrivilegedAction<Void> {
+
+ private ClassLoader cl;
+
+ PrivilegedSetTccl(ClassLoader cl) {
+ this.cl = cl;
+ }
+
+ @Override
+ public Void run() {
+ Thread.currentThread().setContextClassLoader(cl);
+ return null;
+ }
+ }
+
+ private static class PrivilegedGetTccl
+ implements PrivilegedAction<ClassLoader> {
+
+ @Override
+ public ClassLoader run() {
+ return Thread.currentThread().getContextClassLoader();
+ }
+ }
}
*/
public static final String POST = "POST";
-
+ /**
+ * Has security been turned on?
+ */
+ public static final boolean IS_SECURITY_ENABLED =
+ (System.getSecurityManager() != null);
}
import java.security.cert.X509Certificate;
import java.util.Locale;
import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.coyote.ActionCode;
import org.apache.coyote.ActionHook;
import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
import org.apache.tomcat.util.net.AprEndpoint;
import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.SocketWrapper;
/**
/**
* Socket associated with the current connection.
*/
- protected long socket = 0;
+ protected SocketWrapper<Long> socket = null;
/**
*
* @throws IOException error during an I/O operation
*/
- public SocketState process(long socket)
+ public SocketState process(SocketWrapper<Long> socket)
throws IOException {
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
// Setting up the socket
this.socket = socket;
- inputBuffer.setSocket(socket);
- outputBuffer.setSocket(socket);
+ long socketRef = socket.getSocket().longValue();
+ inputBuffer.setSocket(socketRef);
+ outputBuffer.setSocket(socketRef);
// Error flag
error = false;
comet = false;
- async = false;
keepAlive = true;
int keepAliveLeft = maxKeepAliveRequests;
boolean keptAlive = false;
boolean openSocket = false;
- while (!error && keepAlive && !comet && !async && !endpoint.isPaused()) {
+ while (!error && keepAlive && !comet && !isAsync() && !endpoint.isPaused()) {
// Parsing the request header
try {
if( !disableUploadTimeout && keptAlive && soTimeout > 0 ) {
- Socket.timeoutSet(socket, soTimeout * 1000);
+ Socket.timeoutSet(socketRef, soTimeout * 1000);
}
if (!inputBuffer.parseRequestLine(keptAlive)) {
// This means that no data is available right now
// and the method should return true
openSocket = true;
// Add the socket to the poller
- endpoint.getPoller().add(socket);
+ endpoint.getPoller().add(socketRef);
break;
}
request.setStartTime(System.currentTimeMillis());
keptAlive = true;
if (!disableUploadTimeout) {
- Socket.timeoutSet(socket, timeout * 1000);
+ Socket.timeoutSet(socketRef, timeout * 1000);
}
inputBuffer.parseHeaders();
} catch (IOException e) {
}
// Finish the handling of the request
- if (!comet && !async) {
+ if (!comet && !isAsync()) {
// If we know we are closing the connection, don't drain input.
// This way uploading a 100GB file doesn't tie up the thread
// if the servlet has rejected it.
}
request.updateCounters();
- if (!comet && !async) {
+ if (!comet && !isAsync()) {
// Next request
inputBuffer.nextRequest();
outputBuffer.nextRequest();
// Do sendfile as needed: add socket to sendfile and end
if (sendfileData != null && !error) {
- sendfileData.socket = socket;
+ sendfileData.socket = socketRef;
sendfileData.keepAlive = keepAlive;
if (!endpoint.getSendfile().add(sendfileData)) {
openSocket = true;
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
if (error || endpoint.isPaused()) {
- inputBuffer.nextRequest();
- outputBuffer.nextRequest();
recycle();
return SocketState.CLOSED;
- } else if (comet || async) {
+ } else if (comet || isAsync()) {
return SocketState.LONG;
} else {
recycle();
}
/* Copied from the AjpProcessor.java */
- public SocketState asyncDispatch(long socket, SocketStatus status) {
+ public SocketState asyncDispatch(SocketWrapper<Long> socket,
+ SocketStatus status) {
// Setting up the socket
this.socket = socket;
- inputBuffer.setSocket(socket);
- outputBuffer.setSocket(socket);
+ long socketRef = socket.getSocket().longValue();
+ inputBuffer.setSocket(socketRef);
+ outputBuffer.setSocket(socketRef);
RequestInfo rp = request.getRequestProcessor();
try {
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
- if (async) {
- if (error) {
- response.setStatus(500);
- request.updateCounters();
- recycle();
+ if (error) {
+ recycle();
+ return SocketState.CLOSED;
+ } else if (isAsync()) {
+ return SocketState.LONG;
+ } else {
+ recycle();
+ if (!keepAlive) {
return SocketState.CLOSED;
} else {
- return SocketState.LONG;
+ return SocketState.OPEN;
}
- } else {
- if (error) {
- response.setStatus(500);
- }
- request.updateCounters();
- recycle();
- return SocketState.CLOSED;
}
-
}
@Override
public void recycleInternal() {
- this.socket = 0;
+ this.socket = null;
}
@Override
public void actionInternal(ActionCode actionCode, Object param) {
+ long socketRef = socket.getSocket().longValue();
+
if (actionCode == ActionCode.CLOSE) {
// Close
// transactions with the client
comet = false;
- async = false;
try {
outputBuffer.endRequest();
} catch (IOException e) {
} else if (actionCode == ActionCode.REQ_HOST_ADDR_ATTRIBUTE) {
// Get remote host address
- if (remoteAddr == null && (socket != 0)) {
+ if (remoteAddr == null && (socketRef != 0)) {
try {
- long sa = Address.get(Socket.APR_REMOTE, socket);
+ long sa = Address.get(Socket.APR_REMOTE, socketRef);
remoteAddr = Address.getip(sa);
} catch (Exception e) {
log.warn(sm.getString("http11processor.socket.info"), e);
} else if (actionCode == ActionCode.REQ_LOCAL_NAME_ATTRIBUTE) {
// Get local host name
- if (localName == null && (socket != 0)) {
+ if (localName == null && (socketRef != 0)) {
try {
- long sa = Address.get(Socket.APR_LOCAL, socket);
+ long sa = Address.get(Socket.APR_LOCAL, socketRef);
localName = Address.getnameinfo(sa, 0);
} catch (Exception e) {
log.warn(sm.getString("http11processor.socket.info"), e);
} else if (actionCode == ActionCode.REQ_HOST_ATTRIBUTE) {
// Get remote host name
- if (remoteHost == null && (socket != 0)) {
+ if (remoteHost == null && (socketRef != 0)) {
try {
- long sa = Address.get(Socket.APR_REMOTE, socket);
+ long sa = Address.get(Socket.APR_REMOTE, socketRef);
remoteHost = Address.getnameinfo(sa, 0);
} catch (Exception e) {
log.warn(sm.getString("http11processor.socket.info"), e);
} else if (actionCode == ActionCode.REQ_LOCAL_ADDR_ATTRIBUTE) {
// Get local host address
- if (localAddr == null && (socket != 0)) {
+ if (localAddr == null && (socketRef != 0)) {
try {
- long sa = Address.get(Socket.APR_LOCAL, socket);
+ long sa = Address.get(Socket.APR_LOCAL, socketRef);
localAddr = Address.getip(sa);
} catch (Exception e) {
log.warn(sm.getString("http11processor.socket.info"), e);
} else if (actionCode == ActionCode.REQ_REMOTEPORT_ATTRIBUTE) {
// Get remote port
- if (remotePort == -1 && (socket != 0)) {
+ if (remotePort == -1 && (socketRef != 0)) {
try {
- long sa = Address.get(Socket.APR_REMOTE, socket);
+ long sa = Address.get(Socket.APR_REMOTE, socketRef);
Sockaddr addr = Address.getInfo(sa);
remotePort = addr.port;
} catch (Exception e) {
} else if (actionCode == ActionCode.REQ_LOCALPORT_ATTRIBUTE) {
// Get local port
- if (localPort == -1 && (socket != 0)) {
+ if (localPort == -1 && (socketRef != 0)) {
try {
- long sa = Address.get(Socket.APR_LOCAL, socket);
+ long sa = Address.get(Socket.APR_LOCAL, socketRef);
Sockaddr addr = Address.getInfo(sa);
localPort = addr.port;
} catch (Exception e) {
} else if (actionCode == ActionCode.REQ_SSL_ATTRIBUTE ) {
- if (ssl && (socket != 0)) {
+ if (ssl && (socketRef != 0)) {
try {
// Cipher suite
- Object sslO = SSLSocket.getInfoS(socket, SSL.SSL_INFO_CIPHER);
+ Object sslO = SSLSocket.getInfoS(socketRef, SSL.SSL_INFO_CIPHER);
if (sslO != null) {
request.setAttribute(AbstractEndpoint.CIPHER_SUITE_KEY, sslO);
}
// Get client certificate and the certificate chain if present
// certLength == -1 indicates an error
- int certLength = SSLSocket.getInfoI(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN);
- byte[] clientCert = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT);
+ int certLength = SSLSocket.getInfoI(socketRef, SSL.SSL_INFO_CLIENT_CERT_CHAIN);
+ byte[] clientCert = SSLSocket.getInfoB(socketRef, SSL.SSL_INFO_CLIENT_CERT);
X509Certificate[] certs = null;
if (clientCert != null && certLength > -1) {
certs = new X509Certificate[certLength + 1];
CertificateFactory cf = CertificateFactory.getInstance("X.509");
certs[0] = (X509Certificate) cf.generateCertificate(new ByteArrayInputStream(clientCert));
for (int i = 0; i < certLength; i++) {
- byte[] data = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN + i);
+ byte[] data = SSLSocket.getInfoB(socketRef, SSL.SSL_INFO_CLIENT_CERT_CHAIN + i);
certs[i+1] = (X509Certificate) cf.generateCertificate(new ByteArrayInputStream(data));
}
}
request.setAttribute(AbstractEndpoint.CERTIFICATE_KEY, certs);
}
// User key size
- sslO = Integer.valueOf(SSLSocket.getInfoI(socket,
+ sslO = Integer.valueOf(SSLSocket.getInfoI(socketRef,
SSL.SSL_INFO_CIPHER_USEKEYSIZE));
request.setAttribute(AbstractEndpoint.KEY_SIZE_KEY, sslO);
// SSL session ID
- sslO = SSLSocket.getInfoS(socket, SSL.SSL_INFO_SESSION_ID);
+ sslO = SSLSocket.getInfoS(socketRef, SSL.SSL_INFO_SESSION_ID);
if (sslO != null) {
request.setAttribute(AbstractEndpoint.SESSION_ID_KEY, sslO);
}
} else if (actionCode == ActionCode.REQ_SSL_CERTIFICATE) {
- if (ssl && (socket != 0)) {
+ if (ssl && (socketRef != 0)) {
// Consume and buffer the request body, so that it does not
// interfere with the client's handshake messages
InputFilter[] inputFilters = inputBuffer.getFilters();
inputBuffer.addActiveFilter(inputFilters[Constants.BUFFERED_FILTER]);
try {
// Configure connection to require a certificate
- SSLSocket.setVerify(socket, SSL.SSL_CVERIFY_REQUIRE,
+ SSLSocket.setVerify(socketRef, SSL.SSL_CVERIFY_REQUIRE,
endpoint.getSSLVerifyDepth());
// Renegotiate certificates
- if (SSLSocket.renegotiate(socket) == 0) {
+ if (SSLSocket.renegotiate(socketRef) == 0) {
// Don't look for certs unless we know renegotiation worked.
// Get client certificate and the certificate chain if present
// certLength == -1 indicates an error
- int certLength = SSLSocket.getInfoI(socket,SSL.SSL_INFO_CLIENT_CERT_CHAIN);
- byte[] clientCert = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT);
+ int certLength = SSLSocket.getInfoI(socketRef,SSL.SSL_INFO_CLIENT_CERT_CHAIN);
+ byte[] clientCert = SSLSocket.getInfoB(socketRef, SSL.SSL_INFO_CLIENT_CERT);
X509Certificate[] certs = null;
if (clientCert != null && certLength > -1) {
certs = new X509Certificate[certLength + 1];
CertificateFactory cf = CertificateFactory.getInstance("X.509");
certs[0] = (X509Certificate) cf.generateCertificate(new ByteArrayInputStream(clientCert));
for (int i = 0; i < certLength; i++) {
- byte[] data = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN + i);
+ byte[] data = SSLSocket.getInfoB(socketRef, SSL.SSL_INFO_CLIENT_CERT_CHAIN + i);
certs[i+1] = (X509Certificate) cf.generateCertificate(new ByteArrayInputStream(data));
}
}
} else if (actionCode == ActionCode.COMET_SETTIMEOUT) {
//no op
} else if (actionCode == ActionCode.ASYNC_COMPLETE) {
- //TODO SERVLET3 - async - that is bit hacky -
- AtomicBoolean dispatch = (AtomicBoolean)param;
- RequestInfo rp = request.getRequestProcessor();
- if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) { //async handling
- dispatch.set(true);
- endpoint.getHandler().asyncDispatch(this.socket, SocketStatus.STOP);
- } else {
- dispatch.set(false);
+ if (asyncComplete()) {
+ endpoint.processSocketAsync(this.socket, SocketStatus.OPEN);
}
} else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) {
- //TODO SERVLET3 - async
if (param==null) return;
- if (socket==0) return;
long timeout = ((Long)param).longValue();
- Socket.timeoutSet(socket, timeout * 1000);
+ socket.setTimeout(timeout);
} else if (actionCode == ActionCode.ASYNC_DISPATCH) {
- RequestInfo rp = request.getRequestProcessor();
- AtomicBoolean dispatch = (AtomicBoolean)param;
- if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) {//async handling
- endpoint.getPoller().add(this.socket);
- dispatch.set(true);
- } else {
- dispatch.set(true);
+ if (asyncDispatch()) {
+ endpoint.processSocketAsync(this.socket, SocketStatus.OPEN);
}
}
package org.apache.coyote.http11;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.tomcat.util.net.AprEndpoint;
import org.apache.tomcat.util.net.AprEndpoint.Handler;
import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.SocketWrapper;
import org.apache.tomcat.util.res.StringManager;
protected AtomicLong registerCount = new AtomicLong(0);
protected RequestGroupInfo global = new RequestGroupInfo();
- protected ConcurrentHashMap<Long, Http11AprProcessor> connections =
- new ConcurrentHashMap<Long, Http11AprProcessor>();
+ protected ConcurrentHashMap<SocketWrapper<Long>, Http11AprProcessor> connections =
+ new ConcurrentHashMap<SocketWrapper<Long>, Http11AprProcessor>();
protected ConcurrentLinkedQueue<Http11AprProcessor> recycledProcessors =
new ConcurrentLinkedQueue<Http11AprProcessor>() {
private static final long serialVersionUID = 1L;
}
@Override
- public SocketState event(long socket, SocketStatus status) {
- Http11AprProcessor result = connections.get(Long.valueOf(socket));
+ public SocketState event(SocketWrapper<Long> socket, SocketStatus status) {
+ Http11AprProcessor result = connections.get(socket);
SocketState state = SocketState.CLOSED;
if (result != null) {
"http11protocol.proto.error"), e);
} finally {
if (state != SocketState.LONG) {
- connections.remove(Long.valueOf(socket));
+ connections.remove(socket);
recycledProcessors.offer(result);
if (state == SocketState.OPEN) {
- ((AprEndpoint)proto.endpoint).getPoller().add(socket);
+ ((AprEndpoint)proto.endpoint).getPoller().add(socket.getSocket().longValue());
}
} else {
- ((AprEndpoint)proto.endpoint).getCometPoller().add(socket);
+ ((AprEndpoint)proto.endpoint).getCometPoller().add(socket.getSocket().longValue());
}
}
- } else if (result.async) {
+ } else if (result.isAsync()) {
state = asyncDispatch(socket, status);
}
}
}
@Override
- public SocketState process(long socket) {
+ public SocketState process(SocketWrapper<Long> socket) {
Http11AprProcessor processor = recycledProcessors.poll();
try {
if (processor == null) {
SocketState state = processor.process(socket);
if (state == SocketState.LONG) {
- // Associate the connection with the processor. The next request
- // processed by this thread will use either a new or a recycled
- // processor.
- connections.put(Long.valueOf(socket), processor);
- ((AprEndpoint)proto.endpoint).getCometPoller().add(socket);
+ // Check if the post processing is going to change the state
+ state = processor.asyncPostProcess();
+ }
+ if (state == SocketState.LONG || state == SocketState.ASYNC_END) {
+ // Need to make socket available for next processing cycle
+ // but no need for the poller
+ connections.put(socket, processor);
} else {
recycledProcessors.offer(processor);
}
}
@Override
- public SocketState asyncDispatch(long socket, SocketStatus status) {
- Http11AprProcessor result = connections.get(Long.valueOf(socket));
+ public SocketState asyncDispatch(SocketWrapper<Long> socket, SocketStatus status) {
+ Http11AprProcessor result = connections.get(socket);
SocketState state = SocketState.CLOSED;
if (result != null) {
Http11AprProtocol.log.error
(sm.getString("http11protocol.proto.error"), e);
} finally {
- if (state != SocketState.LONG) {
- connections.remove(Long.valueOf(socket));
+ if (state == SocketState.LONG && result.isAsync()) {
+ state = result.asyncPostProcess();
+ }
+ if (state != SocketState.LONG && state != SocketState.ASYNC_END) {
+ connections.remove(socket);
recycledProcessors.offer(result);
if (state == SocketState.OPEN) {
- ((AprEndpoint)proto.endpoint).getPoller().add(socket);
+ ((AprEndpoint)proto.endpoint).getPoller().add(socket.getSocket().longValue());
}
}
}
synchronized (this) {
try {
long count = registerCount.incrementAndGet();
- RequestInfo rp = processor.getRequest().getRequestProcessor();
+ final RequestInfo rp = processor.getRequest().getRequestProcessor();
rp.setGlobalProcessor(global);
- ObjectName rpName = new ObjectName
+ final ObjectName rpName = new ObjectName
(proto.getDomain() + ":type=RequestProcessor,worker="
+ proto.getName() + ",name=HttpRequest" + count);
if (log.isDebugEnabled()) {
log.debug("Register " + rpName);
}
- Registry.getRegistry(null, null).registerComponent(rp, rpName, null);
+ if (Constants.IS_SECURITY_ENABLED) {
+ AccessController.doPrivileged(new PrivilegedAction<Void>() {
+ @Override
+ public Void run() {
+ try {
+ Registry.getRegistry(null, null).registerComponent(rp, rpName, null);
+ } catch (Exception e) {
+ log.warn("Error registering request");
+ }
+ return null;
+ }
+ });
+ } else {
+ Registry.getRegistry(null, null).registerComponent(rp, rpName, null);
+ }
rp.setRpName(rpName);
} catch (Exception e) {
log.warn("Error registering request");
import java.nio.channels.SelectionKey;
import java.util.Locale;
import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.coyote.ActionCode;
import org.apache.coyote.ActionHook;
Integer comettimeout = (Integer) request.getAttribute("org.apache.tomcat.comet.timeout");
if (comettimeout != null) attach.setTimeout(comettimeout.longValue());
} else {
- //reset the timeout
- if (keepAlive && keepAliveTimeout>0) {
- attach.setTimeout(keepAliveTimeout);
- } else {
- attach.setTimeout(soTimeout);
+ if (isAsyncDispatching()) {
+ //reset the timeout
+ if (keepAlive && keepAliveTimeout>0) {
+ attach.setTimeout(keepAliveTimeout);
+ } else {
+ attach.setTimeout(soTimeout);
+ }
}
}
if (error) {
recycle();
return SocketState.CLOSED;
- } else if (!comet) {
+ } else if (!comet && !isAsync()) {
recycle();
return (keepAlive)?SocketState.OPEN:SocketState.CLOSED;
} else {
error = false;
keepAlive = true;
comet = false;
- async = false;
long soTimeout = endpoint.getSoTimeout();
int keepAliveTimeout = endpoint.getKeepAliveTimeout();
boolean recycle = true;
final KeyAttachment ka = (KeyAttachment)socket.getAttachment(false);
- while (!error && keepAlive && !comet && !async && !endpoint.isPaused()) {
+ while (!error && keepAlive && !comet && !isAsync() && !endpoint.isPaused()) {
//always default to our soTimeout
ka.setTimeout(soTimeout);
// Parsing the request header
}
// Finish the handling of the request
- if (!comet && !async) {
+ if (!comet && !isAsync()) {
// If we know we are closing the connection, don't drain input.
// This way uploading a 100GB file doesn't tie up the thread
// if the servlet has rejected it.
}
request.updateCounters();
- if (!comet && !async) {
+ if (!comet && !isAsync()) {
// Next request
inputBuffer.nextRequest();
outputBuffer.nextRequest();
if (error || endpoint.isPaused()) {
recycle();
return SocketState.CLOSED;
- } else if (comet || async) {
+ } else if (comet || isAsync()) {
return SocketState.LONG;
} else {
if (recycle) {
if (actionCode == ActionCode.CLOSE) {
// Close
-
// End the processing of the current request, and stop any further
// transactions with the client
comet = false;
cometClose = true;
- async = false;
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
if ( key != null ) {
NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) //async handling
attach.setTimeout(timeout);
} else if (actionCode == ActionCode.ASYNC_COMPLETE) {
- //TODO SERVLET3 - async
- AtomicBoolean dispatch = (AtomicBoolean)param;
- RequestInfo rp = request.getRequestProcessor();
- if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) { //async handling
- dispatch.set(true);
- endpoint.processSocket(this.socket, SocketStatus.STOP, true);
- } else {
- dispatch.set(false);
+ if (asyncComplete()) {
+ endpoint.processSocket(this.socket, SocketStatus.OPEN, true);
}
} else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) {
- //TODO SERVLET3 - async
if (param==null) return;
if (socket==null || socket.getAttachment(false)==null) return;
NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
//if we are not piggy backing on a worker thread, set the timeout
attach.setTimeout(timeout);
} else if (actionCode == ActionCode.ASYNC_DISPATCH) {
- RequestInfo rp = request.getRequestProcessor();
- AtomicBoolean dispatch = (AtomicBoolean)param;
- if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) {//async handling
+ if (asyncDispatch()) {
endpoint.processSocket(this.socket, SocketStatus.OPEN, true);
- dispatch.set(true);
- } else {
- dispatch.set(true);
}
}
}
package org.apache.coyote.http11;
import java.nio.channels.SocketChannel;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@Override
public SocketState event(NioChannel socket, SocketStatus status) {
- Http11NioProcessor result = connections.get(socket);
+ Http11NioProcessor processor = connections.get(socket);
NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
att.setAsync(false); //no longer check for timeout
SocketState state = SocketState.CLOSED;
- if (result != null) {
- if (log.isDebugEnabled()) log.debug("Http11NioProcessor.error="+result.error);
+ if (processor != null) {
+ if (log.isDebugEnabled()) log.debug("Http11NioProcessor.error="+processor.error);
// Call the appropriate event
try {
- if (result.async) {
- state = result.asyncDispatch(status);
+ if (processor.comet) {
+ state = processor.event(status);
} else {
- state = result.event(status);
+ state = processor.asyncDispatch(status);
}
} catch (java.net.SocketException e) {
// SocketExceptions are normal
Http11NioProtocol.log.error
(sm.getString("http11protocol.proto.error"), e);
} finally {
- if (state != SocketState.LONG) {
+ if (processor.isAsync()) {
+ state = processor.asyncPostProcess();
+ }
+ if (state != SocketState.LONG && state != SocketState.ASYNC_END) {
connections.remove(socket);
- recycledProcessors.offer(result);
+ recycledProcessors.offer(processor);
if (state == SocketState.OPEN) {
socket.getPoller().add(socket);
}
+ } else if (state == SocketState.ASYNC_END) {
+ // No further work required
+ } else if (state == SocketState.LONG) {
+ att.setAsync(true); // Re-enable timeouts
} else {
- if (log.isDebugEnabled()) log.debug("Keeping processor["+result);
+ if (log.isDebugEnabled()) log.debug("Keeping processor["+processor);
//add correct poller events here based on Comet stuff
socket.getPoller().add(socket,att.getCometOps());
}
if (processor.comet) {
NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
socket.getPoller().add(socket,att.getCometOps());
- } else if (processor.async) {
+ } else if (processor.isAsync()) {
NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
att.setAsync(true);
+ // longPoll may change socket state (e.g. to trigger a
+ // complete or dispatch)
+ state = processor.asyncPostProcess();
} else {
socket.getPoller().add(socket);
}
+ }
+ if (state == SocketState.LONG || state == SocketState.ASYNC_END) {
+ // Already done all we need to do.
} else if (state == SocketState.OPEN){
// In keep-alive but between requests. OK to recycle
// processor. Continue to poll for the next request.
try {
registerCount.addAndGet(1);
if (log.isDebugEnabled()) log.debug("Register ["+processor+"] count="+registerCount.get());
- RequestInfo rp = processor.getRequest().getRequestProcessor();
+ final RequestInfo rp = processor.getRequest().getRequestProcessor();
rp.setGlobalProcessor(global);
- ObjectName rpName = new ObjectName
+ final ObjectName rpName = new ObjectName
(proto.getDomain() + ":type=RequestProcessor,worker="
+ proto.getName() + ",name=HttpRequest" + count++);
- Registry.getRegistry(null, null).registerComponent(rp, rpName, null);
+ if (Constants.IS_SECURITY_ENABLED) {
+ AccessController.doPrivileged(new PrivilegedAction<Void>() {
+ @Override
+ public Void run() {
+ try {
+ Registry.getRegistry(null, null).registerComponent(rp, rpName, null);
+ } catch (Exception e) {
+ log.warn("Error registering request");
+ }
+ return null;
+ }
+ });
+ } else {
+ Registry.getRegistry(null, null).registerComponent(rp, rpName, null);
+ }
rp.setRpName(rpName);
} catch (Exception e) {
log.warn("Error registering request");
import java.net.Socket;
import java.util.Locale;
import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.coyote.ActionCode;
import org.apache.coyote.ActionHook;
// This way uploading a 100GB file doesn't tie up the thread
// if the servlet has rejected it.
- if(error && !async)
+ if(error && !isAsync())
inputBuffer.setSwallowInput(false);
- if (!async)
+ if (!isAsync())
endRequest();
} catch (Throwable t) {
log.error(sm.getString("http11processor.request.finish"), t);
// will reset it
// thrA.setParam(null);
// Next request
- if (!async || error) {
+ if (!isAsync() || error) {
inputBuffer.nextRequest();
outputBuffer.nextRequest();
}
if (error || endpoint.isPaused()) {
recycle();
return SocketState.CLOSED;
- } else if (async) {
+ } else if (isAsync()) {
return SocketState.LONG;
} else {
if (!keepAlive) {
if (error) {
recycle();
return SocketState.CLOSED;
- } else if (async) {
+ } else if (isAsync()) {
return SocketState.LONG;
} else {
recycle();
protected void recycleInternal() {
// Recycle
this.socket = null;
- async = false;
// Recycle ssl info
sslSupport = null;
}
if (actionCode == ActionCode.CLOSE) {
// Close
- async = false;
// End the processing of the current request, and stop any further
// transactions with the client
}
}
} else if (actionCode == ActionCode.ASYNC_COMPLETE) {
- //TODO SERVLET3 - async
- AtomicBoolean dispatch = (AtomicBoolean)param;
- RequestInfo rp = request.getRequestProcessor();
- if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) { //async handling
- dispatch.set(true);
+ if (asyncComplete()) {
endpoint.processSocketAsync(this.socket, SocketStatus.OPEN);
- } else {
- dispatch.set(false);
}
} else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) {
- //TODO SERVLET3 - async
- if (param==null) return;
+ if (param == null) return;
long timeout = ((Long)param).longValue();
- //if we are not piggy backing on a worker thread, set the timeout
+ // if we are not piggy backing on a worker thread, set the timeout
socket.setTimeout(timeout);
} else if (actionCode == ActionCode.ASYNC_DISPATCH) {
- RequestInfo rp = request.getRequestProcessor();
- AtomicBoolean dispatch = (AtomicBoolean)param;
- if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) {//async handling
+ if (asyncDispatch()) {
endpoint.processSocketAsync(this.socket, SocketStatus.OPEN);
- dispatch.set(true);
- } else {
- dispatch.set(true);
}
}
-
-
}
package org.apache.coyote.http11;
+import java.io.IOException;
import java.net.Socket;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
if (state == SocketState.LONG) {
connections.put(socket, processor);
socket.setAsync(true);
+ // longPoll may change socket state (e.g. to trigger a
+ // complete or dispatch)
+ return processor.asyncPostProcess();
} else {
connections.remove(socket);
socket.setAsync(false);
synchronized (this) {
try {
long count = registerCount.incrementAndGet();
- RequestInfo rp = processor.getRequest().getRequestProcessor();
+ final RequestInfo rp = processor.getRequest().getRequestProcessor();
rp.setGlobalProcessor(global);
- ObjectName rpName = new ObjectName
+ final ObjectName rpName = new ObjectName
(proto.getDomain() + ":type=RequestProcessor,worker="
+ proto.getName() + ",name=HttpRequest" + count);
if (log.isDebugEnabled()) {
log.debug("Register " + rpName);
}
- Registry.getRegistry(null, null).registerComponent(rp, rpName, null);
+ if (Constants.IS_SECURITY_ENABLED) {
+ AccessController.doPrivileged(new PrivilegedAction<Void>() {
+ @Override
+ public Void run() {
+ try {
+ Registry.getRegistry(null, null).registerComponent(rp, rpName, null);
+ } catch (Exception e) {
+ log.warn("Error registering request");
+ }
+ return null;
+ }
+ });
+ } else {
+ Registry.getRegistry(null, null).registerComponent(rp, rpName, null);
+ }
rp.setRpName(rpName);
} catch (Exception e) {
log.warn("Error registering request");
# See the License for the specific language governing permissions and
# limitations under the License.
+abstractHttp11Protocol.invalidAsyncState=Calling [{0}] is not valid for a request with Async state [{1}]
+
http11protocol.destroy=Destroying Coyote HTTP/1.1 on {0}
http11protocol.endpoint.initerror=Error initializing endpoint
http11protocol.endpoint.starterror=Error starting endpoint
*/
public static interface Handler {
public enum SocketState {
- OPEN, CLOSED, LONG
+ OPEN, CLOSED, LONG, ASYNC_END
}
}
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.RejectedExecutionException;
import org.apache.juli.logging.Log;
private Acceptor acceptors[] = null;
+ protected ConcurrentLinkedQueue<SocketWrapper<Long>> waitingRequests =
+ new ConcurrentLinkedQueue<SocketWrapper<Long>>();
+
// ------------------------------------------------------------- Properties
*/
protected boolean deferAccept = true;
public void setDeferAccept(boolean deferAccept) { this.deferAccept = deferAccept; }
+ @Override
public boolean getDeferAccept() { return deferAccept; }
*/
protected boolean useSendfile = Library.APR_HAS_SENDFILE;
public void setUseSendfile(boolean useSendfile) { this.useSendfile = useSendfile; }
+ @Override
public boolean getUseSendfile() { return useSendfile; }
acceptors[i].start();
}
+ // Start async timeout thread
+ Thread timeoutThread = new Thread(new AsyncTimeout(),
+ getName() + "-AsyncTimeout");
+ timeoutThread.setPriority(threadPriority);
+ timeoutThread.setDaemon(true);
+ timeoutThread.start();
}
}
/**
* Stop the endpoint. This will cause all processing threads to stop.
*/
+ @Override
public void stop() {
if (!paused) {
pause();
try {
// During shutdown, executor may be null - avoid NPE
if (running) {
- getExecutor().execute(new SocketWithOptionsProcessor(socket));
+ SocketWrapper<Long> wrapper =
+ new SocketWrapper<Long>(Long.valueOf(socket));
+ getExecutor().execute(new SocketWithOptionsProcessor(wrapper));
}
} catch (RejectedExecutionException x) {
log.warn("Socket processing request was rejected for:"+socket,x);
*/
protected boolean processSocket(long socket) {
try {
- getExecutor().execute(new SocketProcessor(socket));
+ SocketWrapper<Long> wrapper =
+ new SocketWrapper<Long>(Long.valueOf(socket));
+ getExecutor().execute(new SocketProcessor(wrapper, null));
} catch (RejectedExecutionException x) {
log.warn("Socket processing request was rejected for:"+socket,x);
return false;
try {
if (status == SocketStatus.OPEN || status == SocketStatus.STOP ||
status == SocketStatus.TIMEOUT) {
+ SocketWrapper<Long> wrapper =
+ new SocketWrapper<Long>(Long.valueOf(socket));
SocketEventProcessor proc =
- new SocketEventProcessor(socket, status);
+ new SocketEventProcessor(wrapper, status);
ClassLoader loader = Thread.currentThread().getContextClassLoader();
try {
if (IS_SECURITY_ENABLED) {
return true;
}
+ public boolean processSocketAsync(SocketWrapper<Long> socket,
+ SocketStatus status) {
+ try {
+ synchronized (socket) {
+ if (waitingRequests.remove(socket)) {
+ SocketProcessor proc = new SocketProcessor(socket, status);
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ try {
+ if (IS_SECURITY_ENABLED) {
+ PrivilegedAction<Void> pa = new PrivilegedSetTccl(
+ getClass().getClassLoader());
+ AccessController.doPrivileged(pa);
+ } else {
+ Thread.currentThread().setContextClassLoader(
+ getClass().getClassLoader());
+ }
+ getExecutor().execute(proc);
+ } finally {
+ if (IS_SECURITY_ENABLED) {
+ PrivilegedAction<Void> pa = new PrivilegedSetTccl(loader);
+ AccessController.doPrivileged(pa);
+ } else {
+ Thread.currentThread().setContextClassLoader(loader);
+ }
+ }
+ }
+ }
+ } catch (RejectedExecutionException x) {
+ log.warn("Socket processing request was rejected for:"+socket,x);
+ return false;
+ } catch (Throwable t) {
+ // This means we got an OOM or similar creating a thread, or that
+ // the pool and its queue are full
+ log.error(sm.getString("endpoint.process.fail"), t);
+ return false;
+ }
+ return true;
+ }
+
private void destroySocket(long socket)
{
if (running && socket != 0) {
}
- // ----------------------------------------------------- Poller Inner Class
-
+ /**
+ * Async timeout thread
+ */
+ protected class AsyncTimeout implements Runnable {
+ /**
+ * The background thread that checks async requests and fires the
+ * timeout if there has been no activity.
+ */
+ @Override
+ public void run() {
+ // Loop until we receive a shutdown command
+ while (running) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ long now = System.currentTimeMillis();
+ Iterator<SocketWrapper<Long>> sockets =
+ waitingRequests.iterator();
+ while (sockets.hasNext()) {
+ SocketWrapper<Long> socket = sockets.next();
+ long access = socket.getLastAccess();
+ if ((now-access)>socket.getTimeout()) {
+ processSocketAsync(socket,SocketStatus.TIMEOUT);
+ }
+ }
+
+ // Loop if endpoint is paused
+ while (paused && running) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+
+ }
+ }
+ }
+
+
+ // ----------------------------------------------------- Poller Inner Class
/**
* Poller class.
*/
/**
* Destroy the poller.
*/
+ @Override
public void destroy() {
// Close all sockets in the add queue
for (int i = 0; i < addCount; i++) {
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
*/
+ @Override
public void run() {
long maintainTime = 0;
/**
* Destroy the poller.
*/
+ @Override
public void destroy() {
// Close any socket remaining in the add queue
addCount = 0;
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
*/
+ @Override
public void run() {
long maintainTime = 0;
* thread local fields.
*/
public interface Handler extends AbstractEndpoint.Handler {
- public SocketState process(long socket);
- public SocketState event(long socket, SocketStatus status);
- public SocketState asyncDispatch(long socket, SocketStatus status);
+ public SocketState process(SocketWrapper<Long> socket);
+ public SocketState event(SocketWrapper<Long> socket,
+ SocketStatus status);
+ public SocketState asyncDispatch(SocketWrapper<Long> socket,
+ SocketStatus status);
}
// ---------------------------------------------- SocketProcessor Inner Class
-
/**
* This class is the equivalent of the Worker, but will simply use in an
* external Executor thread pool. This will also set the socket options
*/
protected class SocketWithOptionsProcessor implements Runnable {
- protected long socket = 0;
+ protected SocketWrapper<Long> socket = null;
- public SocketWithOptionsProcessor(long socket) {
+ public SocketWithOptionsProcessor(SocketWrapper<Long> socket) {
this.socket = socket;
}
+ @Override
public void run() {
- if (!deferAccept) {
- if (setSocketOptions(socket)) {
- getPoller().add(socket);
+ synchronized (socket) {
+ if (!deferAccept) {
+ if (setSocketOptions(socket.getSocket().longValue())) {
+ getPoller().add(socket.getSocket().longValue());
+ } else {
+ // Close socket and pool
+ destroySocket(socket.getSocket().longValue());
+ socket = null;
+ }
} else {
- // Close socket and pool
- destroySocket(socket);
- socket = 0;
- }
- } else {
- // Process the request from this socket
- if (!setSocketOptions(socket)
- || handler.process(socket) == Handler.SocketState.CLOSED) {
- // Close socket and pool
- destroySocket(socket);
- socket = 0;
+ // Process the request from this socket
+ if (!setSocketOptions(socket.getSocket().longValue())
+ || handler.process(socket) == Handler.SocketState.CLOSED) {
+ // Close socket and pool
+ destroySocket(socket.getSocket().longValue());
+ socket = null;
+ }
}
}
*/
protected class SocketProcessor implements Runnable {
- protected long socket = 0;
- protected boolean async = false;
- protected SocketStatus status = SocketStatus.ERROR;
+ protected SocketWrapper<Long> socket = null;
+ protected SocketStatus status = null;
- public SocketProcessor(long socket) {
+ public SocketProcessor(SocketWrapper<Long> socket,
+ SocketStatus status) {
this.socket = socket;
- this.async = false;
+ this.status = status;
}
+ @Override
public void run() {
-
- // Process the request from this socket
- Handler.SocketState state = async?handler.asyncDispatch(socket, status):handler.process(socket);
- if (state == Handler.SocketState.CLOSED) {
- // Close socket and pool
- destroySocket(socket);
- socket = 0;
+ synchronized (socket) {
+ // Process the request from this socket
+ Handler.SocketState state = (status==null)?handler.process(socket):handler.asyncDispatch(socket, status);
+ if (state == Handler.SocketState.CLOSED) {
+ // Close socket and pool
+ destroySocket(socket.getSocket().longValue());
+ socket = null;
+ } else if (state == Handler.SocketState.LONG) {
+ socket.access();
+ waitingRequests.add(socket);
+ } else if (state == Handler.SocketState.ASYNC_END) {
+ socket.access();
+ SocketProcessor proc = new SocketProcessor(socket, SocketStatus.OPEN);
+ getExecutor().execute(proc);
+ }
}
-
}
-
}
*/
protected class SocketEventProcessor implements Runnable {
- protected long socket = 0;
+ protected SocketWrapper<Long> socket = null;
protected SocketStatus status = null;
- public SocketEventProcessor(long socket, SocketStatus status) {
+ public SocketEventProcessor(SocketWrapper<Long> socket,
+ SocketStatus status) {
this.socket = socket;
this.status = status;
}
+ @Override
public void run() {
-
- // Process the request from this socket
- if (handler.event(socket, status) == Handler.SocketState.CLOSED) {
- // Close socket and pool
- destroySocket(socket);
- socket = 0;
+ synchronized (socket) {
+ // Process the request from this socket
+ Handler.SocketState state = handler.event(socket, status);
+ if (state == Handler.SocketState.CLOSED) {
+ // Close socket and pool
+ destroySocket(socket.getSocket().longValue());
+ socket = null;
+ }
}
-
}
-
}
private static class PrivilegedSetTccl implements PrivilegedAction<Void> {
this.cl = cl;
}
+ @Override
public Void run() {
Thread.currentThread().setContextClassLoader(cl);
return null;
}
- // --------------------------------------------------- Acceptor Inner Class
-
/**
* Async timeout thread
*/
}
}
}
+
+
+ // --------------------------------------------------- Acceptor Inner Class
/**
* Server socket acceptor thread.
*/
// Configure the socket
if (setSocketOptions(socket)) {
- // Hand this socket off to an appropriate processor
- if (!processSocket(socket)) {
- // Close socket right away
- try {
- socket.close();
- } catch (IOException e) {
- // Ignore
- }
+ // Hand this socket off to an appropriate processor
+ if (!processSocket(socket)) {
+ // Close socket right away
+ try {
+ socket.close();
+ } catch (IOException e) {
+ // Ignore
}
+ }
} else {
// Close socket right away
try {
@Override
public void run() {
boolean launch = false;
- try {
-
- if (!socket.processing.compareAndSet(false, true)) {
- log.error("Unable to process socket. Invalid state.");
- return;
- }
-
- SocketState state = SocketState.OPEN;
-
+ synchronized (socket) {
try {
- // SSL handshake
- serverSocketFactory.handshake(socket.getSocket());
- } catch (Throwable t) {
- if (log.isDebugEnabled()) {
- log.debug(sm.getString("endpoint.err.handshake"), t);
- }
- // Tell to close the socket
- state = SocketState.CLOSED;
- }
+ SocketState state = SocketState.OPEN;
- if ( (state != SocketState.CLOSED) ) {
- state = (status==null)?handler.process(socket):handler.process(socket,status);
- }
- if (state == SocketState.CLOSED) {
- // Close socket
- if (log.isTraceEnabled()) {
- log.trace("Closing socket:"+socket);
- }
try {
- socket.getSocket().close();
- } catch (IOException e) {
- // Ignore
+ // SSL handshake
+ serverSocketFactory.handshake(socket.getSocket());
+ } catch (Throwable t) {
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("endpoint.err.handshake"), t);
+ }
+ // Tell to close the socket
+ state = SocketState.CLOSED;
+ }
+
+ if ( (state != SocketState.CLOSED) ) {
+ state = (status==null)?handler.process(socket):handler.process(socket,status);
+ }
+ if (state == SocketState.CLOSED) {
+ // Close socket
+ if (log.isTraceEnabled()) {
+ log.trace("Closing socket:"+socket);
+ }
+ try {
+ socket.getSocket().close();
+ } catch (IOException e) {
+ // Ignore
+ }
+ } else if (state == SocketState.ASYNC_END ||
+ state == SocketState.OPEN){
+ socket.setKeptAlive(true);
+ socket.access();
+ launch = true;
+ } else if (state == SocketState.LONG) {
+ socket.access();
+ waitingRequests.add(socket);
+ }
+ } finally {
+ if (launch) {
+ try {
+ getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN));
+ } catch (NullPointerException npe) {
+ if (running) {
+ log.error(sm.getString("endpoint.launch.fail"),
+ npe);
+ }
+ }
}
- } else if (state == SocketState.OPEN){
- socket.setKeptAlive(true);
- socket.access();
- //keepalive connection
- //TODO - servlet3 check async status, we may just be in a hold pattern
- launch = true;
- } else if (state == SocketState.LONG) {
- socket.access();
- waitingRequests.add(socket);
}
- } finally {
- socket.processing.set(false);
- if (launch) getExecutor().execute(new SocketProcessor(socket));
- socket = null;
}
+ socket = null;
// Finish up this request
-
}
}
*/
protected boolean setSocketOptions(Socket socket) {
serverSocketFactory.initSocket(socket);
-
+
try {
// 1: Set socket options: timeout, linger, etc
socketProperties.setProperties(socket);
return true;
}
-
+
/**
* Process a new connection from a new client. Wraps the socket so
* keep-alive and other attributes can be tracked and then passes the socket
}
}
}
- }catch (SocketTimeoutException sx) {
+ } catch (SocketTimeoutException sx) {
//normal condition
- }catch ( IOException x ) {
- if ( running ) log.error(sm.getString("endpoint.accept.fail"), x);
+ } catch (IOException x) {
+ if (running) {
+ log.error(sm.getString("endpoint.accept.fail"), x);
+ }
} catch (OutOfMemoryError oom) {
try {
oomParachuteData = null;
this.socket = channel;
this.poller = poller;
lastAccess = System.currentTimeMillis();
- currentAccess = false;
comet = false;
timeout = soTimeout;
error = false;
@Override
public void run() {
- SelectionKey key = null;
- try {
- key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
- int handshake = -1;
-
+ boolean launch = false;
+ synchronized (socket) {
+ SelectionKey key = null;
try {
- if (key!=null) handshake = socket.handshake(key.isReadable(), key.isWritable());
- }catch ( IOException x ) {
- handshake = -1;
- if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x);
- }catch ( CancelledKeyException ckx ) {
- handshake = -1;
- }
- if ( handshake == 0 ) {
- SocketState state = SocketState.OPEN;
- // Process the request from this socket
- state = (status==null)?handler.process(socket):handler.event(socket,status);
-
- if (state == SocketState.CLOSED) {
- // Close socket and pool
+ key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
+ int handshake = -1;
+
+ try {
+ if (key!=null) handshake = socket.handshake(key.isReadable(), key.isWritable());
+ }catch ( IOException x ) {
+ handshake = -1;
+ if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x);
+ }catch ( CancelledKeyException ckx ) {
+ handshake = -1;
+ }
+ if ( handshake == 0 ) {
+ SocketState state = SocketState.OPEN;
+ // Process the request from this socket
+ state = (status==null)?handler.process(socket):handler.event(socket,status);
+
+ if (state == SocketState.CLOSED) {
+ // Close socket and pool
+ try {
+ KeyAttachment ka = null;
+ if (key!=null) {
+ ka = (KeyAttachment) key.attachment();
+ if (ka!=null) ka.setComet(false);
+ socket.getPoller().cancelledKey(key, SocketStatus.ERROR, false);
+ }
+ if (socket!=null) nioChannels.offer(socket);
+ socket = null;
+ if ( ka!=null ) keyCache.offer(ka);
+ ka = null;
+ }catch ( Exception x ) {
+ log.error("",x);
+ }
+ } else if (state == SocketState.ASYNC_END) {
+ launch = true;
+ }
+ } else if (handshake == -1 ) {
+ KeyAttachment ka = null;
+ if (key!=null) {
+ ka = (KeyAttachment) key.attachment();
+ socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT, false);
+ }
+ if (socket!=null) nioChannels.offer(socket);
+ socket = null;
+ if ( ka!=null ) keyCache.offer(ka);
+ ka = null;
+ } else {
+ final SelectionKey fk = key;
+ final int intops = handshake;
+ final KeyAttachment ka = (KeyAttachment)fk.attachment();
+ ka.getPoller().add(socket,intops);
+ }
+ }catch(CancelledKeyException cx) {
+ socket.getPoller().cancelledKey(key,null,false);
+ } catch (OutOfMemoryError oom) {
+ try {
+ oomParachuteData = null;
+ socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
+ releaseCaches();
+ log.error("", oom);
+ }catch ( Throwable oomt ) {
+ try {
+ System.err.println(oomParachuteMsg);
+ oomt.printStackTrace();
+ }catch (Throwable letsHopeWeDontGetHere){}
+ }
+ }catch ( Throwable t ) {
+ log.error("",t);
+ socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
+ } finally {
+ if (launch) {
try {
- KeyAttachment ka = null;
- if (key!=null) {
- ka = (KeyAttachment) key.attachment();
- if (ka!=null) ka.setComet(false);
- socket.getPoller().cancelledKey(key, SocketStatus.ERROR, false);
+ getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN));
+ } catch (NullPointerException npe) {
+ if (running) {
+ log.error(sm.getString("endpoint.launch.fail"),
+ npe);
}
- if (socket!=null) nioChannels.offer(socket);
- socket = null;
- if ( ka!=null ) keyCache.offer(ka);
- ka = null;
- }catch ( Exception x ) {
- log.error("",x);
}
- }
- } else if (handshake == -1 ) {
- KeyAttachment ka = null;
- if (key!=null) {
- ka = (KeyAttachment) key.attachment();
- socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT, false);
}
- if (socket!=null) nioChannels.offer(socket);
socket = null;
- if ( ka!=null ) keyCache.offer(ka);
- ka = null;
- } else {
- final SelectionKey fk = key;
- final int intops = handshake;
- final KeyAttachment ka = (KeyAttachment)fk.attachment();
- ka.getPoller().add(socket,intops);
+ status = null;
+ //return to cache
+ processorCache.offer(this);
}
- }catch(CancelledKeyException cx) {
- socket.getPoller().cancelledKey(key,null,false);
- } catch (OutOfMemoryError oom) {
- try {
- oomParachuteData = null;
- socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
- releaseCaches();
- log.error("", oom);
- }catch ( Throwable oomt ) {
- try {
- System.err.println(oomParachuteMsg);
- oomt.printStackTrace();
- }catch (Throwable letsHopeWeDontGetHere){}
- }
- }catch ( Throwable t ) {
- log.error("",t);
- socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
- } finally {
- socket = null;
- status = null;
- //return to cache
- processorCache.offer(this);
}
}
-
}
// ----------------------------------------------- SendfileData Inner Class
*/
package org.apache.tomcat.util.net;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-
public class SocketWrapper<E> {
protected volatile E socket;
protected volatile long lastAccess = -1;
- protected volatile boolean currentAccess = false;
protected long timeout = -1;
protected boolean error = false;
protected long lastRegistered = 0;
protected volatile int keepAliveLeft = 100;
protected boolean async = false;
protected boolean keptAlive = false;
- public AtomicBoolean processing = new AtomicBoolean(false);
public SocketWrapper(E socket) {
this.socket = socket;
public long getLastAccess() { return lastAccess; }
public void access() { access(System.currentTimeMillis()); }
public void access(long access) { lastAccess = access; }
- public boolean getCurrentAccess() { return currentAccess; }
- public void setCurrentAccess(boolean access) { currentAccess = access; }
public void setTimeout(long timeout) {this.timeout = timeout;}
public long getTimeout() {return this.timeout;}
public boolean getError() { return error; }
endpoint.init.listen=Socket listen failed: [{0}] {1}
endpoint.init.notavail=APR not available
endpoint.accept.fail=Socket accept failed
+endpoint.launch.fail=Failed to launch new runnable
endpoint.poll.limitedpollsize=Failed to create poller with specified size of {0}
endpoint.poll.initfail=Poller creation failed
endpoint.poll.fail=Critical poller failure (restarting poller): [{0}] {1}
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletException;
+import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
result = new StringBuilder();
result.append('1');
result.append(req.isAsyncStarted());
- req.startAsync();
+ req.startAsync().setTimeout(10000);
result.append('2');
result.append(req.isAsyncStarted());
throws ServletException, IOException {
AsyncContext actxt = req.startAsync();
+ actxt.setTimeout(3000);
resp.setContentType("text/plain");
resp.getWriter().print("OK");
actxt.complete();
}
}
- public void testTimeout() throws Exception {
+ public void testTimeoutListenerComplete() throws Exception {
+ doTestTimeout(true);
+ }
+
+ public void testTimeoutListenerNoComplete() throws Exception {
+ doTestTimeout(false);
+ }
+
+ private void doTestTimeout(boolean completeOnTimeout) throws Exception {
// Setup Tomcat instance
Tomcat tomcat = getTomcatInstance();
Context ctx = tomcat.addContext("", docBase.getAbsolutePath());
- TimeoutServlet timeout = new TimeoutServlet();
+ TimeoutServlet timeout = new TimeoutServlet(completeOnTimeout);
Wrapper wrapper = Tomcat.addServlet(ctx, "time", timeout);
wrapper.setAsyncSupported(true);
tomcat.start();
ByteChunk res = getUrl("http://localhost:" + getPort() + "/async");
- assertEquals("OK", res.toString());
+ StringBuilder expected = new StringBuilder();
+ expected.append("TimeoutServletGet-onTimeout-");
+ if (!completeOnTimeout) {
+ expected.append("onError-");
+ }
+ expected.append("onComplete-");
+ assertEquals(expected.toString(), res.toString());
}
private static class TimeoutServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
+ private boolean completeOnTimeout;
+
+ public TimeoutServlet(boolean completeOnTimeout) {
+ this.completeOnTimeout = completeOnTimeout;
+ }
+
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
if (req.isAsyncSupported()) {
- resp.getWriter().print("OK");
+ resp.getWriter().print("TimeoutServletGet-");
final AsyncContext ac = req.startAsync();
- ac.setTimeout(2000);
+ ac.setTimeout(3000);
- ac.addListener(new TimeoutListener());
+ ac.addListener(new TrackingListener(false, completeOnTimeout));
} else
resp.getWriter().print("FAIL: Async unsupported");
}
}
- private static class TimeoutListener implements AsyncListener {
+ public void testDispatchSingle() throws Exception {
+ doTestDispatch(1, false);
+ }
+
+ public void testDispatchDouble() throws Exception {
+ doTestDispatch(2, false);
+ }
+
+ public void testDispatchMultiple() throws Exception {
+ doTestDispatch(5, false);
+ }
+
+ public void testDispatchWithThreadSingle() throws Exception {
+ doTestDispatch(1, true);
+ }
+
+ public void testDispatchWithThreadDouble() throws Exception {
+ doTestDispatch(2, true);
+ }
+
+ public void testDispatchWithThreadMultiple() throws Exception {
+ doTestDispatch(5, true);
+ }
+
+ private void doTestDispatch(int iter, boolean useThread) throws Exception {
+ // Setup Tomcat instance
+ Tomcat tomcat = getTomcatInstance();
+
+ // Must have a real docBase - just use temp
+ File docBase = new File(System.getProperty("java.io.tmpdir"));
+
+ Context ctx = tomcat.addContext("", docBase.getAbsolutePath());
+
+ DispatchingServlet dispatch = new DispatchingServlet(false, false);
+ Wrapper wrapper = Tomcat.addServlet(ctx, "dispatch", dispatch);
+ wrapper.setAsyncSupported(true);
+ ctx.addServletMapping("/stage1", "dispatch");
+
+ NonAsyncServlet nonasync = new NonAsyncServlet();
+ Wrapper wrapper2 = Tomcat.addServlet(ctx, "nonasync", nonasync);
+ wrapper2.setAsyncSupported(true);
+ ctx.addServletMapping("/stage2", "nonasync");
+
+ tomcat.start();
+
+ StringBuilder url = new StringBuilder(48);
+ url.append("http://localhost:");
+ url.append(getPort());
+ url.append("/stage1?iter=");
+ url.append(iter);
+ if (useThread) {
+ url.append("&useThread=y");
+ }
+ ByteChunk res = getUrl(url.toString());
+
+ StringBuilder expected = new StringBuilder();
+ int loop = iter;
+ while (loop > 0) {
+ expected.append("DispatchingServletGet-");
+ loop--;
+ }
+ expected.append("NonAsyncServletGet-");
+ assertEquals(expected.toString(), res.toString());
+ }
+
+ private static class DispatchingServlet extends HttpServlet {
+
+ private static final long serialVersionUID = 1L;
+ private static final String ITER_PARAM = "iter";
+ private boolean addTrackingListener = false;
+ private boolean completeOnError = false;
+
+ public DispatchingServlet(boolean addTrackingListener,
+ boolean completeOnError) {
+ this.addTrackingListener = addTrackingListener;
+ this.completeOnError = completeOnError;
+ }
+
+ @Override
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+ throws ServletException, IOException {
+ resp.getWriter().write("DispatchingServletGet-");
+ resp.flushBuffer();
+ final int iter = Integer.parseInt(req.getParameter(ITER_PARAM)) - 1;
+ final AsyncContext ctxt = req.startAsync();
+ if (addTrackingListener) {
+ TrackingListener listener =
+ new TrackingListener(completeOnError, true);
+ ctxt.addListener(listener);
+ }
+ Runnable run = new Runnable() {
+ @Override
+ public void run() {
+ if (iter > 0) {
+ ctxt.dispatch("/stage1?" + ITER_PARAM + "=" + iter);
+ } else {
+ ctxt.dispatch("/stage2");
+ }
+ }
+ };
+ if ("y".equals(req.getParameter("useThread"))) {
+ new Thread(run).start();
+ } else {
+ run.run();
+ }
+ }
+ }
+
+ private static class NonAsyncServlet extends HttpServlet {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+ throws ServletException, IOException {
+ resp.getWriter().write("NonAsyncServletGet-");
+ resp.flushBuffer();
+ }
+ }
+
+ public void testListeners() throws Exception {
+ // Setup Tomcat instance
+ Tomcat tomcat = getTomcatInstance();
+
+ // Must have a real docBase - just use temp
+ File docBase = new File(System.getProperty("java.io.tmpdir"));
+
+ Context ctx = tomcat.addContext("", docBase.getAbsolutePath());
+
+ TrackingServlet tracking = new TrackingServlet();
+ Wrapper wrapper = Tomcat.addServlet(ctx, "tracking", tracking);
+ wrapper.setAsyncSupported(true);
+ ctx.addServletMapping("/stage1", "tracking");
+
+ TimeoutServlet timeout = new TimeoutServlet(true);
+ Wrapper wrapper2 = Tomcat.addServlet(ctx, "timeout", timeout);
+ wrapper2.setAsyncSupported(true);
+ ctx.addServletMapping("/stage2", "timeout");
+
+ tomcat.start();
+
+ StringBuilder url = new StringBuilder(48);
+ url.append("http://localhost:");
+ url.append(getPort());
+ url.append("/stage1");
+
+ ByteChunk res = getUrl(url.toString());
+
+ assertEquals(
+ "DispatchingServletGet-DispatchingServletGet-onStartAsync-" +
+ "TimeoutServletGet-onStartAsync-onTimeout-onComplete-",
+ res.toString());
+ }
+
+ private static class TrackingServlet extends HttpServlet {
+
+ private static final long serialVersionUID = 1L;
+
+ private static volatile boolean first = true;
+
+ @Override
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+ throws ServletException, IOException {
+ resp.getWriter().write("DispatchingServletGet-");
+ resp.flushBuffer();
+
+ final boolean first = TrackingServlet.first;
+ TrackingServlet.first = false;
+
+ final AsyncContext ctxt = req.startAsync();
+ TrackingListener listener = new TrackingListener(false, true);
+ ctxt.addListener(listener);
+ ctxt.setTimeout(3000);
+
+ Runnable run = new Runnable() {
+ @Override
+ public void run() {
+ if (first) {
+ ctxt.dispatch("/stage1");
+ } else {
+ ctxt.dispatch("/stage2");
+ }
+ }
+ };
+ if ("y".equals(req.getParameter("useThread"))) {
+ new Thread(run).start();
+ } else {
+ run.run();
+ }
+ }
+ }
+
+ private static class TrackingListener implements AsyncListener {
+
+ private boolean completeOnError;
+ private boolean completeOnTimeout;
+
+ public TrackingListener(boolean completeOnError,
+ boolean completeOnTimeout) {
+ this.completeOnError = completeOnError;
+ this.completeOnTimeout = completeOnTimeout;
+ }
@Override
public void onComplete(AsyncEvent event) throws IOException {
- // NO-OP
+ ServletResponse resp = event.getAsyncContext().getResponse();
+ resp.getWriter().write("onComplete-");
+ resp.flushBuffer();
}
@Override
public void onTimeout(AsyncEvent event) throws IOException {
- event.getAsyncContext().complete();
+ ServletResponse resp = event.getAsyncContext().getResponse();
+ resp.getWriter().write("onTimeout-");
+ resp.flushBuffer();
+ if (completeOnTimeout){
+ event.getAsyncContext().complete();
+ }
}
@Override
public void onError(AsyncEvent event) throws IOException {
- // NOOP
+ ServletResponse resp = event.getAsyncContext().getResponse();
+ resp.getWriter().write("onError-");
+ resp.flushBuffer();
+ if (completeOnError) {
+ event.getAsyncContext().complete();
+ }
}
@Override
public void onStartAsync(AsyncEvent event) throws IOException {
- // NOOP
+ ServletResponse resp = event.getAsyncContext().getResponse();
+ resp.getWriter().write("onStartAsync-");
+ resp.flushBuffer();
+ }
+ }
+
+ public void testDispatchErrorSingle() throws Exception {
+ doTestDispatchError(1, false, false);
+ }
+
+ public void testDispatchErrorDouble() throws Exception {
+ doTestDispatchError(2, false, false);
+ }
+
+ public void testDispatchErrorMultiple() throws Exception {
+ doTestDispatchError(5, false, false);
+ }
+
+ public void testDispatchErrorWithThreadSingle() throws Exception {
+ doTestDispatchError(1, true, false);
+ }
+
+ public void testDispatchErrorWithThreadDouble() throws Exception {
+ doTestDispatchError(2, true, false);
+ }
+
+ public void testDispatchErrorWithThreadMultiple() throws Exception {
+ doTestDispatchError(5, true, false);
+ }
+
+ public void testDispatchErrorSingleThenComplete() throws Exception {
+ doTestDispatchError(1, false, true);
+ }
+
+ public void testDispatchErrorDoubleThenComplete() throws Exception {
+ doTestDispatchError(2, false, true);
+ }
+
+ public void testDispatchErrorMultipleThenComplete() throws Exception {
+ doTestDispatchError(5, false, true);
+ }
+
+ public void testDispatchErrorWithThreadSingleThenComplete()
+ throws Exception {
+ doTestDispatchError(1, true, true);
+ }
+
+ public void testDispatchErrorWithThreadDoubleThenComplete()
+ throws Exception {
+ doTestDispatchError(2, true, true);
+ }
+
+ public void testDispatchErrorWithThreadMultipleThenComplete()
+ throws Exception {
+ doTestDispatchError(5, true, true);
+ }
+
+ private void doTestDispatchError(int iter, boolean useThread,
+ boolean completeOnError)
+ throws Exception {
+ // Setup Tomcat instance
+ Tomcat tomcat = getTomcatInstance();
+
+ // Must have a real docBase - just use temp
+ File docBase = new File(System.getProperty("java.io.tmpdir"));
+
+ Context ctx = tomcat.addContext("", docBase.getAbsolutePath());
+
+ DispatchingServlet dispatch =
+ new DispatchingServlet(true, completeOnError);
+ Wrapper wrapper = Tomcat.addServlet(ctx, "dispatch", dispatch);
+ wrapper.setAsyncSupported(true);
+ ctx.addServletMapping("/stage1", "dispatch");
+
+ ErrorServlet error = new ErrorServlet();
+ Tomcat.addServlet(ctx, "error", error);
+ ctx.addServletMapping("/stage2", "error");
+
+ tomcat.start();
+
+ StringBuilder url = new StringBuilder(48);
+ url.append("http://localhost:");
+ url.append(getPort());
+ url.append("/stage1?iter=");
+ url.append(iter);
+ if (useThread) {
+ url.append("&useThread=y");
}
+ ByteChunk res = getUrl(url.toString());
+ StringBuilder expected = new StringBuilder();
+ int loop = iter;
+ while (loop > 0) {
+ expected.append("DispatchingServletGet-");
+ if (loop != iter) {
+ expected.append("onStartAsync-");
+ }
+ loop--;
+ }
+ expected.append("ErrorServletGet-onError-onComplete-");
+ assertEquals(expected.toString(), res.toString());
+ }
+
+ private static class ErrorServlet extends HttpServlet {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+ throws ServletException, IOException {
+ resp.getWriter().write("ErrorServletGet-");
+ resp.flushBuffer();
+ throw new ServletException("Opps.");
+ }
}
}
<code>UnsupportedOperationException</code>. (markt)
</fix>
<fix>
+ <bug>49884</bug>: Fix occassional NullPointerException on async
+ complete(). This resulted in a major refactoring of the async
+ implementation to address a number of threading issues. (markt)
+ </fix>
+ <fix>
Update the version numbers in ServerInfo defaults to Tomcat 7.0.x.
(markt)
</fix>