\r
KeyAttachment att = (KeyAttachment) key.attachment();\r
try {\r
- att.startLatch(1);\r
- socket.getPoller().add(socket,SelectionKey.OP_WRITE);\r
+ if ( att.getLatch()==null || att.getLatch().getCount()==0) att.startLatch(1);\r
+ if ( att.interestOps() == 0) socket.getPoller().add(socket,SelectionKey.OP_WRITE);\r
att.getLatch().await(writeTimeout,TimeUnit.MILLISECONDS);\r
- att.resetLatch();\r
}catch (InterruptedException ignore) {\r
+ Thread.interrupted();\r
+ }\r
+ if ( att.getLatch()!=null && att.getLatch().getCount()> 0) {\r
+ //we got interrupted, but we haven't received notification from the poller.\r
+ keycount = 0;\r
+ }else {\r
+ //latch countdown has happened\r
+ keycount = 1;\r
+ att.resetLatch();\r
}\r
- if ( att.getLatch() == null ) keycount = 1;\r
- else keycount = 0;\r
+\r
if (writeTimeout > 0 && (keycount == 0))\r
timedout = (System.currentTimeMillis() - time) >= writeTimeout;\r
} //while\r
}\r
KeyAttachment att = (KeyAttachment) key.attachment();\r
try {\r
- att.startLatch(1);\r
- socket.getPoller().add(socket,SelectionKey.OP_READ);\r
+ if ( att.getLatch()==null || att.getLatch().getCount()==0) att.startLatch(1);\r
+ if ( att.interestOps() == 0) socket.getPoller().add(socket,SelectionKey.OP_READ);\r
att.getLatch().await(readTimeout,TimeUnit.MILLISECONDS);\r
- att.resetLatch();\r
}catch (InterruptedException ignore) {\r
+ Thread.interrupted();\r
+ }\r
+ if ( att.getLatch()!=null && att.getLatch().getCount()> 0) {\r
+ //we got interrupted, but we haven't received notification from the poller.\r
+ keycount = 0;\r
+ }else {\r
+ //latch countdown has happened\r
+ keycount = 1;\r
+ att.resetLatch();\r
}\r
- if ( att.getLatch() == null ) keycount = 1;\r
- else keycount = 0;\r
if (readTimeout > 0 && (keycount == 0))\r
timedout = (System.currentTimeMillis() - time) >= readTimeout;\r
} //while\r
serverSock = ServerSocketChannel.open();
InetSocketAddress addr = (address!=null?new InetSocketAddress(address,port):new InetSocketAddress(port));
- serverSock.socket().bind(addr,100); //todo, set backlog value
+ serverSock.socket().bind(addr,backlog);
serverSock.configureBlocking(true); //mimic APR behavior
// Initialize thread count defaults for acceptor, poller and sendfile
/**
+ * Returns true if a worker thread is available for processing.
+ * @return boolean
+ */
+ protected boolean isWorkerAvailable() {
+ if (workers.size() > 0) {
+ return true;
+ }
+ if ((maxThreads > 0) && (curThreads < maxThreads)) {
+ return true;
+ } else {
+ if (maxThreads < 0) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+ /**
* Create (or allocate) and return an available processor for use in
* processing a specific HTTP request, if possible. If the maximum
* allowed processors have already been created and are in use, return
// Accept the next incoming connection from the server socket
SocketChannel socket = serverSock.accept();
// Hand this socket off to an appropriate processor
+ //TODO FIXME - this is currently a blocking call, meaning we will be blocking
+ //further accepts until there is a thread available.
if ( running && (!paused) && socket != null ) processSocket(socket);
} catch (Throwable t) {
log.error(sm.getString("endpoint.accept.fail"), t);
if ( sk.isValid() && attachment != null ) {
attachment.access();
sk.attach(attachment);
+ int interestOps = sk.interestOps();
sk.interestOps(0); //this is a must, so that we don't have multiple threads messing with the socket
attachment.interestOps(0);
NioChannel channel = attachment.getChannel();
if (sk.isReadable() || sk.isWritable() ) {
if ( attachment.getComet() ) {
- if (!processSocket(channel, SocketStatus.OPEN))
- processSocket(channel, SocketStatus.DISCONNECT);
+ //check if thread is available
+ if ( isWorkerAvailable() ) {
+ if (!processSocket(channel, SocketStatus.OPEN))
+ processSocket(channel, SocketStatus.DISCONNECT);
+ } else {
+ //reregister it
+ attachment.interestOps(interestOps);
+ sk.interestOps(interestOps);
+ }
} else if ( attachment.getLatch() != null ) {
attachment.getLatch().countDown();
} else {
- //this sucker here dead locks with the count down latch
- //since this call is blocking if no threads are available.
- //TODO: FIXME BIG TIME
- boolean close = (!processSocket(channel));
- if ( close ) {
- channel.close();
- channel.getIOChannel().socket().close();
+ //later on, improve latch behavior
+ if ( isWorkerAvailable() ) {
+ boolean close = (!processSocket(channel));
+ if (close) {
+ channel.close();
+ channel.getIOChannel().socket().close();
+ }
+ } else {
+ //reregister it
+ attachment.interestOps(interestOps);
+ sk.interestOps(interestOps);
}
}
}
protected final static boolean SHARED =
Boolean.valueOf(System.getProperty("org.apache.tomcat.util.net.NioSelectorShared", "true")).booleanValue();
protected static Selector SHARED_SELECTOR;
+
+ protected int maxSelectors = 200;
+ protected int maxSpareSelectors = -1;
+ protected boolean enabled = true;
+ protected AtomicInteger active = new AtomicInteger(0);
+ protected AtomicInteger spare = new AtomicInteger(0);
+ protected ConcurrentLinkedQueue<Selector> selectors = new ConcurrentLinkedQueue<Selector>();
+
protected static Selector getSharedSelector() throws IOException {
if (SHARED && SHARED_SELECTOR == null) {
synchronized ( NioSelectorPool.class ) {
if ( SHARED_SELECTOR == null ) {
- SHARED_SELECTOR = Selector.open();
+ SHARED_SELECTOR = Selector.open();
log.info("Using a shared selector for servlet write/read");
- }
+ }
}
}
return SHARED_SELECTOR;
}
- protected int maxSelectors = 200;
- protected int maxSpareSelectors = -1;
- protected boolean enabled = true;
- protected AtomicInteger active = new AtomicInteger(0);
- protected AtomicInteger spare = new AtomicInteger(0);
- protected ConcurrentLinkedQueue<Selector> selectors = new ConcurrentLinkedQueue<Selector>();
public Selector get() throws IOException{
if ( SHARED ) {