import org.apache.commons.logging.LogFactory;
import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler;
import org.apache.tomcat.util.res.StringManager;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
/**
* NIO tailored thread pool, providing the following services:
public class Poller implements Runnable {
protected Selector selector;
- protected LinkedList<Runnable> events = new LinkedList<Runnable>();
+ protected ConcurrentLinkedQueue events = new ConcurrentLinkedQueue();
+
protected boolean close = false;
protected long nextExpiration = 0;//optimize expiration handling
protected int keepAliveCount = 0;
public int getKeepAliveCount() { return keepAliveCount; }
+
+ protected AtomicLong wakeupCounter = new AtomicLong(0l);
}
public void addEvent(Runnable event) {
- synchronized (events) {
- events.add(event);
- }
- selector.wakeup();
+ //synchronized (events) {
+ // events.add(event);
+ //}
+ events.offer(event);
+ if ( wakeupCounter.incrementAndGet() < 3 ) selector.wakeup();
}
/**
*/
public void add(final NioChannel socket) {
final SelectionKey key = socket.getIOChannel().keyFor(selector);
- KeyAttachment att = (KeyAttachment)key.attachment();
+ final KeyAttachment att = (KeyAttachment)key.attachment();
if ( att != null ) att.setWakeUp(false);
Runnable r = new Runnable() {
public void run() {
try {
- if (key != null) key.interestOps(SelectionKey.OP_READ);
+ if (key != null) {
+ key.interestOps(SelectionKey.OP_READ);
+ att.interestOps(SelectionKey.OP_READ);
+ }
}catch ( CancelledKeyException ckx ) {
try {
if ( key != null && key.attachment() != null ) {
public boolean events() {
boolean result = false;
- synchronized (events) {
+ //synchronized (events) {
Runnable r = null;
result = (events.size() > 0);
- while ( (events.size() > 0) && (r = events.removeFirst()) != null ) {
+ while ( (r = (Runnable)events.poll()) != null ) {
try {
r.run();
} catch ( Exception x ) {
log.error("",x);
}
}
- events.clear();
- }
+ //events.clear();
+ //}
return result;
}
public void register(final NioChannel socket)
{
- SelectionKey key = socket.getIOChannel().keyFor(selector);
Runnable r = new Runnable() {
public void run() {
try {
int keyCount = 0;
try {
+ wakeupCounter.set(0);
keyCount = selector.select(selectorTimeout);
} catch (Throwable x) {
log.error("",x);
attachment.access();
sk.attach(attachment);
int readyOps = sk.readyOps();
- sk.interestOps(sk.interestOps() & ~readyOps);
+ sk.interestOps(0);
+ attachment.interestOps(0);
NioChannel channel = attachment.getChannel();
if (sk.isReadable() || sk.isWritable() ) {
if ( attachment.getWakeUp() ) {
cancelledKey(key); //we don't support any keys without attachments
} else if ( ka.getError() ) {
cancelledKey(key);
- }else if ((key.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ) {
+ }else if ((ka.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ) {
//only timeout sockets that we are waiting for a read from
long delta = now - ka.getLastAccess();
long timeout = (ka.getTimeout()==-1)?((long) soTimeout):(ka.getTimeout());
public void setError(boolean error) { this.error = error; }
public NioChannel getChannel() { return channel;}
public void setChannel(NioChannel channel) { this.channel = channel;}
+ protected int interestOps = 0;
+ public int interestOps() { return interestOps;}
+ public int interestOps(int ops) { this.interestOps = ops; return ops; }
protected Object mutex = new Object();
protected boolean wakeUp = false;
protected long lastAccess = System.currentTimeMillis();
} else {
final SelectionKey fk = key;
final int intops = handshake;
+ final KeyAttachment ka = (KeyAttachment)fk.attachment();
//register for handshake ops
Runnable r = new Runnable() {
public void run() {
try {
fk.interestOps(intops);
+ ka.interestOps(intops);
} catch (CancelledKeyException ckx) {
try {
if ( fk != null && fk.attachment() != null ) {
- KeyAttachment ka = (KeyAttachment)fk.attachment();
+
ka.setError(true); //set to collect this socket immediately
try {ka.getChannel().getIOChannel().socket().close();}catch(Exception ignore){}
try {ka.getChannel().close();}catch(Exception ignore){}
public NioBufferHandler(int readsize, int writesize) {
readbuf = ByteBuffer.allocateDirect(readsize);
writebuf = ByteBuffer.allocateDirect(writesize);
+// readbuf = ByteBuffer.allocate(readsize);
+// writebuf = ByteBuffer.allocate(writesize);
}
public ByteBuffer expand(ByteBuffer buffer, int remaining) {return buffer;}