Tune the connector, next step is to add the ability to have more than one poller...
authorfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Wed, 9 Aug 2006 12:26:11 +0000 (12:26 +0000)
committerfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Wed, 9 Aug 2006 12:26:11 +0000 (12:26 +0000)
git-svn-id: https://svn.apache.org/repos/asf/tomcat/tc6.0.x/trunk@430043 13f79535-47bb-0310-9956-ffa450edef68

java/org/apache/coyote/http11/InternalNioInputBuffer.java
java/org/apache/tomcat/util/net/NioEndpoint.java

index 8816381..ada5fd1 100644 (file)
@@ -569,7 +569,7 @@ public class InternalNioInputBuffer implements InputBuffer {
                         //to do, add in a check, we might have just timed out on the wait,
                         //so there is no need to register us again.
                         boolean addToQueue = false;
-                        try { addToQueue = ((key.interestOps()&SelectionKey.OP_READ) != SelectionKey.OP_READ); } catch ( CancelledKeyException ckx ){ throw new IOException("Socket key cancelled.");}
+                        try { addToQueue = ((att.interestOps()&SelectionKey.OP_READ) != SelectionKey.OP_READ); } catch ( CancelledKeyException ckx ){ throw new IOException("Socket key cancelled.");}
                         if ( addToQueue ) {
                             synchronized (att.getMutex()) {
                                 addToReadQueue(key, att);
@@ -591,7 +591,10 @@ public class InternalNioInputBuffer implements InputBuffer {
             new Runnable() {
             public void run() {
                 try {
-                    if (key != null) key.interestOps(SelectionKey.OP_READ);
+                    if (key != null) {
+                        key.interestOps(SelectionKey.OP_READ);
+                        att.interestOps(SelectionKey.OP_READ);
+                    }
                 } catch (CancelledKeyException ckx) {
                     try {
                         if ( att != null ) {
index 404ca3e..7b4bbd3 100644 (file)
@@ -40,6 +40,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler;
 import org.apache.tomcat.util.res.StringManager;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * NIO tailored thread pool, providing the following services:
@@ -868,12 +870,15 @@ public class NioEndpoint {
     public class Poller implements Runnable {
 
         protected Selector selector;
-        protected LinkedList<Runnable> events = new LinkedList<Runnable>();
+        protected ConcurrentLinkedQueue events = new ConcurrentLinkedQueue();
+        
         protected boolean close = false;
         protected long nextExpiration = 0;//optimize expiration handling
 
         protected int keepAliveCount = 0;
         public int getKeepAliveCount() { return keepAliveCount; }
+        
+        protected AtomicLong wakeupCounter = new AtomicLong(0l);
 
 
 
@@ -909,10 +914,11 @@ public class NioEndpoint {
         }
         
         public void addEvent(Runnable event) {
-            synchronized (events) {
-                events.add(event);
-            }
-            selector.wakeup();
+            //synchronized (events) {
+            //    events.add(event);
+            //}
+            events.offer(event);
+            if ( wakeupCounter.incrementAndGet() < 3 ) selector.wakeup();
         }
 
         /**
@@ -925,12 +931,15 @@ public class NioEndpoint {
          */
         public void add(final NioChannel socket) {
             final SelectionKey key = socket.getIOChannel().keyFor(selector);
-            KeyAttachment att = (KeyAttachment)key.attachment();
+            final KeyAttachment att = (KeyAttachment)key.attachment();
             if ( att != null ) att.setWakeUp(false);
             Runnable r = new Runnable() {
                 public void run() {
                     try {
-                        if (key != null) key.interestOps(SelectionKey.OP_READ);
+                        if (key != null) {
+                            key.interestOps(SelectionKey.OP_READ);
+                            att.interestOps(SelectionKey.OP_READ);
+                        }
                     }catch ( CancelledKeyException ckx ) {
                         try {
                             if ( key != null && key.attachment() != null ) {
@@ -948,24 +957,23 @@ public class NioEndpoint {
 
         public boolean events() {
             boolean result = false;
-            synchronized (events) {
+            //synchronized (events) {
                 Runnable r = null;
                 result = (events.size() > 0);
-                while ( (events.size() > 0) && (r = events.removeFirst()) != null ) {
+                while ( (r = (Runnable)events.poll()) != null ) {
                     try {
                         r.run();
                     } catch ( Exception x ) {
                         log.error("",x);
                     }
                 }
-                events.clear();
-            }
+                //events.clear();
+            //}
             return result;
         }
         
         public void register(final NioChannel socket)
         {
-            SelectionKey key = socket.getIOChannel().keyFor(selector);
             Runnable r = new Runnable() {
                 public void run() {
                     try {
@@ -1017,6 +1025,7 @@ public class NioEndpoint {
 
                 int keyCount = 0;
                 try {
+                    wakeupCounter.set(0);
                     keyCount = selector.select(selectorTimeout);
                 } catch (Throwable x) {
                     log.error("",x);
@@ -1041,7 +1050,8 @@ public class NioEndpoint {
                             attachment.access();
                             sk.attach(attachment);
                             int readyOps = sk.readyOps();
-                            sk.interestOps(sk.interestOps() & ~readyOps);
+                            sk.interestOps(0);
+                            attachment.interestOps(0);
                             NioChannel channel = attachment.getChannel();
                             if (sk.isReadable() || sk.isWritable() ) {
                                 if ( attachment.getWakeUp() ) {
@@ -1091,7 +1101,7 @@ public class NioEndpoint {
                         cancelledKey(key); //we don't support any keys without attachments
                     } else if ( ka.getError() ) {
                         cancelledKey(key);
-                    }else if ((key.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ) {
+                    }else if ((ka.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ) {
                         //only timeout sockets that we are waiting for a read from
                         long delta = now - ka.getLastAccess();
                         long timeout = (ka.getTimeout()==-1)?((long) soTimeout):(ka.getTimeout());
@@ -1128,6 +1138,9 @@ public class NioEndpoint {
         public void setError(boolean error) { this.error = error; }
         public NioChannel getChannel() { return channel;}
         public void setChannel(NioChannel channel) { this.channel = channel;}
+        protected int interestOps = 0;
+        public int interestOps() { return interestOps;}
+        public int interestOps(int ops) { this.interestOps  = ops; return ops; }
         protected Object mutex = new Object();
         protected boolean wakeUp = false;
         protected long lastAccess = System.currentTimeMillis();
@@ -1276,15 +1289,17 @@ public class NioEndpoint {
                 } else {
                     final SelectionKey fk = key;
                     final int intops = handshake;
+                    final KeyAttachment ka = (KeyAttachment)fk.attachment();
                     //register for handshake ops
                     Runnable r = new Runnable() {
                         public void run() {
                             try {
                                 fk.interestOps(intops);
+                                ka.interestOps(intops);
                             } catch (CancelledKeyException ckx) {
                                 try {
                                     if ( fk != null && fk.attachment() != null ) {
-                                        KeyAttachment ka = (KeyAttachment)fk.attachment();
+                                        
                                         ka.setError(true); //set to collect this socket immediately
                                         try {ka.getChannel().getIOChannel().socket().close();}catch(Exception ignore){}
                                         try {ka.getChannel().close();}catch(Exception ignore){}
@@ -1326,6 +1341,8 @@ public class NioEndpoint {
         public NioBufferHandler(int readsize, int writesize) {
             readbuf = ByteBuffer.allocateDirect(readsize);
             writebuf = ByteBuffer.allocateDirect(writesize);
+//            readbuf = ByteBuffer.allocate(readsize);
+//            writebuf = ByteBuffer.allocate(writesize);
         }
         
         public ByteBuffer expand(ByteBuffer buffer, int remaining) {return buffer;}