Thread safe handling of dealing with async writes and non blocking writes, needed...
authorfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Fri, 1 Jun 2007 17:20:06 +0000 (17:20 +0000)
committerfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Fri, 1 Jun 2007 17:20:06 +0000 (17:20 +0000)
Not thread safe for multiple async servlet threads writing at the same time, up to the comet developer to set it straight

git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@543538 13f79535-47bb-0310-9956-ffa450edef68

java/org/apache/tomcat/util/net/NioBlockingSelector.java
java/org/apache/tomcat/util/net/NioEndpoint.java
java/org/apache/tomcat/util/net/NioSelectorPool.java

index 84dc2f9..71b8ac5 100644 (file)
@@ -25,9 +25,43 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment;
 import org.apache.tomcat.util.MutableInteger;
+import java.nio.channels.Selector;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import java.util.Iterator;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.CancelledKeyException;
+import java.util.concurrent.CountDownLatch;
 
 public class NioBlockingSelector {
+    
+    protected static Log log = LogFactory.getLog(NioBlockingSelector.class);
+    
+    private static int threadCounter = 0;
+    
+    protected Selector sharedSelector;
+    
+    protected BlockPoller poller;
     public NioBlockingSelector() {
+        
+    }
+    
+    public void open(Selector selector) {
+        sharedSelector = selector;
+        poller = new BlockPoller();
+        poller.selector = sharedSelector;
+        poller.setDaemon(true);
+        poller.setName("NioBlockingSelector.BlockPoller-"+(++threadCounter));
+        poller.start();
+    }
+    
+    public void close() {
+        if (poller!=null) {
+            poller.disable();
+            poller.interrupt();
+        }
     }
 
     /**
@@ -42,11 +76,10 @@ public class NioBlockingSelector {
      * @throws SocketTimeoutException if the write times out
      * @throws IOException if an IO Exception occurs in the underlying socket logic
      */
-    public static int write(ByteBuffer buf, NioChannel socket, long writeTimeout,MutableInteger lastWrite) throws IOException {
+    public int write(ByteBuffer buf, NioChannel socket, long writeTimeout,MutableInteger lastWrite) throws IOException {
         SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
         if ( key == null ) throw new IOException("Key no longer registered");
         KeyAttachment att = (KeyAttachment) key.attachment();
-        int prevOps = att.interestOps() | (att.getCometOps()&NioEndpoint.OP_CALLBACK);
         int written = 0;
         boolean timedout = false;
         int keycount = 1; //assume we can write
@@ -66,8 +99,7 @@ public class NioBlockingSelector {
                 }
                 try {
                     if ( att.getWriteLatch()==null || att.getWriteLatch().getCount()==0) att.startWriteLatch(1);
-                    //only register for write if a write has not yet been issued
-                    if ( (att.interestOps() & SelectionKey.OP_WRITE) == 0) socket.getPoller().add(socket,prevOps|SelectionKey.OP_WRITE);
+                    poller.add(att,SelectionKey.OP_WRITE);
                     att.awaitWriteLatch(writeTimeout,TimeUnit.MILLISECONDS);
                 }catch (InterruptedException ignore) {
                     Thread.interrupted();
@@ -87,11 +119,11 @@ public class NioBlockingSelector {
             if (timedout) 
                 throw new SocketTimeoutException();
         } finally {
+            poller.remove(att,SelectionKey.OP_WRITE);
             if (timedout && key != null) {
                 cancelKey(socket, key);
             }
         }
-        socket.getPoller().add(socket,prevOps);
         return written;
     }
 
@@ -108,11 +140,10 @@ public class NioBlockingSelector {
      * @throws SocketTimeoutException if the read times out
      * @throws IOException if an IO Exception occurs in the underlying socket logic
      */
-    public static int read(ByteBuffer buf, NioChannel socket, long readTimeout) throws IOException {
+    public int read(ByteBuffer buf, NioChannel socket, long readTimeout) throws IOException {
         SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
         if ( key == null ) throw new IOException("Key no longer registered");
         KeyAttachment att = (KeyAttachment) key.attachment();
-        int prevOps = att.interestOps() | (att.getCometOps()&NioEndpoint.OP_CALLBACK);
         int read = 0;
         boolean timedout = false;
         int keycount = 1; //assume we can write
@@ -129,7 +160,7 @@ public class NioBlockingSelector {
                 }
                 try {
                     if ( att.getReadLatch()==null || att.getReadLatch().getCount()==0) att.startReadLatch(1);
-                    if ( (att.interestOps() & SelectionKey.OP_READ) == 0) socket.getPoller().add(socket,prevOps|SelectionKey.OP_READ);
+                    poller.add(att,SelectionKey.OP_READ);
                     att.awaitReadLatch(readTimeout,TimeUnit.MILLISECONDS);
                 }catch (InterruptedException ignore) {
                     Thread.interrupted();
@@ -148,11 +179,11 @@ public class NioBlockingSelector {
             if (timedout)
                 throw new SocketTimeoutException();
         } finally {
+            poller.remove(att,SelectionKey.OP_READ);
             if (timedout && key != null) {
                 cancelKey(socket,key);
             }
         }
-        socket.getPoller().add(socket,prevOps);
         return read;
     }
 
@@ -164,5 +195,136 @@ public class NioBlockingSelector {
             }
         });
     }
+    
+    protected class BlockPoller extends Thread {
+        protected boolean run = true;
+        protected Selector selector = null;
+        protected ConcurrentLinkedQueue events = new ConcurrentLinkedQueue();
+        public void disable() { run = false; selector.wakeup();}
+        
+        public void add(final KeyAttachment key, final int ops) {
+            Runnable r = new Runnable() {
+                public void run() {
+                    SocketChannel ch = key.getChannel().getIOChannel();
+                    SelectionKey sk = ch.keyFor(selector);
+                    try {
+                        if (sk == null) {
+                            sk = ch.register(selector, ops, key);
+                        } else {
+                            sk.interestOps(sk.interestOps() | ops);
+                        }
+                    }catch (ClosedChannelException cx) {
+                        if (sk!=null) sk.cancel();
+                    }
+                }
+            };
+            events.offer(r);
+        }
+        
+        public void remove(final KeyAttachment key, final int ops) {
+            Runnable r = new Runnable() {
+                public void run() {
+                    SocketChannel ch = key.getChannel().getIOChannel();
+                    SelectionKey sk = ch.keyFor(selector);
+                    try {
+                        if (sk == null) {
+                            if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) countDown(key.getWriteLatch());
+                            if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch());
+                        } else {
+                            sk.interestOps(sk.interestOps() & (~ops));
+                            if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) countDown(key.getWriteLatch());
+                            if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch());
+                            if (sk.interestOps()==0) {
+                                sk.cancel();
+                                sk.attach(null);
+                            }
+                        }
+                    }catch (CancelledKeyException cx) {
+                        if (sk!=null) {
+                            sk.cancel();
+                            sk.attach(null);
+                        }
+                    }
+                }
+            };
+            events.offer(r);
+            selector.wakeup();
+        }
+
+
+        public boolean events() {
+            boolean result = false;
+            Runnable r = null;
+            result = (events.size() > 0);
+            while ( (r = (Runnable)events.poll()) != null ) {
+                r.run();
+                result = true;
+            }
+            return result;
+        }
+
+        public void run() {
+            while (run) {
+                try {
+                    events();
+                    int keyCount = 0;
+                    try {
+                        keyCount = selector.select(1000);
+                        if (!run) break;
+                    }catch ( NullPointerException x ) {
+                        //sun bug 5076772 on windows JDK 1.5
+                        if (selector==null) throw x;
+                        continue;
+                    } catch ( CancelledKeyException x ) {
+                        //sun bug 5076772 on windows JDK 1.5
+                        continue;
+                    } catch (Throwable x) {
+                        log.error("",x);
+                        continue;
+                    }
+
+                    Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
+
+                    // Walk through the collection of ready keys and dispatch
+                    // any active event.
+                    while (run && iterator != null && iterator.hasNext()) {
+                        SelectionKey sk = (SelectionKey) iterator.next();
+                        KeyAttachment attachment = (KeyAttachment)sk.attachment();
+                        try {
+                            attachment.access();
+                            iterator.remove(); ;
+                            sk.interestOps(sk.interestOps() & (~sk.readyOps()));
+                            if ( sk.isReadable() ) {
+                                countDown(attachment.getReadLatch());
+                            }
+                            if (sk.isWritable()) {
+                                countDown(attachment.getWriteLatch());
+                            }
+                        }catch (CancelledKeyException ckx) {
+                            if (sk!=null) sk.cancel();
+                            countDown(attachment.getReadLatch());
+                            countDown(attachment.getWriteLatch());
+                        }
+                    }//while
+                }catch ( Throwable t ) {
+                    log.error("",t);
+                }
+            }
+            events.clear();
+            try {
+                selector.selectNow();//cancel all remaining keys
+            }catch( Exception ignore ) {
+                if (log.isDebugEnabled())log.debug("",ignore);
+            }
+        }
+        
+        public void countDown(CountDownLatch latch) {
+            if ( latch == null ) return;
+            latch.countDown();
+        }
+        
+        
+        
+    }
 
 }
\ No newline at end of file
index 230c265..8c1ce1e 100644 (file)
@@ -737,7 +737,7 @@ public class NioEndpoint {
         }
         
         if (oomParachute>0) reclaimParachute(true);
-
+        selectorPool.open();
         initialized = true;
 
     }
@@ -832,6 +832,7 @@ public class NioEndpoint {
             }
             executor = null;
         }
+        
     }
 
 
@@ -849,6 +850,7 @@ public class NioEndpoint {
         sslContext = null;
         initialized = false;
         releaseCaches();
+        selectorPool.close();
     }
 
 
@@ -1473,13 +1475,7 @@ public class NioEndpoint {
                     sk.attach(attachment);//cant remember why this is here
                     NioChannel channel = attachment.getChannel();
                     if (sk.isReadable() || sk.isWritable() ) {
-                        if ( sk.isReadable() && attachment.getReadLatch() != null ) {
-                            unreg(sk, attachment,SelectionKey.OP_READ);
-                            attachment.getReadLatch().countDown();
-                        } else if ( sk.isWritable() && attachment.getWriteLatch() != null ) {
-                            unreg(sk, attachment,SelectionKey.OP_WRITE);
-                            attachment.getWriteLatch().countDown();
-                        } else if ( attachment.getSendfileData() != null ) {
+                        if ( attachment.getSendfileData() != null ) {
                             processSendfile(sk,attachment,true);
                         } else if ( attachment.getComet() ) {
                             //check if thread is available
@@ -1674,7 +1670,7 @@ public class NioEndpoint {
         public CountDownLatch getReadLatch() { return readLatch; }
         public CountDownLatch getWriteLatch() { return writeLatch; }
         protected CountDownLatch resetLatch(CountDownLatch latch) {
-            if ( latch.getCount() == 0 ) return null;
+            if ( latch==null || latch.getCount() == 0 ) return null;
             else throw new IllegalStateException("Latch must be at count 0");
         }
         public void resetReadLatch() { readLatch = resetLatch(readLatch); }
index 9b88631..b3b8cbb 100644 (file)
@@ -39,12 +39,19 @@ import org.apache.tomcat.util.MutableInteger;
  */
 
 public class NioSelectorPool {
+    
+    public NioSelectorPool() {
+    }
+    
     protected static int threadCount = 0;
     
     protected static Log log = LogFactory.getLog(NioSelectorPool.class);
 
     protected final static boolean SHARED =
         Boolean.valueOf(System.getProperty("org.apache.tomcat.util.net.NioSelectorShared", "true")).booleanValue();
+    
+    protected NioBlockingSelector blockingSelector;
+    
     protected Selector SHARED_SELECTOR;
     
     protected int maxSelectors = 200;
@@ -107,6 +114,9 @@ public class NioSelectorPool {
         while ( (s = selectors.poll()) != null ) s.close();
         spare.set(0);
         active.set(0);
+        if (blockingSelector!=null) {
+            blockingSelector.close();
+        }
         if ( SHARED && getSharedSelector()!=null ) {
             getSharedSelector().close();
             SHARED_SELECTOR = null;
@@ -116,6 +126,11 @@ public class NioSelectorPool {
     public void open() throws IOException {
         enabled = true;
         getSharedSelector();
+        if (SHARED) {
+            blockingSelector = new NioBlockingSelector();
+            blockingSelector.open(getSharedSelector());
+        }
+
     }
 
     /**
@@ -142,7 +157,7 @@ public class NioSelectorPool {
             buf = socket.getBufHandler().getWriteBuffer();
         }
         if ( SHARED && block ) {
-            return NioBlockingSelector.write(buf,socket,writeTimeout,lastWrite);
+            return blockingSelector.write(buf,socket,writeTimeout,lastWrite);
         }
         SelectionKey key = null;
         int written = 0;
@@ -215,7 +230,7 @@ public class NioSelectorPool {
      */
     public int read(ByteBuffer buf, NioChannel socket, Selector selector, long readTimeout, boolean block) throws IOException {
         if ( SHARED && block ) {
-            return NioBlockingSelector.read(buf,socket,readTimeout);
+            return blockingSelector.read(buf,socket,readTimeout);
         }
         SelectionKey key = null;
         int read = 0;