//readTimeout = -1;
}
inputBuffer = new InternalNioInputBuffer(request, headerBufferSize,readTimeout);
- inputBuffer.setPoller(endpoint.getPoller());
request.setInputBuffer(inputBuffer);
response = new Response();
if (request.getAttribute("org.apache.tomcat.comet") == null) {
comet = false;
}
- SelectionKey key = socket.getIOChannel().keyFor(endpoint.getPoller().getSelector());
+ SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
if ( key != null ) {
NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
if ( attach!=null ) {
return SocketState.CLOSED;
} else if (!comet) {
recycle();
- endpoint.getPoller().add(socket);
+ socket.getPoller().add(socket);
return SocketState.OPEN;
} else {
- endpoint.getCometPoller().add(socket);
+ socket.getPoller().add(socket);
return SocketState.LONG;
}
}
this.socket = socket;
inputBuffer.setSocket(socket);
outputBuffer.setSocket(socket);
- outputBuffer.setSelector(endpoint.getPoller().getSelector());
// Error flag
error = false;
// and the method should return true
openSocket = true;
// Add the socket to the poller
- endpoint.getPoller().add(socket);
+ socket.getPoller().add(socket);
break;
}
request.setStartTime(System.currentTimeMillis());
if (request.getAttribute("org.apache.tomcat.comet") != null) {
comet = true;
}
- SelectionKey key = socket.getIOChannel().keyFor(endpoint.getPoller().getSelector());
+ SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
if (key != null) {
NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
if (attach != null) {
comet = false;
cometClose = true;
- SelectionKey key = socket.getIOChannel().keyFor(endpoint.getPoller().getSelector());
+ SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
if ( key != null ) {
NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
if ( attach!=null && attach.getComet()) {
import org.apache.tomcat.util.res.StringManager;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
/**
* NIO tailored thread pool, providing the following services:
*/
protected Poller[] pollers = null;
protected int pollerRoundRobin = 0;
- public Poller getPoller() {
+ public Poller getPoller0() {
pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length;
Poller poller = pollers[pollerRoundRobin];
return poller;
/**
* The socket poller used for Comet support.
*/
- public Poller getCometPoller() {
- Poller poller = getPoller();
+ public Poller getCometPoller0() {
+ Poller poller = getPoller0();
return poller;
}
/**
* Dummy maxSpareThreads property.
*/
- public int getMaxSpareThreads() { return 0; }
+ public int getMaxSpareThreads() { return Math.min(getMaxThreads(),5); }
/**
* Dummy minSpareThreads property.
*/
- public int getMinSpareThreads() { return 0; }
+ public int getMinSpareThreads() { return Math.min(getMaxThreads(),5); }
// -------------------- SSL related properties --------------------
protected String keystoreFile = System.getProperty("user.home")+"/.keystore";
// FIXME: Doesn't seem to work that well with multiple accept threads
acceptorThreadCount = 1;
}
- if (pollerThreadCount != 1) {
- // limit to one poller, no need for others
+ if (pollerThreadCount <= 0) {
+ //minimum one poller thread
pollerThreadCount = 1;
}
if (!running) {
running = true;
paused = false;
-
+
+
// Create worker collection
if (executor == null) {
workers = new WorkerStack(maxThreads);
+ //executor = new ThreadPoolExecutor(getMinSpareThreads(),getMaxThreads(),5000,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
// Start acceptor threads
}
// Start poller threads
+ log.info("Creating poller threads:"+pollerThreadCount);
pollers = new Poller[pollerThreadCount];
for (int i = 0; i < pollerThreadCount; i++) {
pollers[i] = new Poller();
channel = new NioChannel(socket,bufhandler);
}
- getPoller().register(channel);
+
+ getPoller0().register(channel);
} catch (Throwable t) {
if (log.isDebugEnabled()) {
while (workerThread == null) {
try {
synchronized (workers) {
- workers.wait();
+ workerThread = createWorkerThread();
+ if ( workerThread == null ) workers.wait();
}
} catch (InterruptedException e) {
// Ignore
}
- workerThread = createWorkerThread();
+ if ( workerThread == null ) workerThread = createWorkerThread();
}
return workerThread;
}
public void register(final NioChannel socket)
{
+ socket.setPoller(this);
+ final KeyAttachment ka = new KeyAttachment(this);
+ ka.setChannel(socket);
Runnable r = new Runnable() {
public void run() {
try {
- KeyAttachment ka = new KeyAttachment();
- ka.setChannel(socket);
+
socket.getIOChannel().register(selector, SelectionKey.OP_READ, ka);
} catch (Exception x) {
log.error("", x);
try {
wakeupCounter.set(0);
keyCount = selector.select(selectorTimeout);
+ } catch ( NullPointerException x ) {
+ //sun bug 5076772 on windows JDK 1.5
+ if ( wakeupCounter == null || selector == null ) throw x;
+ continue;
+ } catch ( CancelledKeyException x ) {
+ //sun bug 5076772 on windows JDK 1.5
+ if ( wakeupCounter == null || selector == null ) throw x;
+ continue;
} catch (Throwable x) {
log.error("",x);
continue;
iterator.remove();
KeyAttachment attachment = (KeyAttachment)sk.attachment();
try {
- if ( sk.isValid() ) {
- if(attachment == null) attachment = new KeyAttachment();
+ if ( sk.isValid() && attachment != null ) {
attachment.access();
sk.attach(attachment);
- int readyOps = sk.readyOps();
sk.interestOps(0);
attachment.interestOps(0);
NioChannel channel = attachment.getChannel();
}
public static class KeyAttachment {
-
+
+ public KeyAttachment(Poller poller) {
+ this.poller = poller;
+ }
+ public Poller getPoller() { return poller;}
+ public void setPoller(Poller poller){this.poller = poller;}
public long getLastAccess() { return lastAccess; }
public void access() { access(System.currentTimeMillis()); }
public void access(long access) { lastAccess = access; }
public void setError(boolean error) { this.error = error; }
public NioChannel getChannel() { return channel;}
public void setChannel(NioChannel channel) { this.channel = channel;}
+ protected Poller poller = null;
protected int interestOps = 0;
public int interestOps() { return interestOps;}
public int interestOps(int ops) { this.interestOps = ops; return ops; }
NioChannel socket = await();
if (socket == null)
continue;
- SelectionKey key = socket.getIOChannel().keyFor(getPoller().getSelector());
+ SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
int handshake = -1;
try {
handshake = socket.handshake(key.isReadable(), key.isWritable());
}
};
- getPoller().addEvent(r);
+ ka.getPoller().addEvent(r);
}
//dereference socket to let GC do its job
socket = null;