From 9ea4545770c9d61b066bfa0619931570bd2053c2 Mon Sep 17 00:00:00 2001 From: fhanik Date: Fri, 13 Apr 2007 02:28:01 +0000 Subject: [PATCH] Fix the handling of the Http11NioProcessors when the thread pool can be shrinking and growing. So we are not associating the processor with a thread local, instead going directly to a pool of them git-svn-id: https://svn.apache.org/repos/asf/tomcat/tc6.0.x/trunk@528323 13f79535-47bb-0310-9956-ffa450edef68 --- java/org/apache/coyote/RequestInfo.java | 11 +++ .../apache/coyote/http11/Http11NioProcessor.java | 15 ++- .../apache/coyote/http11/Http11NioProtocol.java | 102 +++++++++++++++++---- java/org/apache/tomcat/util/net/NioEndpoint.java | 75 +++++++-------- webapps/docs/config/http.xml | 6 ++ 5 files changed, 144 insertions(+), 65 deletions(-) diff --git a/java/org/apache/coyote/RequestInfo.java b/java/org/apache/coyote/RequestInfo.java index 25956d9a4..6a84d772e 100644 --- a/java/org/apache/coyote/RequestInfo.java +++ b/java/org/apache/coyote/RequestInfo.java @@ -17,6 +17,8 @@ package org.apache.coyote; +import javax.management.ObjectName; + /** * Structure holding the Request and Response objects. It also holds statistical @@ -63,6 +65,7 @@ public class RequestInfo { Response res; int stage = Constants.STAGE_NEW; String workerThreadName; + ObjectName rpName; // -------------------- Information about the current request ----------- // This is usefull for long-running requests only @@ -217,7 +220,15 @@ public class RequestInfo { return workerThreadName; } + public ObjectName getRpName() { + return rpName; + } + public void setWorkerThreadName(String workerThreadName) { this.workerThreadName = workerThreadName; } + + public void setRpName(ObjectName rpName) { + this.rpName = rpName; + } } diff --git a/java/org/apache/coyote/http11/Http11NioProcessor.java b/java/org/apache/coyote/http11/Http11NioProcessor.java index 0fcdeb90a..8f091ece0 100644 --- a/java/org/apache/coyote/http11/Http11NioProcessor.java +++ b/java/org/apache/coyote/http11/Http11NioProcessor.java @@ -753,16 +753,14 @@ public class Http11NioProcessor implements ActionHook { try { rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); error = !adapter.event(request, response, status); - SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); - if ( key != null ) { - NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment(); - if ( attach!=null ) { + if ( !error ) { + NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getAttachment(false); + if (attach != null) { attach.setComet(comet); - Integer comettimeout = (Integer)request.getAttribute("org.apache.tomcat.comet.timeout"); - if ( comettimeout != null ) attach.setTimeout(comettimeout.longValue()); + Integer comettimeout = (Integer) request.getAttribute("org.apache.tomcat.comet.timeout"); + if (comettimeout != null) attach.setTimeout(comettimeout.longValue()); } } - } catch (InterruptedIOException e) { error = true; } catch (Throwable t) { @@ -1072,7 +1070,8 @@ public class Http11NioProcessor implements ActionHook { //if this is a comet connection //then execute the connection closure at the next selector loop request.getAttributes().remove("org.apache.tomcat.comet.timeout"); - attach.setError(true); + //attach.setTimeout(5000); //force a cleanup in 5 seconds + //attach.setError(true); //this has caused concurrency errors } } diff --git a/java/org/apache/coyote/http11/Http11NioProtocol.java b/java/org/apache/coyote/http11/Http11NioProtocol.java index ca4089c4d..d313adf5c 100644 --- a/java/org/apache/coyote/http11/Http11NioProtocol.java +++ b/java/org/apache/coyote/http11/Http11NioProtocol.java @@ -22,7 +22,9 @@ import java.net.URLEncoder; import java.util.Hashtable; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; import javax.management.MBeanRegistration; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -212,6 +214,7 @@ public class Http11NioProtocol implements ProtocolHandler, MBeanRegistration private int timeout = 300000; // 5 minutes as in Apache HTTPD server private int maxSavePostSize = 4 * 1024; private int maxHttpHeaderSize = 8 * 1024; + protected int processorCache = 200; //max number of Http11NioProcessor objects cached private int socketCloseDelay=-1; private boolean disableUploadTimeout = true; private int socketBuffer = 9000; @@ -534,6 +537,10 @@ public class Http11NioProtocol implements ProtocolHandler, MBeanRegistration setAttribute("timeout", "" + timeouts); } + public void setProcessorCache(int processorCache) { + this.processorCache = processorCache; + } + public void setOomParachute(int oomParachute) { ep.setOomParachute(oomParachute); setAttribute("oomParachute",oomParachute); @@ -580,12 +587,42 @@ public class Http11NioProtocol implements ProtocolHandler, MBeanRegistration protected static int count = 0; protected RequestGroupInfo global = new RequestGroupInfo(); - protected ThreadLocal localProcessor = - new ThreadLocal(); protected ConcurrentHashMap connections = new ConcurrentHashMap(); - protected java.util.Stack recycledProcessors = - new java.util.Stack(); + protected ConcurrentLinkedQueue recycledProcessors = new ConcurrentLinkedQueue() { + protected AtomicInteger size = new AtomicInteger(0); + public boolean offer(Http11NioProcessor processor) { + boolean offer = proto.processorCache==-1?true:size.get() < proto.processorCache; + //avoid over growing our cache or add after we have stopped + boolean result = false; + if ( offer ) { + result = super.offer(processor); + if ( result ) { + size.incrementAndGet(); + } + } + if (!result) deregister(processor); + return result; + } + + public Http11NioProcessor poll() { + Http11NioProcessor result = super.poll(); + if ( result != null ) { + size.decrementAndGet(); + } + return result; + } + + public void clear() { + Http11NioProcessor next = poll(); + while ( next != null ) { + deregister(next); + next = poll(); + } + super.clear(); + size.set(0); + } + }; Http11ConnectionHandler(Http11NioProtocol proto) { this.proto = proto; @@ -600,6 +637,7 @@ public class Http11NioProtocol implements ProtocolHandler, MBeanRegistration SocketState state = SocketState.CLOSED; if (result != null) { + if (log.isDebugEnabled()) log.debug("Http11NioProcessor.error="+result.error); // Call the appropriate event try { state = result.event(status); @@ -626,7 +664,9 @@ public class Http11NioProtocol implements ProtocolHandler, MBeanRegistration } finally { if (state != SocketState.LONG) { connections.remove(socket); - recycledProcessors.push(result); + recycledProcessors.offer(result); + } else { + if (log.isDebugEnabled()) log.debug("Keeping processor["+result); } } } @@ -636,14 +676,8 @@ public class Http11NioProtocol implements ProtocolHandler, MBeanRegistration public SocketState process(NioChannel socket) { Http11NioProcessor processor = null; try { - processor = (Http11NioProcessor) localProcessor.get(); if (processor == null) { - synchronized (recycledProcessors) { - if (!recycledProcessors.isEmpty()) { - processor = recycledProcessors.pop(); - localProcessor.set(processor); - } - } + processor = recycledProcessors.poll(); } if (processor == null) { processor = createProcessor(); @@ -669,9 +703,11 @@ public class Http11NioProtocol implements ProtocolHandler, MBeanRegistration // Associate the connection with the processor. The next request // processed by this thread will use either a new or a recycled // processor. + if (log.isDebugEnabled()) log.debug("Not recycling ["+processor+"] Comet="+((NioEndpoint.KeyAttachment)socket.getAttachment(false)).getComet()); connections.put(socket, processor); - localProcessor.set(null); socket.getPoller().add(socket); + } else { + recycledProcessors.offer(processor); } return state; @@ -696,6 +732,7 @@ public class Http11NioProtocol implements ProtocolHandler, MBeanRegistration Http11NioProtocol.log.error (sm.getString("http11protocol.proto.error"), e); } + recycledProcessors.offer(processor); return SocketState.CLOSED; } @@ -717,24 +754,51 @@ public class Http11NioProtocol implements ProtocolHandler, MBeanRegistration processor.setSocketBuffer(proto.socketBuffer); processor.setMaxSavePostSize(proto.maxSavePostSize); processor.setServer(proto.server); - localProcessor.set(processor); + register(processor); + return processor; + } + AtomicInteger registerCount = new AtomicInteger(0); + public void register(Http11NioProcessor processor) { if (proto.getDomain() != null) { synchronized (this) { try { + registerCount.addAndGet(1); + if (log.isDebugEnabled()) log.debug("Register ["+processor+"] count="+registerCount.get()); RequestInfo rp = processor.getRequest().getRequestProcessor(); rp.setGlobalProcessor(global); ObjectName rpName = new ObjectName - (proto.getDomain() + ":type=RequestProcessor,worker=" - + proto.getName() + ",name=HttpRequest" + count++); + (proto.getDomain() + ":type=RequestProcessor,worker=" + + proto.getName() + ",name=HttpRequest" + count++); Registry.getRegistry(null, null).registerComponent(rp, rpName, null); + rp.setRpName(rpName); } catch (Exception e) { log.warn("Error registering request"); } } } - return processor; } + + public void deregister(Http11NioProcessor processor) { + if (proto.getDomain() != null) { + synchronized (this) { + try { + registerCount.addAndGet(-1); + if (log.isDebugEnabled()) log.debug("Deregister ["+processor+"] count="+registerCount.get()); + RequestInfo rp = processor.getRequest().getRequestProcessor(); + rp.setGlobalProcessor(null); + ObjectName rpName = rp.getRpName(); + Registry.getRegistry(null, null).unregisterComponent(rpName); + rp.setRpName(null); + } catch (Exception e) { + log.warn("Error unregistering request", e); + } + } + } + } + } + + protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(Http11NioProtocol.class); @@ -753,6 +817,10 @@ public class Http11NioProtocol implements ProtocolHandler, MBeanRegistration return domain; } + public int getProcessorCache() { + return processorCache; + } + public int getOomParachute() { return ep.getOomParachute(); } diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java index e053a0be5..a8664e63a 100644 --- a/java/org/apache/tomcat/util/net/NioEndpoint.java +++ b/java/org/apache/tomcat/util/net/NioEndpoint.java @@ -341,7 +341,7 @@ public class NioEndpoint { protected boolean useExecutor = true; public void setUseExecutor(boolean useexec) { useExecutor = useexec;} - public boolean getUseExecutor() { return useExecutor;} + public boolean getUseExecutor() { return useExecutor || (executor!=null);} /** * Maximum amount of worker threads. @@ -1250,28 +1250,25 @@ public class NioEndpoint { } else { final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); try { + boolean cancel = false; if (key != null) { final KeyAttachment att = (KeyAttachment) key.attachment(); - //we are registering the key to start with, reset the fairness counter. - att.setFairness(0); - key.interestOps(interestOps); - att.interestOps(interestOps); - } - } - catch (CancelledKeyException ckx) { - try { - if (key != null && key.attachment() != null) { - KeyAttachment ka = (KeyAttachment) key.attachment(); - ka.setError(true); //set to collect this socket immediately - } - try { - socket.close(); + if ( att!=null ) { + //we are registering the key to start with, reset the fairness counter. + att.setFairness(0); + att.interestOps(interestOps); + key.interestOps(interestOps); + } else { + cancel = true; } - catch (Exception ignore) {} - if (socket.isOpen()) - socket.close(true); + } else { + cancel = true; } - catch (Exception ignore) {} + if ( cancel ) getPoller0().cancelledKey(key,SocketStatus.ERROR,false); + }catch (CancelledKeyException ckx) { + try { + getPoller0().cancelledKey(key,SocketStatus.DISCONNECT,true); + }catch (Exception ignore) {} } }//end if }//run @@ -1391,12 +1388,15 @@ public class NioEndpoint { try { if ( key == null ) return;//nothing to do KeyAttachment ka = (KeyAttachment) key.attachment(); - if (ka != null && ka.getComet()) { + if (ka != null && ka.getComet() && status != null) { //the comet event takes care of clean up - processSocket(ka.getChannel(), status, dispatch); + //processSocket(ka.getChannel(), status, dispatch); + ka.setComet(false);//to avoid a loop + processSocket(ka.getChannel(), status, false);//don't dispatch if the lines below are cancelling the key } if (key.isValid()) key.cancel(); if (key.channel().isOpen()) key.channel().close(); + try {ka.channel.close(true);}catch (Exception ignore){} key.attach(null); } catch (Throwable e) { if ( log.isDebugEnabled() ) log.error("",e); @@ -1521,8 +1521,7 @@ public class NioEndpoint { unreg(sk, attachment); boolean close = (!processSocket(channel)); if (close) { - channel.close(); - channel.getIOChannel().socket().close(); + cancelledKey(sk,SocketStatus.DISCONNECT,false); } attachment.setFairness(0); } else { @@ -1601,14 +1600,16 @@ public class NioEndpoint { nextExpiration = now + (long)socketProperties.getSoTimeout(); //timeout Set keys = selector.keys(); + int keycount = 0; for (Iterator iter = keys.iterator(); iter.hasNext(); ) { SelectionKey key = iter.next(); + keycount++; try { KeyAttachment ka = (KeyAttachment) key.attachment(); if ( ka == null ) { cancelledKey(key, SocketStatus.ERROR); //we don't support any keys without attachments } else if ( ka.getError() ) { - cancelledKey(key, SocketStatus.DISCONNECT); + cancelledKey(key, SocketStatus.ERROR); }else if ((ka.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ) { //only timeout sockets that we are waiting for a read from long delta = now - ka.getLastAccess(); @@ -1631,6 +1632,7 @@ public class NioEndpoint { cancelledKey(key, SocketStatus.ERROR); } }//for + if ( log.isDebugEnabled() ) log.debug("Poller processed "+keycount+" keys through timeout"); } } @@ -1873,10 +1875,7 @@ public class NioEndpoint { // Close socket and pool try { KeyAttachment att = (KeyAttachment)socket.getAttachment(true); - try {socket.close();}catch (Exception ignore){} - if ( socket.isOpen() ) socket.close(true); - key.cancel(); - key.attach(null); + getPoller0().cancelledKey(key,SocketStatus.ERROR,false); nioChannels.offer(socket); if ( att!=null ) keyCache.offer(att); }catch ( Exception x ) { @@ -1886,10 +1885,7 @@ public class NioEndpoint { // Close socket and pool try { KeyAttachment att = (KeyAttachment)socket.getAttachment(true); - try {socket.close();}catch (Exception ignore){} - if ( socket.isOpen() ) socket.close(true); - key.cancel(); - key.attach(null); + getPoller0().cancelledKey(key,SocketStatus.ERROR,false); nioChannels.offer(socket); if ( att!=null ) keyCache.offer(att); }catch ( Exception x ) { @@ -1898,7 +1894,6 @@ public class NioEndpoint { } } else if (handshake == -1 ) { socket.getPoller().cancelledKey(key,SocketStatus.DISCONNECT); - try {socket.close(true);}catch (IOException ignore){} nioChannels.offer(socket); } else { final SelectionKey fk = key; @@ -2080,13 +2075,14 @@ public class NioEndpoint { if (closed) { // Close socket and pool try { - KeyAttachment att = (KeyAttachment)socket.getAttachment(true); - try {socket.close();}catch (Exception ignore){} - if ( socket.isOpen() ) socket.close(true); - key.cancel(); - key.attach(null); + KeyAttachment ka = null; + if (key!=null) { + ka = (KeyAttachment) key.attachment(); + ka.setComet(false); + socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT, false); + } nioChannels.offer(socket); - if ( att!=null ) keyCache.offer(att); + if ( ka!=null ) keyCache.offer(ka); }catch ( Exception x ) { log.error("",x); } @@ -2097,7 +2093,6 @@ public class NioEndpoint { ka = (KeyAttachment) key.attachment(); socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT, false); } - try {socket.close(true);}catch (IOException ignore){} nioChannels.offer(socket); if ( ka!=null ) keyCache.offer(ka); } else { diff --git a/webapps/docs/config/http.xml b/webapps/docs/config/http.xml index f6e4b486f..6e2a715bc 100644 --- a/webapps/docs/config/http.xml +++ b/webapps/docs/config/http.xml @@ -450,6 +450,12 @@

(bool)Whether to allow comet servlets or not, Default value is true.

+ +

(int)The protocol handler caches Http11NioProcessor objects to speed up performance. + This setting dictates how many of these objects get cached. + -1 means unlimited, default is 200. Set this value somewhere close to your maxThreads value. +

+

(bool)Boolean value, whether to use direct ByteBuffers or java mapped ByteBuffers. Default is false
When you are using direct buffers, make sure you allocate the appropriate amount of memory for the -- 2.11.0