}
}
- private void addToReadQueue(final SelectionKey key, final KeyAttachment att) {
- att.setWakeUp(true);
- att.getPoller().addEvent(
- new Runnable() {
- public void run() {
- try {
- if (key != null) {
- key.interestOps(SelectionKey.OP_READ);
- att.interestOps(SelectionKey.OP_READ);
- }
- } catch (CancelledKeyException ckx) {
- try {
- if ( att != null ) {
- att.setError(true); //set to collect this socket immediately
- att.setWakeUp(false);
- }
- try {socket.close();}catch (Exception ignore){}
- if ( socket.isOpen() ) socket.close(true);
- } catch (Exception ignore) {}
- }
- }
- });
- }
-
-
/**
* Parse the HTTP headers.
*/
SelectionKey key = sel!=null?socket.getIOChannel().keyFor(sel):null;
KeyAttachment att = key!=null?(KeyAttachment)key.attachment():null;
if ( att!=null ) att.reset();
+ if ( key!=null ) key.attach(null);
//avoid over growing our cache or add after we have stopped
if ( running && (!paused) && (size() < socketProperties.getDirectBufferPool()) ) return super.offer(socket);
else return false;
}
public void addEvent(Runnable event) {
- //synchronized (events) {
- // events.add(event);
- //}
events.offer(event);
if ( wakeupCounter.incrementAndGet() < 3 ) selector.wakeup();
}
public void add(final NioChannel socket) {
final SelectionKey key = socket.getIOChannel().keyFor(selector);
final KeyAttachment att = (KeyAttachment)key.attachment();
- if ( att != null ) att.setWakeUp(false);
Runnable r = new Runnable() {
public void run() {
try {
attachment.interestOps(0);
NioChannel channel = attachment.getChannel();
if (sk.isReadable() || sk.isWritable() ) {
- if ( attachment.getWakeUp() ) {
- attachment.setWakeUp(false);
- synchronized (attachment.getMutex()) {attachment.getMutex().notifyAll();}
- } else if ( attachment.getComet() ) {
+ if ( attachment.getComet() ) {
if (!processSocket(channel, SocketStatus.OPEN))
processSocket(channel, SocketStatus.DISCONNECT);
} else {
}
public void reset() {
//mutex = new Object();
- wakeUp = false;
lastAccess = System.currentTimeMillis();
currentAccess = false;
comet = false;
public boolean getComet() { return comet; }
public boolean getCurrentAccess() { return currentAccess; }
public void setCurrentAccess(boolean access) { currentAccess = access; }
- public boolean getWakeUp() { return wakeUp; }
- public void setWakeUp(boolean wakeUp) { this.wakeUp = wakeUp; }
public Object getMutex() {return mutex;}
public void setTimeout(long timeout) {this.timeout = timeout;}
public long getTimeout() {return this.timeout;}
public int interestOps() { return interestOps;}
public int interestOps(int ops) { this.interestOps = ops; return ops; }
protected Object mutex = new Object();
- protected boolean wakeUp = false;
protected long lastAccess = System.currentTimeMillis();
protected boolean currentAccess = false;
protected boolean comet = false;
} catch (CancelledKeyException ckx) {
try {
if ( fk != null && fk.attachment() != null ) {
-
ka.setError(true); //set to collect this socket immediately
try {ka.getChannel().getIOChannel().socket().close();}catch(Exception ignore){}
try {ka.getChannel().close();}catch(Exception ignore){}
- ka.setWakeUp(false);
}
} catch (Exception ignore) {}
}