import java.net.Socket;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.coyote.ActionCode;
import org.apache.coyote.ActionHook;
import org.apache.tomcat.util.http.MimeHeaders;
import org.apache.tomcat.util.net.AbstractEndpoint;
import org.apache.tomcat.util.net.JIoEndpoint;
+import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.SocketWrapper;
+import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
import org.apache.tomcat.util.res.StringManager;
/**
* Socket associated with the current connection.
*/
- protected Socket socket;
+ protected SocketWrapper<Socket> socket;
/**
* Flush message array.
*/
protected static final byte[] flushMessageArray;
+
+ /**
+ * Async used
+ */
+ protected boolean async = false;
// ----------------------------------------------------- Static Initializer
*
* @throws IOException error during an I/O operation
*/
- public void process(Socket socket)
+ public SocketState process(SocketWrapper<Socket> socket)
throws IOException {
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
// Setting up the socket
this.socket = socket;
- input = socket.getInputStream();
- output = socket.getOutputStream();
+ input = socket.getSocket().getInputStream();
+ output = socket.getSocket().getOutputStream();
int soTimeout = -1;
if (keepAliveTimeout > 0) {
- soTimeout = socket.getSoTimeout();
+ soTimeout = socket.getSocket().getSoTimeout();
}
// Error flag
try {
// Set keep alive timeout if enabled
if (keepAliveTimeout > 0) {
- socket.setSoTimeout(keepAliveTimeout);
+ socket.getSocket().setSoTimeout(keepAliveTimeout);
}
// Get first message of the request
if (!readMessage(requestHeaderMessage)) {
}
// Set back timeout if keep alive timeout is enabled
if (keepAliveTimeout > 0) {
- socket.setSoTimeout(soTimeout);
+ socket.getSocket().setSoTimeout(soTimeout);
}
// Check message type, process right away and break if
// not regular request processing
error = true;
}
}
+
+ if (async && !error) {
+ break;
+ }
// Finish the response if not done yet
if (!finished) {
recycle();
}
+ if (async && !error) {
+ rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
+ socket.setAsync(true);
+ return SocketState.LONG;
+ } else {
+ rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
+ recycle();
+ input = null;
+ output = null;
+ return SocketState.CLOSED;
+ }
+
+ }
+
+ public SocketState asyncDispatch(SocketStatus status) throws IOException {
+
+ RequestInfo rp = request.getRequestProcessor();
+ try {
+ rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
+ error = !adapter.asyncDispatch(request, response, status);
+ } catch (InterruptedIOException e) {
+ error = true;
+ } catch (Throwable t) {
+ log.error(sm.getString("http11processor.request.process"), t);
+ // 500 - Internal Server Error
+ response.setStatus(500);
+ error = true;
+ }
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
- recycle();
- input = null;
- output = null;
+
+ if (async) {
+ if (error) {
+ socket.setAsync(false);
+ response.setStatus(500);
+ request.updateCounters();
+ recycle();
+ input = null;
+ output = null;
+ return SocketState.CLOSED;
+ } else {
+ return SocketState.LONG;
+ }
+ } else {
+ socket.setAsync(false);
+ if (error) {
+ response.setStatus(500);
+ }
+ request.updateCounters();
+ recycle();
+ input = null;
+ output = null;
+ return SocketState.CLOSED;
+ }
+
+
+
}
-
// ----------------------------------------------------- ActionHook Methods
} else if (actionCode == ActionCode.ACTION_CLOSE) {
// Close
-
+ async = false;
// End the processing of the current request, and stop any further
// transactions with the client
empty = false;
replay = true;
+ } else if (actionCode == ActionCode.ACTION_ASYNC_START) {
+ //TODO SERVLET3 - async
+ async = true;
+ } else if (actionCode == ActionCode.ACTION_ASYNC_COMPLETE) {
+ //TODO SERVLET3 - async
+ AtomicBoolean dispatch = (AtomicBoolean)param;
+ RequestInfo rp = request.getRequestProcessor();
+ if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) { //async handling
+ dispatch.set(true);
+ endpoint.processSocket(this.socket, SocketStatus.STOP);
+ } else {
+ dispatch.set(true);
+ }
+ } else if (actionCode == ActionCode.ACTION_ASYNC_SETTIMEOUT) {
+ //TODO SERVLET3 - async
+ if (param==null) return;
+ long timeout = ((Long)param).longValue();
+ //if we are not piggy backing on a worker thread, set the timeout
+ socket.setTimeout(timeout);
+ } else if (actionCode == ActionCode.ACTION_ASYNC_DISPATCH) {
+ RequestInfo rp = request.getRequestProcessor();
+ AtomicBoolean dispatch = (AtomicBoolean)param;
+ if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) {//async handling
+ endpoint.processSocket(this.socket, SocketStatus.OPEN);
+ dispatch.set(true);
+ } else {
+ dispatch.set(true);
+ }
}
import java.net.URLEncoder;
import java.util.Hashtable;
import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
protected AjpProtocol proto;
protected AtomicLong registerCount = new AtomicLong(0);
protected RequestGroupInfo global = new RequestGroupInfo();
+ protected ConcurrentHashMap<SocketWrapper, AjpProcessor> connections =
+ new ConcurrentHashMap<SocketWrapper, AjpProcessor>();
protected ConcurrentLinkedQueue<AjpProcessor> recycledProcessors =
new ConcurrentLinkedQueue<AjpProcessor>() {
this.proto = proto;
}
- public SocketState process(SocketWrapper<Socket> socket, SocketStatus status) {
- throw new UnsupportedOperationException();
+ public SocketState process(SocketWrapper<Socket> socket) {
+ return process(socket,SocketStatus.OPEN);
}
- public SocketState process(SocketWrapper<Socket> socket) {
- AjpProcessor processor = recycledProcessors.poll();
+ public SocketState process(SocketWrapper<Socket> socket, SocketStatus status) {
+ AjpProcessor processor = connections.remove(socket);
try {
-
+ if (processor == null) {
+ processor = recycledProcessors.poll();
+ }
if (processor == null) {
processor = createProcessor();
}
-
processor.action(ActionCode.ACTION_START, null);
- processor.process(socket.getSocket());
- return SocketState.CLOSED;
-
+ SocketState state = socket.isAsync()?processor.asyncDispatch(status):processor.process(socket);
+ if (state == SocketState.LONG) {
+ connections.put(socket, processor);
+ } else {
+ connections.remove(socket);
+ }
+ return state;
} catch(java.net.SocketException e) {
// SocketExceptions are normal
AjpProtocol.log.debug