From db94fac5a42e7802b3c130ef1e3daccb758f7a40 Mon Sep 17 00:00:00 2001 From: fhanik Date: Thu, 25 Mar 2010 16:41:05 +0000 Subject: [PATCH] Enable async behavior for the AJP connector git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@927490 13f79535-47bb-0310-9956-ffa450edef68 --- java/org/apache/coyote/ajp/AjpProcessor.java | 116 ++++++++++++++++++++++++--- java/org/apache/coyote/ajp/AjpProtocol.java | 26 +++--- 2 files changed, 121 insertions(+), 21 deletions(-) diff --git a/java/org/apache/coyote/ajp/AjpProcessor.java b/java/org/apache/coyote/ajp/AjpProcessor.java index 48fb25c7f..fbe570253 100644 --- a/java/org/apache/coyote/ajp/AjpProcessor.java +++ b/java/org/apache/coyote/ajp/AjpProcessor.java @@ -26,6 +26,7 @@ import java.net.InetAddress; import java.net.Socket; import java.security.cert.CertificateFactory; import java.security.cert.X509Certificate; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.coyote.ActionCode; import org.apache.coyote.ActionHook; @@ -44,6 +45,9 @@ import org.apache.tomcat.util.http.HttpMessages; import org.apache.tomcat.util.http.MimeHeaders; import org.apache.tomcat.util.net.AbstractEndpoint; import org.apache.tomcat.util.net.JIoEndpoint; +import org.apache.tomcat.util.net.SocketStatus; +import org.apache.tomcat.util.net.SocketWrapper; +import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; import org.apache.tomcat.util.res.StringManager; @@ -181,7 +185,7 @@ public class AjpProcessor implements ActionHook { /** * Socket associated with the current connection. */ - protected Socket socket; + protected SocketWrapper socket; /** @@ -271,6 +275,11 @@ public class AjpProcessor implements ActionHook { * Flush message array. */ protected static final byte[] flushMessageArray; + + /** + * Async used + */ + protected boolean async = false; // ----------------------------------------------------- Static Initializer @@ -357,18 +366,18 @@ public class AjpProcessor implements ActionHook { * * @throws IOException error during an I/O operation */ - public void process(Socket socket) + public SocketState process(SocketWrapper socket) throws IOException { RequestInfo rp = request.getRequestProcessor(); rp.setStage(org.apache.coyote.Constants.STAGE_PARSE); // Setting up the socket this.socket = socket; - input = socket.getInputStream(); - output = socket.getOutputStream(); + input = socket.getSocket().getInputStream(); + output = socket.getSocket().getOutputStream(); int soTimeout = -1; if (keepAliveTimeout > 0) { - soTimeout = socket.getSoTimeout(); + soTimeout = socket.getSocket().getSoTimeout(); } // Error flag @@ -380,7 +389,7 @@ public class AjpProcessor implements ActionHook { try { // Set keep alive timeout if enabled if (keepAliveTimeout > 0) { - socket.setSoTimeout(keepAliveTimeout); + socket.getSocket().setSoTimeout(keepAliveTimeout); } // Get first message of the request if (!readMessage(requestHeaderMessage)) { @@ -390,7 +399,7 @@ public class AjpProcessor implements ActionHook { } // Set back timeout if keep alive timeout is enabled if (keepAliveTimeout > 0) { - socket.setSoTimeout(soTimeout); + socket.getSocket().setSoTimeout(soTimeout); } // Check message type, process right away and break if // not regular request processing @@ -446,6 +455,10 @@ public class AjpProcessor implements ActionHook { error = true; } } + + if (async && !error) { + break; + } // Finish the response if not done yet if (!finished) { @@ -467,15 +480,66 @@ public class AjpProcessor implements ActionHook { recycle(); } + if (async && !error) { + rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); + socket.setAsync(true); + return SocketState.LONG; + } else { + rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); + recycle(); + input = null; + output = null; + return SocketState.CLOSED; + } + + } + + public SocketState asyncDispatch(SocketStatus status) throws IOException { + + RequestInfo rp = request.getRequestProcessor(); + try { + rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); + error = !adapter.asyncDispatch(request, response, status); + } catch (InterruptedIOException e) { + error = true; + } catch (Throwable t) { + log.error(sm.getString("http11processor.request.process"), t); + // 500 - Internal Server Error + response.setStatus(500); + error = true; + } rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); - recycle(); - input = null; - output = null; + + if (async) { + if (error) { + socket.setAsync(false); + response.setStatus(500); + request.updateCounters(); + recycle(); + input = null; + output = null; + return SocketState.CLOSED; + } else { + return SocketState.LONG; + } + } else { + socket.setAsync(false); + if (error) { + response.setStatus(500); + } + request.updateCounters(); + recycle(); + input = null; + output = null; + return SocketState.CLOSED; + } + + + } - // ----------------------------------------------------- ActionHook Methods @@ -522,7 +586,7 @@ public class AjpProcessor implements ActionHook { } else if (actionCode == ActionCode.ACTION_CLOSE) { // Close - + async = false; // End the processing of the current request, and stop any further // transactions with the client @@ -602,6 +666,34 @@ public class AjpProcessor implements ActionHook { empty = false; replay = true; + } else if (actionCode == ActionCode.ACTION_ASYNC_START) { + //TODO SERVLET3 - async + async = true; + } else if (actionCode == ActionCode.ACTION_ASYNC_COMPLETE) { + //TODO SERVLET3 - async + AtomicBoolean dispatch = (AtomicBoolean)param; + RequestInfo rp = request.getRequestProcessor(); + if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) { //async handling + dispatch.set(true); + endpoint.processSocket(this.socket, SocketStatus.STOP); + } else { + dispatch.set(true); + } + } else if (actionCode == ActionCode.ACTION_ASYNC_SETTIMEOUT) { + //TODO SERVLET3 - async + if (param==null) return; + long timeout = ((Long)param).longValue(); + //if we are not piggy backing on a worker thread, set the timeout + socket.setTimeout(timeout); + } else if (actionCode == ActionCode.ACTION_ASYNC_DISPATCH) { + RequestInfo rp = request.getRequestProcessor(); + AtomicBoolean dispatch = (AtomicBoolean)param; + if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) {//async handling + endpoint.processSocket(this.socket, SocketStatus.OPEN); + dispatch.set(true); + } else { + dispatch.set(true); + } } diff --git a/java/org/apache/coyote/ajp/AjpProtocol.java b/java/org/apache/coyote/ajp/AjpProtocol.java index a01cadb51..ff6492957 100644 --- a/java/org/apache/coyote/ajp/AjpProtocol.java +++ b/java/org/apache/coyote/ajp/AjpProtocol.java @@ -22,6 +22,7 @@ import java.net.Socket; import java.net.URLEncoder; import java.util.Hashtable; import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; @@ -320,6 +321,8 @@ public class AjpProtocol protected AjpProtocol proto; protected AtomicLong registerCount = new AtomicLong(0); protected RequestGroupInfo global = new RequestGroupInfo(); + protected ConcurrentHashMap connections = + new ConcurrentHashMap(); protected ConcurrentLinkedQueue recycledProcessors = new ConcurrentLinkedQueue() { @@ -364,23 +367,28 @@ public class AjpProtocol this.proto = proto; } - public SocketState process(SocketWrapper socket, SocketStatus status) { - throw new UnsupportedOperationException(); + public SocketState process(SocketWrapper socket) { + return process(socket,SocketStatus.OPEN); } - public SocketState process(SocketWrapper socket) { - AjpProcessor processor = recycledProcessors.poll(); + public SocketState process(SocketWrapper socket, SocketStatus status) { + AjpProcessor processor = connections.remove(socket); try { - + if (processor == null) { + processor = recycledProcessors.poll(); + } if (processor == null) { processor = createProcessor(); } - processor.action(ActionCode.ACTION_START, null); - processor.process(socket.getSocket()); - return SocketState.CLOSED; - + SocketState state = socket.isAsync()?processor.asyncDispatch(status):processor.process(socket); + if (state == SocketState.LONG) { + connections.put(socket, processor); + } else { + connections.remove(socket); + } + return state; } catch(java.net.SocketException e) { // SocketExceptions are normal AjpProtocol.log.debug -- 2.11.0