package org.apache.coyote;
+import javax.management.ObjectName;
+
/**
* Structure holding the Request and Response objects. It also holds statistical
Response res;
int stage = Constants.STAGE_NEW;
String workerThreadName;
+ ObjectName rpName;
// -------------------- Information about the current request -----------
// This is usefull for long-running requests only
return workerThreadName;
}
+ public ObjectName getRpName() {
+ return rpName;
+ }
+
public void setWorkerThreadName(String workerThreadName) {
this.workerThreadName = workerThreadName;
}
+
+ public void setRpName(ObjectName rpName) {
+ this.rpName = rpName;
+ }
}
try {
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
error = !adapter.event(request, response, status);
- SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
- if ( key != null ) {
- NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
- if ( attach!=null ) {
+ if ( !error ) {
+ NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
+ if (attach != null) {
attach.setComet(comet);
- Integer comettimeout = (Integer)request.getAttribute("org.apache.tomcat.comet.timeout");
- if ( comettimeout != null ) attach.setTimeout(comettimeout.longValue());
+ Integer comettimeout = (Integer) request.getAttribute("org.apache.tomcat.comet.timeout");
+ if (comettimeout != null) attach.setTimeout(comettimeout.longValue());
}
}
-
} catch (InterruptedIOException e) {
error = true;
} catch (Throwable t) {
//if this is a comet connection
//then execute the connection closure at the next selector loop
request.getAttributes().remove("org.apache.tomcat.comet.timeout");
- attach.setError(true);
+ //attach.setTimeout(5000); //force a cleanup in 5 seconds
+ //attach.setError(true); //this has caused concurrency errors
}
}
import java.util.Hashtable;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MBeanRegistration;
import javax.management.MBeanServer;
import javax.management.ObjectName;
private int timeout = 300000; // 5 minutes as in Apache HTTPD server
private int maxSavePostSize = 4 * 1024;
private int maxHttpHeaderSize = 8 * 1024;
+ protected int processorCache = 200; //max number of Http11NioProcessor objects cached
private int socketCloseDelay=-1;
private boolean disableUploadTimeout = true;
private int socketBuffer = 9000;
setAttribute("timeout", "" + timeouts);
}
+ public void setProcessorCache(int processorCache) {
+ this.processorCache = processorCache;
+ }
+
public void setOomParachute(int oomParachute) {
ep.setOomParachute(oomParachute);
setAttribute("oomParachute",oomParachute);
protected static int count = 0;
protected RequestGroupInfo global = new RequestGroupInfo();
- protected ThreadLocal<Http11NioProcessor> localProcessor =
- new ThreadLocal<Http11NioProcessor>();
protected ConcurrentHashMap<NioChannel, Http11NioProcessor> connections =
new ConcurrentHashMap<NioChannel, Http11NioProcessor>();
- protected java.util.Stack<Http11NioProcessor> recycledProcessors =
- new java.util.Stack<Http11NioProcessor>();
+ protected ConcurrentLinkedQueue<Http11NioProcessor> recycledProcessors = new ConcurrentLinkedQueue<Http11NioProcessor>() {
+ protected AtomicInteger size = new AtomicInteger(0);
+ public boolean offer(Http11NioProcessor processor) {
+ boolean offer = proto.processorCache==-1?true:size.get() < proto.processorCache;
+ //avoid over growing our cache or add after we have stopped
+ boolean result = false;
+ if ( offer ) {
+ result = super.offer(processor);
+ if ( result ) {
+ size.incrementAndGet();
+ }
+ }
+ if (!result) deregister(processor);
+ return result;
+ }
+
+ public Http11NioProcessor poll() {
+ Http11NioProcessor result = super.poll();
+ if ( result != null ) {
+ size.decrementAndGet();
+ }
+ return result;
+ }
+
+ public void clear() {
+ Http11NioProcessor next = poll();
+ while ( next != null ) {
+ deregister(next);
+ next = poll();
+ }
+ super.clear();
+ size.set(0);
+ }
+ };
Http11ConnectionHandler(Http11NioProtocol proto) {
this.proto = proto;
SocketState state = SocketState.CLOSED;
if (result != null) {
+ if (log.isDebugEnabled()) log.debug("Http11NioProcessor.error="+result.error);
// Call the appropriate event
try {
state = result.event(status);
} finally {
if (state != SocketState.LONG) {
connections.remove(socket);
- recycledProcessors.push(result);
+ recycledProcessors.offer(result);
+ } else {
+ if (log.isDebugEnabled()) log.debug("Keeping processor["+result);
}
}
}
public SocketState process(NioChannel socket) {
Http11NioProcessor processor = null;
try {
- processor = (Http11NioProcessor) localProcessor.get();
if (processor == null) {
- synchronized (recycledProcessors) {
- if (!recycledProcessors.isEmpty()) {
- processor = recycledProcessors.pop();
- localProcessor.set(processor);
- }
- }
+ processor = recycledProcessors.poll();
}
if (processor == null) {
processor = createProcessor();
// Associate the connection with the processor. The next request
// processed by this thread will use either a new or a recycled
// processor.
+ if (log.isDebugEnabled()) log.debug("Not recycling ["+processor+"] Comet="+((NioEndpoint.KeyAttachment)socket.getAttachment(false)).getComet());
connections.put(socket, processor);
- localProcessor.set(null);
socket.getPoller().add(socket);
+ } else {
+ recycledProcessors.offer(processor);
}
return state;
Http11NioProtocol.log.error
(sm.getString("http11protocol.proto.error"), e);
}
+ recycledProcessors.offer(processor);
return SocketState.CLOSED;
}
processor.setSocketBuffer(proto.socketBuffer);
processor.setMaxSavePostSize(proto.maxSavePostSize);
processor.setServer(proto.server);
- localProcessor.set(processor);
+ register(processor);
+ return processor;
+ }
+ AtomicInteger registerCount = new AtomicInteger(0);
+ public void register(Http11NioProcessor processor) {
if (proto.getDomain() != null) {
synchronized (this) {
try {
+ registerCount.addAndGet(1);
+ if (log.isDebugEnabled()) log.debug("Register ["+processor+"] count="+registerCount.get());
RequestInfo rp = processor.getRequest().getRequestProcessor();
rp.setGlobalProcessor(global);
ObjectName rpName = new ObjectName
- (proto.getDomain() + ":type=RequestProcessor,worker="
- + proto.getName() + ",name=HttpRequest" + count++);
+ (proto.getDomain() + ":type=RequestProcessor,worker="
+ + proto.getName() + ",name=HttpRequest" + count++);
Registry.getRegistry(null, null).registerComponent(rp, rpName, null);
+ rp.setRpName(rpName);
} catch (Exception e) {
log.warn("Error registering request");
}
}
}
- return processor;
}
+
+ public void deregister(Http11NioProcessor processor) {
+ if (proto.getDomain() != null) {
+ synchronized (this) {
+ try {
+ registerCount.addAndGet(-1);
+ if (log.isDebugEnabled()) log.debug("Deregister ["+processor+"] count="+registerCount.get());
+ RequestInfo rp = processor.getRequest().getRequestProcessor();
+ rp.setGlobalProcessor(null);
+ ObjectName rpName = rp.getRpName();
+ Registry.getRegistry(null, null).unregisterComponent(rpName);
+ rp.setRpName(null);
+ } catch (Exception e) {
+ log.warn("Error unregistering request", e);
+ }
+ }
+ }
+ }
+
}
+
+
protected static org.apache.juli.logging.Log log
= org.apache.juli.logging.LogFactory.getLog(Http11NioProtocol.class);
return domain;
}
+ public int getProcessorCache() {
+ return processorCache;
+ }
+
public int getOomParachute() {
return ep.getOomParachute();
}
protected boolean useExecutor = true;
public void setUseExecutor(boolean useexec) { useExecutor = useexec;}
- public boolean getUseExecutor() { return useExecutor;}
+ public boolean getUseExecutor() { return useExecutor || (executor!=null);}
/**
* Maximum amount of worker threads.
} else {
final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
+ boolean cancel = false;
if (key != null) {
final KeyAttachment att = (KeyAttachment) key.attachment();
- //we are registering the key to start with, reset the fairness counter.
- att.setFairness(0);
- key.interestOps(interestOps);
- att.interestOps(interestOps);
- }
- }
- catch (CancelledKeyException ckx) {
- try {
- if (key != null && key.attachment() != null) {
- KeyAttachment ka = (KeyAttachment) key.attachment();
- ka.setError(true); //set to collect this socket immediately
- }
- try {
- socket.close();
+ if ( att!=null ) {
+ //we are registering the key to start with, reset the fairness counter.
+ att.setFairness(0);
+ att.interestOps(interestOps);
+ key.interestOps(interestOps);
+ } else {
+ cancel = true;
}
- catch (Exception ignore) {}
- if (socket.isOpen())
- socket.close(true);
+ } else {
+ cancel = true;
}
- catch (Exception ignore) {}
+ if ( cancel ) getPoller0().cancelledKey(key,SocketStatus.ERROR,false);
+ }catch (CancelledKeyException ckx) {
+ try {
+ getPoller0().cancelledKey(key,SocketStatus.DISCONNECT,true);
+ }catch (Exception ignore) {}
}
}//end if
}//run
try {
if ( key == null ) return;//nothing to do
KeyAttachment ka = (KeyAttachment) key.attachment();
- if (ka != null && ka.getComet()) {
+ if (ka != null && ka.getComet() && status != null) {
//the comet event takes care of clean up
- processSocket(ka.getChannel(), status, dispatch);
+ //processSocket(ka.getChannel(), status, dispatch);
+ ka.setComet(false);//to avoid a loop
+ processSocket(ka.getChannel(), status, false);//don't dispatch if the lines below are cancelling the key
}
if (key.isValid()) key.cancel();
if (key.channel().isOpen()) key.channel().close();
+ try {ka.channel.close(true);}catch (Exception ignore){}
key.attach(null);
} catch (Throwable e) {
if ( log.isDebugEnabled() ) log.error("",e);
unreg(sk, attachment);
boolean close = (!processSocket(channel));
if (close) {
- channel.close();
- channel.getIOChannel().socket().close();
+ cancelledKey(sk,SocketStatus.DISCONNECT,false);
}
attachment.setFairness(0);
} else {
nextExpiration = now + (long)socketProperties.getSoTimeout();
//timeout
Set<SelectionKey> keys = selector.keys();
+ int keycount = 0;
for (Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext(); ) {
SelectionKey key = iter.next();
+ keycount++;
try {
KeyAttachment ka = (KeyAttachment) key.attachment();
if ( ka == null ) {
cancelledKey(key, SocketStatus.ERROR); //we don't support any keys without attachments
} else if ( ka.getError() ) {
- cancelledKey(key, SocketStatus.DISCONNECT);
+ cancelledKey(key, SocketStatus.ERROR);
}else if ((ka.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ) {
//only timeout sockets that we are waiting for a read from
long delta = now - ka.getLastAccess();
cancelledKey(key, SocketStatus.ERROR);
}
}//for
+ if ( log.isDebugEnabled() ) log.debug("Poller processed "+keycount+" keys through timeout");
}
}
// Close socket and pool
try {
KeyAttachment att = (KeyAttachment)socket.getAttachment(true);
- try {socket.close();}catch (Exception ignore){}
- if ( socket.isOpen() ) socket.close(true);
- key.cancel();
- key.attach(null);
+ getPoller0().cancelledKey(key,SocketStatus.ERROR,false);
nioChannels.offer(socket);
if ( att!=null ) keyCache.offer(att);
}catch ( Exception x ) {
// Close socket and pool
try {
KeyAttachment att = (KeyAttachment)socket.getAttachment(true);
- try {socket.close();}catch (Exception ignore){}
- if ( socket.isOpen() ) socket.close(true);
- key.cancel();
- key.attach(null);
+ getPoller0().cancelledKey(key,SocketStatus.ERROR,false);
nioChannels.offer(socket);
if ( att!=null ) keyCache.offer(att);
}catch ( Exception x ) {
}
} else if (handshake == -1 ) {
socket.getPoller().cancelledKey(key,SocketStatus.DISCONNECT);
- try {socket.close(true);}catch (IOException ignore){}
nioChannels.offer(socket);
} else {
final SelectionKey fk = key;
if (closed) {
// Close socket and pool
try {
- KeyAttachment att = (KeyAttachment)socket.getAttachment(true);
- try {socket.close();}catch (Exception ignore){}
- if ( socket.isOpen() ) socket.close(true);
- key.cancel();
- key.attach(null);
+ KeyAttachment ka = null;
+ if (key!=null) {
+ ka = (KeyAttachment) key.attachment();
+ ka.setComet(false);
+ socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT, false);
+ }
nioChannels.offer(socket);
- if ( att!=null ) keyCache.offer(att);
+ if ( ka!=null ) keyCache.offer(ka);
}catch ( Exception x ) {
log.error("",x);
}
ka = (KeyAttachment) key.attachment();
socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT, false);
}
- try {socket.close(true);}catch (IOException ignore){}
nioChannels.offer(socket);
if ( ka!=null ) keyCache.offer(ka);
} else {
<attribute name="useComet" required="false">
<p>(bool)Whether to allow comet servlets or not, Default value is true.</p>
</attribute>
+ <attribute name="processCache" required="false">
+ <p>(int)The protocol handler caches Http11NioProcessor objects to speed up performance.
+ This setting dictates how many of these objects get cached.
+ -1 means unlimited, default is 200. Set this value somewhere close to your maxThreads value.
+ </p>
+ </attribute>
<attribute name="socket.directBuffer" required="false">
<p>(bool)Boolean value, whether to use direct ByteBuffers or java mapped ByteBuffers. Default is <code>false</code>
<br/>When you are using direct buffers, make sure you allocate the appropriate amount of memory for the