// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = (SelectionKey) iterator.next();
- iterator.remove();
KeyAttachment attachment = (KeyAttachment)sk.attachment();
- try {
- if ( sk.isValid() && attachment != null ) {
- attachment.access();//make sure we don't time out valid sockets
- sk.attach(attachment);//cant remember why this is here
- int interestOps = sk.interestOps();//get the interestops, in case we need to reset them
- sk.interestOps(0); //this is a must, so that we don't have multiple threads messing with the socket
- attachment.interestOps(0);//fast access interestp ops
- NioChannel channel = attachment.getChannel();
- if (sk.isReadable() || sk.isWritable() ) {
- if ( attachment.getComet() ) {
- //check if thread is available
- if ( isWorkerAvailable() ) {
- if (!processSocket(channel, SocketStatus.OPEN))
- processSocket(channel, SocketStatus.DISCONNECT);
- } else {
- //increase the fairness counter
- attachment.incFairness();
- //reregister it
- attachment.interestOps(interestOps);
- sk.interestOps(interestOps);
- }
- } else if ( attachment.getLatch() != null ) {
- attachment.getLatch().countDown();
- } else {
- //later on, improve latch behavior
- if ( isWorkerAvailable() ) {
- boolean close = (!processSocket(channel));
- if (close) {
- channel.close();
- channel.getIOChannel().socket().close();
- }
- } else {
- //increase the fairness counter
- attachment.incFairness();
- //reregister it
- attachment.interestOps(interestOps);
- sk.interestOps(interestOps);
- }
- }
- }
- } else {
- //invalid key
- cancelledKey(sk, SocketStatus.ERROR);
- }
- } catch ( CancelledKeyException ckx ) {
- cancelledKey(sk, SocketStatus.ERROR);
- } catch (Throwable t) {
- log.error("",t);
- }
+ if ( processKey(sk, attachment) ) {
+ iterator.remove(); //only remove it if the key was processed.
+ }
}//while
+
//process timeouts
timeout(keyCount,hasEvents);
}//while
}
}
+
+ protected boolean processKey(SelectionKey sk, KeyAttachment attachment) {
+ boolean result = true;
+ try {
+ if ( sk.isValid() && attachment != null ) {
+ attachment.access();//make sure we don't time out valid sockets
+ sk.attach(attachment);//cant remember why this is here
+ NioChannel channel = attachment.getChannel();
+ if (sk.isReadable() || sk.isWritable() ) {
+ if ( attachment.getComet() ) {
+ //check if thread is available
+ if ( isWorkerAvailable() ) {
+ unreg(sk, attachment);
+ if (!processSocket(channel, SocketStatus.OPEN))
+ processSocket(channel, SocketStatus.DISCONNECT);
+ attachment.setFairness(0);
+ } else {
+ //increase the fairness counter
+ attachment.incFairness();
+ result = false;
+ }
+ } else if ( attachment.getLatch() != null ) {
+ unreg(sk, attachment);
+ attachment.getLatch().countDown();
+ } else {
+ //later on, improve latch behavior
+ if ( isWorkerAvailable() ) {
+ unreg(sk, attachment);
+ boolean close = (!processSocket(channel));
+ if (close) {
+ channel.close();
+ channel.getIOChannel().socket().close();
+ }
+ attachment.setFairness(0);
+ } else {
+ //increase the fairness counter
+ attachment.incFairness();
+ result = false;
+ }
+ }
+ }
+ } else {
+ //invalid key
+ cancelledKey(sk, SocketStatus.ERROR);
+ }
+ } catch ( CancelledKeyException ckx ) {
+ cancelledKey(sk, SocketStatus.ERROR);
+ } catch (Throwable t) {
+ log.error("",t);
+ }
+ return result;
+ }
+
+ protected void unreg(SelectionKey sk, KeyAttachment attachment) {
+ sk.interestOps(0); //this is a must, so that we don't have multiple threads messing with the socket
+ attachment.interestOps(0);//fast access interestp ops
+ }
+
protected void timeout(int keyCount, boolean hasEvents) {
long now = System.currentTimeMillis();
//don't process timeouts too frequently, but if the selector simply timed out
protected long lastRegistered = 0;
}
// ----------------------------------------------------- Key Fairness Comparator
- public static class KeyFairnessComparator implements Comparator<KeyAttachment> {
- public int compare(KeyAttachment ka1, KeyAttachment ka2) {
+ public static class KeyFairnessComparator implements Comparator<SelectionKey> {
+ public int compare(SelectionKey ska1, SelectionKey ska2) {
+ KeyAttachment ka1 = (KeyAttachment)ska1.attachment();
+ KeyAttachment ka2 = (KeyAttachment)ska2.attachment();
+ if ( ka1 == null && ka2 == null ) return 0;
+ if ( ka1 == null ) return 1; //invalid keys go last
+ if ( ka2 == null ) return -1; //invalid keys go last
long lr1 = ka1.getLastRegistered();
long lr2 = ka2.getLastRegistered();
int f1 = ka1.getFairness();
int f2 = ka2.getFairness();
- if ( f1 == f2 ) {
+ CountDownLatch lat1 = ka1.getLatch();
+ CountDownLatch lat2 = ka2.getLatch();
+ if ( lat1 != null && lat2 != null ) {
+ return 0;
+ } else if ( lat1 != null && lat2 == null ) {
+ //latches have highest priority
+ return -1;
+ } else if ( lat1 == null && lat2 != null ) {
+ return 1;
+ } else if ( f1 == f2 ) {
if ( lr1 == lr2 ) return 0;
//earlier objects have priorioty
else return lr1<lr2?-1:1;
return ka1.getFairness()>ka2.getFairness()?-1:1;
}
}
-
}
+
// ----------------------------------------------------- Worker Inner Class