Added in blocking logic to the NIO connector. This logic ensures that if there is...
authorfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Mon, 23 Oct 2006 17:39:28 +0000 (17:39 +0000)
committerfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Mon, 23 Oct 2006 17:39:28 +0000 (17:39 +0000)
Ideas for this implementation can be credited to the Tribes implementation where we have a pool of selectors, so that each sending thread uses its own selector and from Jeanfrancois Arcand's blog about wrapping a NIO channel in a blocking inputstream.

git-svn-id: https://svn.apache.org/repos/asf/tomcat/tc6.0.x/trunk@467044 13f79535-47bb-0310-9956-ffa450edef68

java/org/apache/catalina/tribes/transport/nio/NioSender.java
java/org/apache/coyote/http11/Http11NioProcessor.java
java/org/apache/coyote/http11/InternalNioInputBuffer.java
java/org/apache/coyote/http11/InternalNioOutputBuffer.java
java/org/apache/tomcat/util/net/NioChannel.java
java/org/apache/tomcat/util/net/NioEndpoint.java
java/org/apache/tomcat/util/net/NioSelectorPool.java [new file with mode: 0644]
java/org/apache/tomcat/util/net/SecureNioChannel.java

index 8fdbd94..38f2364 100644 (file)
@@ -31,6 +31,7 @@ import org.apache.catalina.tribes.io.XByteBuffer;
 import org.apache.catalina.tribes.transport.AbstractSender;\r
 import org.apache.catalina.tribes.transport.DataSender;\r
 import org.apache.catalina.tribes.RemoteProcessException;\r
+import java.io.EOFException;\r
 \r
 /**\r
  * This class is NOT thread safe and should never be used with more than one thread at a time\r
@@ -177,6 +178,7 @@ public class NioSender extends AbstractSender implements DataSender{
                 //weve written everything, or we are starting a new package\r
                 //protect against buffer overwrite\r
                 int byteswritten = socketChannel.write(writebuf);\r
+                if (byteswritten == -1 ) throw new EOFException();\r
                 remaining -= byteswritten;\r
                 //if the entire message was written from the buffer\r
                 //reset the position counter\r
index b753e60..b130b3b 100644 (file)
@@ -100,7 +100,7 @@ public class Http11NioProcessor implements ActionHook {
 
         response = new Response();
         response.setHook(this);
-        outputBuffer = new InternalNioOutputBuffer(response, headerBufferSize);
+        outputBuffer = new InternalNioOutputBuffer(response, headerBufferSize,readTimeout);
         response.setOutputBuffer(outputBuffer);
         request.setResponse(response);
 
@@ -806,6 +806,8 @@ public class Http11NioProcessor implements ActionHook {
         this.socket = socket;
         inputBuffer.setSocket(socket);
         outputBuffer.setSocket(socket);
+        inputBuffer.setSelectorPool(endpoint.getSelectorPool());
+        outputBuffer.setSelectorPool(endpoint.getSelectorPool());
 
         // Error flag
         error = false;
index 43019e9..a7eaaa3 100644 (file)
@@ -33,6 +33,8 @@ import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment;
 import org.apache.tomcat.util.net.NioEndpoint.Poller;
 import org.apache.tomcat.util.res.StringManager;
 import org.apache.tomcat.util.net.NioChannel;
+import org.apache.tomcat.util.net.NioSelectorPool;
+import java.nio.channels.Selector;
 
 /**
  * Implementation of InputBuffer which provides HTTP request header parsing as
@@ -157,7 +159,12 @@ public class InternalNioInputBuffer implements InputBuffer {
      * Underlying socket.
      */
     protected NioChannel socket;
-
+    
+    /**
+     * Selector pool, for blocking reads and blocking writes
+     */
+    protected NioSelectorPool pool;
+    
 
     /**
      * Underlying input buffer.
@@ -199,8 +206,7 @@ public class InternalNioInputBuffer implements InputBuffer {
     public void setSocket(NioChannel socket) {
         this.socket = socket;
     }
-
-
+    
     /**
      * Get the underlying socket input stream.
      */
@@ -208,6 +214,15 @@ public class InternalNioInputBuffer implements InputBuffer {
         return socket;
     }
 
+    public void setSelectorPool(NioSelectorPool pool) { 
+        this.pool = pool;
+    }
+    
+    public NioSelectorPool getSelectorPool() {
+        return pool;
+    }
+
+
     /**
      * Add an input filter to the filter library.
      */
@@ -549,47 +564,34 @@ public class InternalNioInputBuffer implements InputBuffer {
      */
     private boolean readSocket(boolean timeout, boolean block) throws IOException {
         int nRead = 0;
-        long start = System.currentTimeMillis();
-        boolean timedOut = false;
-        do {
-            
-            socket.getBufHandler().getReadBuffer().clear();
+        long rto = timeout?this.readTimeout:-1;
+        socket.getBufHandler().getReadBuffer().clear();
+        if ( block ) {
+            Selector selector = null;
+            try { selector = getSelectorPool().get(); }catch ( IOException x ) {}
+            try {
+                nRead = getSelectorPool().read(socket.getBufHandler().getReadBuffer(),socket.getIOChannel(),selector,rto);
+            } catch ( EOFException eof ) {
+                nRead = -1;
+            } finally { 
+                if ( selector != null ) getSelectorPool().put(selector);
+            }
+        } else {
             nRead = socket.read(socket.getBufHandler().getReadBuffer());
-            if (nRead > 0) {
-                socket.getBufHandler().getReadBuffer().flip();
-                socket.getBufHandler().getReadBuffer().limit(nRead);
-                expand(nRead + pos);
-                socket.getBufHandler().getReadBuffer().get(buf, pos, nRead);
-                lastValid = pos + nRead;
-                return true;
-            } else if (nRead == -1) {
-                //return false;
-                throw new EOFException(sm.getString("iib.eof.error"));
-            } else if ( !block ) {
-                return false;
-            } else {
-                timedOut = timeout && (readTimeout != -1) && ((System.currentTimeMillis()-start)>readTimeout);
-                if ( !timedOut && nRead == 0 )  {
-                    try {
-                        final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
-                        final KeyAttachment att = (KeyAttachment)key.attachment();
-                        //to do, add in a check, we might have just timed out on the wait,
-                        //so there is no need to register us again.
-                        boolean addToQueue = false;
-                        try { addToQueue = ((att.interestOps()&SelectionKey.OP_READ) != SelectionKey.OP_READ); } catch ( CancelledKeyException ckx ){ throw new IOException("Socket key cancelled.");}
-                        if ( addToQueue ) {
-                            synchronized (att.getMutex()) {
-                                addToReadQueue(key, att);
-                                att.getMutex().wait(readTimeout);
-                            }
-                        }//end if
-                    }catch ( Exception x ) {}
-                }
-             }
-        }while ( nRead == 0 && (!timedOut) );
-        //else throw new IOException(sm.getString("iib.failedread"));
-        //return false; //timeout
-        throw new IOException("read timed out.");
+        }
+        if (nRead > 0) {
+            socket.getBufHandler().getReadBuffer().flip();
+            socket.getBufHandler().getReadBuffer().limit(nRead);
+            expand(nRead + pos);
+            socket.getBufHandler().getReadBuffer().get(buf, pos, nRead);
+            lastValid = pos + nRead;
+            return true;
+        } else if (nRead == -1) {
+            //return false;
+            throw new EOFException(sm.getString("iib.eof.error"));
+        } else {
+            return false;
+        }
     }
 
     private void addToReadQueue(final SelectionKey key, final KeyAttachment att) {
index b3c3ae7..09f87e7 100644 (file)
 
 package org.apache.coyote.http11;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
 
 import org.apache.coyote.ActionCode;
 import org.apache.coyote.OutputBuffer;
@@ -31,6 +33,7 @@ import org.apache.tomcat.util.http.HttpMessages;
 import org.apache.tomcat.util.http.MimeHeaders;
 import org.apache.tomcat.util.net.NioChannel;
 import org.apache.tomcat.util.net.NioEndpoint;
+import org.apache.tomcat.util.net.NioSelectorPool;
 import org.apache.tomcat.util.res.StringManager;
 
 /**
@@ -54,14 +57,14 @@ public class InternalNioOutputBuffer
      * Default constructor.
      */
     public InternalNioOutputBuffer(Response response) {
-        this(response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE);
+        this(response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE, 10000);
     }
 
 
     /**
      * Alternate constructor.
      */
-    public InternalNioOutputBuffer(Response response, int headerBufferSize) {
+    public InternalNioOutputBuffer(Response response, int headerBufferSize, long writeTimeout) {
 
         this.response = response;
         headers = response.getMimeHeaders();
@@ -83,6 +86,8 @@ public class InternalNioOutputBuffer
 
         committed = false;
         finished = false;
+        
+        this.writeTimeout = writeTimeout;
 
         // Cause loading of HttpMessages
         HttpMessages.getMessage(200);
@@ -143,6 +148,12 @@ public class InternalNioOutputBuffer
      * Underlying socket.
      */
     protected NioChannel socket;
+    
+    /**
+     * Selector pool, for blocking reads and blocking writes
+     */
+    protected NioSelectorPool pool;
+
 
 
     /**
@@ -168,7 +179,11 @@ public class InternalNioOutputBuffer
      * Index of the last active filter.
      */
     protected int lastActiveFilter;
-
+    
+    /**
+     * Write time out in milliseconds
+     */
+    protected long writeTimeout = -1;
 
 
     // ------------------------------------------------------------- Properties
@@ -181,12 +196,28 @@ public class InternalNioOutputBuffer
         this.socket = socket;
     }
 
+    public void setWriteTimeout(long writeTimeout) {
+        this.writeTimeout = writeTimeout;
+    }
+
     /**
      * Get the underlying socket input stream.
      */
     public NioChannel getSocket() {
         return socket;
     }
+
+    public long getWriteTimeout() {
+        return writeTimeout;
+    }
+
+    public void setSelectorPool(NioSelectorPool pool) { 
+        this.pool = pool;
+    }
+
+    public NioSelectorPool getSelectorPool() {
+        return pool;
+    }    
     /**
      * Set the socket buffer size.
      */
@@ -392,14 +423,22 @@ public class InternalNioOutputBuffer
     private synchronized void writeToSocket(ByteBuffer bytebuffer, boolean flip) throws IOException {
         //int limit = bytebuffer.position();
         if ( flip ) bytebuffer.flip();
-        while ( bytebuffer.hasRemaining() ) {
-            int written = socket.write(bytebuffer);
+        int written = 0;
+        Selector selector = null;
+        try {
+            selector = getSelectorPool().get();
+        } catch ( IOException x ) {
+            //ignore
+        }
+        try {
+            written = getSelectorPool().write(bytebuffer, socket.getIOChannel(), selector, writeTimeout);
+            //make sure we are flushed 
+            do {
+                if (socket.flush(selector)) break;
+            }while ( true );
+        }finally { 
+            if ( selector != null ) getSelectorPool().put(selector);
         }
-        //make sure we are flushed 
-        do {
-            if (socket.flush()) break;
-        }while ( true );
-        
         socket.getBufHandler().getWriteBuffer().clear();
         this.total = 0;
     } 
index e839e6b..33b008f 100644 (file)
@@ -1,19 +1,21 @@
 /*\r
- *  Licensed to the Apache Software Foundation (ASF) under one or more\r
- *  contributor license agreements.  See the NOTICE file distributed with\r
- *  this work for additional information regarding copyright ownership.\r
- *  The ASF licenses this file to You under the Apache License, Version 2.0\r
- *  (the "License"); you may not use this file except in compliance with\r
- *  the License.  You may obtain a copy of the License at\r
- *\r
+ * Licensed to the Apache Software Foundation (ASF) under one or more\r
+ * contributor license agreements.  See the NOTICE file distributed with\r
+ * this work for additional information regarding copyright ownership.\r
+ * The ASF licenses this file to You under the Apache License, Version 2.0\r
+ * (the "License"); you may not use this file except in compliance with\r
+ * the License.  You may obtain a copy of the License at\r
+ * \r
  *      http://www.apache.org/licenses/LICENSE-2.0\r
- *\r
- *  Unless required by applicable law or agreed to in writing, software\r
- *  distributed under the License is distributed on an "AS IS" BASIS,\r
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- *  See the License for the specific language governing permissions and\r
- *  limitations under the License.\r
+ * \r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
  */\r
+\r
+\r
 package org.apache.tomcat.util.net;\r
 \r
 import java.io.IOException;\r
@@ -23,6 +25,7 @@ import java.nio.channels.SocketChannel;
 \r
 import org.apache.tomcat.util.net.NioEndpoint.Poller;\r
 import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler;\r
+import java.nio.channels.Selector;\r
 \r
 /**\r
  * \r
@@ -34,20 +37,20 @@ import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler;
  * @version 1.0\r
  */\r
 public class NioChannel implements ByteChannel{\r
-    \r
+\r
     protected static ByteBuffer emptyBuf = ByteBuffer.allocate(0);\r
 \r
     protected SocketChannel sc = null;\r
 \r
     protected ApplicationBufferHandler bufHandler;\r
-    \r
+\r
     protected Poller poller;\r
 \r
     public NioChannel(SocketChannel channel, ApplicationBufferHandler bufHandler) throws IOException {\r
         this.sc = channel;\r
         this.bufHandler = bufHandler;\r
     }\r
-    \r
+\r
     public void reset() throws IOException {\r
         bufHandler.getReadBuffer().clear();\r
         bufHandler.getWriteBuffer().clear();\r
@@ -58,7 +61,7 @@ public class NioChannel implements ByteChannel{
      * been flushed out and is empty\r
      * @return boolean\r
      */\r
-    public boolean flush() throws IOException {\r
+    public boolean flush(Selector s) throws IOException {\r
         return true; //no network buffer in the regular channel\r
     }\r
 \r
@@ -154,7 +157,7 @@ public class NioChannel implements ByteChannel{
     public boolean isInitHandshakeComplete() {\r
         return true;\r
     }\r
-    \r
+\r
     public int handshake(boolean read, boolean write) throws IOException {\r
         return 0;\r
     }\r
@@ -171,4 +174,4 @@ public class NioChannel implements ByteChannel{
         return super.toString()+":"+this.sc.toString();\r
     }\r
 \r
-}
\ No newline at end of file
+}\r
index 79e2343..268bbe3 100644 (file)
@@ -35,7 +35,6 @@ import java.util.StringTokenizer;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
-
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
@@ -143,6 +142,8 @@ public class NioEndpoint {
     protected int readBufSize = 8192;
     protected int writeBufSize = 8192;
     
+    protected NioSelectorPool selectorPool = new NioSelectorPool();;
+    
     /**
      * Server socket "pointer".
      */
@@ -418,6 +419,10 @@ public class NioEndpoint {
         this.readBufSize = readBufSize;
     }
 
+    public void setSelectorPool(NioSelectorPool selectorPool) {
+        this.selectorPool = selectorPool;
+    }
+
     protected SSLContext sslContext = null;
     public SSLContext getSSLContext() { return sslContext;}
     public void setSSLContext(SSLContext c) { sslContext = c;}
@@ -548,6 +553,9 @@ public class NioEndpoint {
             running = true;
             paused = false;
             
+            selectorPool.setMaxSelectors(maxThreads);
+            selectorPool.setMaxSpareSelectors(-1);
+            selectorPool.open();
             
             // Create worker collection
             if (executor == null) {
@@ -611,6 +619,7 @@ public class NioEndpoint {
             }
             pollers = null;
         }
+        try {selectorPool.close();}catch (IOException x){}
         nioChannels.clear();
     }
 
@@ -650,8 +659,12 @@ public class NioEndpoint {
         return readBufSize;
     }
 
+    public NioSelectorPool getSelectorPool() {
+        return selectorPool;
+    }
+
     /**
-     * Unlock the server socket accept using a bugus connection.
+     * Unlock the server socket accept using a bogus connection.
      */
     protected void unlockAccept() {
         java.net.Socket s = null;
@@ -709,7 +722,7 @@ public class NioEndpoint {
                     int appbufsize = engine.getSession().getApplicationBufferSize();
                     int bufsize = Math.max(Math.max(getReadBufSize(), getWriteBufSize()), appbufsize);
                     NioBufferHandler bufhandler = new NioBufferHandler(bufsize, bufsize);
-                    channel = new SecureNioChannel(socket, engine, bufhandler);
+                    channel = new SecureNioChannel(socket, engine, bufhandler, selectorPool);
                 } else {
                     NioBufferHandler bufhandler = new NioBufferHandler(getReadBufSize(), getWriteBufSize());
                     channel = new NioChannel(socket, bufhandler);
diff --git a/java/org/apache/tomcat/util/net/NioSelectorPool.java b/java/org/apache/tomcat/util/net/NioSelectorPool.java
new file mode 100644 (file)
index 0000000..2cc8568
--- /dev/null
@@ -0,0 +1,200 @@
+/*
+ *  Copyright 2005-2006 The Apache Software Foundation
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.tomcat.util.net;
+
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.nio.channels.Selector;
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.SelectionKey;
+import java.io.EOFException;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * 
+ * Thread safe non blocking selector pool
+ * @author Filip Hanik
+ * @version 1.0
+ * @since 6.0
+ */
+
+public class NioSelectorPool {
+    protected int maxSelectors = 100;
+    protected int maxSpareSelectors = -1;
+    protected boolean enabled = true;
+    protected AtomicInteger active = new AtomicInteger(0);
+    protected AtomicInteger spare = new AtomicInteger(0);
+    //protected LinkedList<Selector> selectors = new LinkedList<Selector>();
+    protected ConcurrentLinkedQueue<Selector> selectors = new ConcurrentLinkedQueue<Selector>();
+    
+    public Selector get() throws IOException{
+        if ( (!enabled) || active.incrementAndGet() >= maxSelectors ) {
+            active.decrementAndGet();
+            return null;
+        }
+        Selector s = null;
+        try {
+            s = selectors.size()>0?selectors.poll():null;
+            if (s == null) s = Selector.open();
+            else spare.decrementAndGet();
+            
+        }catch (NoSuchElementException x ) {
+            try {s = Selector.open();}catch (IOException iox){}
+        } finally {
+            if ( s == null ) active.decrementAndGet();//we were unable to find a selector
+        }
+        return s;            
+    }
+    
+    
+    
+    public void put(Selector s) throws IOException {
+        active.decrementAndGet();
+        if ( enabled && (maxSpareSelectors==-1 || spare.get() < Math.min(maxSpareSelectors,maxSelectors)) ) {
+            spare.incrementAndGet();
+            selectors.offer(s);
+        }
+        else s.close();
+    }
+    
+    public void close() throws IOException {
+        enabled = false;
+        Selector s;
+        while ( (s = selectors.poll()) != null ) s.close();
+        spare.set(0);
+    }
+    
+    public void open(){
+        enabled = true;
+    }
+    
+    /**
+     * Performs a blocking write using the bytebuffer for data to be written and a selector to block.
+     * If the <code>selector</code> parameter is null, then it will perform a busy write that could
+     * take up a lot of CPU cycles.
+     * @param buf ByteBuffer - the buffer containing the data, we will write as long as <code>(buf.hasRemaining()==true)</code>
+     * @param socket SocketChannel - the socket to write data to
+     * @param selector Selector - the selector to use for blocking, if null then a busy write will be initiated
+     * @param writeTimeout long - the timeout for this write operation in milliseconds, -1 means no timeout
+     * @return int - returns the number of bytes written
+     * @throws EOFException if write returns -1
+     * @throws SocketTimeoutException if the write times out
+     * @throws IOException if an IO Exception occurs in the underlying socket logic
+     */
+    public int write(ByteBuffer buf, SocketChannel socket, Selector selector, long writeTimeout) throws IOException {
+        SelectionKey key = null;
+        int written = 0;
+        boolean timedout = false;
+        int keycount = 1; //assume we can write
+        long time = System.currentTimeMillis(); //start the timeout timer
+        try {
+            while ( (!timedout) && buf.hasRemaining() ) {
+                if ( keycount > 0 ) { //only write if we were registered for a write
+                    int cnt = socket.write(buf); //write the data
+                    if (cnt == -1) throw new EOFException();
+                    written += cnt;
+                    if (cnt > 0) {
+                        time = System.currentTimeMillis(); //reset our timeout timer
+                        continue; //we successfully wrote, try again without a selector
+                    }
+                }
+                if ( selector != null ) {
+                    //register OP_WRITE to the selector
+                    if (key==null) key = socket.register(selector, SelectionKey.OP_WRITE);
+                    else key.interestOps(SelectionKey.OP_WRITE);
+                    keycount = selector.select(writeTimeout);
+                }                
+                if (writeTimeout > 0 && (selector == null || keycount == 0) ) timedout = (System.currentTimeMillis()-time)>=writeTimeout;
+            }//while
+            if ( timedout ) throw new SocketTimeoutException();
+        } finally {
+            if (key != null) key.cancel();
+            if (selector != null) selector.selectNow();
+        }
+        return written;
+    }
+    
+    /**
+     * Performs a blocking read using the bytebuffer for data to be read and a selector to block.
+     * If the <code>selector</code> parameter is null, then it will perform a busy read that could
+     * take up a lot of CPU cycles.
+     * @param buf ByteBuffer - the buffer containing the data, we will read as until we have read at least one byte or we timed out
+     * @param socket SocketChannel - the socket to write data to
+     * @param selector Selector - the selector to use for blocking, if null then a busy read will be initiated
+     * @param readTimeout long - the timeout for this read operation in milliseconds, -1 means no timeout
+     * @return int - returns the number of bytes read
+     * @throws EOFException if read returns -1
+     * @throws SocketTimeoutException if the read times out
+     * @throws IOException if an IO Exception occurs in the underlying socket logic
+     */
+    public int read(ByteBuffer buf, SocketChannel socket, Selector selector, long readTimeout) throws IOException {
+        SelectionKey key = null;
+        int read = 0;
+        boolean timedout = false;
+        int keycount = 1; //assume we can write
+        long time = System.currentTimeMillis(); //start the timeout timer
+        try {
+            while ( (!timedout) && read == 0 ) {
+                if ( keycount > 0 ) { //only read if we were registered for a read
+                    int cnt = socket.read(buf);
+                    if (cnt == -1) throw new EOFException();
+                    read += cnt;
+                    if (cnt > 0) break;
+                }
+                if ( selector != null ) {
+                    //register OP_WRITE to the selector
+                    if (key==null) key = socket.register(selector, SelectionKey.OP_READ);
+                    else key.interestOps(SelectionKey.OP_READ);
+                    keycount = selector.select(readTimeout);
+                }                
+                if (readTimeout > 0 && (selector == null || keycount == 0) ) timedout = (System.currentTimeMillis()-time)>=readTimeout;
+            }//while
+            if ( timedout ) throw new SocketTimeoutException();
+        } finally {
+            if (key != null) key.cancel();
+            if (selector != null) selector.selectNow();
+        }
+        return read;
+    }
+    
+    public void setMaxSelectors(int maxSelectors) {
+        this.maxSelectors = maxSelectors;
+    }
+
+    public void setMaxSpareSelectors(int maxSpareSelectors) {
+        this.maxSpareSelectors = maxSpareSelectors;
+    }
+
+    public void setEnabled(boolean enabled) {
+        this.enabled = enabled;
+    }
+
+    public int getMaxSelectors() {
+        return maxSelectors;
+    }
+
+    public int getMaxSpareSelectors() {
+        return maxSpareSelectors;
+    }
+
+    public boolean isEnabled() {
+        return enabled;
+    }
+}
\ No newline at end of file
index 980be1b..c7981cb 100644 (file)
@@ -8,6 +8,7 @@ import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLEngineResult;
 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
 import javax.net.ssl.SSLEngineResult.Status;
+import java.nio.channels.Selector;
 
 /**
  * 
@@ -29,7 +30,10 @@ public class SecureNioChannel extends NioChannel  {
     protected boolean closed = false;
     protected boolean closing = false;
     
-    public SecureNioChannel(SocketChannel channel, SSLEngine engine, ApplicationBufferHandler bufHandler) throws IOException {
+    protected NioSelectorPool pool;
+    
+    public SecureNioChannel(SocketChannel channel, SSLEngine engine, 
+                            ApplicationBufferHandler bufHandler, NioSelectorPool pool) throws IOException {
         super(channel,bufHandler);
         this.sslEngine = engine;
         int appBufSize = sslEngine.getSession().getApplicationBufferSize();
@@ -37,7 +41,10 @@ public class SecureNioChannel extends NioChannel  {
         //allocate network buffers - TODO, add in optional direct non-direct buffers
         if ( netInBuffer == null ) netInBuffer = ByteBuffer.allocateDirect(netBufSize);
         if ( netOutBuffer == null ) netOutBuffer = ByteBuffer.allocateDirect(netBufSize);
-
+        
+        //selector pool for blocking operations
+        this.pool = pool;
+        
         //ensure that the application has a large enough read/write buffers
         //by doing this, we should not encounter any buffer overflow errors
         bufHandler.expand(bufHandler.getReadBuffer(), appBufSize);
@@ -72,12 +79,13 @@ public class SecureNioChannel extends NioChannel  {
      * been flushed out and is empty
      * @return boolean
      */
-    public boolean flush() throws IOException {
-        return flush(netOutBuffer);
+    public boolean flush(Selector s, long timeout) throws IOException {
+        pool.write(netOutBuffer,sc,s,timeout);
+        return !netOutBuffer.hasRemaining();
     }
     
     /**
-     * Flushes the buffer to the network
+     * Flushes the buffer to the network, non blocking
      * @param buf ByteBuffer
      * @return boolean true if the buffer has been emptied out, false otherwise
      * @throws IOException