import org.apache.catalina.tribes.transport.AbstractSender;\r
import org.apache.catalina.tribes.transport.DataSender;\r
import org.apache.catalina.tribes.RemoteProcessException;\r
+import java.io.EOFException;\r
\r
/**\r
* This class is NOT thread safe and should never be used with more than one thread at a time\r
//weve written everything, or we are starting a new package\r
//protect against buffer overwrite\r
int byteswritten = socketChannel.write(writebuf);\r
+ if (byteswritten == -1 ) throw new EOFException();\r
remaining -= byteswritten;\r
//if the entire message was written from the buffer\r
//reset the position counter\r
response = new Response();
response.setHook(this);
- outputBuffer = new InternalNioOutputBuffer(response, headerBufferSize);
+ outputBuffer = new InternalNioOutputBuffer(response, headerBufferSize,readTimeout);
response.setOutputBuffer(outputBuffer);
request.setResponse(response);
this.socket = socket;
inputBuffer.setSocket(socket);
outputBuffer.setSocket(socket);
+ inputBuffer.setSelectorPool(endpoint.getSelectorPool());
+ outputBuffer.setSelectorPool(endpoint.getSelectorPool());
// Error flag
error = false;
import org.apache.tomcat.util.net.NioEndpoint.Poller;
import org.apache.tomcat.util.res.StringManager;
import org.apache.tomcat.util.net.NioChannel;
+import org.apache.tomcat.util.net.NioSelectorPool;
+import java.nio.channels.Selector;
/**
* Implementation of InputBuffer which provides HTTP request header parsing as
* Underlying socket.
*/
protected NioChannel socket;
-
+
+ /**
+ * Selector pool, for blocking reads and blocking writes
+ */
+ protected NioSelectorPool pool;
+
/**
* Underlying input buffer.
public void setSocket(NioChannel socket) {
this.socket = socket;
}
-
-
+
/**
* Get the underlying socket input stream.
*/
return socket;
}
+ public void setSelectorPool(NioSelectorPool pool) {
+ this.pool = pool;
+ }
+
+ public NioSelectorPool getSelectorPool() {
+ return pool;
+ }
+
+
/**
* Add an input filter to the filter library.
*/
*/
private boolean readSocket(boolean timeout, boolean block) throws IOException {
int nRead = 0;
- long start = System.currentTimeMillis();
- boolean timedOut = false;
- do {
-
- socket.getBufHandler().getReadBuffer().clear();
+ long rto = timeout?this.readTimeout:-1;
+ socket.getBufHandler().getReadBuffer().clear();
+ if ( block ) {
+ Selector selector = null;
+ try { selector = getSelectorPool().get(); }catch ( IOException x ) {}
+ try {
+ nRead = getSelectorPool().read(socket.getBufHandler().getReadBuffer(),socket.getIOChannel(),selector,rto);
+ } catch ( EOFException eof ) {
+ nRead = -1;
+ } finally {
+ if ( selector != null ) getSelectorPool().put(selector);
+ }
+ } else {
nRead = socket.read(socket.getBufHandler().getReadBuffer());
- if (nRead > 0) {
- socket.getBufHandler().getReadBuffer().flip();
- socket.getBufHandler().getReadBuffer().limit(nRead);
- expand(nRead + pos);
- socket.getBufHandler().getReadBuffer().get(buf, pos, nRead);
- lastValid = pos + nRead;
- return true;
- } else if (nRead == -1) {
- //return false;
- throw new EOFException(sm.getString("iib.eof.error"));
- } else if ( !block ) {
- return false;
- } else {
- timedOut = timeout && (readTimeout != -1) && ((System.currentTimeMillis()-start)>readTimeout);
- if ( !timedOut && nRead == 0 ) {
- try {
- final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
- final KeyAttachment att = (KeyAttachment)key.attachment();
- //to do, add in a check, we might have just timed out on the wait,
- //so there is no need to register us again.
- boolean addToQueue = false;
- try { addToQueue = ((att.interestOps()&SelectionKey.OP_READ) != SelectionKey.OP_READ); } catch ( CancelledKeyException ckx ){ throw new IOException("Socket key cancelled.");}
- if ( addToQueue ) {
- synchronized (att.getMutex()) {
- addToReadQueue(key, att);
- att.getMutex().wait(readTimeout);
- }
- }//end if
- }catch ( Exception x ) {}
- }
- }
- }while ( nRead == 0 && (!timedOut) );
- //else throw new IOException(sm.getString("iib.failedread"));
- //return false; //timeout
- throw new IOException("read timed out.");
+ }
+ if (nRead > 0) {
+ socket.getBufHandler().getReadBuffer().flip();
+ socket.getBufHandler().getReadBuffer().limit(nRead);
+ expand(nRead + pos);
+ socket.getBufHandler().getReadBuffer().get(buf, pos, nRead);
+ lastValid = pos + nRead;
+ return true;
+ } else if (nRead == -1) {
+ //return false;
+ throw new EOFException(sm.getString("iib.eof.error"));
+ } else {
+ return false;
+ }
}
private void addToReadQueue(final SelectionKey key, final KeyAttachment att) {
package org.apache.coyote.http11;
+import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
import org.apache.coyote.ActionCode;
import org.apache.coyote.OutputBuffer;
import org.apache.tomcat.util.http.MimeHeaders;
import org.apache.tomcat.util.net.NioChannel;
import org.apache.tomcat.util.net.NioEndpoint;
+import org.apache.tomcat.util.net.NioSelectorPool;
import org.apache.tomcat.util.res.StringManager;
/**
* Default constructor.
*/
public InternalNioOutputBuffer(Response response) {
- this(response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE);
+ this(response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE, 10000);
}
/**
* Alternate constructor.
*/
- public InternalNioOutputBuffer(Response response, int headerBufferSize) {
+ public InternalNioOutputBuffer(Response response, int headerBufferSize, long writeTimeout) {
this.response = response;
headers = response.getMimeHeaders();
committed = false;
finished = false;
+
+ this.writeTimeout = writeTimeout;
// Cause loading of HttpMessages
HttpMessages.getMessage(200);
* Underlying socket.
*/
protected NioChannel socket;
+
+ /**
+ * Selector pool, for blocking reads and blocking writes
+ */
+ protected NioSelectorPool pool;
+
/**
* Index of the last active filter.
*/
protected int lastActiveFilter;
-
+
+ /**
+ * Write time out in milliseconds
+ */
+ protected long writeTimeout = -1;
// ------------------------------------------------------------- Properties
this.socket = socket;
}
+ public void setWriteTimeout(long writeTimeout) {
+ this.writeTimeout = writeTimeout;
+ }
+
/**
* Get the underlying socket input stream.
*/
public NioChannel getSocket() {
return socket;
}
+
+ public long getWriteTimeout() {
+ return writeTimeout;
+ }
+
+ public void setSelectorPool(NioSelectorPool pool) {
+ this.pool = pool;
+ }
+
+ public NioSelectorPool getSelectorPool() {
+ return pool;
+ }
/**
* Set the socket buffer size.
*/
private synchronized void writeToSocket(ByteBuffer bytebuffer, boolean flip) throws IOException {
//int limit = bytebuffer.position();
if ( flip ) bytebuffer.flip();
- while ( bytebuffer.hasRemaining() ) {
- int written = socket.write(bytebuffer);
+ int written = 0;
+ Selector selector = null;
+ try {
+ selector = getSelectorPool().get();
+ } catch ( IOException x ) {
+ //ignore
+ }
+ try {
+ written = getSelectorPool().write(bytebuffer, socket.getIOChannel(), selector, writeTimeout);
+ //make sure we are flushed
+ do {
+ if (socket.flush(selector)) break;
+ }while ( true );
+ }finally {
+ if ( selector != null ) getSelectorPool().put(selector);
}
- //make sure we are flushed
- do {
- if (socket.flush()) break;
- }while ( true );
-
socket.getBufHandler().getWriteBuffer().clear();
this.total = 0;
}
/*\r
- * Licensed to the Apache Software Foundation (ASF) under one or more\r
- * contributor license agreements. See the NOTICE file distributed with\r
- * this work for additional information regarding copyright ownership.\r
- * The ASF licenses this file to You under the Apache License, Version 2.0\r
- * (the "License"); you may not use this file except in compliance with\r
- * the License. You may obtain a copy of the License at\r
- *\r
+ * Licensed to the Apache Software Foundation (ASF) under one or more\r
+ * contributor license agreements. See the NOTICE file distributed with\r
+ * this work for additional information regarding copyright ownership.\r
+ * The ASF licenses this file to You under the Apache License, Version 2.0\r
+ * (the "License"); you may not use this file except in compliance with\r
+ * the License. You may obtain a copy of the License at\r
+ * \r
* http://www.apache.org/licenses/LICENSE-2.0\r
- *\r
- * Unless required by applicable law or agreed to in writing, software\r
- * distributed under the License is distributed on an "AS IS" BASIS,\r
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * See the License for the specific language governing permissions and\r
- * limitations under the License.\r
+ * \r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
*/\r
+\r
+\r
package org.apache.tomcat.util.net;\r
\r
import java.io.IOException;\r
\r
import org.apache.tomcat.util.net.NioEndpoint.Poller;\r
import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler;\r
+import java.nio.channels.Selector;\r
\r
/**\r
* \r
* @version 1.0\r
*/\r
public class NioChannel implements ByteChannel{\r
- \r
+\r
protected static ByteBuffer emptyBuf = ByteBuffer.allocate(0);\r
\r
protected SocketChannel sc = null;\r
\r
protected ApplicationBufferHandler bufHandler;\r
- \r
+\r
protected Poller poller;\r
\r
public NioChannel(SocketChannel channel, ApplicationBufferHandler bufHandler) throws IOException {\r
this.sc = channel;\r
this.bufHandler = bufHandler;\r
}\r
- \r
+\r
public void reset() throws IOException {\r
bufHandler.getReadBuffer().clear();\r
bufHandler.getWriteBuffer().clear();\r
* been flushed out and is empty\r
* @return boolean\r
*/\r
- public boolean flush() throws IOException {\r
+ public boolean flush(Selector s) throws IOException {\r
return true; //no network buffer in the regular channel\r
}\r
\r
public boolean isInitHandshakeComplete() {\r
return true;\r
}\r
- \r
+\r
public int handshake(boolean read, boolean write) throws IOException {\r
return 0;\r
}\r
return super.toString()+":"+this.sc.toString();\r
}\r
\r
-}
\ No newline at end of file
+}\r
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
-
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
protected int readBufSize = 8192;
protected int writeBufSize = 8192;
+ protected NioSelectorPool selectorPool = new NioSelectorPool();;
+
/**
* Server socket "pointer".
*/
this.readBufSize = readBufSize;
}
+ public void setSelectorPool(NioSelectorPool selectorPool) {
+ this.selectorPool = selectorPool;
+ }
+
protected SSLContext sslContext = null;
public SSLContext getSSLContext() { return sslContext;}
public void setSSLContext(SSLContext c) { sslContext = c;}
running = true;
paused = false;
+ selectorPool.setMaxSelectors(maxThreads);
+ selectorPool.setMaxSpareSelectors(-1);
+ selectorPool.open();
// Create worker collection
if (executor == null) {
}
pollers = null;
}
+ try {selectorPool.close();}catch (IOException x){}
nioChannels.clear();
}
return readBufSize;
}
+ public NioSelectorPool getSelectorPool() {
+ return selectorPool;
+ }
+
/**
- * Unlock the server socket accept using a bugus connection.
+ * Unlock the server socket accept using a bogus connection.
*/
protected void unlockAccept() {
java.net.Socket s = null;
int appbufsize = engine.getSession().getApplicationBufferSize();
int bufsize = Math.max(Math.max(getReadBufSize(), getWriteBufSize()), appbufsize);
NioBufferHandler bufhandler = new NioBufferHandler(bufsize, bufsize);
- channel = new SecureNioChannel(socket, engine, bufhandler);
+ channel = new SecureNioChannel(socket, engine, bufhandler, selectorPool);
} else {
NioBufferHandler bufhandler = new NioBufferHandler(getReadBufSize(), getWriteBufSize());
channel = new NioChannel(socket, bufhandler);
--- /dev/null
+/*
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.net;
+
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.nio.channels.Selector;
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.SelectionKey;
+import java.io.EOFException;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ *
+ * Thread safe non blocking selector pool
+ * @author Filip Hanik
+ * @version 1.0
+ * @since 6.0
+ */
+
+public class NioSelectorPool {
+ protected int maxSelectors = 100;
+ protected int maxSpareSelectors = -1;
+ protected boolean enabled = true;
+ protected AtomicInteger active = new AtomicInteger(0);
+ protected AtomicInteger spare = new AtomicInteger(0);
+ //protected LinkedList<Selector> selectors = new LinkedList<Selector>();
+ protected ConcurrentLinkedQueue<Selector> selectors = new ConcurrentLinkedQueue<Selector>();
+
+ public Selector get() throws IOException{
+ if ( (!enabled) || active.incrementAndGet() >= maxSelectors ) {
+ active.decrementAndGet();
+ return null;
+ }
+ Selector s = null;
+ try {
+ s = selectors.size()>0?selectors.poll():null;
+ if (s == null) s = Selector.open();
+ else spare.decrementAndGet();
+
+ }catch (NoSuchElementException x ) {
+ try {s = Selector.open();}catch (IOException iox){}
+ } finally {
+ if ( s == null ) active.decrementAndGet();//we were unable to find a selector
+ }
+ return s;
+ }
+
+
+
+ public void put(Selector s) throws IOException {
+ active.decrementAndGet();
+ if ( enabled && (maxSpareSelectors==-1 || spare.get() < Math.min(maxSpareSelectors,maxSelectors)) ) {
+ spare.incrementAndGet();
+ selectors.offer(s);
+ }
+ else s.close();
+ }
+
+ public void close() throws IOException {
+ enabled = false;
+ Selector s;
+ while ( (s = selectors.poll()) != null ) s.close();
+ spare.set(0);
+ }
+
+ public void open(){
+ enabled = true;
+ }
+
+ /**
+ * Performs a blocking write using the bytebuffer for data to be written and a selector to block.
+ * If the <code>selector</code> parameter is null, then it will perform a busy write that could
+ * take up a lot of CPU cycles.
+ * @param buf ByteBuffer - the buffer containing the data, we will write as long as <code>(buf.hasRemaining()==true)</code>
+ * @param socket SocketChannel - the socket to write data to
+ * @param selector Selector - the selector to use for blocking, if null then a busy write will be initiated
+ * @param writeTimeout long - the timeout for this write operation in milliseconds, -1 means no timeout
+ * @return int - returns the number of bytes written
+ * @throws EOFException if write returns -1
+ * @throws SocketTimeoutException if the write times out
+ * @throws IOException if an IO Exception occurs in the underlying socket logic
+ */
+ public int write(ByteBuffer buf, SocketChannel socket, Selector selector, long writeTimeout) throws IOException {
+ SelectionKey key = null;
+ int written = 0;
+ boolean timedout = false;
+ int keycount = 1; //assume we can write
+ long time = System.currentTimeMillis(); //start the timeout timer
+ try {
+ while ( (!timedout) && buf.hasRemaining() ) {
+ if ( keycount > 0 ) { //only write if we were registered for a write
+ int cnt = socket.write(buf); //write the data
+ if (cnt == -1) throw new EOFException();
+ written += cnt;
+ if (cnt > 0) {
+ time = System.currentTimeMillis(); //reset our timeout timer
+ continue; //we successfully wrote, try again without a selector
+ }
+ }
+ if ( selector != null ) {
+ //register OP_WRITE to the selector
+ if (key==null) key = socket.register(selector, SelectionKey.OP_WRITE);
+ else key.interestOps(SelectionKey.OP_WRITE);
+ keycount = selector.select(writeTimeout);
+ }
+ if (writeTimeout > 0 && (selector == null || keycount == 0) ) timedout = (System.currentTimeMillis()-time)>=writeTimeout;
+ }//while
+ if ( timedout ) throw new SocketTimeoutException();
+ } finally {
+ if (key != null) key.cancel();
+ if (selector != null) selector.selectNow();
+ }
+ return written;
+ }
+
+ /**
+ * Performs a blocking read using the bytebuffer for data to be read and a selector to block.
+ * If the <code>selector</code> parameter is null, then it will perform a busy read that could
+ * take up a lot of CPU cycles.
+ * @param buf ByteBuffer - the buffer containing the data, we will read as until we have read at least one byte or we timed out
+ * @param socket SocketChannel - the socket to write data to
+ * @param selector Selector - the selector to use for blocking, if null then a busy read will be initiated
+ * @param readTimeout long - the timeout for this read operation in milliseconds, -1 means no timeout
+ * @return int - returns the number of bytes read
+ * @throws EOFException if read returns -1
+ * @throws SocketTimeoutException if the read times out
+ * @throws IOException if an IO Exception occurs in the underlying socket logic
+ */
+ public int read(ByteBuffer buf, SocketChannel socket, Selector selector, long readTimeout) throws IOException {
+ SelectionKey key = null;
+ int read = 0;
+ boolean timedout = false;
+ int keycount = 1; //assume we can write
+ long time = System.currentTimeMillis(); //start the timeout timer
+ try {
+ while ( (!timedout) && read == 0 ) {
+ if ( keycount > 0 ) { //only read if we were registered for a read
+ int cnt = socket.read(buf);
+ if (cnt == -1) throw new EOFException();
+ read += cnt;
+ if (cnt > 0) break;
+ }
+ if ( selector != null ) {
+ //register OP_WRITE to the selector
+ if (key==null) key = socket.register(selector, SelectionKey.OP_READ);
+ else key.interestOps(SelectionKey.OP_READ);
+ keycount = selector.select(readTimeout);
+ }
+ if (readTimeout > 0 && (selector == null || keycount == 0) ) timedout = (System.currentTimeMillis()-time)>=readTimeout;
+ }//while
+ if ( timedout ) throw new SocketTimeoutException();
+ } finally {
+ if (key != null) key.cancel();
+ if (selector != null) selector.selectNow();
+ }
+ return read;
+ }
+
+ public void setMaxSelectors(int maxSelectors) {
+ this.maxSelectors = maxSelectors;
+ }
+
+ public void setMaxSpareSelectors(int maxSpareSelectors) {
+ this.maxSpareSelectors = maxSpareSelectors;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ public int getMaxSelectors() {
+ return maxSelectors;
+ }
+
+ public int getMaxSpareSelectors() {
+ return maxSpareSelectors;
+ }
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+}
\ No newline at end of file
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLEngineResult.Status;
+import java.nio.channels.Selector;
/**
*
protected boolean closed = false;
protected boolean closing = false;
- public SecureNioChannel(SocketChannel channel, SSLEngine engine, ApplicationBufferHandler bufHandler) throws IOException {
+ protected NioSelectorPool pool;
+
+ public SecureNioChannel(SocketChannel channel, SSLEngine engine,
+ ApplicationBufferHandler bufHandler, NioSelectorPool pool) throws IOException {
super(channel,bufHandler);
this.sslEngine = engine;
int appBufSize = sslEngine.getSession().getApplicationBufferSize();
//allocate network buffers - TODO, add in optional direct non-direct buffers
if ( netInBuffer == null ) netInBuffer = ByteBuffer.allocateDirect(netBufSize);
if ( netOutBuffer == null ) netOutBuffer = ByteBuffer.allocateDirect(netBufSize);
-
+
+ //selector pool for blocking operations
+ this.pool = pool;
+
//ensure that the application has a large enough read/write buffers
//by doing this, we should not encounter any buffer overflow errors
bufHandler.expand(bufHandler.getReadBuffer(), appBufSize);
* been flushed out and is empty
* @return boolean
*/
- public boolean flush() throws IOException {
- return flush(netOutBuffer);
+ public boolean flush(Selector s, long timeout) throws IOException {
+ pool.write(netOutBuffer,sc,s,timeout);
+ return !netOutBuffer.hasRemaining();
}
/**
- * Flushes the buffer to the network
+ * Flushes the buffer to the network, non blocking
* @param buf ByteBuffer
* @return boolean true if the buffer has been emptied out, false otherwise
* @throws IOException