import java.net.InetAddress;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.catalina.core.AsyncContextImpl;
import org.apache.coyote.ActionCode;
import org.apache.coyote.ActionHook;
import org.apache.coyote.Adapter;
+import org.apache.coyote.AsyncStateMachine;
import org.apache.coyote.InputBuffer;
+import org.apache.coyote.Processor;
import org.apache.coyote.Request;
import org.apache.coyote.Response;
import org.apache.juli.logging.Log;
import org.apache.tomcat.util.http.HttpMessages;
import org.apache.tomcat.util.http.MimeHeaders;
import org.apache.tomcat.util.net.AbstractEndpoint;
+import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
import org.apache.tomcat.util.res.StringManager;
/**
* Base class for AJP Processor implementations.
*/
-public abstract class AbstractAjpProcessor implements ActionHook {
+public abstract class AbstractAjpProcessor implements ActionHook, Processor {
protected abstract Log getLog();
/**
- * Async used
- */
- protected boolean async = false;
-
-
- /**
* Associated adapter.
*/
protected Adapter adapter = null;
protected boolean finished = false;
+ /**
+ * Track changes in state for async requests.
+ */
+ protected AsyncStateMachine asyncStateMachine = new AsyncStateMachine(this);
+
+
// ------------------------------------------------------------- Properties
} else if (actionCode == ActionCode.CLOSE) {
// Close
- async = false;
// End the processing of the current request, and stop any further
// transactions with the client
empty = false;
replay = true;
+ } else if (actionCode == ActionCode.ASYNC_START) {
+ asyncStateMachine.asyncStart((AsyncContextImpl) param);
+ } else if (actionCode == ActionCode.ASYNC_DISPATCHED) {
+ asyncStateMachine.asyncDispatched();
+ } else if (actionCode == ActionCode.ASYNC_TIMEOUT) {
+ AtomicBoolean result = (AtomicBoolean) param;
+ result.set(asyncStateMachine.asyncTimeout());
+ } else if (actionCode == ActionCode.ASYNC_RUN) {
+ asyncStateMachine.asyncRun((Runnable) param);
+ } else if (actionCode == ActionCode.ASYNC_ERROR) {
+ asyncStateMachine.asyncError();
+ } else if (actionCode == ActionCode.ASYNC_IS_STARTED) {
+ ((AtomicBoolean) param).set(asyncStateMachine.isAsyncStarted());
+ } else if (actionCode == ActionCode.ASYNC_IS_DISPATCHING) {
+ ((AtomicBoolean) param).set(asyncStateMachine.isAsyncDispatching());
+ } else if (actionCode == ActionCode.ASYNC_IS_ASYNC) {
+ ((AtomicBoolean) param).set(asyncStateMachine.isAsync());
+ } else if (actionCode == ActionCode.ASYNC_IS_TIMINGOUT) {
+ ((AtomicBoolean) param).set(asyncStateMachine.isAsyncTimingOut());
} else {
actionInternal(actionCode, param);
}
protected abstract void finish() throws IOException;
+ @Override
+ public abstract Executor getExecutor();
+
+
+ public void recycle() {
+ asyncStateMachine.recycle();
+
+ // Recycle Request object
+ first = true;
+ endOfStream = false;
+ empty = true;
+ replay = false;
+ finished = false;
+ request.recycle();
+ response.recycle();
+ certificates.recycle();
+ }
+
// ------------------------------------------------------ Connector Methods
throws IOException;
+ protected boolean isAsync() {
+ return asyncStateMachine.isAsync();
+ }
+
+ protected SocketState asyncPostProcess() {
+ return asyncStateMachine.asyncPostProcess();
+ }
+
// ------------------------------------- InputStreamInputBuffer Inner Class
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.Executor;
import org.apache.coyote.ActionCode;
import org.apache.coyote.OutputBuffer;
*
* @throws IOException error during an I/O operation
*/
- public boolean process(SocketWrapper<Long> socket)
+ public SocketState process(SocketWrapper<Long> socket)
throws IOException {
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
// Error flag
error = false;
- async = false;
boolean openSocket = true;
boolean keptAlive = false;
}
}
- if (async && !error) {
+ if (isAsync() && !error) {
break;
}
// Add the socket to the poller
if (!error && !endpoint.isPaused()) {
- ((AprEndpoint)endpoint).getPoller().add(socketRef);
+ if (!isAsync()) {
+ ((AprEndpoint)endpoint).getPoller().add(socketRef);
+ }
} else {
openSocket = false;
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
- if (!async || error || endpoint.isPaused())
- recycle();
-
- return openSocket;
-
+
+ if (error || endpoint.isPaused()) {
+ recycle();
+ return SocketState.CLOSED;
+ } else if (isAsync()) {
+ return SocketState.LONG;
+ } else {
+ recycle();
+ return (openSocket) ? SocketState.OPEN : SocketState.CLOSED;
+ }
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
- if (async) {
+ if (error) {
+ response.setStatus(500);
+ }
+ if (isAsync()) {
if (error) {
- response.setStatus(500);
request.updateCounters();
recycle();
return SocketState.CLOSED;
return SocketState.LONG;
}
} else {
- if (error) {
- response.setStatus(500);
- }
request.updateCounters();
recycle();
- return SocketState.CLOSED;
+ if (error) {
+ return SocketState.CLOSED;
+ } else {
+ return SocketState.OPEN;
+ }
}
}
+
+ @Override
+ public Executor getExecutor() {
+ return endpoint.getExecutor();
+ }
+
+
// ----------------------------------------------------- ActionHook Methods
@Override
protected void actionInternal(ActionCode actionCode, Object param) {
- long socketRef = socket.getSocket().longValue();
-
- if (actionCode == ActionCode.ASYNC_START) {
- async = true;
- } else if (actionCode == ActionCode.ASYNC_COMPLETE) {
- AtomicBoolean dispatch = (AtomicBoolean)param;
- RequestInfo rp = request.getRequestProcessor();
- if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) { //async handling
- dispatch.set(true);
- ((AprEndpoint)endpoint).getHandler().asyncDispatch(this.socket, SocketStatus.STOP);
- } else {
- dispatch.set(false);
- }
+ if (actionCode == ActionCode.ASYNC_COMPLETE) {
+ if (asyncStateMachine.asyncComplete()) {
+ ((AprEndpoint)endpoint).processSocketAsync(this.socket,
+ SocketStatus.OPEN);
+ }
} else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) {
if (param==null) return;
- if (socketRef==0) return;
long timeout = ((Long)param).longValue();
- Socket.timeoutSet(socketRef, timeout * 1000);
+ socket.setTimeout(timeout);
} else if (actionCode == ActionCode.ASYNC_DISPATCH) {
- RequestInfo rp = request.getRequestProcessor();
- AtomicBoolean dispatch = (AtomicBoolean)param;
- if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) {//async handling
- ((AprEndpoint)endpoint).getPoller().add(socketRef);
- dispatch.set(true);
- } else {
- dispatch.set(true);
+ if (asyncStateMachine.asyncDispatch()) {
+ ((AprEndpoint)endpoint).processSocketAsync(this.socket,
+ SocketStatus.OPEN);
}
}
/**
* Recycle the processor.
*/
+ @Override
public void recycle() {
-
- // Recycle Request object
- first = true;
- endOfStream = false;
- empty = true;
- replay = false;
- finished = false;
- request.recycle();
- response.recycle();
- certificates.recycle();
+ super.recycle();
inputBuffer.clear();
inputBuffer.limit(0);
/**
* Write chunk.
*/
+ @Override
public int doWrite(ByteChunk chunk, Response res)
throws IOException {
/**
* Pass config info
*/
+ @Override
public void setAttribute(String name, Object value) {
if (log.isTraceEnabled()) {
log.trace(sm.getString("ajpprotocol.setattribute", name, value));
attributes.put(name, value);
}
+ @Override
public Object getAttribute(String key) {
if (log.isTraceEnabled()) {
log.trace(sm.getString("ajpprotocol.getattribute", key));
}
+ @Override
public Iterator<String> getAttributeNames() {
return attributes.keySet().iterator();
}
/**
* The adapter, used to call the connector
*/
+ @Override
public void setAdapter(Adapter adapter) {
this.adapter = adapter;
}
+ @Override
public Adapter getAdapter() {
return adapter;
}
/** Start the protocol
*/
+ @Override
public void init() throws Exception {
endpoint.setName(getName());
endpoint.setHandler(cHandler);
}
+ @Override
public void start() throws Exception {
if (this.domain != null ) {
try {
log.info(sm.getString("ajpprotocol.start", getName()));
}
+ @Override
public void pause() throws Exception {
try {
endpoint.pause();
log.info(sm.getString("ajpprotocol.pause", getName()));
}
+ @Override
public void resume() throws Exception {
try {
endpoint.resume();
log.info(sm.getString("ajpprotocol.resume", getName()));
}
+ @Override
public void stop() throws Exception {
try {
endpoint.stop();
log.info(sm.getString("ajpprotocol.stop", getName()));
}
+ @Override
public void destroy() throws Exception {
if (log.isInfoEnabled())
log.info(sm.getString("ajpprotocol.destroy", getName()));
public int getProcessorCache() { return this.processorCache; }
public void setProcessorCache(int processorCache) { this.processorCache = processorCache; }
+ @Override
public Executor getExecutor() { return endpoint.getExecutor(); }
public void setExecutor(Executor executor) { endpoint.setExecutor(executor); }
public void setKeepAliveTimeout(int timeout) { endpoint.setKeepAliveTimeout(timeout); }
public boolean getUseSendfile() { return endpoint.getUseSendfile(); }
- public void setUseSendfile(boolean useSendfile) { /* No sendfile for AJP */ }
+ public void setUseSendfile(@SuppressWarnings("unused") boolean useSendfile) {
+ /* No sendfile for AJP */
+ }
public int getPollTime() { return endpoint.getPollTime(); }
public void setPollTime(int pollTime) { endpoint.setPollTime(pollTime); }
protected ConcurrentLinkedQueue<AjpAprProcessor> recycledProcessors =
new ConcurrentLinkedQueue<AjpAprProcessor>() {
+ private static final long serialVersionUID = 1L;
protected AtomicInteger size = new AtomicInteger(0);
@Override
public boolean offer(AjpAprProcessor processor) {
}
// FIXME: Support for this could be added in AJP as well
+ @Override
public SocketState event(SocketWrapper<Long> socket, SocketStatus status) {
return SocketState.CLOSED;
}
+ @Override
public SocketState process(SocketWrapper<Long> socket) {
AjpAprProcessor processor = recycledProcessors.poll();
try {
-
if (processor == null) {
processor = createProcessor();
}
- if (processor.process(socket)) {
+ SocketState state = processor.process(socket);
+ if (state == SocketState.LONG) {
+ // Check if the post processing is going to change the state
+ state = processor.asyncPostProcess();
+ }
+ if (state == SocketState.LONG || state == SocketState.ASYNC_END) {
+ // Need to make socket available for next processing cycle
+ // but no need for the poller
connections.put(socket, processor);
- return SocketState.OPEN;
} else {
- // recycledProcessors.offer(processor);
- return SocketState.CLOSED;
+ if (state == SocketState.OPEN) {
+ connections.put(socket, processor);
+ }
+ recycledProcessors.offer(processor);
}
+ return state;
} catch(java.net.SocketException e) {
// SocketExceptions are normal
- AjpAprProtocol.log.debug
- (sm.getString
- ("ajpprotocol.proto.socketexception.debug"), e);
+ log.debug(sm.getString(
+ "ajpprotocol.proto.socketexception.debug"), e);
} catch (java.io.IOException e) {
// IOExceptions are normal
- AjpAprProtocol.log.debug
- (sm.getString
- ("ajpprotocol.proto.ioexception.debug"), e);
+ log.debug(sm.getString(
+ "ajpprotocol.proto.ioexception.debug"), e);
}
// Future developers: if you discover any other
// rare-but-nonfatal exceptions, catch them here, and log as
// any other exception or error is odd. Here we log it
// with "ERROR" level, so it will show up even on
// less-than-verbose logs.
- AjpAprProtocol.log.error
- (sm.getString("ajpprotocol.proto.error"), e);
- } finally {
- recycledProcessors.offer(processor);
+ log.error(sm.getString("ajpprotocol.proto.error"), e);
}
+ recycledProcessors.offer(processor);
return SocketState.CLOSED;
}
// FIXME: Support for this could be added in AJP as well
+ @Override
public SocketState asyncDispatch(SocketWrapper<Long> socket, SocketStatus status) {
AjpAprProcessor result = connections.get(socket);
AjpAprProtocol.log.error
(sm.getString("ajpprotocol.proto.error"), e);
} finally {
- if (state != SocketState.LONG) {
+ if (state == SocketState.LONG && result.isAsync()) {
+ state = result.asyncPostProcess();
+ }
+ if (state != SocketState.LONG && state != SocketState.ASYNC_END) {
connections.remove(socket);
recycledProcessors.offer(result);
if (state == SocketState.OPEN) {
return domain;
}
+ @Override
public ObjectName preRegister(MBeanServer server,
ObjectName name) throws Exception {
oname=name;
return name;
}
+ @Override
public void postRegister(Boolean registrationDone) {
+ // NOOP
}
+ @Override
public void preDeregister() throws Exception {
+ // NOOP
}
+ @Override
public void postDeregister() {
+ // NOOP
}
-
-
}
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.Socket;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.Executor;
import org.apache.coyote.ActionCode;
import org.apache.coyote.OutputBuffer;
}
}
- if (async && !error) {
+ if (isAsync() && !error) {
break;
}
recycle();
}
- if (async && !error && !endpoint.isPaused()) {
- rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
+
+ rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
+
+ if (isAsync() && !error && !endpoint.isPaused()) {
return SocketState.LONG;
} else {
- rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
recycle();
input = null;
output = null;
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
- if (async) {
+ if (isAsync()) {
if (error) {
response.setStatus(500);
request.updateCounters();
}
+
+ @Override
+ public Executor getExecutor() {
+ return endpoint.getExecutor();
+ }
+
+
// ----------------------------------------------------- ActionHook Methods
@Override
protected void actionInternal(ActionCode actionCode, Object param) {
- if (actionCode == ActionCode.ASYNC_START) {
- //TODO SERVLET3 - async
- async = true;
- } else if (actionCode == ActionCode.ASYNC_COMPLETE) {
- //TODO SERVLET3 - async
- AtomicBoolean dispatch = (AtomicBoolean)param;
- RequestInfo rp = request.getRequestProcessor();
- if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) { //async handling
- dispatch.set(true);
- ((JIoEndpoint)endpoint).processSocketAsync(this.socket, SocketStatus.OPEN);
- } else {
- dispatch.set(false);
+ if (actionCode == ActionCode.ASYNC_COMPLETE) {
+ if (asyncStateMachine.asyncComplete()) {
+ ((JIoEndpoint)endpoint).processSocketAsync(this.socket,
+ SocketStatus.OPEN);
}
} else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) {
- //TODO SERVLET3 - async
- if (param==null) return;
+ if (param == null) return;
long timeout = ((Long)param).longValue();
- //if we are not piggy backing on a worker thread, set the timeout
+ // if we are not piggy backing on a worker thread, set the timeout
socket.setTimeout(timeout);
} else if (actionCode == ActionCode.ASYNC_DISPATCH) {
- RequestInfo rp = request.getRequestProcessor();
- AtomicBoolean dispatch = (AtomicBoolean)param;
- if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) {//async handling
- ((JIoEndpoint)endpoint).processSocketAsync(this.socket, SocketStatus.OPEN);
- dispatch.set(true);
- } else {
- dispatch.set(true);
+ if (asyncStateMachine.asyncDispatch()) {
+ ((JIoEndpoint)endpoint).processSocketAsync(this.socket,
+ SocketStatus.OPEN);
}
}
-
-
}
/**
- * Recycle the processor.
- */
- public void recycle() {
-
- // Recycle Request object
- first = true;
- endOfStream = false;
- empty = true;
- replay = false;
- finished = false;
- request.recycle();
- response.recycle();
- certificates.recycle();
- async = false;
-
- }
-
-
- /**
* Callback to write data from the buffer.
*/
@Override
/**
* Write chunk.
*/
+ @Override
public int doWrite(ByteChunk chunk, Response res)
throws IOException {
/**
* Pass config info
*/
+ @Override
public void setAttribute(String name, Object value) {
if (log.isTraceEnabled()) {
log.trace(sm.getString("ajpprotocol.setattribute", name, value));
attributes.put(name, value);
}
+ @Override
public Object getAttribute(String key) {
if (log.isTraceEnabled()) {
log.trace(sm.getString("ajpprotocol.getattribute", key));
}
+ @Override
public Iterator<String> getAttributeNames() {
return attributes.keySet().iterator();
}
/**
* The adapter, used to call the connector
*/
+ @Override
public void setAdapter(Adapter adapter) {
this.adapter = adapter;
}
+ @Override
public Adapter getAdapter() {
return adapter;
}
/** Start the protocol
*/
+ @Override
public void init() throws Exception {
endpoint.setName(getName());
endpoint.setHandler(cHandler);
}
+ @Override
public void start() throws Exception {
if (this.domain != null ) {
try {
log.info(sm.getString("ajpprotocol.start", getName()));
}
+ @Override
public void pause() throws Exception {
try {
endpoint.pause();
log.info(sm.getString("ajpprotocol.pause", getName()));
}
+ @Override
public void resume() throws Exception {
try {
endpoint.resume();
log.info(sm.getString("ajpprotocol.resume", getName()));
}
+ @Override
public void stop() throws Exception {
try {
endpoint.stop();
log.info(sm.getString("ajpprotocol.stop", getName()));
}
+ @Override
public void destroy() throws Exception {
if (log.isInfoEnabled())
log.info(sm.getString("ajpprotocol.destroy", getName()));
public int getProcessorCache() { return this.processorCache; }
public void setProcessorCache(int processorCache) { this.processorCache = processorCache; }
+ @Override
public Executor getExecutor() { return endpoint.getExecutor(); }
public void setExecutor(Executor executor) { endpoint.setExecutor(executor); }
protected ConcurrentLinkedQueue<AjpProcessor> recycledProcessors =
new ConcurrentLinkedQueue<AjpProcessor>() {
+ private static final long serialVersionUID = 1L;
protected AtomicInteger size = new AtomicInteger(0);
@Override
public boolean offer(AjpProcessor processor) {
this.proto = proto;
}
+ @Override
public SocketState process(SocketWrapper<Socket> socket) {
return process(socket,SocketStatus.OPEN);
}
+ @Override
public SocketState process(SocketWrapper<Socket> socket, SocketStatus status) {
AjpProcessor processor = connections.remove(socket);
try {
if (state == SocketState.LONG) {
connections.put(socket, processor);
socket.setAsync(true);
+ // longPoll may change socket state (e.g. to trigger a
+ // complete or dispatch)
+ return processor.asyncPostProcess();
} else {
connections.remove(socket);
socket.setAsync(false);
+ recycledProcessors.offer(processor);
}
return state;
} catch(java.net.SocketException e) {
// SocketExceptions are normal
- AjpProtocol.log.debug
- (sm.getString
- ("ajpprotocol.proto.socketexception.debug"), e);
+ log.debug(sm.getString(
+ "ajpprotocol.proto.socketexception.debug"), e);
} catch (java.io.IOException e) {
// IOExceptions are normal
- AjpProtocol.log.debug
- (sm.getString
- ("ajpprotocol.proto.ioexception.debug"), e);
+ log.debug(sm.getString(
+ "ajpprotocol.proto.ioexception.debug"), e);
}
// Future developers: if you discover any other
// rare-but-nonfatal exceptions, catch them here, and log as
// any other exception or error is odd. Here we log it
// with "ERROR" level, so it will show up even on
// less-than-verbose logs.
- AjpProtocol.log.error
- (sm.getString("ajpprotocol.proto.error"), e);
- } finally {
- recycledProcessors.offer(processor);
+ log.error(sm.getString("ajpprotocol.proto.error"), e);
}
+ recycledProcessors.offer(processor);
return SocketState.CLOSED;
}
return domain;
}
+ @Override
public ObjectName preRegister(MBeanServer server,
ObjectName name) throws Exception {
oname=name;
return name;
}
+ @Override
public void postRegister(Boolean registrationDone) {
+ // NOOP
}
+ @Override
public void preDeregister() throws Exception {
+ // NOOP
}
+ @Override
public void postDeregister() {
+ // NOOP
}
-
-
}