*/
public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session";
-
+ public static final int OP_REGISTER = -1; //register interest op
// ----------------------------------------------------------------- Fields
*/
protected ServerSocketChannel serverSock = null;
+ /**
+ * Cache for key attachment objects
+ */
+ protected ConcurrentLinkedQueue<KeyAttachment> keyCache = new ConcurrentLinkedQueue<KeyAttachment>();
+
+ /**
+ * Cache for poller events
+ */
+ protected ConcurrentLinkedQueue<PollerEvent> eventCache = new ConcurrentLinkedQueue<PollerEvent>();
+ /**
+ * Bytebuffer cache, each channel holds a set of buffers (two, except for SSL holds four)
+ */
protected ConcurrentLinkedQueue<NioChannel> nioChannels = new ConcurrentLinkedQueue<NioChannel>() {
protected AtomicInteger size = new AtomicInteger(0);
protected AtomicInteger bytes = new AtomicInteger(0);
Selector sel = pol!=null?pol.getSelector():null;
SelectionKey key = sel!=null?socket.getIOChannel().keyFor(sel):null;
KeyAttachment att = key!=null?(KeyAttachment)key.attachment():null;
- if ( att!=null ) att.reset();
+ if ( att!=null ) { att.reset(); keyCache.offer(att); }
if ( key!=null ) key.attach(null);
boolean offer = socketProperties.getBufferPool()==-1?true:size.get()<socketProperties.getBufferPool();
offer = offer && (socketProperties.getBufferPoolSize()==-1?true:(bytes.get()+socket.getBufferSize())<socketProperties.getBufferPoolSize());
}
pollers = null;
}
+ eventCache.clear();
+ keyCache.clear();
nioChannels.clear();
}
* PollerEvent, cacheable object for poller events to avoid GC
*/
public class PollerEvent implements Runnable {
+
protected NioChannel socket;
protected int interestOps;
- public PollerEvent(NioChannel ch, int intOps) {
- reset(ch, intOps);
+ protected KeyAttachment key;
+ public PollerEvent(NioChannel ch, KeyAttachment k, int intOps) {
+ reset(ch, k, intOps);
}
- public void reset(NioChannel ch, int intOps) {
+ public void reset(NioChannel ch, KeyAttachment k, int intOps) {
socket = ch;
interestOps = intOps;
+ key = k;
}
public void reset() {
- reset(null, 0);
+ reset(null, null, 0);
}
public void run() {
- final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
- final KeyAttachment att = (KeyAttachment) key.attachment();
- try {
- if (key != null) {
- key.interestOps(interestOps);
- att.interestOps(interestOps);
+ if ( interestOps == OP_REGISTER ) {
+ try {
+ socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
+ } catch (Exception x) {
+ log.error("", x);
}
- }
- catch (CancelledKeyException ckx) {
+ } else {
+ final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
+ final KeyAttachment att = (KeyAttachment) key.attachment();
try {
- if (key != null && key.attachment() != null) {
- KeyAttachment ka = (KeyAttachment) key.attachment();
- ka.setError(true); //set to collect this socket immediately
+ if (key != null) {
+ key.interestOps(interestOps);
+ att.interestOps(interestOps);
}
+ }
+ catch (CancelledKeyException ckx) {
try {
- socket.close();
+ if (key != null && key.attachment() != null) {
+ KeyAttachment ka = (KeyAttachment) key.attachment();
+ ka.setError(true); //set to collect this socket immediately
+ }
+ try {
+ socket.close();
+ }
+ catch (Exception ignore) {}
+ if (socket.isOpen())
+ socket.close(true);
}
catch (Exception ignore) {}
- if (socket.isOpen())
- socket.close(true);
}
- catch (Exception ignore) {}
- }
- }
+ }//end if
+ }//run
}
/**
* Poller class.
protected Selector selector;
protected ConcurrentLinkedQueue<Runnable> events = new ConcurrentLinkedQueue<Runnable>();
- protected ConcurrentLinkedQueue<PollerEvent> eventCache = new ConcurrentLinkedQueue<PollerEvent>();
protected boolean close = false;
protected long nextExpiration = 0;//optimize expiration handling
// exit, otherwise parallel descturction of sockets which are still
// in the poller can cause problems
close = true;
+ events.clear();
selector.wakeup();
}
}
public void add(final NioChannel socket, final int interestOps) {
- PollerEvent r = this.eventCache.poll();
- if ( r==null) r = new PollerEvent(socket,interestOps);
+ PollerEvent r = eventCache.poll();
+ if ( r==null) r = new PollerEvent(socket,null,interestOps);
+ else r.reset(socket,null,interestOps);
addEvent(r);
}
public void register(final NioChannel socket)
{
socket.setPoller(this);
- final KeyAttachment ka = new KeyAttachment(this);
- ka.setChannel(socket);
- Runnable r = new Runnable() {
- public void run() {
- try {
- socket.getIOChannel().register(selector, SelectionKey.OP_READ, ka);
- } catch (Exception x) {
- log.error("", x);
- }
- }
-
- };
+ KeyAttachment key = keyCache.poll();
+ final KeyAttachment ka = key!=null?key:new KeyAttachment();
+ ka.reset(this,socket);
+ PollerEvent r = eventCache.poll();
+ if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
+ else r.reset(socket,ka,OP_REGISTER);
addEvent(r);
}
try {
wakeupCounter.set(0);
keyCount = selector.select(selectorTimeout);
+ if ( close ) { selector.close(); return; }
} catch ( NullPointerException x ) {
//sun bug 5076772 on windows JDK 1.5
if ( wakeupCounter == null || selector == null ) throw x;
log.error("",x);
continue;
}
-
//either we timed out or we woke up, process events first
if ( keyCount == 0 ) hasEvents = (hasEvents | events());
- //if (keyCount == 0) continue;
-
Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
public static class KeyAttachment {
- public KeyAttachment(Poller poller) {
- this.poller = poller;
+ public KeyAttachment() {
+
}
- public void reset() {
- //mutex = new Object();
+ public void reset(Poller poller, NioChannel channel) {
+ this.channel = channel;
+ this.poller = poller;
lastAccess = System.currentTimeMillis();
currentAccess = false;
comet = false;
timeout = -1;
error = false;
- channel = null;
}
+
+ public void reset() {
+ reset(null,null);
+ }
+
public Poller getPoller() { return poller;}
public void setPoller(Poller poller){this.poller = poller;}
public long getLastAccess() { return lastAccess; }
public int interestOps() { return interestOps;}
public int interestOps(int ops) { this.interestOps = ops; return ops; }
protected Object mutex = new Object();
- protected long lastAccess = System.currentTimeMillis();
+ protected long lastAccess = -1;
protected boolean currentAccess = false;
protected boolean comet = false;
protected long timeout = -1;