import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment;
import org.apache.tomcat.util.MutableInteger;
+import java.nio.channels.Selector;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import java.util.Iterator;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.CancelledKeyException;
+import java.util.concurrent.CountDownLatch;
public class NioBlockingSelector {
+
+ protected static Log log = LogFactory.getLog(NioBlockingSelector.class);
+
+ private static int threadCounter = 0;
+
+ protected Selector sharedSelector;
+
+ protected BlockPoller poller;
public NioBlockingSelector() {
+
+ }
+
+ public void open(Selector selector) {
+ sharedSelector = selector;
+ poller = new BlockPoller();
+ poller.selector = sharedSelector;
+ poller.setDaemon(true);
+ poller.setName("NioBlockingSelector.BlockPoller-"+(++threadCounter));
+ poller.start();
+ }
+
+ public void close() {
+ if (poller!=null) {
+ poller.disable();
+ poller.interrupt();
+ }
}
/**
* @throws SocketTimeoutException if the write times out
* @throws IOException if an IO Exception occurs in the underlying socket logic
*/
- public static int write(ByteBuffer buf, NioChannel socket, long writeTimeout,MutableInteger lastWrite) throws IOException {
+ public int write(ByteBuffer buf, NioChannel socket, long writeTimeout,MutableInteger lastWrite) throws IOException {
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
if ( key == null ) throw new IOException("Key no longer registered");
KeyAttachment att = (KeyAttachment) key.attachment();
- int prevOps = att.interestOps() | (att.getCometOps()&NioEndpoint.OP_CALLBACK);
int written = 0;
boolean timedout = false;
int keycount = 1; //assume we can write
}
try {
if ( att.getWriteLatch()==null || att.getWriteLatch().getCount()==0) att.startWriteLatch(1);
- //only register for write if a write has not yet been issued
- if ( (att.interestOps() & SelectionKey.OP_WRITE) == 0) socket.getPoller().add(socket,prevOps|SelectionKey.OP_WRITE);
+ poller.add(att,SelectionKey.OP_WRITE);
att.awaitWriteLatch(writeTimeout,TimeUnit.MILLISECONDS);
}catch (InterruptedException ignore) {
Thread.interrupted();
if (timedout)
throw new SocketTimeoutException();
} finally {
+ poller.remove(att,SelectionKey.OP_WRITE);
if (timedout && key != null) {
cancelKey(socket, key);
}
}
- socket.getPoller().add(socket,prevOps);
return written;
}
* @throws SocketTimeoutException if the read times out
* @throws IOException if an IO Exception occurs in the underlying socket logic
*/
- public static int read(ByteBuffer buf, NioChannel socket, long readTimeout) throws IOException {
+ public int read(ByteBuffer buf, NioChannel socket, long readTimeout) throws IOException {
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
if ( key == null ) throw new IOException("Key no longer registered");
KeyAttachment att = (KeyAttachment) key.attachment();
- int prevOps = att.interestOps() | (att.getCometOps()&NioEndpoint.OP_CALLBACK);
int read = 0;
boolean timedout = false;
int keycount = 1; //assume we can write
}
try {
if ( att.getReadLatch()==null || att.getReadLatch().getCount()==0) att.startReadLatch(1);
- if ( (att.interestOps() & SelectionKey.OP_READ) == 0) socket.getPoller().add(socket,prevOps|SelectionKey.OP_READ);
+ poller.add(att,SelectionKey.OP_READ);
att.awaitReadLatch(readTimeout,TimeUnit.MILLISECONDS);
}catch (InterruptedException ignore) {
Thread.interrupted();
if (timedout)
throw new SocketTimeoutException();
} finally {
+ poller.remove(att,SelectionKey.OP_READ);
if (timedout && key != null) {
cancelKey(socket,key);
}
}
- socket.getPoller().add(socket,prevOps);
return read;
}
}
});
}
+
+ protected class BlockPoller extends Thread {
+ protected boolean run = true;
+ protected Selector selector = null;
+ protected ConcurrentLinkedQueue events = new ConcurrentLinkedQueue();
+ public void disable() { run = false; selector.wakeup();}
+
+ public void add(final KeyAttachment key, final int ops) {
+ Runnable r = new Runnable() {
+ public void run() {
+ SocketChannel ch = key.getChannel().getIOChannel();
+ SelectionKey sk = ch.keyFor(selector);
+ try {
+ if (sk == null) {
+ sk = ch.register(selector, ops, key);
+ } else {
+ sk.interestOps(sk.interestOps() | ops);
+ }
+ }catch (ClosedChannelException cx) {
+ if (sk!=null) sk.cancel();
+ }
+ }
+ };
+ events.offer(r);
+ }
+
+ public void remove(final KeyAttachment key, final int ops) {
+ Runnable r = new Runnable() {
+ public void run() {
+ SocketChannel ch = key.getChannel().getIOChannel();
+ SelectionKey sk = ch.keyFor(selector);
+ try {
+ if (sk == null) {
+ if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) countDown(key.getWriteLatch());
+ if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch());
+ } else {
+ sk.interestOps(sk.interestOps() & (~ops));
+ if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) countDown(key.getWriteLatch());
+ if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch());
+ if (sk.interestOps()==0) {
+ sk.cancel();
+ sk.attach(null);
+ }
+ }
+ }catch (CancelledKeyException cx) {
+ if (sk!=null) {
+ sk.cancel();
+ sk.attach(null);
+ }
+ }
+ }
+ };
+ events.offer(r);
+ selector.wakeup();
+ }
+
+
+ public boolean events() {
+ boolean result = false;
+ Runnable r = null;
+ result = (events.size() > 0);
+ while ( (r = (Runnable)events.poll()) != null ) {
+ r.run();
+ result = true;
+ }
+ return result;
+ }
+
+ public void run() {
+ while (run) {
+ try {
+ events();
+ int keyCount = 0;
+ try {
+ keyCount = selector.select(1000);
+ if (!run) break;
+ }catch ( NullPointerException x ) {
+ //sun bug 5076772 on windows JDK 1.5
+ if (selector==null) throw x;
+ continue;
+ } catch ( CancelledKeyException x ) {
+ //sun bug 5076772 on windows JDK 1.5
+ continue;
+ } catch (Throwable x) {
+ log.error("",x);
+ continue;
+ }
+
+ Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
+
+ // Walk through the collection of ready keys and dispatch
+ // any active event.
+ while (run && iterator != null && iterator.hasNext()) {
+ SelectionKey sk = (SelectionKey) iterator.next();
+ KeyAttachment attachment = (KeyAttachment)sk.attachment();
+ try {
+ attachment.access();
+ iterator.remove(); ;
+ sk.interestOps(sk.interestOps() & (~sk.readyOps()));
+ if ( sk.isReadable() ) {
+ countDown(attachment.getReadLatch());
+ }
+ if (sk.isWritable()) {
+ countDown(attachment.getWriteLatch());
+ }
+ }catch (CancelledKeyException ckx) {
+ if (sk!=null) sk.cancel();
+ countDown(attachment.getReadLatch());
+ countDown(attachment.getWriteLatch());
+ }
+ }//while
+ }catch ( Throwable t ) {
+ log.error("",t);
+ }
+ }
+ events.clear();
+ try {
+ selector.selectNow();//cancel all remaining keys
+ }catch( Exception ignore ) {
+ if (log.isDebugEnabled())log.debug("",ignore);
+ }
+ }
+
+ public void countDown(CountDownLatch latch) {
+ if ( latch == null ) return;
+ latch.countDown();
+ }
+
+
+
+ }
}
\ No newline at end of file
*/
public class NioSelectorPool {
+
+ public NioSelectorPool() {
+ }
+
protected static int threadCount = 0;
protected static Log log = LogFactory.getLog(NioSelectorPool.class);
protected final static boolean SHARED =
Boolean.valueOf(System.getProperty("org.apache.tomcat.util.net.NioSelectorShared", "true")).booleanValue();
+
+ protected NioBlockingSelector blockingSelector;
+
protected Selector SHARED_SELECTOR;
protected int maxSelectors = 200;
while ( (s = selectors.poll()) != null ) s.close();
spare.set(0);
active.set(0);
+ if (blockingSelector!=null) {
+ blockingSelector.close();
+ }
if ( SHARED && getSharedSelector()!=null ) {
getSharedSelector().close();
SHARED_SELECTOR = null;
public void open() throws IOException {
enabled = true;
getSharedSelector();
+ if (SHARED) {
+ blockingSelector = new NioBlockingSelector();
+ blockingSelector.open(getSharedSelector());
+ }
+
}
/**
buf = socket.getBufHandler().getWriteBuffer();
}
if ( SHARED && block ) {
- return NioBlockingSelector.write(buf,socket,writeTimeout,lastWrite);
+ return blockingSelector.write(buf,socket,writeTimeout,lastWrite);
}
SelectionKey key = null;
int written = 0;
*/
public int read(ByteBuffer buf, NioChannel socket, Selector selector, long readTimeout, boolean block) throws IOException {
if ( SHARED && block ) {
- return NioBlockingSelector.read(buf,socket,readTimeout);
+ return blockingSelector.read(buf,socket,readTimeout);
}
SelectionKey key = null;
int read = 0;