*/
public static int write(ByteBuffer buf, NioChannel socket, long writeTimeout,MutableInteger lastWrite) throws IOException {
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
+ if ( key == null ) throw new IOException("Key no longer registered");
+ KeyAttachment att = (KeyAttachment) key.attachment();
+ int prevOps = att.interestOps() | (att.getCometOps()&NioEndpoint.OP_CALLBACK);
int written = 0;
boolean timedout = false;
int keycount = 1; //assume we can write
continue; //we successfully wrote, try again without a selector
}
}
- if ( key == null ) throw new IOException("Key no longer registered");
- KeyAttachment att = (KeyAttachment) key.attachment();
try {
if ( att.getWriteLatch()==null || att.getWriteLatch().getCount()==0) att.startWriteLatch(1);
//only register for write if a write has not yet been issued
- if ( (att.interestOps() & SelectionKey.OP_WRITE) == 0) socket.getPoller().add(socket,SelectionKey.OP_WRITE);
+ if ( (att.interestOps() & SelectionKey.OP_WRITE) == 0) socket.getPoller().add(socket,prevOps|SelectionKey.OP_WRITE);
att.awaitWriteLatch(writeTimeout,TimeUnit.MILLISECONDS);
}catch (InterruptedException ignore) {
Thread.interrupted();
cancelKey(socket, key);
}
}
+ socket.getPoller().add(socket,prevOps);
return written;
}
* @throws IOException if an IO Exception occurs in the underlying socket logic
*/
public static int read(ByteBuffer buf, NioChannel socket, long readTimeout) throws IOException {
- final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
+ SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
+ if ( key == null ) throw new IOException("Key no longer registered");
+ KeyAttachment att = (KeyAttachment) key.attachment();
+ int prevOps = att.interestOps() | (att.getCometOps()&NioEndpoint.OP_CALLBACK);
int read = 0;
boolean timedout = false;
int keycount = 1; //assume we can write
if (cnt > 0)
break;
}
- KeyAttachment att = (KeyAttachment) key.attachment();
try {
if ( att.getReadLatch()==null || att.getReadLatch().getCount()==0) att.startReadLatch(1);
- if ( att.interestOps() == 0) socket.getPoller().add(socket,SelectionKey.OP_READ);
+ if ( (att.interestOps() & SelectionKey.OP_READ) == 0) socket.getPoller().add(socket,prevOps|SelectionKey.OP_READ);
att.awaitReadLatch(readTimeout,TimeUnit.MILLISECONDS);
}catch (InterruptedException ignore) {
Thread.interrupted();
cancelKey(socket,key);
}
}
+ socket.getPoller().add(socket,prevOps);
return read;
}