From: fhanik Date: Mon, 22 Mar 2010 17:44:24 +0000 (+0000) Subject: More async stuff, implement timeout handling X-Git-Url: https://git.internetallee.de/?a=commitdiff_plain;h=8b1af84d69b538959c77e9c3c0a7991cab0c2144;p=tomcat7.0 More async stuff, implement timeout handling git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@926219 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/java/org/apache/coyote/ajp/AjpProtocol.java b/java/org/apache/coyote/ajp/AjpProtocol.java index b1a74e43c..a01cadb51 100644 --- a/java/org/apache/coyote/ajp/AjpProtocol.java +++ b/java/org/apache/coyote/ajp/AjpProtocol.java @@ -40,6 +40,7 @@ import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; import org.apache.tomcat.util.modeler.Registry; import org.apache.tomcat.util.net.JIoEndpoint; +import org.apache.tomcat.util.net.SocketStatus; import org.apache.tomcat.util.net.SocketWrapper; import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; import org.apache.tomcat.util.net.JIoEndpoint.Handler; @@ -362,6 +363,10 @@ public class AjpProtocol public AjpConnectionHandler(AjpProtocol proto) { this.proto = proto; } + + public SocketState process(SocketWrapper socket, SocketStatus status) { + throw new UnsupportedOperationException(); + } public SocketState process(SocketWrapper socket) { AjpProcessor processor = recycledProcessors.poll(); diff --git a/java/org/apache/coyote/http11/AbstractHttp11Protocol.java b/java/org/apache/coyote/http11/AbstractHttp11Protocol.java index 6fa73f31c..a9a306842 100644 --- a/java/org/apache/coyote/http11/AbstractHttp11Protocol.java +++ b/java/org/apache/coyote/http11/AbstractHttp11Protocol.java @@ -28,6 +28,7 @@ import javax.management.ObjectName; import org.apache.coyote.Adapter; import org.apache.coyote.ProtocolHandler; +import org.apache.juli.logging.Log; import org.apache.tomcat.util.modeler.Registry; import org.apache.tomcat.util.net.AbstractEndpoint; import org.apache.tomcat.util.net.SSLImplementation; @@ -39,7 +40,7 @@ public abstract class AbstractHttp11Protocol implements ProtocolHandler, MBeanRe */ protected static final StringManager sm = StringManager.getManager(Constants.Package); - private static final org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(AbstractHttp11Protocol.class); + protected abstract Log getLog(); protected ObjectName tpOname = null; protected ObjectName rgOname = null; @@ -63,8 +64,8 @@ public abstract class AbstractHttp11Protocol implements ProtocolHandler, MBeanRe * Pass config info */ public void setAttribute(String name, Object value) { - if (log.isTraceEnabled()) { - log.trace(sm.getString("http11protocol.setattribute", name, value)); + if (getLog().isTraceEnabled()) { + getLog().trace(sm.getString("http11protocol.setattribute", name, value)); } attributes.put(name, value); } @@ -119,27 +120,27 @@ public abstract class AbstractHttp11Protocol implements ProtocolHandler, MBeanRe try { endpoint.pause(); } catch (Exception ex) { - log.error(sm.getString("http11protocol.endpoint.pauseerror"), ex); + getLog().error(sm.getString("http11protocol.endpoint.pauseerror"), ex); throw ex; } - if(log.isInfoEnabled()) - log.info(sm.getString("http11protocol.pause", getName())); + if(getLog().isInfoEnabled()) + getLog().info(sm.getString("http11protocol.pause", getName())); } public void resume() throws Exception { try { endpoint.resume(); } catch (Exception ex) { - log.error(sm.getString("http11protocol.endpoint.resumeerror"), ex); + getLog().error(sm.getString("http11protocol.endpoint.resumeerror"), ex); throw ex; } - if(log.isInfoEnabled()) - log.info(sm.getString("http11protocol.resume", getName())); + if(getLog().isInfoEnabled()) + getLog().info(sm.getString("http11protocol.resume", getName())); } public void destroy() throws Exception { - if(log.isInfoEnabled()) - log.info(sm.getString("http11protocol.stop", getName())); + if(getLog().isInfoEnabled()) + getLog().info(sm.getString("http11protocol.stop", getName())); endpoint.destroy(); if( tpOname!=null ) Registry.getRegistry(null, null).unregisterComponent(tpOname); diff --git a/java/org/apache/coyote/http11/Http11NioProtocol.java b/java/org/apache/coyote/http11/Http11NioProtocol.java index 557842f29..c6374cf6a 100644 --- a/java/org/apache/coyote/http11/Http11NioProtocol.java +++ b/java/org/apache/coyote/http11/Http11NioProtocol.java @@ -28,6 +28,8 @@ import javax.management.ObjectName; import org.apache.coyote.ActionCode; import org.apache.coyote.RequestGroupInfo; import org.apache.coyote.RequestInfo; +import org.apache.juli.logging.Log; +import org.apache.juli.logging.LogFactory; import org.apache.tomcat.util.modeler.Registry; import org.apache.tomcat.util.net.NioChannel; import org.apache.tomcat.util.net.NioEndpoint; @@ -48,6 +50,10 @@ import org.apache.tomcat.util.net.jsse.JSSEImplementation; */ public class Http11NioProtocol extends AbstractHttp11Protocol { + private static final Log log = LogFactory.getLog(Http11NioProtocol.class); + + protected Log getLog() { return log; } + public Http11NioProtocol() { endpoint=new NioEndpoint(); cHandler = new Http11ConnectionHandler( this ); @@ -467,7 +473,5 @@ public class Http11NioProtocol extends AbstractHttp11Protocol { - private static final org.apache.juli.logging.Log log - = org.apache.juli.logging.LogFactory.getLog(Http11NioProtocol.class); } diff --git a/java/org/apache/coyote/http11/Http11Processor.java b/java/org/apache/coyote/http11/Http11Processor.java index 306ff0635..af76297d4 100644 --- a/java/org/apache/coyote/http11/Http11Processor.java +++ b/java/org/apache/coyote/http11/Http11Processor.java @@ -628,7 +628,7 @@ public class Http11Processor extends AbstractHttp11Processor implements ActionHo if (param==null) return; long timeout = ((Long)param).longValue(); //if we are not piggy backing on a worker thread, set the timeout - socket.setAsyncTimeout(timeout); + socket.setTimeout(timeout); } else if (actionCode == ActionCode.ACTION_ASYNC_DISPATCH) { RequestInfo rp = request.getRequestProcessor(); AtomicBoolean dispatch = (AtomicBoolean)param; diff --git a/java/org/apache/coyote/http11/Http11Protocol.java b/java/org/apache/coyote/http11/Http11Protocol.java index b4208518e..a82ccc2a2 100644 --- a/java/org/apache/coyote/http11/Http11Protocol.java +++ b/java/org/apache/coyote/http11/Http11Protocol.java @@ -29,6 +29,7 @@ import javax.management.ObjectName; import org.apache.coyote.ActionCode; import org.apache.coyote.RequestGroupInfo; import org.apache.coyote.RequestInfo; +import org.apache.juli.logging.Log; import org.apache.tomcat.util.modeler.Registry; import org.apache.tomcat.util.net.JIoEndpoint; import org.apache.tomcat.util.net.NioChannel; @@ -53,6 +54,8 @@ public class Http11Protocol extends AbstractHttp11Protocol { private static final org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(Http11Protocol.class); + + public Log getLog() { return log; } // ------------------------------------------------------------ Constructor @@ -233,8 +236,11 @@ public class Http11Protocol extends AbstractHttp11Protocol { this.proto = proto; } - public SocketState process(SocketWrapper socket) { + return process(socket,SocketStatus.OPEN); + } + + public SocketState process(SocketWrapper socket, SocketStatus status) { Http11Processor processor = connections.remove(socket); try { if (processor == null) { @@ -252,7 +258,7 @@ public class Http11Protocol extends AbstractHttp11Protocol { processor.setSSLSupport(null); } - SocketState state = socket.isAsync()?processor.asyncDispatch(SocketStatus.OPEN):processor.process(socket); + SocketState state = socket.isAsync()?processor.asyncDispatch(status):processor.process(socket); if (state == SocketState.LONG) { connections.put(socket, processor); } else { diff --git a/java/org/apache/tomcat/util/net/JIoEndpoint.java b/java/org/apache/tomcat/util/net/JIoEndpoint.java index 363d6b5e9..ff7e86404 100644 --- a/java/org/apache/tomcat/util/net/JIoEndpoint.java +++ b/java/org/apache/tomcat/util/net/JIoEndpoint.java @@ -22,6 +22,7 @@ import java.net.BindException; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; +import java.util.Iterator; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.RejectedExecutionException; @@ -117,12 +118,51 @@ public class JIoEndpoint extends AbstractEndpoint { */ public interface Handler { public SocketState process(SocketWrapper socket); + public SocketState process(SocketWrapper socket, SocketStatus status); } // --------------------------------------------------- Acceptor Inner Class + /** + * Async timeout thread + */ + protected class AsyncTimeout implements Runnable { + /** + * The background thread that listens for incoming TCP/IP connections and + * hands them off to an appropriate processor. + */ + public void run() { + // Loop until we receive a shutdown command + while (running) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Ignore + } + long now = System.currentTimeMillis(); + Iterator sockets = waitingRequests.iterator(); + while (sockets.hasNext()) { + SocketWrapper socket = sockets.next(); + long access = socket.getLastAccess(); + if ((now-access)>socket.getTimeout()) { + processSocket(socket,SocketStatus.TIMEOUT); + } + } + + // Loop if endpoint is paused + while (paused) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Ignore + } + } + + } + } + } /** * Server socket acceptor thread. */ @@ -185,11 +225,17 @@ public class JIoEndpoint extends AbstractEndpoint { protected class SocketProcessor implements Runnable { protected SocketWrapper socket = null; + protected SocketStatus status = null; public SocketProcessor(SocketWrapper socket) { this.socket = socket; } + public SocketProcessor(SocketWrapper socket, SocketStatus status) { + this.socket = socket; + this.status = status; + } + public void run() { SocketState state = SocketState.OPEN; // Process the request from this socket @@ -199,7 +245,7 @@ public class JIoEndpoint extends AbstractEndpoint { socket.setInitialized(true); if ( (state != SocketState.CLOSED) ) { - state = handler.process(socket); + state = (status==null)?handler.process(socket):handler.process(socket,status); } if (state == SocketState.CLOSED) { // Close socket @@ -442,10 +488,17 @@ public class JIoEndpoint extends AbstractEndpoint { public boolean processSocket(SocketWrapper socket, SocketStatus status) { try { - if (status == SocketStatus.OPEN || status == SocketStatus.STOP) { + if (status == SocketStatus.OPEN || status == SocketStatus.STOP || status == SocketStatus.TIMEOUT) { if (waitingRequests.remove(socket)) { - SocketProcessor proc = new SocketProcessor(socket); - getExecutor().execute(proc); + SocketProcessor proc = new SocketProcessor(socket,status); + ClassLoader loader = Thread.currentThread().getContextClassLoader(); + try { + //threads should not be created by the webapp classloader + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + getExecutor().execute(proc); + }finally { + Thread.currentThread().setContextClassLoader(loader); + } } } } catch (Throwable t) { diff --git a/java/org/apache/tomcat/util/net/SocketWrapper.java b/java/org/apache/tomcat/util/net/SocketWrapper.java index 07c7d4963..de8fe2dc7 100644 --- a/java/org/apache/tomcat/util/net/SocketWrapper.java +++ b/java/org/apache/tomcat/util/net/SocketWrapper.java @@ -30,7 +30,6 @@ public class SocketWrapper { protected boolean async = false; protected boolean keptAlive = false; protected boolean initialized = false; - protected long asyncTimeout = 0; public SocketWrapper(E socket) { reset(socket); @@ -62,7 +61,5 @@ public class SocketWrapper { public void setKeptAlive(boolean keptAlive) {this.keptAlive = keptAlive;} public boolean isInitialized() {return initialized;} public void setInitialized(boolean initialized) {this.initialized = initialized;} - public long getAsyncTimeout() {return asyncTimeout;} - public void setAsyncTimeout(long asyncTimeout) {this.asyncTimeout = asyncTimeout;} }