1. Timeouts are now per connection, not using fixed timeouts anywhere. by default...
authorfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Thu, 31 May 2007 19:32:33 +0000 (19:32 +0000)
committerfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Thu, 31 May 2007 19:32:33 +0000 (19:32 +0000)
2. Implemented all Comet operations, including the ability to have none
3. Implemented CometEvent.isReadable and isWriteable
   isAvailable - means data is available to the servlet
   isReadable - means there is data from the socket also checks the socket, by doing a read, in a non blocking fashion to verify this to be true
   isWriteable - the last write attempted on this socket was 0, hence we are probably blocking
4. simplified CometEvent.register/unregister, they are now just one call and no syncs
5. After each event, the connection is registered with the same operations it had before
6. CoyoteAdapter respects when the servlet doesn't want to be notified of the READ event, hence it doesn't invoke it automatically
7. Let me know if MutableBoolean and MutableInteger should be elsewhere(in terms of package), they are used since ActionHook doesn't have a return value and also valuable in the output buffers since SSL writing is two steps, one through the engine and the other to the socket
I'm pretty happy with how isReadable,isWriteable works, they are completly non blocking and very accurate
True non blocking in the buffers and filters seems like a major surgery, still holding off on that.
Need to fix the NioBlockingSelector as it is almost impossible to make the poller interest declaration thread safe

git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@543226 13f79535-47bb-0310-9956-ffa450edef68

17 files changed:
java/org/apache/catalina/connector/CometEventImpl.java
java/org/apache/catalina/connector/CoyoteAdapter.java
java/org/apache/catalina/connector/OutputBuffer.java
java/org/apache/catalina/connector/Request.java
java/org/apache/catalina/connector/Response.java
java/org/apache/coyote/ActionCode.java
java/org/apache/coyote/Response.java
java/org/apache/coyote/http11/Http11NioProcessor.java
java/org/apache/coyote/http11/InternalNioInputBuffer.java
java/org/apache/coyote/http11/InternalNioOutputBuffer.java
java/org/apache/tomcat/util/MutableBoolean.java [new file with mode: 0644]
java/org/apache/tomcat/util/MutableInteger.java [new file with mode: 0644]
java/org/apache/tomcat/util/net/NioBlockingSelector.java
java/org/apache/tomcat/util/net/NioChannel.java
java/org/apache/tomcat/util/net/NioEndpoint.java
java/org/apache/tomcat/util/net/NioSelectorPool.java
java/org/apache/tomcat/util/net/SecureNioChannel.java

index 728f054..26832dd 100644 (file)
@@ -29,6 +29,7 @@ import org.apache.catalina.util.StringManager;
 import org.apache.coyote.ActionCode;
 import org.apache.tomcat.util.net.PollerInterest;
 import java.util.Arrays;
+import org.apache.tomcat.util.MutableBoolean;
 
 public class CometEventImpl implements CometEvent {
 
@@ -142,12 +143,16 @@ public class CometEventImpl implements CometEvent {
     }
     
     public boolean isReadable() {
-        return request.isReadable();
+        return request.isAvailable() || request.isReadable();
     }    
     public boolean isWriteable() {
         return response.isWriteable();
     }
     
+    public boolean hasOp(CometEvent.CometOperation op) {
+        return cometOperations.contains(op);
+    }
+    
     public void configure(CometEvent.CometConfiguration... options)
         throws IOException, IllegalStateException {
         checkWorkerThread();
@@ -169,7 +174,7 @@ public class CometEventImpl implements CometEvent {
         throws IOException, IllegalStateException {
         //remove from the registered set
         cometOperations.removeAll(Arrays.asList(operations));
-        request.action(ActionCode.ACTION_COMET_UNREGISTER, translate(cometOperations.toArray(new CometOperation[0])));
+        request.action(ActionCode.ACTION_COMET_REGISTER, translate(cometOperations.toArray(new CometOperation[0])));
     }
     
     public CometConfiguration[] getConfiguration() {
index 61c41b8..0a4e8a5 100644 (file)
@@ -227,7 +227,7 @@ public class CoyoteAdapter
                 }
                 if (response.isClosed() || !request.isComet()) {
                     res.action(ActionCode.ACTION_COMET_END, null);
-                } else if (!error && read && request.isReadable()) {
+                } else if (!error && read && request.isAvailable()) {
                     // If this was a read and not all bytes have been read, or if no data
                     // was read from the connector, then it is an error
                     error = true;
@@ -312,7 +312,7 @@ public class CoyoteAdapter
 
                 if (request.isComet()) {
                     if (!response.isClosed() && !response.isError()) {
-                        if (request.isReadable()) {
+                        if (request.isAvailable() && request.hasOp(CometEvent.CometOperation.OP_READ)) {
                             // Invoke a read event right away if there are available bytes
                             if (event(req, res, SocketStatus.OPEN_READ)) {
                                 comet = true;
index c7e98da..634a5b7 100644 (file)
@@ -324,13 +324,6 @@ public class OutputBuffer extends Writer
     }
 
     
-    /**
-     * Return the amount of bytes written by the lower layer.
-     */
-    protected int lastWrite() {
-        return coyoteResponse.getLastWrite();
-    }
-    
 
     // ------------------------------------------------- Bytes Handling Methods
 
index 73efb8e..9e3dac6 100644 (file)
@@ -70,6 +70,8 @@ import org.apache.catalina.util.ParameterMap;
 import org.apache.catalina.util.RequestUtil;
 import org.apache.catalina.util.StringManager;
 import org.apache.catalina.util.StringParser;
+import org.apache.tomcat.util.MutableBoolean;
+import org.apache.catalina.CometEvent;
 
 
 /**
@@ -2250,13 +2252,26 @@ public class Request
 
     
     /**
-     * Return true if bytes are available.
+     * Return true if bytes are available at the servlet layer
      */
-    public boolean isReadable() {
+    public boolean isAvailable() {
         return (inputBuffer.available() > 0);
     }
-
     
+    /**
+     * returns true if we read data from the socket
+     * @return boolean
+     */
+    public boolean isReadable() {
+        MutableBoolean bool = new MutableBoolean(false);
+        action(ActionCode.ACTION_COMET_READABLE,bool);
+        return bool.get();    
+    }
+
+    public boolean hasOp(CometEvent.CometOperation op) {
+        if ( !comet || getEvent()==null ) return false;
+        return event.hasOp(op);
+    }
     // ------------------------------------------------------ Protected Methods
 
     protected void action(ActionCode actionCode, Object param) {
index acb59c1..ba28742 100644 (file)
@@ -52,6 +52,8 @@ import org.apache.tomcat.util.http.FastHttpDateFormat;
 import org.apache.tomcat.util.http.MimeHeaders;
 import org.apache.tomcat.util.http.ServerCookie;
 import org.apache.tomcat.util.net.URL;
+import org.apache.coyote.ActionCode;
+import org.apache.tomcat.util.MutableBoolean;
 
 /**
  * Wrapper object for the Coyote response.
@@ -533,7 +535,9 @@ public class Response
      * Return true if bytes are available.
      */
     public boolean isWriteable() {
-        return (outputBuffer.lastWrite() > 0);
+        MutableBoolean bool = new MutableBoolean(false);
+        coyoteResponse.action(ActionCode.ACTION_COMET_WRITEABLE,bool);
+        return bool.get();    
     }
 
     
index 8b47725..d46f8ad 100644 (file)
@@ -167,11 +167,14 @@ public final class ActionCode {
     public static final ActionCode ACTION_COMET_REGISTER = new ActionCode(26);
     
     /**
-     * Unregister for notifications for a comet connection
+     * Action for getting the readable status
      */
-    public static final ActionCode ACTION_COMET_UNREGISTER = new ActionCode(27);
-    
+    public static final ActionCode ACTION_COMET_READABLE = new ActionCode(28);
 
+    /**
+     * Action for getting the writeable status
+     */
+    public static final ActionCode ACTION_COMET_WRITEABLE = new ActionCode(29);
 
     // ----------------------------------------------------------- Constructors
     int code;
index 7fd2d29..d18ab97 100644 (file)
@@ -123,8 +123,6 @@ public final class Response {
     protected String errorURI = null;
 
     protected Request req;
-
-    protected int lastWrite = 1;
     
     // ------------------------------------------------------------- Properties
 
@@ -190,16 +188,6 @@ public final class Response {
     // -------------------- State --------------------
 
 
-    public int getLastWrite() {
-        return lastWrite;
-    }
-
-    
-    public void setLastWrite(int lastWrite) {
-        this.lastWrite = lastWrite;
-    }
-
-
     public int getStatus() {
         return status;
     }
@@ -591,7 +579,6 @@ public final class Response {
         headers.clear();
 
         // update counters
-        lastWrite = 1;
         bytesWritten=0;
     }
 
index a068dd4..a503e74 100644 (file)
@@ -54,6 +54,7 @@ import org.apache.tomcat.util.net.NioEndpoint.Handler.SocketState;
 import org.apache.tomcat.util.res.StringManager;
 import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment;
 import org.apache.tomcat.util.net.PollerInterest;
+import org.apache.tomcat.util.MutableBoolean;
 
 
 /**
@@ -91,12 +92,12 @@ public class Http11NioProcessor implements ActionHook {
 
         request = new Request();
         int readTimeout = endpoint.getSoTimeout();
-        inputBuffer = new InternalNioInputBuffer(request, maxHttpHeaderSize,readTimeout);
+        inputBuffer = new InternalNioInputBuffer(request, maxHttpHeaderSize);
         request.setInputBuffer(inputBuffer);
 
         response = new Response();
         response.setHook(this);
-        outputBuffer = new InternalNioOutputBuffer(response, maxHttpHeaderSize,readTimeout);
+        outputBuffer = new InternalNioOutputBuffer(response, maxHttpHeaderSize);
         response.setOutputBuffer(outputBuffer);
         request.setResponse(response);
 
@@ -819,7 +820,6 @@ public class Http11NioProcessor implements ActionHook {
             try {
                 if( !disableUploadTimeout && keptAlive && soTimeout > 0 ) {
                     socket.getIOChannel().socket().setSoTimeout((int)soTimeout);
-                    inputBuffer.readTimeout = soTimeout;
                 }
                 if (!inputBuffer.parseRequestLine(keptAlive)) {
                     //no data available yet, since we might have read part
@@ -839,7 +839,6 @@ public class Http11NioProcessor implements ActionHook {
                 request.setStartTime(System.currentTimeMillis());
                 if (!disableUploadTimeout) { //only for body, not for request headers
                     socket.getIOChannel().socket().setSoTimeout((int)timeout);
-                    inputBuffer.readTimeout = soTimeout;
                 }
             } catch (IOException e) {
                 error = true;
@@ -1223,20 +1222,22 @@ public class Http11NioProcessor implements ActionHook {
         } else if (actionCode == ActionCode.ACTION_COMET_REGISTER) {
             int interest = getPollerInterest(param);
             NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
-            attach.setCometOps(attach.getCometOps()|interest);
-            //notify poller if not on a tomcat thread
-            RequestInfo rp = request.getRequestProcessor();
-            if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE )
-                socket.getPoller().cometInterest(socket);
-        } else if (actionCode == ActionCode.ACTION_COMET_UNREGISTER) {
-            int interest = getPollerInterest(param);
-            NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
-            attach.setCometOps(attach.getCometOps()& (~interest));
+            attach.setCometOps(interest);
             //notify poller if not on a tomcat thread
             RequestInfo rp = request.getRequestProcessor();
             if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE )
                 socket.getPoller().cometInterest(socket);
         } else if (actionCode == ActionCode.ACTION_COMET_CONFIGURE) {
+        } else if (actionCode == ActionCode.ACTION_COMET_READABLE) {
+            MutableBoolean bool = (MutableBoolean)param;
+            try {
+                bool.set(inputBuffer.isReadable());
+            }catch ( IOException x ) {
+                throw new RuntimeException(x);
+            }
+        } else if (actionCode == ActionCode.ACTION_COMET_WRITEABLE) {
+            MutableBoolean bool = (MutableBoolean)param;
+            bool.set(outputBuffer.isWritable());
         }
 
     }
index 0861d53..013f4fb 100644 (file)
@@ -30,6 +30,7 @@ import org.apache.tomcat.util.http.MimeHeaders;
 import org.apache.tomcat.util.net.NioChannel;
 import org.apache.tomcat.util.net.NioSelectorPool;
 import org.apache.tomcat.util.res.StringManager;
+import org.apache.tomcat.util.net.NioEndpoint;
 
 /**
  * Implementation of InputBuffer which provides HTTP request header parsing as
@@ -51,8 +52,7 @@ public class InternalNioInputBuffer implements InputBuffer {
     /**
      * Alternate constructor.
      */
-    public InternalNioInputBuffer(Request request, int headerBufferSize, 
-                                  long readTimeout) {
+    public InternalNioInputBuffer(Request request, int headerBufferSize) {
 
         this.request = request;
         headers = request.getMimeHeaders();
@@ -80,12 +80,6 @@ public class InternalNioInputBuffer implements InputBuffer {
         headerData.recycle();
         swallowInput = true;
 
-        if (readTimeout < 0) {
-            this.readTimeout = -1;
-        } else {
-            this.readTimeout = readTimeout;
-        }
-
     }
 
 
@@ -195,12 +189,6 @@ public class InternalNioInputBuffer implements InputBuffer {
     protected int lastActiveFilter;
 
 
-    /**
-     * The socket timeout used when reading the first block of the request
-     * header.
-     */
-    protected long readTimeout;
-
     // ------------------------------------------------------------- Properties
 
 
@@ -296,7 +284,23 @@ public class InternalNioInputBuffer implements InputBuffer {
     }
 
     // --------------------------------------------------------- Public Methods
-
+    /**
+     * Returns true if there are bytes available from the socket layer
+     * @return boolean
+     * @throws IOException
+     */
+    public boolean isReadable() throws IOException {
+        return (pos < lastValid) || (nbRead()>0);
+    }
+    
+    /**
+     * Issues a non blocking read
+     * @return int
+     * @throws IOException
+     */
+    public int nbRead() throws IOException {
+        return readSocket(true,false);
+    }
 
     /**
      * Recycle the input buffer. This should be called when closing the 
@@ -413,13 +417,8 @@ public class InternalNioInputBuffer implements InputBuffer {
                     if (useAvailableData) {
                         return false;
                     }
-                    if (readTimeout == -1) {
-                        if (!fill(false,true)) //request line parsing
-                            throw new EOFException(sm.getString("iib.eof.error"));
-                    } else {
-                        // Do a simple read with a short timeout
-                        if ( !readSocket(true, false) ) return false;
-                    }
+                    // Do a simple read with a short timeout
+                    if ( readSocket(true, false)==0 ) return false;
                 }
                 chr = buf[pos++];
             } while ((chr == Constants.CR) || (chr == Constants.LF));
@@ -434,13 +433,8 @@ public class InternalNioInputBuffer implements InputBuffer {
                 if (useAvailableData) {
                     return false;
                 }
-                if (readTimeout == -1) {
-                    if (!fill(false,false)) //request line parsing
-                        return false;
-                } else {
-                    // Do a simple read with a short timeout
-                    if ( !readSocket(true, false) ) return false;
-                }
+                // Do a simple read with a short timeout
+                if ( readSocket(true, false)==0 ) return false;
             }
             parsingRequestLinePhase = 2;
         }
@@ -552,6 +546,7 @@ public class InternalNioInputBuffer implements InputBuffer {
             tmp = null;
         }
     }
+    
     /**
      * Perform blocking read with a timeout if desired
      * @param timeout boolean - if we want to use the timeout data
@@ -560,15 +555,16 @@ public class InternalNioInputBuffer implements InputBuffer {
      * @throws IOException if a socket exception occurs
      * @throws EOFException if end of stream is reached
      */
-    private boolean readSocket(boolean timeout, boolean block) throws IOException {
+    private int readSocket(boolean timeout, boolean block) throws IOException {
         int nRead = 0;
-        long rto = timeout?this.readTimeout:-1;
         socket.getBufHandler().getReadBuffer().clear();
         if ( block ) {
             Selector selector = null;
             try { selector = getSelectorPool().get(); }catch ( IOException x ) {}
             try {
-                nRead = getSelectorPool().read(socket.getBufHandler().getReadBuffer(),socket,selector,rto);
+                NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
+                if ( att == null ) throw new IOException("Key must be cancelled.");
+                nRead = getSelectorPool().read(socket.getBufHandler().getReadBuffer(),socket,selector,att.getTimeout());
             } catch ( EOFException eof ) {
                 nRead = -1;
             } finally { 
@@ -583,12 +579,12 @@ public class InternalNioInputBuffer implements InputBuffer {
             expand(nRead + pos);
             socket.getBufHandler().getReadBuffer().get(buf, pos, nRead);
             lastValid = pos + nRead;
-            return true;
+            return nRead;
         } else if (nRead == -1) {
             //return false;
             throw new EOFException(sm.getString("iib.eof.error"));
         } else {
-            return false;
+            return 0;
         }
     }
 
@@ -852,7 +848,7 @@ public class InternalNioInputBuffer implements InputBuffer {
             }
 
             // Do a simple read with a short timeout
-            read = readSocket(timeout,block);
+            read = readSocket(timeout,block)>0;
         } else {
 
             if (buf.length - end < 4500) {
@@ -865,7 +861,7 @@ public class InternalNioInputBuffer implements InputBuffer {
             pos = end;
             lastValid = pos;
             // Do a simple read with a short timeout
-            read = readSocket(timeout, block);
+            read = readSocket(timeout, block)>0;
         }
         return read;
     }
index 9891ab1..70a4892 100644 (file)
@@ -34,6 +34,8 @@ import org.apache.tomcat.util.net.NioChannel;
 import org.apache.tomcat.util.net.NioEndpoint;
 import org.apache.tomcat.util.net.NioSelectorPool;
 import org.apache.tomcat.util.res.StringManager;
+import java.io.EOFException;
+import org.apache.tomcat.util.MutableInteger;
 
 /**
  * Output buffer.
@@ -56,14 +58,14 @@ public class InternalNioOutputBuffer
      * Default constructor.
      */
     public InternalNioOutputBuffer(Response response) {
-        this(response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE, 10000);
+        this(response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE);
     }
 
 
     /**
      * Alternate constructor.
      */
-    public InternalNioOutputBuffer(Response response, int headerBufferSize, long writeTimeout) {
+    public InternalNioOutputBuffer(Response response, int headerBufferSize) {
 
         this.response = response;
         headers = response.getMimeHeaders();
@@ -86,8 +88,6 @@ public class InternalNioOutputBuffer
         committed = false;
         finished = false;
         
-        this.writeTimeout = writeTimeout;
-
         // Cause loading of HttpMessages
         HttpMessages.getMessage(200);
 
@@ -142,6 +142,10 @@ public class InternalNioOutputBuffer
      */
     protected int pos;
 
+    /**
+     * Number of bytes last written
+     */
+    protected MutableInteger lastWrite = new MutableInteger(1);
 
     /**
      * Underlying socket.
@@ -179,12 +183,6 @@ public class InternalNioOutputBuffer
      */
     protected int lastActiveFilter;
     
-    /**
-     * Write time out in milliseconds
-     */
-    protected long writeTimeout = -1;
-
-
     // ------------------------------------------------------------- Properties
 
 
@@ -195,10 +193,6 @@ public class InternalNioOutputBuffer
         this.socket = socket;
     }
 
-    public void setWriteTimeout(long writeTimeout) {
-        this.writeTimeout = writeTimeout;
-    }
-
     /**
      * Get the underlying socket input stream.
      */
@@ -206,10 +200,6 @@ public class InternalNioOutputBuffer
         return socket;
     }
 
-    public long getWriteTimeout() {
-        return writeTimeout;
-    }
-
     public void setSelectorPool(NioSelectorPool pool) { 
         this.pool = pool;
     }
@@ -324,7 +314,6 @@ public class InternalNioOutputBuffer
 
         // Recycle Request object
         response.recycle();
-
     }
 
 
@@ -343,6 +332,7 @@ public class InternalNioOutputBuffer
         lastActiveFilter = -1;
         committed = false;
         finished = false;
+        lastWrite.set(1);
 
     }
 
@@ -401,7 +391,9 @@ public class InternalNioOutputBuffer
 
     }
 
-
+    public boolean isWritable() {
+        return lastWrite.get()>0;
+    }
     // ------------------------------------------------ HTTP/1.1 Output Methods
 
 
@@ -414,15 +406,25 @@ public class InternalNioOutputBuffer
         if (!committed) {
             //Socket.send(socket, Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length) < 0
             ByteBuffer buf = ByteBuffer.wrap(Constants.ACK_BYTES,0,Constants.ACK_BYTES.length);    
-            writeToSocket(buf,false);
+            writeToSocket(buf,false,true);
         }
 
     }
 
-    private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean flip) throws IOException {
+    /**
+     * 
+     * @param bytebuffer ByteBuffer
+     * @param flip boolean
+     * @return int
+     * @throws IOException
+     */
+    private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean flip, boolean block) throws IOException {
         //int limit = bytebuffer.position();
         if ( flip ) bytebuffer.flip();
         int written = 0;
+        NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
+        if ( att == null ) throw new IOException("Key must be cancelled");
+        long writeTimeout = att.getTimeout();
         Selector selector = null;
         try {
             selector = getSelectorPool().get();
@@ -430,10 +432,10 @@ public class InternalNioOutputBuffer
             //ignore
         }
         try {
-            written = getSelectorPool().write(bytebuffer, socket, selector, writeTimeout);
+            written = getSelectorPool().write(bytebuffer, socket, selector, writeTimeout, block,lastWrite);
             //make sure we are flushed 
             do {
-                if (socket.flush(true,selector,writeTimeout)) break;
+                if (socket.flush(true,selector,writeTimeout,lastWrite)) break;
             }while ( true );
         }finally { 
             if ( selector != null ) getSelectorPool().put(selector);
@@ -759,7 +761,7 @@ public class InternalNioOutputBuffer
 
         //write to the socket, if there is anything to write
         if (socket.getBufHandler().getWriteBuffer().position() > 0) {
-            writeToSocket(socket.getBufHandler().getWriteBuffer(),true);
+            writeToSocket(socket.getBufHandler().getWriteBuffer(),true,true);
         }
     }
 
diff --git a/java/org/apache/tomcat/util/MutableBoolean.java b/java/org/apache/tomcat/util/MutableBoolean.java
new file mode 100644 (file)
index 0000000..b7ea90a
--- /dev/null
@@ -0,0 +1,29 @@
+/*\r
+ *  Licensed to the Apache Software Foundation (ASF) under one or more\r
+ *  contributor license agreements.  See the NOTICE file distributed with\r
+ *  this work for additional information regarding copyright ownership.\r
+ *  The ASF licenses this file to You under the Apache License, Version 2.0\r
+ *  (the "License"); you may not use this file except in compliance with\r
+ *  the License.  You may obtain a copy of the License at\r
+ *\r
+ *      http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ *  Unless required by applicable law or agreed to in writing, software\r
+ *  distributed under the License is distributed on an "AS IS" BASIS,\r
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ *  See the License for the specific language governing permissions and\r
+ *  limitations under the License.\r
+ */\r
+\r
+package org.apache.tomcat.util;\r
+\r
+public class MutableBoolean {\r
+    protected boolean value = false;\r
+    public MutableBoolean() {}\r
+    public MutableBoolean(boolean val) {\r
+        this.value = val;\r
+    }\r
+    \r
+    public boolean get() { return value;}\r
+    public void set(boolean val) {this.value = val;}\r
+}\r
diff --git a/java/org/apache/tomcat/util/MutableInteger.java b/java/org/apache/tomcat/util/MutableInteger.java
new file mode 100644 (file)
index 0000000..5f58171
--- /dev/null
@@ -0,0 +1,29 @@
+/*\r
+ *  Licensed to the Apache Software Foundation (ASF) under one or more\r
+ *  contributor license agreements.  See the NOTICE file distributed with\r
+ *  this work for additional information regarding copyright ownership.\r
+ *  The ASF licenses this file to You under the Apache License, Version 2.0\r
+ *  (the "License"); you may not use this file except in compliance with\r
+ *  the License.  You may obtain a copy of the License at\r
+ *\r
+ *      http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ *  Unless required by applicable law or agreed to in writing, software\r
+ *  distributed under the License is distributed on an "AS IS" BASIS,\r
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ *  See the License for the specific language governing permissions and\r
+ *  limitations under the License.\r
+ */\r
+\r
+package org.apache.tomcat.util;\r
+\r
+public class MutableInteger {\r
+    protected int value = 0;\r
+    public MutableInteger() {}\r
+    public MutableInteger(int val) {\r
+        this.value = val;\r
+    }\r
+\r
+    public int get() { return value;}\r
+    public void set(int val) {this.value = val;}\r
+}\r
index 79027d8..f23e76d 100644 (file)
@@ -24,6 +24,7 @@ import java.nio.channels.SelectionKey;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment;
+import org.apache.tomcat.util.MutableInteger;
 
 public class NioBlockingSelector {
     public NioBlockingSelector() {
@@ -41,7 +42,7 @@ public class NioBlockingSelector {
      * @throws SocketTimeoutException if the write times out
      * @throws IOException if an IO Exception occurs in the underlying socket logic
      */
-    public static int write(ByteBuffer buf, NioChannel socket, long writeTimeout) throws IOException {
+    public static int write(ByteBuffer buf, NioChannel socket, long writeTimeout,MutableInteger lastWrite) throws IOException {
         SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
         int written = 0;
         boolean timedout = false;
@@ -55,6 +56,7 @@ public class NioBlockingSelector {
             while ( (!timedout) && buf.hasRemaining()) {
                 if (keycount > 0) { //only write if we were registered for a write
                     int cnt = socket.write(buf); //write the data
+                    lastWrite.set(cnt);
                     if (cnt == -1)
                         throw new EOFException();
                     written += cnt;
index 901ef5f..ba90442 100644 (file)
@@ -27,6 +27,7 @@ import org.apache.tomcat.util.net.NioEndpoint.Poller;
 import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler;
 import java.nio.channels.Selector;
 import java.nio.channels.SelectionKey;
+import org.apache.tomcat.util.MutableInteger;
 
 /**
  * 
@@ -70,7 +71,8 @@ public class NioChannel implements ByteChannel{
      * been flushed out and is empty
      * @return boolean
      */
-    public boolean flush(boolean block, Selector s,long timeout) throws IOException {
+    public boolean flush(boolean block, Selector s, long timeout,MutableInteger lastWrite) throws IOException {
+        if (lastWrite!=null) lastWrite.set(1);
         return true; //no network buffer in the regular channel
     }
 
index 727adcb..12a0443 100644 (file)
@@ -479,24 +479,13 @@ public class NioEndpoint {
     /**
      * The socket poller.
      */
-    protected Poller[] pollers = null;
-    protected int pollerRoundRobin = 0;
+    protected Poller poller = null;
     public Poller getPoller0() {
-        pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length;
-        Poller poller = pollers[pollerRoundRobin];
         return poller;
     }
 
-
-    /**
-     * The socket poller used for Comet support.
-     */
-    public Poller getCometPoller0() {
-        Poller poller = getPoller0();
-        return poller;
-    }
-
-
+    protected Poller readWritePoller = null;
+    
     /**
      * Dummy maxSpareThreads property.
      */
@@ -649,14 +638,10 @@ public class NioEndpoint {
      * Number of keepalive sockets.
      */
     public int getKeepAliveCount() {
-        if (pollers == null) {
+        if (poller == null) {
             return 0;
         } else {
-            int keepAliveCount = 0;
-            for (int i = 0; i < pollers.length; i++) {
-                keepAliveCount += pollers[i].getKeepAliveCount();
-            }
-            return keepAliveCount;
+                return poller.selector.keys().size();
         }
     }
 
@@ -793,16 +778,12 @@ public class NioEndpoint {
                 acceptorThread.start();
             }
 
-            // Start poller threads
-            pollers = new Poller[pollerThreadCount];
-            for (int i = 0; i < pollerThreadCount; i++) {
-                pollers[i] = new Poller();
-                pollers[i].init();
-                Thread pollerThread = new Thread(pollers[i], getName() + "-Poller-" + i);
-                pollerThread.setPriority(threadPriority);
-                pollerThread.setDaemon(true);
-                pollerThread.start();
-            }
+            // Start poller thread
+            poller = new Poller();
+            Thread pollerThread = new Thread(poller, getName() + "-ClientPoller");
+            pollerThread.setPriority(threadPriority);
+            pollerThread.setDaemon(true);
+            pollerThread.start();
         }
     }
 
@@ -836,10 +817,8 @@ public class NioEndpoint {
         if (running) {
             running = false;
             unlockAccept();
-            for (int i = 0; i < pollers.length; i++) {
-                pollers[i].destroy();
-            }
-            pollers = null;
+                poller.destroy();
+            poller = null;
         }
         eventCache.clear();
         keyCache.clear();
@@ -1118,6 +1097,8 @@ public class NioEndpoint {
     
     protected boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
         try {
+            KeyAttachment attachment = (KeyAttachment)socket.getAttachment(false);
+            attachment.setCometNotify(false); //will get reset upon next reg
             if (executor == null) {
                 getWorkerThread().assign(socket, status);
             } else {
@@ -1248,8 +1229,9 @@ public class NioEndpoint {
                             interestOps = (interestOps & (~OP_CALLBACK));//remove the callback flag
                             att.access();//to prevent timeout
                             //we are registering the key to start with, reset the fairness counter.
-                            att.interestOps(interestOps);
-                            key.interestOps(interestOps);
+                            int ops = key.interestOps() | interestOps;
+                            att.interestOps(ops);
+                            key.interestOps(ops);
                         } else {
                             cancel = true;
                         }
@@ -1269,6 +1251,7 @@ public class NioEndpoint {
             return super.toString()+"[intOps="+this.interestOps+"]";
         }
     }
+    
     /**
      * Poller class.
      */
@@ -1279,9 +1262,6 @@ public class NioEndpoint {
         
         protected boolean close = false;
         protected long nextExpiration = 0;//optimize expiration handling
-
-        protected int keepAliveCount = 0;
-        public int getKeepAliveCount() { return keepAliveCount; }
         
         protected AtomicLong wakeupCounter = new AtomicLong(0l);
         
@@ -1296,14 +1276,6 @@ public class NioEndpoint {
         public Selector getSelector() { return selector;}
 
         /**
-         * Create the poller. With some versions of APR, the maximum poller size will
-         * be 62 (reocmpiling APR is necessary to remove this limitation).
-         */
-        protected void init() {
-            keepAliveCount = 0;
-        }
-
-        /**
          * Destroy the poller.
          */
         protected void destroy() {
@@ -1379,7 +1351,7 @@ public class NioEndpoint {
             socket.setPoller(this);
             KeyAttachment key = keyCache.poll();
             final KeyAttachment ka = key!=null?key:new KeyAttachment();
-            ka.reset(this,socket);
+            ka.reset(this,socket,getSocketProperties().getSoTimeout());
             PollerEvent r = eventCache.poll();
             ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
             if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
@@ -1621,7 +1593,6 @@ public class NioEndpoint {
                     } else if ( ka.getError() ) {
                         cancelledKey(key, SocketStatus.ERROR,true);
                     } else if (ka.getComet() && ka.getCometNotify() ) {
-                        ka.setCometNotify(false);//this will get reset after invokation if callback is still in there
                         reg(key,ka,0);//avoid multiple calls, this gets reregistered after invokation
                         if (!processSocket(ka.getChannel(), SocketStatus.OPEN_CALLBACK)) processSocket(ka.getChannel(), SocketStatus.DISCONNECT);
                     }else if ((ka.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ) {
@@ -1656,13 +1627,13 @@ public class NioEndpoint {
         public KeyAttachment() {
             
         }
-        public void reset(Poller poller, NioChannel channel) {
+        public void reset(Poller poller, NioChannel channel, long soTimeout) {
             this.channel = channel;
             this.poller = poller;
             lastAccess = System.currentTimeMillis();
             currentAccess = false;
             comet = false;
-            timeout = -1;
+            timeout = soTimeout;
             error = false;
             lastRegistered = 0;
             sendfileData = null;
@@ -1676,7 +1647,7 @@ public class NioEndpoint {
         }
         
         public void reset() {
-            reset(null,null);
+            reset(null,null,-1);
         }
         
         public Poller getPoller() { return poller;}
index 5562b4d..1998105 100644 (file)
  */
 package org.apache.tomcat.util.net;
 
-import java.util.concurrent.atomic.AtomicInteger;
-import java.nio.channels.Selector;
+import java.io.EOFException;
 import java.io.IOException;
-import java.util.NoSuchElementException;
+import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
-import java.io.EOFException;
-import java.net.SocketTimeoutException;
+import java.nio.channels.Selector;
+import java.util.NoSuchElementException;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.util.MutableInteger;
+import java.util.Iterator;
 
 /**
  *
@@ -37,20 +40,23 @@ import org.apache.juli.logging.LogFactory;
  */
 
 public class NioSelectorPool {
+    protected static int threadCount = 0;
+    
     protected static Log log = LogFactory.getLog(NioSelectorPool.class);
 
     protected final static boolean SHARED =
         Boolean.valueOf(System.getProperty("org.apache.tomcat.util.net.NioSelectorShared", "true")).booleanValue();
-    protected static Selector SHARED_SELECTOR;
+    protected Selector SHARED_SELECTOR;
     
     protected int maxSelectors = 200;
+    protected long sharedSelectorTimeout = 30000;
     protected int maxSpareSelectors = -1;
     protected boolean enabled = true;
     protected AtomicInteger active = new AtomicInteger(0);
     protected AtomicInteger spare = new AtomicInteger(0);
     protected ConcurrentLinkedQueue<Selector> selectors = new ConcurrentLinkedQueue<Selector>();
 
-    protected static Selector getSharedSelector() throws IOException {
+    protected Selector getSharedSelector() throws IOException {
         if (SHARED && SHARED_SELECTOR == null) {
             synchronized ( NioSelectorPool.class ) {
                 if ( SHARED_SELECTOR == null )  {
@@ -127,12 +133,13 @@ public class NioSelectorPool {
      * @throws IOException if an IO Exception occurs in the underlying socket logic
      */
     public int write(ByteBuffer buf, NioChannel socket, Selector selector, long writeTimeout) throws IOException {
-        return write(buf,socket,selector,writeTimeout,true);
+        return write(buf,socket,selector,writeTimeout,true,null);
     }
     
-    public int write(ByteBuffer buf, NioChannel socket, Selector selector, long writeTimeout, boolean block) throws IOException {
-        if ( SHARED && block) {
-            return NioBlockingSelector.write(buf,socket,writeTimeout);
+    public int write(ByteBuffer buf, NioChannel socket, Selector selector, 
+                     long writeTimeout, boolean block,MutableInteger lastWrite) throws IOException {
+        if ( SHARED && block ) {
+            return NioBlockingSelector.write(buf,socket,writeTimeout,lastWrite);
         }
         SelectionKey key = null;
         int written = 0;
@@ -148,7 +155,9 @@ public class NioSelectorPool {
                 int cnt = 0;
                 if ( keycount > 0 ) { //only write if we were registered for a write
                     cnt = socket.write(buf); //write the data
+                    if (lastWrite!=null) lastWrite.set(cnt);
                     if (cnt == -1) throw new EOFException();
+                    
                     written += cnt;
                     if (cnt > 0) {
                         time = System.currentTimeMillis(); //reset our timeout timer
@@ -206,7 +215,7 @@ public class NioSelectorPool {
      * @throws IOException if an IO Exception occurs in the underlying socket logic
      */
     public int read(ByteBuffer buf, NioChannel socket, Selector selector, long readTimeout, boolean block) throws IOException {
-        if ( SHARED && block) {
+        if ( SHARED && block ) {
             return NioBlockingSelector.read(buf,socket,readTimeout);
         }
         SelectionKey key = null;
@@ -254,6 +263,10 @@ public class NioSelectorPool {
         this.enabled = enabled;
     }
 
+    public void setSharedSelectorTimeout(long sharedSelectorTimeout) {
+        this.sharedSelectorTimeout = sharedSelectorTimeout;
+    }
+
     public int getMaxSelectors() {
         return maxSelectors;
     }
@@ -265,4 +278,16 @@ public class NioSelectorPool {
     public boolean isEnabled() {
         return enabled;
     }
+
+    public long getSharedSelectorTimeout() {
+        return sharedSelectorTimeout;
+    }
+
+    public ConcurrentLinkedQueue getSelectors() {
+        return selectors;
+    }
+
+    public AtomicInteger getSpare() {
+        return spare;
+    }
 }
\ No newline at end of file
index 2463f60..3fbcc69 100644 (file)
@@ -25,6 +25,7 @@ import javax.net.ssl.SSLEngineResult;
 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
 import javax.net.ssl.SSLEngineResult.Status;
 import java.nio.channels.Selector;
+import org.apache.tomcat.util.MutableInteger;
 
 /**
  * 
@@ -102,11 +103,11 @@ public class SecureNioChannel extends NioChannel  {
      * been flushed out and is empty
      * @return boolean
      */
-    public boolean flush(boolean block, Selector s, long timeout) throws IOException {
+    public boolean flush(boolean block, Selector s, long timeout,MutableInteger lastWrite) throws IOException {
         if (!block) {
             flush(netOutBuffer);
         } else {
-            pool.write(netOutBuffer, this, s, timeout);
+            pool.write(netOutBuffer, this, s, timeout,block,lastWrite);
         }
         return !netOutBuffer.hasRemaining();
     }