From: fhanik Date: Fri, 1 Jun 2007 17:20:06 +0000 (+0000) Subject: Thread safe handling of dealing with async writes and non blocking writes, needed... X-Git-Url: https://git.internetallee.de/?a=commitdiff_plain;h=5baf492e1c362e75a473fc16438804aa39bda4a8;p=tomcat7.0 Thread safe handling of dealing with async writes and non blocking writes, needed to separate it into a poller for incoming events and one poller for outgoing data Not thread safe for multiple async servlet threads writing at the same time, up to the comet developer to set it straight git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@543538 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/java/org/apache/tomcat/util/net/NioBlockingSelector.java b/java/org/apache/tomcat/util/net/NioBlockingSelector.java index 84dc2f94b..71b8ac56e 100644 --- a/java/org/apache/tomcat/util/net/NioBlockingSelector.java +++ b/java/org/apache/tomcat/util/net/NioBlockingSelector.java @@ -25,9 +25,43 @@ import java.util.concurrent.TimeUnit; import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment; import org.apache.tomcat.util.MutableInteger; +import java.nio.channels.Selector; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.juli.logging.Log; +import org.apache.juli.logging.LogFactory; +import java.util.Iterator; +import java.nio.channels.SocketChannel; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.CancelledKeyException; +import java.util.concurrent.CountDownLatch; public class NioBlockingSelector { + + protected static Log log = LogFactory.getLog(NioBlockingSelector.class); + + private static int threadCounter = 0; + + protected Selector sharedSelector; + + protected BlockPoller poller; public NioBlockingSelector() { + + } + + public void open(Selector selector) { + sharedSelector = selector; + poller = new BlockPoller(); + poller.selector = sharedSelector; + poller.setDaemon(true); + poller.setName("NioBlockingSelector.BlockPoller-"+(++threadCounter)); + poller.start(); + } + + public void close() { + if (poller!=null) { + poller.disable(); + poller.interrupt(); + } } /** @@ -42,11 +76,10 @@ public class NioBlockingSelector { * @throws SocketTimeoutException if the write times out * @throws IOException if an IO Exception occurs in the underlying socket logic */ - public static int write(ByteBuffer buf, NioChannel socket, long writeTimeout,MutableInteger lastWrite) throws IOException { + public int write(ByteBuffer buf, NioChannel socket, long writeTimeout,MutableInteger lastWrite) throws IOException { SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); if ( key == null ) throw new IOException("Key no longer registered"); KeyAttachment att = (KeyAttachment) key.attachment(); - int prevOps = att.interestOps() | (att.getCometOps()&NioEndpoint.OP_CALLBACK); int written = 0; boolean timedout = false; int keycount = 1; //assume we can write @@ -66,8 +99,7 @@ public class NioBlockingSelector { } try { if ( att.getWriteLatch()==null || att.getWriteLatch().getCount()==0) att.startWriteLatch(1); - //only register for write if a write has not yet been issued - if ( (att.interestOps() & SelectionKey.OP_WRITE) == 0) socket.getPoller().add(socket,prevOps|SelectionKey.OP_WRITE); + poller.add(att,SelectionKey.OP_WRITE); att.awaitWriteLatch(writeTimeout,TimeUnit.MILLISECONDS); }catch (InterruptedException ignore) { Thread.interrupted(); @@ -87,11 +119,11 @@ public class NioBlockingSelector { if (timedout) throw new SocketTimeoutException(); } finally { + poller.remove(att,SelectionKey.OP_WRITE); if (timedout && key != null) { cancelKey(socket, key); } } - socket.getPoller().add(socket,prevOps); return written; } @@ -108,11 +140,10 @@ public class NioBlockingSelector { * @throws SocketTimeoutException if the read times out * @throws IOException if an IO Exception occurs in the underlying socket logic */ - public static int read(ByteBuffer buf, NioChannel socket, long readTimeout) throws IOException { + public int read(ByteBuffer buf, NioChannel socket, long readTimeout) throws IOException { SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); if ( key == null ) throw new IOException("Key no longer registered"); KeyAttachment att = (KeyAttachment) key.attachment(); - int prevOps = att.interestOps() | (att.getCometOps()&NioEndpoint.OP_CALLBACK); int read = 0; boolean timedout = false; int keycount = 1; //assume we can write @@ -129,7 +160,7 @@ public class NioBlockingSelector { } try { if ( att.getReadLatch()==null || att.getReadLatch().getCount()==0) att.startReadLatch(1); - if ( (att.interestOps() & SelectionKey.OP_READ) == 0) socket.getPoller().add(socket,prevOps|SelectionKey.OP_READ); + poller.add(att,SelectionKey.OP_READ); att.awaitReadLatch(readTimeout,TimeUnit.MILLISECONDS); }catch (InterruptedException ignore) { Thread.interrupted(); @@ -148,11 +179,11 @@ public class NioBlockingSelector { if (timedout) throw new SocketTimeoutException(); } finally { + poller.remove(att,SelectionKey.OP_READ); if (timedout && key != null) { cancelKey(socket,key); } } - socket.getPoller().add(socket,prevOps); return read; } @@ -164,5 +195,136 @@ public class NioBlockingSelector { } }); } + + protected class BlockPoller extends Thread { + protected boolean run = true; + protected Selector selector = null; + protected ConcurrentLinkedQueue events = new ConcurrentLinkedQueue(); + public void disable() { run = false; selector.wakeup();} + + public void add(final KeyAttachment key, final int ops) { + Runnable r = new Runnable() { + public void run() { + SocketChannel ch = key.getChannel().getIOChannel(); + SelectionKey sk = ch.keyFor(selector); + try { + if (sk == null) { + sk = ch.register(selector, ops, key); + } else { + sk.interestOps(sk.interestOps() | ops); + } + }catch (ClosedChannelException cx) { + if (sk!=null) sk.cancel(); + } + } + }; + events.offer(r); + } + + public void remove(final KeyAttachment key, final int ops) { + Runnable r = new Runnable() { + public void run() { + SocketChannel ch = key.getChannel().getIOChannel(); + SelectionKey sk = ch.keyFor(selector); + try { + if (sk == null) { + if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) countDown(key.getWriteLatch()); + if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch()); + } else { + sk.interestOps(sk.interestOps() & (~ops)); + if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) countDown(key.getWriteLatch()); + if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch()); + if (sk.interestOps()==0) { + sk.cancel(); + sk.attach(null); + } + } + }catch (CancelledKeyException cx) { + if (sk!=null) { + sk.cancel(); + sk.attach(null); + } + } + } + }; + events.offer(r); + selector.wakeup(); + } + + + public boolean events() { + boolean result = false; + Runnable r = null; + result = (events.size() > 0); + while ( (r = (Runnable)events.poll()) != null ) { + r.run(); + result = true; + } + return result; + } + + public void run() { + while (run) { + try { + events(); + int keyCount = 0; + try { + keyCount = selector.select(1000); + if (!run) break; + }catch ( NullPointerException x ) { + //sun bug 5076772 on windows JDK 1.5 + if (selector==null) throw x; + continue; + } catch ( CancelledKeyException x ) { + //sun bug 5076772 on windows JDK 1.5 + continue; + } catch (Throwable x) { + log.error("",x); + continue; + } + + Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; + + // Walk through the collection of ready keys and dispatch + // any active event. + while (run && iterator != null && iterator.hasNext()) { + SelectionKey sk = (SelectionKey) iterator.next(); + KeyAttachment attachment = (KeyAttachment)sk.attachment(); + try { + attachment.access(); + iterator.remove(); ; + sk.interestOps(sk.interestOps() & (~sk.readyOps())); + if ( sk.isReadable() ) { + countDown(attachment.getReadLatch()); + } + if (sk.isWritable()) { + countDown(attachment.getWriteLatch()); + } + }catch (CancelledKeyException ckx) { + if (sk!=null) sk.cancel(); + countDown(attachment.getReadLatch()); + countDown(attachment.getWriteLatch()); + } + }//while + }catch ( Throwable t ) { + log.error("",t); + } + } + events.clear(); + try { + selector.selectNow();//cancel all remaining keys + }catch( Exception ignore ) { + if (log.isDebugEnabled())log.debug("",ignore); + } + } + + public void countDown(CountDownLatch latch) { + if ( latch == null ) return; + latch.countDown(); + } + + + + } } \ No newline at end of file diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java index 230c265aa..8c1ce1ea5 100644 --- a/java/org/apache/tomcat/util/net/NioEndpoint.java +++ b/java/org/apache/tomcat/util/net/NioEndpoint.java @@ -737,7 +737,7 @@ public class NioEndpoint { } if (oomParachute>0) reclaimParachute(true); - + selectorPool.open(); initialized = true; } @@ -832,6 +832,7 @@ public class NioEndpoint { } executor = null; } + } @@ -849,6 +850,7 @@ public class NioEndpoint { sslContext = null; initialized = false; releaseCaches(); + selectorPool.close(); } @@ -1473,13 +1475,7 @@ public class NioEndpoint { sk.attach(attachment);//cant remember why this is here NioChannel channel = attachment.getChannel(); if (sk.isReadable() || sk.isWritable() ) { - if ( sk.isReadable() && attachment.getReadLatch() != null ) { - unreg(sk, attachment,SelectionKey.OP_READ); - attachment.getReadLatch().countDown(); - } else if ( sk.isWritable() && attachment.getWriteLatch() != null ) { - unreg(sk, attachment,SelectionKey.OP_WRITE); - attachment.getWriteLatch().countDown(); - } else if ( attachment.getSendfileData() != null ) { + if ( attachment.getSendfileData() != null ) { processSendfile(sk,attachment,true); } else if ( attachment.getComet() ) { //check if thread is available @@ -1674,7 +1670,7 @@ public class NioEndpoint { public CountDownLatch getReadLatch() { return readLatch; } public CountDownLatch getWriteLatch() { return writeLatch; } protected CountDownLatch resetLatch(CountDownLatch latch) { - if ( latch.getCount() == 0 ) return null; + if ( latch==null || latch.getCount() == 0 ) return null; else throw new IllegalStateException("Latch must be at count 0"); } public void resetReadLatch() { readLatch = resetLatch(readLatch); } diff --git a/java/org/apache/tomcat/util/net/NioSelectorPool.java b/java/org/apache/tomcat/util/net/NioSelectorPool.java index 9b88631a5..b3b8cbbda 100644 --- a/java/org/apache/tomcat/util/net/NioSelectorPool.java +++ b/java/org/apache/tomcat/util/net/NioSelectorPool.java @@ -39,12 +39,19 @@ import org.apache.tomcat.util.MutableInteger; */ public class NioSelectorPool { + + public NioSelectorPool() { + } + protected static int threadCount = 0; protected static Log log = LogFactory.getLog(NioSelectorPool.class); protected final static boolean SHARED = Boolean.valueOf(System.getProperty("org.apache.tomcat.util.net.NioSelectorShared", "true")).booleanValue(); + + protected NioBlockingSelector blockingSelector; + protected Selector SHARED_SELECTOR; protected int maxSelectors = 200; @@ -107,6 +114,9 @@ public class NioSelectorPool { while ( (s = selectors.poll()) != null ) s.close(); spare.set(0); active.set(0); + if (blockingSelector!=null) { + blockingSelector.close(); + } if ( SHARED && getSharedSelector()!=null ) { getSharedSelector().close(); SHARED_SELECTOR = null; @@ -116,6 +126,11 @@ public class NioSelectorPool { public void open() throws IOException { enabled = true; getSharedSelector(); + if (SHARED) { + blockingSelector = new NioBlockingSelector(); + blockingSelector.open(getSharedSelector()); + } + } /** @@ -142,7 +157,7 @@ public class NioSelectorPool { buf = socket.getBufHandler().getWriteBuffer(); } if ( SHARED && block ) { - return NioBlockingSelector.write(buf,socket,writeTimeout,lastWrite); + return blockingSelector.write(buf,socket,writeTimeout,lastWrite); } SelectionKey key = null; int written = 0; @@ -215,7 +230,7 @@ public class NioSelectorPool { */ public int read(ByteBuffer buf, NioChannel socket, Selector selector, long readTimeout, boolean block) throws IOException { if ( SHARED && block ) { - return NioBlockingSelector.read(buf,socket,readTimeout); + return blockingSelector.read(buf,socket,readTimeout); } SelectionKey key = null; int read = 0;