From: fhanik Date: Wed, 9 Aug 2006 12:26:11 +0000 (+0000) Subject: Tune the connector, next step is to add the ability to have more than one poller... X-Git-Url: https://git.internetallee.de/?a=commitdiff_plain;h=2d8999085f2d63851e7f2fe6e7a995a18696ce5e;p=tomcat7.0 Tune the connector, next step is to add the ability to have more than one poller, this is achievable by simply having each poller have its own selector, exactly like it is done today git-svn-id: https://svn.apache.org/repos/asf/tomcat/tc6.0.x/trunk@430043 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/java/org/apache/coyote/http11/InternalNioInputBuffer.java b/java/org/apache/coyote/http11/InternalNioInputBuffer.java index 88163818f..ada5fd11d 100644 --- a/java/org/apache/coyote/http11/InternalNioInputBuffer.java +++ b/java/org/apache/coyote/http11/InternalNioInputBuffer.java @@ -569,7 +569,7 @@ public class InternalNioInputBuffer implements InputBuffer { //to do, add in a check, we might have just timed out on the wait, //so there is no need to register us again. boolean addToQueue = false; - try { addToQueue = ((key.interestOps()&SelectionKey.OP_READ) != SelectionKey.OP_READ); } catch ( CancelledKeyException ckx ){ throw new IOException("Socket key cancelled.");} + try { addToQueue = ((att.interestOps()&SelectionKey.OP_READ) != SelectionKey.OP_READ); } catch ( CancelledKeyException ckx ){ throw new IOException("Socket key cancelled.");} if ( addToQueue ) { synchronized (att.getMutex()) { addToReadQueue(key, att); @@ -591,7 +591,10 @@ public class InternalNioInputBuffer implements InputBuffer { new Runnable() { public void run() { try { - if (key != null) key.interestOps(SelectionKey.OP_READ); + if (key != null) { + key.interestOps(SelectionKey.OP_READ); + att.interestOps(SelectionKey.OP_READ); + } } catch (CancelledKeyException ckx) { try { if ( att != null ) { diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java index 404ca3e12..7b4bbd331 100644 --- a/java/org/apache/tomcat/util/net/NioEndpoint.java +++ b/java/org/apache/tomcat/util/net/NioEndpoint.java @@ -40,6 +40,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler; import org.apache.tomcat.util.res.StringManager; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; /** * NIO tailored thread pool, providing the following services: @@ -868,12 +870,15 @@ public class NioEndpoint { public class Poller implements Runnable { protected Selector selector; - protected LinkedList events = new LinkedList(); + protected ConcurrentLinkedQueue events = new ConcurrentLinkedQueue(); + protected boolean close = false; protected long nextExpiration = 0;//optimize expiration handling protected int keepAliveCount = 0; public int getKeepAliveCount() { return keepAliveCount; } + + protected AtomicLong wakeupCounter = new AtomicLong(0l); @@ -909,10 +914,11 @@ public class NioEndpoint { } public void addEvent(Runnable event) { - synchronized (events) { - events.add(event); - } - selector.wakeup(); + //synchronized (events) { + // events.add(event); + //} + events.offer(event); + if ( wakeupCounter.incrementAndGet() < 3 ) selector.wakeup(); } /** @@ -925,12 +931,15 @@ public class NioEndpoint { */ public void add(final NioChannel socket) { final SelectionKey key = socket.getIOChannel().keyFor(selector); - KeyAttachment att = (KeyAttachment)key.attachment(); + final KeyAttachment att = (KeyAttachment)key.attachment(); if ( att != null ) att.setWakeUp(false); Runnable r = new Runnable() { public void run() { try { - if (key != null) key.interestOps(SelectionKey.OP_READ); + if (key != null) { + key.interestOps(SelectionKey.OP_READ); + att.interestOps(SelectionKey.OP_READ); + } }catch ( CancelledKeyException ckx ) { try { if ( key != null && key.attachment() != null ) { @@ -948,24 +957,23 @@ public class NioEndpoint { public boolean events() { boolean result = false; - synchronized (events) { + //synchronized (events) { Runnable r = null; result = (events.size() > 0); - while ( (events.size() > 0) && (r = events.removeFirst()) != null ) { + while ( (r = (Runnable)events.poll()) != null ) { try { r.run(); } catch ( Exception x ) { log.error("",x); } } - events.clear(); - } + //events.clear(); + //} return result; } public void register(final NioChannel socket) { - SelectionKey key = socket.getIOChannel().keyFor(selector); Runnable r = new Runnable() { public void run() { try { @@ -1017,6 +1025,7 @@ public class NioEndpoint { int keyCount = 0; try { + wakeupCounter.set(0); keyCount = selector.select(selectorTimeout); } catch (Throwable x) { log.error("",x); @@ -1041,7 +1050,8 @@ public class NioEndpoint { attachment.access(); sk.attach(attachment); int readyOps = sk.readyOps(); - sk.interestOps(sk.interestOps() & ~readyOps); + sk.interestOps(0); + attachment.interestOps(0); NioChannel channel = attachment.getChannel(); if (sk.isReadable() || sk.isWritable() ) { if ( attachment.getWakeUp() ) { @@ -1091,7 +1101,7 @@ public class NioEndpoint { cancelledKey(key); //we don't support any keys without attachments } else if ( ka.getError() ) { cancelledKey(key); - }else if ((key.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ) { + }else if ((ka.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ) { //only timeout sockets that we are waiting for a read from long delta = now - ka.getLastAccess(); long timeout = (ka.getTimeout()==-1)?((long) soTimeout):(ka.getTimeout()); @@ -1128,6 +1138,9 @@ public class NioEndpoint { public void setError(boolean error) { this.error = error; } public NioChannel getChannel() { return channel;} public void setChannel(NioChannel channel) { this.channel = channel;} + protected int interestOps = 0; + public int interestOps() { return interestOps;} + public int interestOps(int ops) { this.interestOps = ops; return ops; } protected Object mutex = new Object(); protected boolean wakeUp = false; protected long lastAccess = System.currentTimeMillis(); @@ -1276,15 +1289,17 @@ public class NioEndpoint { } else { final SelectionKey fk = key; final int intops = handshake; + final KeyAttachment ka = (KeyAttachment)fk.attachment(); //register for handshake ops Runnable r = new Runnable() { public void run() { try { fk.interestOps(intops); + ka.interestOps(intops); } catch (CancelledKeyException ckx) { try { if ( fk != null && fk.attachment() != null ) { - KeyAttachment ka = (KeyAttachment)fk.attachment(); + ka.setError(true); //set to collect this socket immediately try {ka.getChannel().getIOChannel().socket().close();}catch(Exception ignore){} try {ka.getChannel().close();}catch(Exception ignore){} @@ -1326,6 +1341,8 @@ public class NioEndpoint { public NioBufferHandler(int readsize, int writesize) { readbuf = ByteBuffer.allocateDirect(readsize); writebuf = ByteBuffer.allocateDirect(writesize); +// readbuf = ByteBuffer.allocate(readsize); +// writebuf = ByteBuffer.allocate(writesize); } public ByteBuffer expand(ByteBuffer buffer, int remaining) {return buffer;}