import org.apache.coyote.ActionCode;
import org.apache.tomcat.util.net.PollerInterest;
import java.util.Arrays;
+import org.apache.tomcat.util.MutableBoolean;
public class CometEventImpl implements CometEvent {
}
public boolean isReadable() {
- return request.isReadable();
+ return request.isAvailable() || request.isReadable();
}
public boolean isWriteable() {
return response.isWriteable();
}
+ public boolean hasOp(CometEvent.CometOperation op) {
+ return cometOperations.contains(op);
+ }
+
public void configure(CometEvent.CometConfiguration... options)
throws IOException, IllegalStateException {
checkWorkerThread();
throws IOException, IllegalStateException {
//remove from the registered set
cometOperations.removeAll(Arrays.asList(operations));
- request.action(ActionCode.ACTION_COMET_UNREGISTER, translate(cometOperations.toArray(new CometOperation[0])));
+ request.action(ActionCode.ACTION_COMET_REGISTER, translate(cometOperations.toArray(new CometOperation[0])));
}
public CometConfiguration[] getConfiguration() {
}
if (response.isClosed() || !request.isComet()) {
res.action(ActionCode.ACTION_COMET_END, null);
- } else if (!error && read && request.isReadable()) {
+ } else if (!error && read && request.isAvailable()) {
// If this was a read and not all bytes have been read, or if no data
// was read from the connector, then it is an error
error = true;
if (request.isComet()) {
if (!response.isClosed() && !response.isError()) {
- if (request.isReadable()) {
+ if (request.isAvailable() && request.hasOp(CometEvent.CometOperation.OP_READ)) {
// Invoke a read event right away if there are available bytes
if (event(req, res, SocketStatus.OPEN_READ)) {
comet = true;
}
- /**
- * Return the amount of bytes written by the lower layer.
- */
- protected int lastWrite() {
- return coyoteResponse.getLastWrite();
- }
-
// ------------------------------------------------- Bytes Handling Methods
import org.apache.catalina.util.RequestUtil;
import org.apache.catalina.util.StringManager;
import org.apache.catalina.util.StringParser;
+import org.apache.tomcat.util.MutableBoolean;
+import org.apache.catalina.CometEvent;
/**
/**
- * Return true if bytes are available.
+ * Return true if bytes are available at the servlet layer
*/
- public boolean isReadable() {
+ public boolean isAvailable() {
return (inputBuffer.available() > 0);
}
-
+ /**
+ * returns true if we read data from the socket
+ * @return boolean
+ */
+ public boolean isReadable() {
+ MutableBoolean bool = new MutableBoolean(false);
+ action(ActionCode.ACTION_COMET_READABLE,bool);
+ return bool.get();
+ }
+
+ public boolean hasOp(CometEvent.CometOperation op) {
+ if ( !comet || getEvent()==null ) return false;
+ return event.hasOp(op);
+ }
// ------------------------------------------------------ Protected Methods
protected void action(ActionCode actionCode, Object param) {
import org.apache.tomcat.util.http.MimeHeaders;
import org.apache.tomcat.util.http.ServerCookie;
import org.apache.tomcat.util.net.URL;
+import org.apache.coyote.ActionCode;
+import org.apache.tomcat.util.MutableBoolean;
/**
* Wrapper object for the Coyote response.
* Return true if bytes are available.
*/
public boolean isWriteable() {
- return (outputBuffer.lastWrite() > 0);
+ MutableBoolean bool = new MutableBoolean(false);
+ coyoteResponse.action(ActionCode.ACTION_COMET_WRITEABLE,bool);
+ return bool.get();
}
public static final ActionCode ACTION_COMET_REGISTER = new ActionCode(26);
/**
- * Unregister for notifications for a comet connection
+ * Action for getting the readable status
*/
- public static final ActionCode ACTION_COMET_UNREGISTER = new ActionCode(27);
-
+ public static final ActionCode ACTION_COMET_READABLE = new ActionCode(28);
+ /**
+ * Action for getting the writeable status
+ */
+ public static final ActionCode ACTION_COMET_WRITEABLE = new ActionCode(29);
// ----------------------------------------------------------- Constructors
int code;
protected String errorURI = null;
protected Request req;
-
- protected int lastWrite = 1;
// ------------------------------------------------------------- Properties
// -------------------- State --------------------
- public int getLastWrite() {
- return lastWrite;
- }
-
-
- public void setLastWrite(int lastWrite) {
- this.lastWrite = lastWrite;
- }
-
-
public int getStatus() {
return status;
}
headers.clear();
// update counters
- lastWrite = 1;
bytesWritten=0;
}
import org.apache.tomcat.util.res.StringManager;
import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment;
import org.apache.tomcat.util.net.PollerInterest;
+import org.apache.tomcat.util.MutableBoolean;
/**
request = new Request();
int readTimeout = endpoint.getSoTimeout();
- inputBuffer = new InternalNioInputBuffer(request, maxHttpHeaderSize,readTimeout);
+ inputBuffer = new InternalNioInputBuffer(request, maxHttpHeaderSize);
request.setInputBuffer(inputBuffer);
response = new Response();
response.setHook(this);
- outputBuffer = new InternalNioOutputBuffer(response, maxHttpHeaderSize,readTimeout);
+ outputBuffer = new InternalNioOutputBuffer(response, maxHttpHeaderSize);
response.setOutputBuffer(outputBuffer);
request.setResponse(response);
try {
if( !disableUploadTimeout && keptAlive && soTimeout > 0 ) {
socket.getIOChannel().socket().setSoTimeout((int)soTimeout);
- inputBuffer.readTimeout = soTimeout;
}
if (!inputBuffer.parseRequestLine(keptAlive)) {
//no data available yet, since we might have read part
request.setStartTime(System.currentTimeMillis());
if (!disableUploadTimeout) { //only for body, not for request headers
socket.getIOChannel().socket().setSoTimeout((int)timeout);
- inputBuffer.readTimeout = soTimeout;
}
} catch (IOException e) {
error = true;
} else if (actionCode == ActionCode.ACTION_COMET_REGISTER) {
int interest = getPollerInterest(param);
NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
- attach.setCometOps(attach.getCometOps()|interest);
- //notify poller if not on a tomcat thread
- RequestInfo rp = request.getRequestProcessor();
- if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE )
- socket.getPoller().cometInterest(socket);
- } else if (actionCode == ActionCode.ACTION_COMET_UNREGISTER) {
- int interest = getPollerInterest(param);
- NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
- attach.setCometOps(attach.getCometOps()& (~interest));
+ attach.setCometOps(interest);
//notify poller if not on a tomcat thread
RequestInfo rp = request.getRequestProcessor();
if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE )
socket.getPoller().cometInterest(socket);
} else if (actionCode == ActionCode.ACTION_COMET_CONFIGURE) {
+ } else if (actionCode == ActionCode.ACTION_COMET_READABLE) {
+ MutableBoolean bool = (MutableBoolean)param;
+ try {
+ bool.set(inputBuffer.isReadable());
+ }catch ( IOException x ) {
+ throw new RuntimeException(x);
+ }
+ } else if (actionCode == ActionCode.ACTION_COMET_WRITEABLE) {
+ MutableBoolean bool = (MutableBoolean)param;
+ bool.set(outputBuffer.isWritable());
}
}
import org.apache.tomcat.util.net.NioChannel;
import org.apache.tomcat.util.net.NioSelectorPool;
import org.apache.tomcat.util.res.StringManager;
+import org.apache.tomcat.util.net.NioEndpoint;
/**
* Implementation of InputBuffer which provides HTTP request header parsing as
/**
* Alternate constructor.
*/
- public InternalNioInputBuffer(Request request, int headerBufferSize,
- long readTimeout) {
+ public InternalNioInputBuffer(Request request, int headerBufferSize) {
this.request = request;
headers = request.getMimeHeaders();
headerData.recycle();
swallowInput = true;
- if (readTimeout < 0) {
- this.readTimeout = -1;
- } else {
- this.readTimeout = readTimeout;
- }
-
}
protected int lastActiveFilter;
- /**
- * The socket timeout used when reading the first block of the request
- * header.
- */
- protected long readTimeout;
-
// ------------------------------------------------------------- Properties
}
// --------------------------------------------------------- Public Methods
-
+ /**
+ * Returns true if there are bytes available from the socket layer
+ * @return boolean
+ * @throws IOException
+ */
+ public boolean isReadable() throws IOException {
+ return (pos < lastValid) || (nbRead()>0);
+ }
+
+ /**
+ * Issues a non blocking read
+ * @return int
+ * @throws IOException
+ */
+ public int nbRead() throws IOException {
+ return readSocket(true,false);
+ }
/**
* Recycle the input buffer. This should be called when closing the
if (useAvailableData) {
return false;
}
- if (readTimeout == -1) {
- if (!fill(false,true)) //request line parsing
- throw new EOFException(sm.getString("iib.eof.error"));
- } else {
- // Do a simple read with a short timeout
- if ( !readSocket(true, false) ) return false;
- }
+ // Do a simple read with a short timeout
+ if ( readSocket(true, false)==0 ) return false;
}
chr = buf[pos++];
} while ((chr == Constants.CR) || (chr == Constants.LF));
if (useAvailableData) {
return false;
}
- if (readTimeout == -1) {
- if (!fill(false,false)) //request line parsing
- return false;
- } else {
- // Do a simple read with a short timeout
- if ( !readSocket(true, false) ) return false;
- }
+ // Do a simple read with a short timeout
+ if ( readSocket(true, false)==0 ) return false;
}
parsingRequestLinePhase = 2;
}
tmp = null;
}
}
+
/**
* Perform blocking read with a timeout if desired
* @param timeout boolean - if we want to use the timeout data
* @throws IOException if a socket exception occurs
* @throws EOFException if end of stream is reached
*/
- private boolean readSocket(boolean timeout, boolean block) throws IOException {
+ private int readSocket(boolean timeout, boolean block) throws IOException {
int nRead = 0;
- long rto = timeout?this.readTimeout:-1;
socket.getBufHandler().getReadBuffer().clear();
if ( block ) {
Selector selector = null;
try { selector = getSelectorPool().get(); }catch ( IOException x ) {}
try {
- nRead = getSelectorPool().read(socket.getBufHandler().getReadBuffer(),socket,selector,rto);
+ NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
+ if ( att == null ) throw new IOException("Key must be cancelled.");
+ nRead = getSelectorPool().read(socket.getBufHandler().getReadBuffer(),socket,selector,att.getTimeout());
} catch ( EOFException eof ) {
nRead = -1;
} finally {
expand(nRead + pos);
socket.getBufHandler().getReadBuffer().get(buf, pos, nRead);
lastValid = pos + nRead;
- return true;
+ return nRead;
} else if (nRead == -1) {
//return false;
throw new EOFException(sm.getString("iib.eof.error"));
} else {
- return false;
+ return 0;
}
}
}
// Do a simple read with a short timeout
- read = readSocket(timeout,block);
+ read = readSocket(timeout,block)>0;
} else {
if (buf.length - end < 4500) {
pos = end;
lastValid = pos;
// Do a simple read with a short timeout
- read = readSocket(timeout, block);
+ read = readSocket(timeout, block)>0;
}
return read;
}
import org.apache.tomcat.util.net.NioEndpoint;
import org.apache.tomcat.util.net.NioSelectorPool;
import org.apache.tomcat.util.res.StringManager;
+import java.io.EOFException;
+import org.apache.tomcat.util.MutableInteger;
/**
* Output buffer.
* Default constructor.
*/
public InternalNioOutputBuffer(Response response) {
- this(response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE, 10000);
+ this(response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE);
}
/**
* Alternate constructor.
*/
- public InternalNioOutputBuffer(Response response, int headerBufferSize, long writeTimeout) {
+ public InternalNioOutputBuffer(Response response, int headerBufferSize) {
this.response = response;
headers = response.getMimeHeaders();
committed = false;
finished = false;
- this.writeTimeout = writeTimeout;
-
// Cause loading of HttpMessages
HttpMessages.getMessage(200);
*/
protected int pos;
+ /**
+ * Number of bytes last written
+ */
+ protected MutableInteger lastWrite = new MutableInteger(1);
/**
* Underlying socket.
*/
protected int lastActiveFilter;
- /**
- * Write time out in milliseconds
- */
- protected long writeTimeout = -1;
-
-
// ------------------------------------------------------------- Properties
this.socket = socket;
}
- public void setWriteTimeout(long writeTimeout) {
- this.writeTimeout = writeTimeout;
- }
-
/**
* Get the underlying socket input stream.
*/
return socket;
}
- public long getWriteTimeout() {
- return writeTimeout;
- }
-
public void setSelectorPool(NioSelectorPool pool) {
this.pool = pool;
}
// Recycle Request object
response.recycle();
-
}
lastActiveFilter = -1;
committed = false;
finished = false;
+ lastWrite.set(1);
}
}
-
+ public boolean isWritable() {
+ return lastWrite.get()>0;
+ }
// ------------------------------------------------ HTTP/1.1 Output Methods
if (!committed) {
//Socket.send(socket, Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length) < 0
ByteBuffer buf = ByteBuffer.wrap(Constants.ACK_BYTES,0,Constants.ACK_BYTES.length);
- writeToSocket(buf,false);
+ writeToSocket(buf,false,true);
}
}
- private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean flip) throws IOException {
+ /**
+ *
+ * @param bytebuffer ByteBuffer
+ * @param flip boolean
+ * @return int
+ * @throws IOException
+ */
+ private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean flip, boolean block) throws IOException {
//int limit = bytebuffer.position();
if ( flip ) bytebuffer.flip();
int written = 0;
+ NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
+ if ( att == null ) throw new IOException("Key must be cancelled");
+ long writeTimeout = att.getTimeout();
Selector selector = null;
try {
selector = getSelectorPool().get();
//ignore
}
try {
- written = getSelectorPool().write(bytebuffer, socket, selector, writeTimeout);
+ written = getSelectorPool().write(bytebuffer, socket, selector, writeTimeout, block,lastWrite);
//make sure we are flushed
do {
- if (socket.flush(true,selector,writeTimeout)) break;
+ if (socket.flush(true,selector,writeTimeout,lastWrite)) break;
}while ( true );
}finally {
if ( selector != null ) getSelectorPool().put(selector);
//write to the socket, if there is anything to write
if (socket.getBufHandler().getWriteBuffer().position() > 0) {
- writeToSocket(socket.getBufHandler().getWriteBuffer(),true);
+ writeToSocket(socket.getBufHandler().getWriteBuffer(),true,true);
}
}
--- /dev/null
+/*\r
+ * Licensed to the Apache Software Foundation (ASF) under one or more\r
+ * contributor license agreements. See the NOTICE file distributed with\r
+ * this work for additional information regarding copyright ownership.\r
+ * The ASF licenses this file to You under the Apache License, Version 2.0\r
+ * (the "License"); you may not use this file except in compliance with\r
+ * the License. You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+\r
+package org.apache.tomcat.util;\r
+\r
+public class MutableBoolean {\r
+ protected boolean value = false;\r
+ public MutableBoolean() {}\r
+ public MutableBoolean(boolean val) {\r
+ this.value = val;\r
+ }\r
+ \r
+ public boolean get() { return value;}\r
+ public void set(boolean val) {this.value = val;}\r
+}\r
--- /dev/null
+/*\r
+ * Licensed to the Apache Software Foundation (ASF) under one or more\r
+ * contributor license agreements. See the NOTICE file distributed with\r
+ * this work for additional information regarding copyright ownership.\r
+ * The ASF licenses this file to You under the Apache License, Version 2.0\r
+ * (the "License"); you may not use this file except in compliance with\r
+ * the License. You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+\r
+package org.apache.tomcat.util;\r
+\r
+public class MutableInteger {\r
+ protected int value = 0;\r
+ public MutableInteger() {}\r
+ public MutableInteger(int val) {\r
+ this.value = val;\r
+ }\r
+\r
+ public int get() { return value;}\r
+ public void set(int val) {this.value = val;}\r
+}\r
import java.util.concurrent.TimeUnit;
import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment;
+import org.apache.tomcat.util.MutableInteger;
public class NioBlockingSelector {
public NioBlockingSelector() {
* @throws SocketTimeoutException if the write times out
* @throws IOException if an IO Exception occurs in the underlying socket logic
*/
- public static int write(ByteBuffer buf, NioChannel socket, long writeTimeout) throws IOException {
+ public static int write(ByteBuffer buf, NioChannel socket, long writeTimeout,MutableInteger lastWrite) throws IOException {
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
int written = 0;
boolean timedout = false;
while ( (!timedout) && buf.hasRemaining()) {
if (keycount > 0) { //only write if we were registered for a write
int cnt = socket.write(buf); //write the data
+ lastWrite.set(cnt);
if (cnt == -1)
throw new EOFException();
written += cnt;
import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler;
import java.nio.channels.Selector;
import java.nio.channels.SelectionKey;
+import org.apache.tomcat.util.MutableInteger;
/**
*
* been flushed out and is empty
* @return boolean
*/
- public boolean flush(boolean block, Selector s,long timeout) throws IOException {
+ public boolean flush(boolean block, Selector s, long timeout,MutableInteger lastWrite) throws IOException {
+ if (lastWrite!=null) lastWrite.set(1);
return true; //no network buffer in the regular channel
}
/**
* The socket poller.
*/
- protected Poller[] pollers = null;
- protected int pollerRoundRobin = 0;
+ protected Poller poller = null;
public Poller getPoller0() {
- pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length;
- Poller poller = pollers[pollerRoundRobin];
return poller;
}
-
- /**
- * The socket poller used for Comet support.
- */
- public Poller getCometPoller0() {
- Poller poller = getPoller0();
- return poller;
- }
-
-
+ protected Poller readWritePoller = null;
+
/**
* Dummy maxSpareThreads property.
*/
* Number of keepalive sockets.
*/
public int getKeepAliveCount() {
- if (pollers == null) {
+ if (poller == null) {
return 0;
} else {
- int keepAliveCount = 0;
- for (int i = 0; i < pollers.length; i++) {
- keepAliveCount += pollers[i].getKeepAliveCount();
- }
- return keepAliveCount;
+ return poller.selector.keys().size();
}
}
acceptorThread.start();
}
- // Start poller threads
- pollers = new Poller[pollerThreadCount];
- for (int i = 0; i < pollerThreadCount; i++) {
- pollers[i] = new Poller();
- pollers[i].init();
- Thread pollerThread = new Thread(pollers[i], getName() + "-Poller-" + i);
- pollerThread.setPriority(threadPriority);
- pollerThread.setDaemon(true);
- pollerThread.start();
- }
+ // Start poller thread
+ poller = new Poller();
+ Thread pollerThread = new Thread(poller, getName() + "-ClientPoller");
+ pollerThread.setPriority(threadPriority);
+ pollerThread.setDaemon(true);
+ pollerThread.start();
}
}
if (running) {
running = false;
unlockAccept();
- for (int i = 0; i < pollers.length; i++) {
- pollers[i].destroy();
- }
- pollers = null;
+ poller.destroy();
+ poller = null;
}
eventCache.clear();
keyCache.clear();
protected boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
try {
+ KeyAttachment attachment = (KeyAttachment)socket.getAttachment(false);
+ attachment.setCometNotify(false); //will get reset upon next reg
if (executor == null) {
getWorkerThread().assign(socket, status);
} else {
interestOps = (interestOps & (~OP_CALLBACK));//remove the callback flag
att.access();//to prevent timeout
//we are registering the key to start with, reset the fairness counter.
- att.interestOps(interestOps);
- key.interestOps(interestOps);
+ int ops = key.interestOps() | interestOps;
+ att.interestOps(ops);
+ key.interestOps(ops);
} else {
cancel = true;
}
return super.toString()+"[intOps="+this.interestOps+"]";
}
}
+
/**
* Poller class.
*/
protected boolean close = false;
protected long nextExpiration = 0;//optimize expiration handling
-
- protected int keepAliveCount = 0;
- public int getKeepAliveCount() { return keepAliveCount; }
protected AtomicLong wakeupCounter = new AtomicLong(0l);
public Selector getSelector() { return selector;}
/**
- * Create the poller. With some versions of APR, the maximum poller size will
- * be 62 (reocmpiling APR is necessary to remove this limitation).
- */
- protected void init() {
- keepAliveCount = 0;
- }
-
- /**
* Destroy the poller.
*/
protected void destroy() {
socket.setPoller(this);
KeyAttachment key = keyCache.poll();
final KeyAttachment ka = key!=null?key:new KeyAttachment();
- ka.reset(this,socket);
+ ka.reset(this,socket,getSocketProperties().getSoTimeout());
PollerEvent r = eventCache.poll();
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
} else if ( ka.getError() ) {
cancelledKey(key, SocketStatus.ERROR,true);
} else if (ka.getComet() && ka.getCometNotify() ) {
- ka.setCometNotify(false);//this will get reset after invokation if callback is still in there
reg(key,ka,0);//avoid multiple calls, this gets reregistered after invokation
if (!processSocket(ka.getChannel(), SocketStatus.OPEN_CALLBACK)) processSocket(ka.getChannel(), SocketStatus.DISCONNECT);
}else if ((ka.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ) {
public KeyAttachment() {
}
- public void reset(Poller poller, NioChannel channel) {
+ public void reset(Poller poller, NioChannel channel, long soTimeout) {
this.channel = channel;
this.poller = poller;
lastAccess = System.currentTimeMillis();
currentAccess = false;
comet = false;
- timeout = -1;
+ timeout = soTimeout;
error = false;
lastRegistered = 0;
sendfileData = null;
}
public void reset() {
- reset(null,null);
+ reset(null,null,-1);
}
public Poller getPoller() { return poller;}
*/
package org.apache.tomcat.util.net;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.nio.channels.Selector;
+import java.io.EOFException;
import java.io.IOException;
-import java.util.NoSuchElementException;
+import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
-import java.io.EOFException;
-import java.net.SocketTimeoutException;
+import java.nio.channels.Selector;
+import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.util.MutableInteger;
+import java.util.Iterator;
/**
*
*/
public class NioSelectorPool {
+ protected static int threadCount = 0;
+
protected static Log log = LogFactory.getLog(NioSelectorPool.class);
protected final static boolean SHARED =
Boolean.valueOf(System.getProperty("org.apache.tomcat.util.net.NioSelectorShared", "true")).booleanValue();
- protected static Selector SHARED_SELECTOR;
+ protected Selector SHARED_SELECTOR;
protected int maxSelectors = 200;
+ protected long sharedSelectorTimeout = 30000;
protected int maxSpareSelectors = -1;
protected boolean enabled = true;
protected AtomicInteger active = new AtomicInteger(0);
protected AtomicInteger spare = new AtomicInteger(0);
protected ConcurrentLinkedQueue<Selector> selectors = new ConcurrentLinkedQueue<Selector>();
- protected static Selector getSharedSelector() throws IOException {
+ protected Selector getSharedSelector() throws IOException {
if (SHARED && SHARED_SELECTOR == null) {
synchronized ( NioSelectorPool.class ) {
if ( SHARED_SELECTOR == null ) {
* @throws IOException if an IO Exception occurs in the underlying socket logic
*/
public int write(ByteBuffer buf, NioChannel socket, Selector selector, long writeTimeout) throws IOException {
- return write(buf,socket,selector,writeTimeout,true);
+ return write(buf,socket,selector,writeTimeout,true,null);
}
- public int write(ByteBuffer buf, NioChannel socket, Selector selector, long writeTimeout, boolean block) throws IOException {
- if ( SHARED && block) {
- return NioBlockingSelector.write(buf,socket,writeTimeout);
+ public int write(ByteBuffer buf, NioChannel socket, Selector selector,
+ long writeTimeout, boolean block,MutableInteger lastWrite) throws IOException {
+ if ( SHARED && block ) {
+ return NioBlockingSelector.write(buf,socket,writeTimeout,lastWrite);
}
SelectionKey key = null;
int written = 0;
int cnt = 0;
if ( keycount > 0 ) { //only write if we were registered for a write
cnt = socket.write(buf); //write the data
+ if (lastWrite!=null) lastWrite.set(cnt);
if (cnt == -1) throw new EOFException();
+
written += cnt;
if (cnt > 0) {
time = System.currentTimeMillis(); //reset our timeout timer
* @throws IOException if an IO Exception occurs in the underlying socket logic
*/
public int read(ByteBuffer buf, NioChannel socket, Selector selector, long readTimeout, boolean block) throws IOException {
- if ( SHARED && block) {
+ if ( SHARED && block ) {
return NioBlockingSelector.read(buf,socket,readTimeout);
}
SelectionKey key = null;
this.enabled = enabled;
}
+ public void setSharedSelectorTimeout(long sharedSelectorTimeout) {
+ this.sharedSelectorTimeout = sharedSelectorTimeout;
+ }
+
public int getMaxSelectors() {
return maxSelectors;
}
public boolean isEnabled() {
return enabled;
}
+
+ public long getSharedSelectorTimeout() {
+ return sharedSelectorTimeout;
+ }
+
+ public ConcurrentLinkedQueue getSelectors() {
+ return selectors;
+ }
+
+ public AtomicInteger getSpare() {
+ return spare;
+ }
}
\ No newline at end of file
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLEngineResult.Status;
import java.nio.channels.Selector;
+import org.apache.tomcat.util.MutableInteger;
/**
*
* been flushed out and is empty
* @return boolean
*/
- public boolean flush(boolean block, Selector s, long timeout) throws IOException {
+ public boolean flush(boolean block, Selector s, long timeout,MutableInteger lastWrite) throws IOException {
if (!block) {
flush(netOutBuffer);
} else {
- pool.write(netOutBuffer, this, s, timeout);
+ pool.write(netOutBuffer, this, s, timeout,block,lastWrite);
}
return !netOutBuffer.hasRemaining();
}