boolean addToQueue = false;\r
try { addToQueue = ((key.interestOps()&SelectionKey.OP_READ) != SelectionKey.OP_READ); } catch ( CancelledKeyException ignore ){}\r
if ( addToQueue ) {\r
- att.setWakeUp(true);\r
- poller.addEvent(\r
- new Runnable() {\r
- public void run() {\r
- try {\r
- if (key != null) key.interestOps(SelectionKey.OP_READ);\r
- } catch (CancelledKeyException ckx) {\r
- try {\r
- socket.socket().close();\r
- socket.close();\r
- att.setWakeUp(false);\r
- } catch (Exception ignore) {}\r
- }\r
- }\r
- });\r
+ addToReadQueue(key, att);\r
}//end if\r
synchronized (att.getMutex()) {\r
if ( att.getWakeUp() ) att.getMutex().wait(25);\r
throw new IOException("read timed out.");\r
}\r
\r
+ private void addToReadQueue(final SelectionKey key, final KeyAttachment att) {\r
+ att.setWakeUp(true);\r
+ poller.addEvent(\r
+ new Runnable() {\r
+ public void run() {\r
+ try {\r
+ if (key != null) key.interestOps(SelectionKey.OP_READ);\r
+ } catch (CancelledKeyException ckx) {\r
+ try {\r
+ if ( key != null && key.attachment() != null ) {\r
+ KeyAttachment ka = (KeyAttachment)key.attachment();\r
+ ka.setError(true); //set to collect this socket immediately\r
+ }\r
+ socket.socket().close();\r
+ socket.close();\r
+ att.setWakeUp(false);\r
+ } catch (Exception ignore) {}\r
+ }\r
+ }\r
+ });\r
+ }\r
+\r
\r
/**\r
* Parse the HTTP headers.\r
protected Selector selector;\r
protected LinkedList<Runnable> events = new LinkedList<Runnable>();\r
protected boolean close = false;\r
+ protected long nextExpiration = 0;//optimize expiration handling\r
\r
protected int keepAliveCount = 0;\r
public int getKeepAliveCount() { return keepAliveCount; }\r
if (key != null) key.interestOps(SelectionKey.OP_READ);\r
}catch ( CancelledKeyException ckx ) {\r
try {\r
+ if ( key != null && key.attachment() != null ) {\r
+ KeyAttachment ka = (KeyAttachment)key.attachment();\r
+ ka.setError(true); //set to collect this socket immediately\r
+ }\r
socket.socket().close();\r
socket.close();\r
} catch ( Exception ignore ) {}\r
try {\r
KeyAttachment ka = (KeyAttachment) key.attachment();\r
key.cancel();\r
- if (ka.getComet()) processSocket( (SocketChannel) key.channel(), true);\r
+ if (ka != null && ka.getComet()) processSocket( (SocketChannel) key.channel(), true);\r
key.channel().close();\r
} catch (IOException e) {\r
if ( log.isDebugEnabled() ) log.debug("",e);\r
log.error("",t);\r
}\r
}//while\r
-\r
- //timeout\r
- Set<SelectionKey> keys = selector.keys();\r
- long now = System.currentTimeMillis();\r
- for (Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext(); ) {\r
- SelectionKey key = iter.next();\r
- try {\r
- if (key.interestOps() == SelectionKey.OP_READ) {\r
- //only timeout sockets that we are waiting for a read from\r
- KeyAttachment ka = (KeyAttachment) key.attachment();\r
- long delta = now - ka.getLastAccess();\r
- boolean isTimedout = (ka.getTimeout()==-1)?(delta > (long) soTimeout):(delta>ka.getTimeout());\r
- if (isTimedout) {\r
- cancelledKey(key);\r
- }\r
- }\r
- }catch ( CancelledKeyException ckx ) {\r
- cancelledKey(key);\r
- }\r
- }\r
-\r
+ //process timeouts\r
+ timeout();\r
}\r
synchronized (this) {\r
this.notifyAll();\r
}\r
\r
}\r
-\r
+ protected void timeout() {\r
+ long now = System.currentTimeMillis();\r
+ if ( now < nextExpiration ) return;\r
+ nextExpiration = now + (long)soTimeout;\r
+ //timeout\r
+ Set<SelectionKey> keys = selector.keys();\r
+ for (Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext(); ) {\r
+ SelectionKey key = iter.next();\r
+ try {\r
+ KeyAttachment ka = (KeyAttachment) key.attachment();\r
+ if ( ka == null ) {\r
+ cancelledKey(key); //we don't support any keys without attachments\r
+ } else if ( ka.getError() ) {\r
+ cancelledKey(key);\r
+ }else if ((key.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ) {\r
+ //only timeout sockets that we are waiting for a read from\r
+ long delta = now - ka.getLastAccess();\r
+ long timeout = (ka.getTimeout()==-1)?((long) soTimeout):(ka.getTimeout());\r
+ boolean isTimedout = delta > timeout;\r
+ if (isTimedout) {\r
+ cancelledKey(key);\r
+ } else {\r
+ long nextTime = now+(timeout-delta);\r
+ nextExpiration = (nextTime < nextExpiration)?nextTime:nextExpiration;\r
+ }\r
+ }//end if\r
+ }catch ( CancelledKeyException ckx ) {\r
+ cancelledKey(key);\r
+ }\r
+ }//for\r
+ }\r
}\r
\r
public static class KeyAttachment {\r
public Object getMutex() {return mutex;}\r
public void setTimeout(long timeout) {this.timeout = timeout;}\r
public long getTimeout() {return this.timeout;}\r
+ public boolean getError() { return error; }\r
+ public void setError(boolean error) { this.error = error; }\r
protected Object mutex = new Object();\r
protected boolean wakeUp = false;\r
protected long lastAccess = System.currentTimeMillis();\r
protected boolean currentAccess = false;\r
protected boolean comet = false;\r
protected long timeout = -1;\r
+ protected boolean error = false;\r
\r
}\r
\r