import javax.servlet.AsyncContext;
import javax.servlet.AsyncListener;
+import javax.servlet.RequestDispatcher;
import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.coyote.ActionCode;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
/**
private ServletResponse servletResponse = null;
private List<AsyncListenerWrapper> listeners = new ArrayList<AsyncListenerWrapper>();
private boolean hasOriginalRequestAndResponse = true;
- private boolean completed = false;
+ private boolean completed = true;
+ private volatile Runnable dispatch = null;
private Request request;
public void complete() {
// TODO SERVLET3 - async
- for (AsyncListenerWrapper wrapper : listeners) {
- try {
- wrapper.fireOnComplete();
- }catch (IOException x) {
- //how does this propagate, or should it?
- //TODO SERVLET3 - async
- log.error("",x);
- }
- }
- this.completed = true;
+ doInternalComplete(false);
}
@Override
public void dispatch(String path) {
// TODO SERVLET3 - async
-
+ if (request.getAttribute(ASYNC_REQUEST_URI)==null) {
+ request.setAttribute(ASYNC_REQUEST_URI, request.getRequestURI());
+ request.setAttribute(ASYNC_CONTEXT_PATH, request.getContextPath());
+ request.setAttribute(ASYNC_SERVLET_PATH, request.getServletPath());
+ request.setAttribute(ASYNC_QUERY_STRING, request.getQueryString());
+ }
+ final RequestDispatcher requestDispatcher = request.getServletContext().getRequestDispatcher(path);
+ final HttpServletRequest servletRequest = (HttpServletRequest)getRequest();
+ final HttpServletResponse servletResponse = (HttpServletResponse)getResponse();
+ Runnable run = new Runnable() {
+ public void run() {
+ try {
+ requestDispatcher.include(servletRequest, servletResponse);
+ }catch (Exception x) {
+ //log.error("Async.dispatch",x);
+ throw new RuntimeException(x);
+ }
+ }
+ };
+ this.dispatch = run;
+ request.coyoteRequest.action(ActionCode.ACTION_ASYNC_DISPATCH, run );
}
@Override
servletResponse = null;
listeners.clear();
hasOriginalRequestAndResponse = true;
- completed = false;
+ completed = true;
}
public boolean isStarted() {
this.completed = completed;
}
+ public void doInternalDispatch() throws ServletException, IOException {
+ if (this.dispatch!=null) {
+ try {
+ dispatch.run();
+ } catch (RuntimeException x) {
+ doInternalComplete(true);
+ if (x.getCause() instanceof ServletException) throw (ServletException)x.getCause();
+ if (x.getCause() instanceof IOException) throw (IOException)x.getCause();
+ else throw new ServletException(x);
+ } finally {
+ dispatch = null;
+ }
+ }
+ }
+ public void doInternalComplete(boolean error) {
+ if (isCompleted()) return;
+ for (AsyncListenerWrapper wrapper : listeners) {
+ try {
+ wrapper.fireOnComplete();
+ }catch (IOException x) {
+ //how does this propagate, or should it?
+ //TODO SERVLET3 - async
+ log.error("",x);
+ }
+ }
+ try {
+ if (!error) getResponse().flushBuffer();
+
+ }catch (Exception x) {
+ log.error("",x);
+ }
+ request.coyoteRequest.action(ActionCode.ACTION_ASYNC_COMPLETE,null);
+ recycle();
+ }
}
}
}
+ public boolean asyncDispatch(org.apache.coyote.Request req,org.apache.coyote.Response res) throws Exception {
+ Request request = (Request) req.getNote(ADAPTER_NOTES);
+ Response response = (Response) res.getNote(ADAPTER_NOTES);
+
+ if (request == null) {
+ throw new IllegalStateException("Dispatch may only happen on an existing request.");
+ }
+ boolean comet = false;
+ boolean async = false;
+ boolean success = true;
+
+ try {
+ // Calling the container
+ try {
+ connector.getContainer().getPipeline().getFirst().invoke(request, response);
+ }catch (RuntimeException x) {
+ success = false;
+ }
+
+ if (request.isComet()) {
+ if (!response.isClosed() && !response.isError()) {
+ if (request.getAvailable() || (request.getContentLength() > 0 && (!request.isParametersParsed()))) {
+ // Invoke a read event right away if there are available bytes
+ if (event(req, res, SocketStatus.OPEN)) {
+ comet = true;
+ res.action(ActionCode.ACTION_COMET_BEGIN, null);
+ }
+ } else {
+ comet = true;
+ res.action(ActionCode.ACTION_COMET_BEGIN, null);
+ }
+ } else {
+ // Clear the filter chain, as otherwise it will not be reset elsewhere
+ // since this is a Comet request
+ request.setFilterChain(null);
+ }
+ }
+
+ if (request.isAsyncStarted()) {
+ //TODO SERVLET3 - async
+ res.action(ActionCode.ACTION_ASYNC_START, request.getAsyncContext());
+ async = true;
+ } else if (!comet) {
+ response.finishResponse();
+ req.action(ActionCode.ACTION_POST_REQUEST , null);
+ }
+ } catch (IOException e) {
+ success = false;
+ // Ignore
+ } catch (Throwable t) {
+ success = false;
+ log.error(sm.getString("coyoteAdapter.service"), t);
+ } finally {
+ req.getRequestProcessor().setWorkerThreadName(null);
+ // Recycle the wrapper request and response
+ if (!comet && !async) {
+ request.recycle();
+ response.recycle();
+ } else {
+ // Clear converters so that the minimum amount of memory
+ // is used by this processor
+ request.clearEncoders();
+ response.clearEncoders();
+ }
+ }
+ return success;
+ }
+
/**
* Service method.
*/
}
boolean comet = false;
+ boolean async = request.isAsyncStarted();
try {
if (request.isAsyncStarted()) {
//TODO SERVLET3 - async
res.action(ActionCode.ACTION_ASYNC_START, request.getAsyncContext());
+ async = true;
} else if (!comet) {
response.finishResponse();
req.action(ActionCode.ACTION_POST_REQUEST , null);
} finally {
req.getRequestProcessor().setWorkerThreadName(null);
// Recycle the wrapper request and response
- if (!comet) {
+ if (!comet && !async) {
request.recycle();
response.recycle();
} else {
} else {
serverName = req.serverName();
}
+ if (request.isAsyncStarted()) {
+ //TODO SERVLET3 - async
+ //reset mapping data, should prolly be done elsewhere
+ request.getMappingData().recycle();
+ }
connector.getMapper().map(serverName, decodedURI,
request.getMappingData());
request.setContext((Context) request.getMappingData().context);
asyncContext.setServletRequest(getRequest());
asyncContext.setServletResponse(response.getResponse());
asyncContext.setStarted(true);
+ asyncContext.setCompleted(false);
return asyncContext;
}
startAsync();
asyncContext.setServletRequest(request);
asyncContext.setServletResponse(response);
- asyncContext.setHasOriginalRequestAndResponse(request==getRequest() && response==getResponse());
+ asyncContext.setHasOriginalRequestAndResponse(request==getRequest() && response==getResponse().getResponse());
return asyncContext;
}
import org.apache.catalina.CometProcessor;
import org.apache.catalina.Context;
import org.apache.catalina.Globals;
+import org.apache.catalina.connector.AsyncContextImpl;
import org.apache.catalina.connector.ClientAbortException;
import org.apache.catalina.connector.Request;
import org.apache.catalina.connector.Response;
if (context.getSwallowOutput()) {
try {
SystemLogHandler.startCapture();
- if (comet) {
+ if (request.isAsyncStarted()) {
+ ((AsyncContextImpl)request.getAsyncContext()).doInternalDispatch();
+ } else if (comet) {
filterChain.doFilterEvent(request.getEvent());
request.setComet(true);
} else {
}
}
} else {
- if (comet) {
+ if (request.isAsyncStarted()) {
+ ((AsyncContextImpl)request.getAsyncContext()).doInternalDispatch();
+ } else if (comet) {
request.setComet(true);
filterChain.doFilterEvent(request.getEvent());
} else {
*/
public static final ActionCode ACTION_ASYNC_SETTIMEOUT = new ActionCode(28);
+ /**
+ * Callback for an async call to {@link javax.servlet.AsyncContext#dispatch()}
+ */
+ public static final ActionCode ACTION_ASYNC_DISPATCH = new ActionCode(29);
+
// ----------------------------------------------------------- Constructors
int code;
public boolean event(Request req, Response res, SocketStatus status)
throws Exception;
+
+ public boolean asyncDispatch(Request req,Response res) throws Exception;
}
* Closed by HttpServletResponse.getWriter().close()
*/
protected boolean cometClose = false;
+
+ /**
+ * Async used
+ */
+ protected boolean async = false;
+ /**
+ * Closed flag, a Comet async thread can
+ * signal for this Nio processor to be closed and recycled instead
+ * of waiting for a timeout.
+ * Closed by HttpServletRequest.getAsyncContext().complete()
+ */
+ protected boolean asyncClose;
/**
* Content delimitator for the request (if false, the connection will
* Allow a customized the server header for the tin-foil hat folks.
*/
protected String server = null;
-
-
+
// ------------------------------------------------------------- Properties
return SocketState.LONG;
}
}
+
+
+ /**
+ * Process pipelined HTTP requests using the specified input and output
+ * streams.
+ *
+ * @throws IOException error during an I/O operation
+ */
+ public SocketState asyncDispatch(SocketStatus status)
+ throws IOException {
+
+ long soTimeout = endpoint.getSoTimeout();
+ int keepAliveTimeout = endpoint.getKeepAliveTimeout();
+
+ RequestInfo rp = request.getRequestProcessor();
+ final NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
+ try {
+ rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
+ error = !adapter.asyncDispatch(request, response);
+ if ( !error ) {
+ if (attach != null) {
+ attach.setComet(comet);
+ if (comet) {
+ Integer comettimeout = (Integer) request.getAttribute("org.apache.tomcat.comet.timeout");
+ if (comettimeout != null) attach.setTimeout(comettimeout.longValue());
+ } else {
+ //reset the timeout
+ if (keepAlive && keepAliveTimeout>0) {
+ attach.setTimeout(keepAliveTimeout);
+ } else {
+ attach.setTimeout(soTimeout);
+ }
+ }
+
+ }
+ }
+ } catch (InterruptedIOException e) {
+ error = true;
+ } catch (Throwable t) {
+ log.error(sm.getString("http11processor.request.process"), t);
+ // 500 - Internal Server Error
+ response.setStatus(500);
+ error = true;
+ }
+
+ rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
+
+ if (error) {
+ recycle();
+ return SocketState.CLOSED;
+ } else if (!comet) {
+ recycle();
+ return (keepAlive)?SocketState.OPEN:SocketState.CLOSED;
+ } else {
+ return SocketState.LONG;
+ }
+ }
/**
* Process pipelined HTTP requests using the specified input and output
}
// Finish the handling of the request
- if (!comet) {
+ if (!comet && !async) {
// If we know we are closing the connection, don't drain input.
// This way uploading a 100GB file doesn't tie up the thread
// if the servlet has rejected it.
}
request.updateCounters();
- if (!comet) {
+ if (!comet && !async) {
// Next request
inputBuffer.nextRequest();
outputBuffer.nextRequest();
}//while
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
- if (comet) {
+ if (comet || async) {
if (error) {
recycle();
return SocketState.CLOSED;
comet = false;
cometClose = true;
+ async = false;
+ asyncClose = false;
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
if ( key != null ) {
NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
attach.setTimeout(timeout);
} else if (actionCode == ActionCode.ACTION_ASYNC_START) {
//TODO SERVLET3 - async
+ async = true;
} else if (actionCode == ActionCode.ACTION_ASYNC_COMPLETE) {
//TODO SERVLET3 - async
+ asyncClose = true;
+ RequestInfo rp = request.getRequestProcessor();
+ if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) //async handling
+ socket.getPoller().cometInterest(socket);
} else if (actionCode == ActionCode.ACTION_ASYNC_SETTIMEOUT) {
//TODO SERVLET3 - async
+ if (param==null) return;
+ if (socket==null || socket.getAttachment(false)==null) return;
+ NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
+ long timeout = ((Long)param).longValue();
+ //if we are not piggy backing on a worker thread, set the timeout
+ RequestInfo rp = request.getRequestProcessor();
+ if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) //async handling
+ attach.setTimeout(timeout);
+ } else if (actionCode == ActionCode.ACTION_ASYNC_DISPATCH) {
+ RequestInfo rp = request.getRequestProcessor();
+ if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) {//async handling
+ endpoint.processSocket(this.socket, SocketStatus.OPEN, true);
+ } else {
+ throw new UnsupportedOperationException("Can't call dispatch on the worker thread.");
+ }
}
}
public SocketState event(NioChannel socket, SocketStatus status) {
Http11NioProcessor result = connections.get(socket);
-
+ NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
+ att.setAsync(false); //no longer check for timeout
SocketState state = SocketState.CLOSED;
if (result != null) {
if (log.isDebugEnabled()) log.debug("Http11NioProcessor.error="+result.error);
// Call the appropriate event
try {
- state = result.event(status);
+ if (result.async) {
+ state = result.asyncDispatch(status);
+ } else {
+ state = result.event(status);
+ }
} catch (java.net.SocketException e) {
// SocketExceptions are normal
Http11NioProtocol.log.debug
} else {
if (log.isDebugEnabled()) log.debug("Keeping processor["+result);
//add correct poller events here based on Comet stuff
- NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
socket.getPoller().add(socket,att.getCometOps());
}
}
// processor.
//if (log.isDebugEnabled()) log.debug("Not recycling ["+processor+"] Comet="+((NioEndpoint.KeyAttachment)socket.getAttachment(false)).getComet());
connections.put(socket, processor);
+
if (processor.comet) {
NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
socket.getPoller().add(socket,att.getCometOps());
+ } else if (processor.async) {
+ NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
+ att.setAsync(true);
} else {
//we should not hold on to the processor objects
release(socket);
return false;
}
- protected boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
+ public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
try {
KeyAttachment attachment = (KeyAttachment)socket.getAttachment(false);
attachment.setCometNotify(false); //will get reset upon next reg
cometOps = SelectionKey.OP_READ;
sendfileData = null;
keepAliveLeft = 100;
+ async = false;
}
public void reset() {
reset(null,null,-1);
}
+ public boolean isAsync() { return async; }
+ public void setAsync(boolean async) { this.async = async; }
public Poller getPoller() { return poller;}
public void setPoller(Poller poller){this.poller = poller;}
public long getLastAccess() { return lastAccess; }
protected long lastRegistered = 0;
protected SendfileData sendfileData = null;
protected int keepAliveLeft = 100;
+ protected boolean async = false;
}
// ------------------------------------------------ Application Buffer Handler
}
- // ---------------------------------------------- SocketProcessor Inner Class
-
+ // ---------------------------------------------- SocketProcessor Inner Class
/**
* This class is the equivalent of the Worker, but will simply use in an
* external Executor thread pool.