From 8e7a409213324f9c25adbe0dd30988a74078ba5f Mon Sep 17 00:00:00 2001 From: fhanik Date: Wed, 9 Aug 2006 14:44:50 +0000 Subject: [PATCH] Fixed deadlock issue with thread pool Fixed error catches for a known JDK bug on windows #5076772 Added in the ability to have more than one poller, although performance actually gets worse Next steps: hand off setting socket options etc to the worker thread for faster acceptance of new socket git-svn-id: https://svn.apache.org/repos/asf/tomcat/tc6.0.x/trunk@430064 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/coyote/http11/Http11NioProcessor.java | 14 +++-- .../apache/coyote/http11/Http11NioProtocol.java | 17 +++++- .../coyote/http11/InternalNioInputBuffer.java | 13 +---- .../coyote/http11/InternalNioOutputBuffer.java | 9 +--- java/org/apache/tomcat/util/net/NioChannel.java | 11 ++++ java/org/apache/tomcat/util/net/NioEndpoint.java | 60 +++++++++++++++------- 6 files changed, 78 insertions(+), 46 deletions(-) diff --git a/java/org/apache/coyote/http11/Http11NioProcessor.java b/java/org/apache/coyote/http11/Http11NioProcessor.java index 61482b997..96d94a226 100644 --- a/java/org/apache/coyote/http11/Http11NioProcessor.java +++ b/java/org/apache/coyote/http11/Http11NioProcessor.java @@ -95,7 +95,6 @@ public class Http11NioProcessor implements ActionHook { //readTimeout = -1; } inputBuffer = new InternalNioInputBuffer(request, headerBufferSize,readTimeout); - inputBuffer.setPoller(endpoint.getPoller()); request.setInputBuffer(inputBuffer); response = new Response(); @@ -752,7 +751,7 @@ public class Http11NioProcessor implements ActionHook { if (request.getAttribute("org.apache.tomcat.comet") == null) { comet = false; } - SelectionKey key = socket.getIOChannel().keyFor(endpoint.getPoller().getSelector()); + SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); if ( key != null ) { NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment(); if ( attach!=null ) { @@ -778,10 +777,10 @@ public class Http11NioProcessor implements ActionHook { return SocketState.CLOSED; } else if (!comet) { recycle(); - endpoint.getPoller().add(socket); + socket.getPoller().add(socket); return SocketState.OPEN; } else { - endpoint.getCometPoller().add(socket); + socket.getPoller().add(socket); return SocketState.LONG; } } @@ -809,7 +808,6 @@ public class Http11NioProcessor implements ActionHook { this.socket = socket; inputBuffer.setSocket(socket); outputBuffer.setSocket(socket); - outputBuffer.setSelector(endpoint.getPoller().getSelector()); // Error flag error = false; @@ -841,7 +839,7 @@ public class Http11NioProcessor implements ActionHook { // and the method should return true openSocket = true; // Add the socket to the poller - endpoint.getPoller().add(socket); + socket.getPoller().add(socket); break; } request.setStartTime(System.currentTimeMillis()); @@ -897,7 +895,7 @@ public class Http11NioProcessor implements ActionHook { if (request.getAttribute("org.apache.tomcat.comet") != null) { comet = true; } - SelectionKey key = socket.getIOChannel().keyFor(endpoint.getPoller().getSelector()); + SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); if (key != null) { NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment(); if (attach != null) { @@ -1049,7 +1047,7 @@ public class Http11NioProcessor implements ActionHook { comet = false; cometClose = true; - SelectionKey key = socket.getIOChannel().keyFor(endpoint.getPoller().getSelector()); + SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); if ( key != null ) { NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment(); if ( attach!=null && attach.getComet()) { diff --git a/java/org/apache/coyote/http11/Http11NioProtocol.java b/java/org/apache/coyote/http11/Http11NioProtocol.java index e5792b8e9..f485bbec8 100644 --- a/java/org/apache/coyote/http11/Http11NioProtocol.java +++ b/java/org/apache/coyote/http11/Http11NioProtocol.java @@ -223,6 +223,21 @@ public class Http11NioProtocol implements ProtocolHandler, MBeanRegistration // -------------------- Pool setup -------------------- + public void setPollerThreadCount(int count) { + ep.setPollerThreadCount(count); + } + + public int getPollerThreadCount() { + return ep.getPollerThreadCount(); + } + + public void setSelectorTimeout(long timeout) { + ep.setSelectorTimeout(timeout); + } + + public long getSelectorTimeout() { + return ep.getSelectorTimeout(); + } // * public Executor getExecutor() { return ep.getExecutor(); @@ -616,7 +631,7 @@ public class Http11NioProtocol implements ProtocolHandler, MBeanRegistration // processor. connections.put(socket, processor); localProcessor.set(null); - proto.ep.getCometPoller().add(socket); + socket.getPoller().add(socket); } return state; diff --git a/java/org/apache/coyote/http11/InternalNioInputBuffer.java b/java/org/apache/coyote/http11/InternalNioInputBuffer.java index ada5fd11d..4698b4b3b 100644 --- a/java/org/apache/coyote/http11/InternalNioInputBuffer.java +++ b/java/org/apache/coyote/http11/InternalNioInputBuffer.java @@ -182,7 +182,6 @@ public class InternalNioInputBuffer implements InputBuffer { * header. */ protected long readTimeout; - private Poller poller; // ------------------------------------------------------------- Properties @@ -202,10 +201,6 @@ public class InternalNioInputBuffer implements InputBuffer { return socket; } - public Poller getPoller() { - return poller; - } - /** * Add an input filter to the filter library. */ @@ -274,10 +269,6 @@ public class InternalNioInputBuffer implements InputBuffer { this.swallowInput = swallowInput; } - public void setPoller(Poller poller) { - this.poller = poller; - } - // --------------------------------------------------------- Public Methods @@ -564,7 +555,7 @@ public class InternalNioInputBuffer implements InputBuffer { timedOut = (readTimeout != -1) && ((System.currentTimeMillis()-start)>readTimeout); if ( !timedOut && nRead == 0 ) { try { - final SelectionKey key = socket.getIOChannel().keyFor(poller.getSelector()); + final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); final KeyAttachment att = (KeyAttachment)key.attachment(); //to do, add in a check, we might have just timed out on the wait, //so there is no need to register us again. @@ -587,7 +578,7 @@ public class InternalNioInputBuffer implements InputBuffer { private void addToReadQueue(final SelectionKey key, final KeyAttachment att) { att.setWakeUp(true); - poller.addEvent( + att.getPoller().addEvent( new Runnable() { public void run() { try { diff --git a/java/org/apache/coyote/http11/InternalNioOutputBuffer.java b/java/org/apache/coyote/http11/InternalNioOutputBuffer.java index e6d50f177..cbafdc16b 100644 --- a/java/org/apache/coyote/http11/InternalNioOutputBuffer.java +++ b/java/org/apache/coyote/http11/InternalNioOutputBuffer.java @@ -49,8 +49,7 @@ public class InternalNioOutputBuffer // ----------------------------------------------------------- Constructors int bbufLimit = 0; - Selector selector; - + /** * Default constructor. */ @@ -182,10 +181,6 @@ public class InternalNioOutputBuffer this.socket = socket; } - public void setSelector(Selector selector) { - this.selector = selector; - } - /** * Get the underlying socket input stream. */ @@ -715,7 +710,7 @@ public class InternalNioOutputBuffer throws IOException { //prevent timeout for async, - SelectionKey key = socket.getIOChannel().keyFor(selector); + SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); if (key != null) { NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment(); attach.access(); diff --git a/java/org/apache/tomcat/util/net/NioChannel.java b/java/org/apache/tomcat/util/net/NioChannel.java index 14ab5a60a..93a9e649f 100644 --- a/java/org/apache/tomcat/util/net/NioChannel.java +++ b/java/org/apache/tomcat/util/net/NioChannel.java @@ -20,7 +20,9 @@ import java.nio.ByteBuffer; import java.nio.channels.ByteChannel; import java.nio.channels.SocketChannel; +import org.apache.tomcat.util.net.NioEndpoint.Poller; import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler; + /** * * Base class for a SocketChannel wrapper used by the endpoint. @@ -37,6 +39,8 @@ public class NioChannel implements ByteChannel{ protected SocketChannel sc = null; protected ApplicationBufferHandler bufHandler; + + protected Poller poller; public NioChannel(SocketChannel channel, ApplicationBufferHandler bufHandler) throws IOException { this.sc = channel; @@ -112,6 +116,10 @@ public class NioChannel implements ByteChannel{ return bufHandler; } + public Poller getPoller() { + return poller; + } + /** * getIOChannel * @@ -146,5 +154,8 @@ public class NioChannel implements ByteChannel{ return 0; } + public void setPoller(Poller poller) { + this.poller = poller; + } } \ No newline at end of file diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java index 7b4bbd331..f88990f44 100644 --- a/java/org/apache/tomcat/util/net/NioEndpoint.java +++ b/java/org/apache/tomcat/util/net/NioEndpoint.java @@ -42,6 +42,9 @@ import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler; import org.apache.tomcat.util.res.StringManager; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.LinkedBlockingQueue; /** * NIO tailored thread pool, providing the following services: @@ -316,7 +319,7 @@ public class NioEndpoint { */ protected Poller[] pollers = null; protected int pollerRoundRobin = 0; - public Poller getPoller() { + public Poller getPoller0() { pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length; Poller poller = pollers[pollerRoundRobin]; return poller; @@ -326,8 +329,8 @@ public class NioEndpoint { /** * The socket poller used for Comet support. */ - public Poller getCometPoller() { - Poller poller = getPoller(); + public Poller getCometPoller0() { + Poller poller = getPoller0(); return poller; } @@ -335,13 +338,13 @@ public class NioEndpoint { /** * Dummy maxSpareThreads property. */ - public int getMaxSpareThreads() { return 0; } + public int getMaxSpareThreads() { return Math.min(getMaxThreads(),5); } /** * Dummy minSpareThreads property. */ - public int getMinSpareThreads() { return 0; } + public int getMinSpareThreads() { return Math.min(getMaxThreads(),5); } // -------------------- SSL related properties -------------------- protected String keystoreFile = System.getProperty("user.home")+"/.keystore"; @@ -470,8 +473,8 @@ public class NioEndpoint { // FIXME: Doesn't seem to work that well with multiple accept threads acceptorThreadCount = 1; } - if (pollerThreadCount != 1) { - // limit to one poller, no need for others + if (pollerThreadCount <= 0) { + //minimum one poller thread pollerThreadCount = 1; } @@ -513,10 +516,12 @@ public class NioEndpoint { if (!running) { running = true; paused = false; - + + // Create worker collection if (executor == null) { workers = new WorkerStack(maxThreads); + //executor = new ThreadPoolExecutor(getMinSpareThreads(),getMaxThreads(),5000,TimeUnit.MILLISECONDS,new LinkedBlockingQueue()); } // Start acceptor threads @@ -528,6 +533,7 @@ public class NioEndpoint { } // Start poller threads + log.info("Creating poller threads:"+pollerThreadCount); pollers = new Poller[pollerThreadCount]; for (int i = 0; i < pollerThreadCount; i++) { pollers[i] = new Poller(); @@ -678,7 +684,8 @@ public class NioEndpoint { channel = new NioChannel(socket,bufhandler); } - getPoller().register(channel); + + getPoller0().register(channel); } catch (Throwable t) { if (log.isDebugEnabled()) { @@ -746,12 +753,13 @@ public class NioEndpoint { while (workerThread == null) { try { synchronized (workers) { - workers.wait(); + workerThread = createWorkerThread(); + if ( workerThread == null ) workers.wait(); } } catch (InterruptedException e) { // Ignore } - workerThread = createWorkerThread(); + if ( workerThread == null ) workerThread = createWorkerThread(); } return workerThread; } @@ -974,11 +982,13 @@ public class NioEndpoint { public void register(final NioChannel socket) { + socket.setPoller(this); + final KeyAttachment ka = new KeyAttachment(this); + ka.setChannel(socket); Runnable r = new Runnable() { public void run() { try { - KeyAttachment ka = new KeyAttachment(); - ka.setChannel(socket); + socket.getIOChannel().register(selector, SelectionKey.OP_READ, ka); } catch (Exception x) { log.error("", x); @@ -1027,6 +1037,14 @@ public class NioEndpoint { try { wakeupCounter.set(0); keyCount = selector.select(selectorTimeout); + } catch ( NullPointerException x ) { + //sun bug 5076772 on windows JDK 1.5 + if ( wakeupCounter == null || selector == null ) throw x; + continue; + } catch ( CancelledKeyException x ) { + //sun bug 5076772 on windows JDK 1.5 + if ( wakeupCounter == null || selector == null ) throw x; + continue; } catch (Throwable x) { log.error("",x); continue; @@ -1045,11 +1063,9 @@ public class NioEndpoint { iterator.remove(); KeyAttachment attachment = (KeyAttachment)sk.attachment(); try { - if ( sk.isValid() ) { - if(attachment == null) attachment = new KeyAttachment(); + if ( sk.isValid() && attachment != null ) { attachment.access(); sk.attach(attachment); - int readyOps = sk.readyOps(); sk.interestOps(0); attachment.interestOps(0); NioChannel channel = attachment.getChannel(); @@ -1121,7 +1137,12 @@ public class NioEndpoint { } public static class KeyAttachment { - + + public KeyAttachment(Poller poller) { + this.poller = poller; + } + public Poller getPoller() { return poller;} + public void setPoller(Poller poller){this.poller = poller;} public long getLastAccess() { return lastAccess; } public void access() { access(System.currentTimeMillis()); } public void access(long access) { lastAccess = access; } @@ -1138,6 +1159,7 @@ public class NioEndpoint { public void setError(boolean error) { this.error = error; } public NioChannel getChannel() { return channel;} public void setChannel(NioChannel channel) { this.channel = channel;} + protected Poller poller = null; protected int interestOps = 0; public int interestOps() { return interestOps;} public int interestOps(int ops) { this.interestOps = ops; return ops; } @@ -1254,7 +1276,7 @@ public class NioEndpoint { NioChannel socket = await(); if (socket == null) continue; - SelectionKey key = socket.getIOChannel().keyFor(getPoller().getSelector()); + SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); int handshake = -1; try { handshake = socket.handshake(key.isReadable(), key.isWritable()); @@ -1310,7 +1332,7 @@ public class NioEndpoint { } }; - getPoller().addEvent(r); + ka.getPoller().addEvent(r); } //dereference socket to let GC do its job socket = null; -- 2.11.0