workers.notify();
}
}
+ /**
+ * Process given socket.
+ */
+ protected boolean processSocket(NioChannel socket) {
+ return processSocket(socket,null);
+ }
/**
* Process given socket for an event.
*/
+ protected boolean processSocket(NioChannel socket, SocketStatus status) {
+ return processSocket(socket,status,true);
+ }
+
protected boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
try {
if (executor == null) {
public void addEvent(Runnable event) {
events.offer(event);
- if ( wakeupCounter.incrementAndGet() == 1 || wakeupCounter.get() > 5 ) selector.wakeup();
+ if ( wakeupCounter.incrementAndGet() < 3 ) selector.wakeup();
}
/**
int keyCount = 0;
try {
if ( !close ) {
- if ( wakeupCounter.get() > 0 )
- keyCount = selector.selectNow(); //we have events that need to be processed
- else
- keyCount = selector.select(selectorTimeout);
+ keyCount = selector.select(selectorTimeout);
wakeupCounter.set(0);
}
if (close) {
//check if thread is available
if ( isWorkerAvailable() ) {
unreg(sk, attachment);
- if (!processSocket(channel, SocketStatus.OPEN,true))
- processSocket(channel, SocketStatus.DISCONNECT,true);
+ if (!processSocket(channel, SocketStatus.OPEN))
+ processSocket(channel, SocketStatus.DISCONNECT);
attachment.setFairness(0);
} else {
//increase the fairness counter
//later on, improve latch behavior
if ( isWorkerAvailable() ) {
unreg(sk, attachment);
- boolean close = (!processSocket(channel,null,true));
+ boolean close = (!processSocket(channel));
if (close) {
cancelledKey(sk,SocketStatus.DISCONNECT,false);
}
cancelledKey(key, SocketStatus.ERROR,false);
}
}//for
+ if ( log.isDebugEnabled() ) log.debug("Poller processed "+keycount+" keys through timeout");
}
}
} finally {
//dereference socket to let GC do its job
socket = null;
- this.socket = null;
- key = null;
// Finish up this request
recycleWorkerThread(this);
}