} else if (nRead == -1) {
//return false;
throw new IOException("end of stream reached.");
- }
- timedOut = (readTimeout != -1) && ((System.currentTimeMillis()-start)>this.readTimeout);
- if ( !timedOut && nRead == 0 )
- try {
- final SelectionKey key = socket.getIOChannel().keyFor(poller.getSelector());
- final KeyAttachment att = (KeyAttachment)key.attachment();
- //to do, add in a check, we might have just timed out on the wait,
- //so there is no need to register us again.
- boolean addToQueue = false;
- try { addToQueue = ((key.interestOps()&SelectionKey.OP_READ) != SelectionKey.OP_READ); } catch ( CancelledKeyException ignore ){}
- if ( addToQueue ) {
- addToReadQueue(key, att);
- }//end if
- synchronized (att.getMutex()) {
- if ( att.getWakeUp() ) att.getMutex().wait(25);
- }
- }catch ( Exception x ) {}
+ } else {
+ timedOut = (readTimeout != -1) && ((System.currentTimeMillis()-start)>readTimeout);
+ if ( !timedOut && nRead == 0 ) {
+ try {
+ final SelectionKey key = socket.getIOChannel().keyFor(poller.getSelector());
+ final KeyAttachment att = (KeyAttachment)key.attachment();
+ //to do, add in a check, we might have just timed out on the wait,
+ //so there is no need to register us again.
+ boolean addToQueue = false;
+ try { addToQueue = ((key.interestOps()&SelectionKey.OP_READ) != SelectionKey.OP_READ); } catch ( CancelledKeyException ckx ){ throw new IOException("Socket key cancelled.");}
+ if ( addToQueue ) {
+ synchronized (att.getMutex()) {
+ addToReadQueue(key, att);
+ att.getMutex().wait(readTimeout);
+ }
+ }//end if
+ }catch ( Exception x ) {}
+ }
+ }
}while ( nRead == 0 && (!timedOut) );
//else throw new IOException(sm.getString("iib.failedread"));
//return false; //timeout
}
private synchronized void writeToSocket(ByteBuffer bytebuffer, boolean flip) throws IOException {
- int limit = bytebuffer.position();
+ //int limit = bytebuffer.position();
if ( flip ) bytebuffer.flip();
while ( bytebuffer.hasRemaining() ) {
int written = socket.write(bytebuffer);
}
+ //make sure we are flushed
+ do {
+ if (socket.flush()) break;
+ }while ( true );
+
socket.getBufHandler().getWriteBuffer().clear();
this.total = 0;
}
this.bufHandler = bufHandler;
}
+ /**
+ * returns true if the network buffer has
+ * been flushed out and is empty
+ * @return boolean
+ */
+ public boolean flush() throws IOException {
+ return true; //no network buffer in the regular channel
+ }
+
/**
* Closes this channel.
}
};
- synchronized (events) {
- events.add(r);
- }
- selector.wakeup();
+ addEvent(r);
}
public void cancelledKey(SelectionKey key) {
try {
KeyAttachment ka = (KeyAttachment) key.attachment();
- key.cancel();
+ if ( key.isValid() ) key.cancel();
if (ka != null && ka.getComet()) processSocket( ka.getChannel(), true);
- key.channel().close();
- } catch (IOException e) {
- if ( log.isDebugEnabled() ) log.debug("",e);
+ if ( key.channel().isOpen() ) key.channel().close();
+ key.attach(null);
+ } catch (Throwable e) {
+ if ( log.isDebugEnabled() ) log.error("",e);
// Ignore
}
}
} else {
boolean close = (!processSocket(channel));
if ( close ) {
- channel.getIOChannel().socket().close();
channel.close();
+ channel.getIOChannel().socket().close();
}
}
}
cancelledKey(sk);
}
} catch ( CancelledKeyException ckx ) {
- if (attachment!=null && attachment.getComet()) processSocket( attachment.getChannel(), true);
- try {
- sk.channel().close();
- }catch ( Exception ignore){}
+ cancelledKey(sk);
} catch (Throwable t) {
log.error("",t);
}
}catch ( IOException x ) {
handshake = -1;
log.error("Error during SSL handshake",x);
+ }catch ( CancelledKeyException ckx ) {
+ handshake = -1;
}
if ( handshake == 0 ) {
// Process the request from this socket
}
}
} else if (handshake == -1 ) {
- key.cancel();
+ if ( key.isValid() ) key.cancel();
try {socket.close(true);}catch (IOException ignore){}
} else {
final SelectionKey fk = key;
//===========================================================================================
// NIO SSL METHODS
//===========================================================================================
+ /**
+ * returns true if the network buffer has
+ * been flushed out and is empty
+ * @return boolean
+ */
+ public boolean flush() throws IOException {
+ return flush(netOutBuffer);
+ }
/**
* Flushes the buffer to the network