* Process given socket for an event.
*/
protected boolean processSocket(NioChannel socket, SocketStatus status) {
+ return processSocket(socket,status,true);
+ }
+
+ protected boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
try {
if (executor == null) {
getWorkerThread().assign(socket, status);
SocketProcessor sc = processorCache.poll();
if ( sc == null ) sc = new SocketProcessor(socket,status);
else sc.reset(socket,status);
- executor.execute(sc);
+ if ( dispatch ) executor.execute(sc);
+ else sc.run();
}
} catch (Throwable t) {
// This means we got an OOM or similar creating a thread, or that
public int getKeepAliveCount() { return keepAliveCount; }
protected AtomicLong wakeupCounter = new AtomicLong(0l);
+
+ protected CountDownLatch stopLatch = new CountDownLatch(1);
close = true;
events.clear();
selector.wakeup();
+ try { stopLatch.await(); } catch (InterruptedException ignore ) {}
}
public void addEvent(Runnable event) {
else r.reset(socket,ka,OP_REGISTER);
addEvent(r);
}
-
public void cancelledKey(SelectionKey key, SocketStatus status) {
+ cancelledKey(key, status, true);
+ }
+ public void cancelledKey(SelectionKey key, SocketStatus status, boolean dispatch) {
try {
KeyAttachment ka = (KeyAttachment) key.attachment();
if (ka != null && ka.getComet()) {
//the comet event takes care of clean up
- processSocket(ka.getChannel(), status);
+ processSocket(ka.getChannel(), status, dispatch);
}else {
if (key.isValid()) key.cancel();
if (key.channel().isOpen()) key.channel().close();
// Loop until we receive a shutdown command
while (running) {
// Loop if endpoint is paused
- while (paused) {
+ while (paused && (!close) ) {
try {
- Thread.sleep(1000);
+ Thread.sleep(500);
} catch (InterruptedException e) {
// Ignore
}
hasEvents = (hasEvents | events());
// Time to terminate?
- if (close) return;
-
+ if (close) {
+ timeout(0, false);
+ stopLatch.countDown();
+ return;
+ }
int keyCount = 0;
try {
- keyCount = selector.select(selectorTimeout);
- wakeupCounter.set(0);
- if ( close ) { selector.close(); return; }
+ if ( !close ) {
+ keyCount = selector.select(selectorTimeout);
+ wakeupCounter.set(0);
+ }
+ if (close) {
+ timeout(0, false);
+ stopLatch.countDown();
+ selector.close();
+ return;
+ }
} catch ( NullPointerException x ) {
//sun bug 5076772 on windows JDK 1.5
if ( wakeupCounter == null || selector == null ) throw x;
synchronized (this) {
this.notifyAll();
}
+ stopLatch.countDown();
}
protected boolean processKey(SelectionKey sk, KeyAttachment attachment) {
boolean result = true;
try {
- if ( sk.isValid() && attachment != null ) {
+ if ( close ) {
+ cancelledKey(sk, SocketStatus.STOP, false);
+ } else if ( sk.isValid() && attachment != null ) {
attachment.access();//make sure we don't time out valid sockets
sk.attach(attachment);//cant remember why this is here
NioChannel channel = attachment.getChannel();
long now = System.currentTimeMillis();
//don't process timeouts too frequently, but if the selector simply timed out
//then we can check timeouts to avoid gaps
- if ( (now < nextExpiration) && (keyCount>0 || hasEvents) ) return;
+ if ( (now < nextExpiration) && (keyCount>0 || hasEvents) && (!close) ) return;
nextExpiration = now + (long)socketProperties.getSoTimeout();
//timeout
Set<SelectionKey> keys = selector.keys();
long delta = now - ka.getLastAccess();
long timeout = (ka.getTimeout()==-1)?((long) socketProperties.getSoTimeout()):(ka.getTimeout());
boolean isTimedout = delta > timeout;
- if (isTimedout) {
+ if ( close ) {
+ key.interestOps(0);
+ ka.interestOps(0); //avoid duplicate stop calls
+ processKey(key,ka);
+ } else if (isTimedout) {
key.interestOps(0);
ka.interestOps(0); //avoid duplicate timeout calls
cancelledKey(key, SocketStatus.TIMEOUT);