From: fhanik Date: Thu, 31 May 2007 19:32:33 +0000 (+0000) Subject: 1. Timeouts are now per connection, not using fixed timeouts anywhere. by default... X-Git-Url: https://git.internetallee.de/?a=commitdiff_plain;h=e8b0e794775b3d7e8517d21982d6ed772f2184ea;p=tomcat7.0 1. Timeouts are now per connection, not using fixed timeouts anywhere. by default the connection gets the timeout defined in server.xml 2. Implemented all Comet operations, including the ability to have none 3. Implemented CometEvent.isReadable and isWriteable isAvailable - means data is available to the servlet isReadable - means there is data from the socket also checks the socket, by doing a read, in a non blocking fashion to verify this to be true isWriteable - the last write attempted on this socket was 0, hence we are probably blocking 4. simplified CometEvent.register/unregister, they are now just one call and no syncs 5. After each event, the connection is registered with the same operations it had before 6. CoyoteAdapter respects when the servlet doesn't want to be notified of the READ event, hence it doesn't invoke it automatically 7. Let me know if MutableBoolean and MutableInteger should be elsewhere(in terms of package), they are used since ActionHook doesn't have a return value and also valuable in the output buffers since SSL writing is two steps, one through the engine and the other to the socket I'm pretty happy with how isReadable,isWriteable works, they are completly non blocking and very accurate True non blocking in the buffers and filters seems like a major surgery, still holding off on that. Need to fix the NioBlockingSelector as it is almost impossible to make the poller interest declaration thread safe git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@543226 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/java/org/apache/catalina/connector/CometEventImpl.java b/java/org/apache/catalina/connector/CometEventImpl.java index 728f054d1..26832ddb9 100644 --- a/java/org/apache/catalina/connector/CometEventImpl.java +++ b/java/org/apache/catalina/connector/CometEventImpl.java @@ -29,6 +29,7 @@ import org.apache.catalina.util.StringManager; import org.apache.coyote.ActionCode; import org.apache.tomcat.util.net.PollerInterest; import java.util.Arrays; +import org.apache.tomcat.util.MutableBoolean; public class CometEventImpl implements CometEvent { @@ -142,12 +143,16 @@ public class CometEventImpl implements CometEvent { } public boolean isReadable() { - return request.isReadable(); + return request.isAvailable() || request.isReadable(); } public boolean isWriteable() { return response.isWriteable(); } + public boolean hasOp(CometEvent.CometOperation op) { + return cometOperations.contains(op); + } + public void configure(CometEvent.CometConfiguration... options) throws IOException, IllegalStateException { checkWorkerThread(); @@ -169,7 +174,7 @@ public class CometEventImpl implements CometEvent { throws IOException, IllegalStateException { //remove from the registered set cometOperations.removeAll(Arrays.asList(operations)); - request.action(ActionCode.ACTION_COMET_UNREGISTER, translate(cometOperations.toArray(new CometOperation[0]))); + request.action(ActionCode.ACTION_COMET_REGISTER, translate(cometOperations.toArray(new CometOperation[0]))); } public CometConfiguration[] getConfiguration() { diff --git a/java/org/apache/catalina/connector/CoyoteAdapter.java b/java/org/apache/catalina/connector/CoyoteAdapter.java index 61c41b8c6..0a4e8a541 100644 --- a/java/org/apache/catalina/connector/CoyoteAdapter.java +++ b/java/org/apache/catalina/connector/CoyoteAdapter.java @@ -227,7 +227,7 @@ public class CoyoteAdapter } if (response.isClosed() || !request.isComet()) { res.action(ActionCode.ACTION_COMET_END, null); - } else if (!error && read && request.isReadable()) { + } else if (!error && read && request.isAvailable()) { // If this was a read and not all bytes have been read, or if no data // was read from the connector, then it is an error error = true; @@ -312,7 +312,7 @@ public class CoyoteAdapter if (request.isComet()) { if (!response.isClosed() && !response.isError()) { - if (request.isReadable()) { + if (request.isAvailable() && request.hasOp(CometEvent.CometOperation.OP_READ)) { // Invoke a read event right away if there are available bytes if (event(req, res, SocketStatus.OPEN_READ)) { comet = true; diff --git a/java/org/apache/catalina/connector/OutputBuffer.java b/java/org/apache/catalina/connector/OutputBuffer.java index c7e98da3c..634a5b77a 100644 --- a/java/org/apache/catalina/connector/OutputBuffer.java +++ b/java/org/apache/catalina/connector/OutputBuffer.java @@ -324,13 +324,6 @@ public class OutputBuffer extends Writer } - /** - * Return the amount of bytes written by the lower layer. - */ - protected int lastWrite() { - return coyoteResponse.getLastWrite(); - } - // ------------------------------------------------- Bytes Handling Methods diff --git a/java/org/apache/catalina/connector/Request.java b/java/org/apache/catalina/connector/Request.java index 73efb8e00..9e3dac684 100644 --- a/java/org/apache/catalina/connector/Request.java +++ b/java/org/apache/catalina/connector/Request.java @@ -70,6 +70,8 @@ import org.apache.catalina.util.ParameterMap; import org.apache.catalina.util.RequestUtil; import org.apache.catalina.util.StringManager; import org.apache.catalina.util.StringParser; +import org.apache.tomcat.util.MutableBoolean; +import org.apache.catalina.CometEvent; /** @@ -2250,13 +2252,26 @@ public class Request /** - * Return true if bytes are available. + * Return true if bytes are available at the servlet layer */ - public boolean isReadable() { + public boolean isAvailable() { return (inputBuffer.available() > 0); } - + /** + * returns true if we read data from the socket + * @return boolean + */ + public boolean isReadable() { + MutableBoolean bool = new MutableBoolean(false); + action(ActionCode.ACTION_COMET_READABLE,bool); + return bool.get(); + } + + public boolean hasOp(CometEvent.CometOperation op) { + if ( !comet || getEvent()==null ) return false; + return event.hasOp(op); + } // ------------------------------------------------------ Protected Methods protected void action(ActionCode actionCode, Object param) { diff --git a/java/org/apache/catalina/connector/Response.java b/java/org/apache/catalina/connector/Response.java index acb59c100..ba28742c9 100644 --- a/java/org/apache/catalina/connector/Response.java +++ b/java/org/apache/catalina/connector/Response.java @@ -52,6 +52,8 @@ import org.apache.tomcat.util.http.FastHttpDateFormat; import org.apache.tomcat.util.http.MimeHeaders; import org.apache.tomcat.util.http.ServerCookie; import org.apache.tomcat.util.net.URL; +import org.apache.coyote.ActionCode; +import org.apache.tomcat.util.MutableBoolean; /** * Wrapper object for the Coyote response. @@ -533,7 +535,9 @@ public class Response * Return true if bytes are available. */ public boolean isWriteable() { - return (outputBuffer.lastWrite() > 0); + MutableBoolean bool = new MutableBoolean(false); + coyoteResponse.action(ActionCode.ACTION_COMET_WRITEABLE,bool); + return bool.get(); } diff --git a/java/org/apache/coyote/ActionCode.java b/java/org/apache/coyote/ActionCode.java index 8b47725da..d46f8ad55 100644 --- a/java/org/apache/coyote/ActionCode.java +++ b/java/org/apache/coyote/ActionCode.java @@ -167,11 +167,14 @@ public final class ActionCode { public static final ActionCode ACTION_COMET_REGISTER = new ActionCode(26); /** - * Unregister for notifications for a comet connection + * Action for getting the readable status */ - public static final ActionCode ACTION_COMET_UNREGISTER = new ActionCode(27); - + public static final ActionCode ACTION_COMET_READABLE = new ActionCode(28); + /** + * Action for getting the writeable status + */ + public static final ActionCode ACTION_COMET_WRITEABLE = new ActionCode(29); // ----------------------------------------------------------- Constructors int code; diff --git a/java/org/apache/coyote/Response.java b/java/org/apache/coyote/Response.java index 7fd2d2933..d18ab97d2 100644 --- a/java/org/apache/coyote/Response.java +++ b/java/org/apache/coyote/Response.java @@ -123,8 +123,6 @@ public final class Response { protected String errorURI = null; protected Request req; - - protected int lastWrite = 1; // ------------------------------------------------------------- Properties @@ -190,16 +188,6 @@ public final class Response { // -------------------- State -------------------- - public int getLastWrite() { - return lastWrite; - } - - - public void setLastWrite(int lastWrite) { - this.lastWrite = lastWrite; - } - - public int getStatus() { return status; } @@ -591,7 +579,6 @@ public final class Response { headers.clear(); // update counters - lastWrite = 1; bytesWritten=0; } diff --git a/java/org/apache/coyote/http11/Http11NioProcessor.java b/java/org/apache/coyote/http11/Http11NioProcessor.java index a068dd482..a503e7403 100644 --- a/java/org/apache/coyote/http11/Http11NioProcessor.java +++ b/java/org/apache/coyote/http11/Http11NioProcessor.java @@ -54,6 +54,7 @@ import org.apache.tomcat.util.net.NioEndpoint.Handler.SocketState; import org.apache.tomcat.util.res.StringManager; import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment; import org.apache.tomcat.util.net.PollerInterest; +import org.apache.tomcat.util.MutableBoolean; /** @@ -91,12 +92,12 @@ public class Http11NioProcessor implements ActionHook { request = new Request(); int readTimeout = endpoint.getSoTimeout(); - inputBuffer = new InternalNioInputBuffer(request, maxHttpHeaderSize,readTimeout); + inputBuffer = new InternalNioInputBuffer(request, maxHttpHeaderSize); request.setInputBuffer(inputBuffer); response = new Response(); response.setHook(this); - outputBuffer = new InternalNioOutputBuffer(response, maxHttpHeaderSize,readTimeout); + outputBuffer = new InternalNioOutputBuffer(response, maxHttpHeaderSize); response.setOutputBuffer(outputBuffer); request.setResponse(response); @@ -819,7 +820,6 @@ public class Http11NioProcessor implements ActionHook { try { if( !disableUploadTimeout && keptAlive && soTimeout > 0 ) { socket.getIOChannel().socket().setSoTimeout((int)soTimeout); - inputBuffer.readTimeout = soTimeout; } if (!inputBuffer.parseRequestLine(keptAlive)) { //no data available yet, since we might have read part @@ -839,7 +839,6 @@ public class Http11NioProcessor implements ActionHook { request.setStartTime(System.currentTimeMillis()); if (!disableUploadTimeout) { //only for body, not for request headers socket.getIOChannel().socket().setSoTimeout((int)timeout); - inputBuffer.readTimeout = soTimeout; } } catch (IOException e) { error = true; @@ -1223,20 +1222,22 @@ public class Http11NioProcessor implements ActionHook { } else if (actionCode == ActionCode.ACTION_COMET_REGISTER) { int interest = getPollerInterest(param); NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getAttachment(false); - attach.setCometOps(attach.getCometOps()|interest); - //notify poller if not on a tomcat thread - RequestInfo rp = request.getRequestProcessor(); - if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) - socket.getPoller().cometInterest(socket); - } else if (actionCode == ActionCode.ACTION_COMET_UNREGISTER) { - int interest = getPollerInterest(param); - NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)socket.getAttachment(false); - attach.setCometOps(attach.getCometOps()& (~interest)); + attach.setCometOps(interest); //notify poller if not on a tomcat thread RequestInfo rp = request.getRequestProcessor(); if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) socket.getPoller().cometInterest(socket); } else if (actionCode == ActionCode.ACTION_COMET_CONFIGURE) { + } else if (actionCode == ActionCode.ACTION_COMET_READABLE) { + MutableBoolean bool = (MutableBoolean)param; + try { + bool.set(inputBuffer.isReadable()); + }catch ( IOException x ) { + throw new RuntimeException(x); + } + } else if (actionCode == ActionCode.ACTION_COMET_WRITEABLE) { + MutableBoolean bool = (MutableBoolean)param; + bool.set(outputBuffer.isWritable()); } } diff --git a/java/org/apache/coyote/http11/InternalNioInputBuffer.java b/java/org/apache/coyote/http11/InternalNioInputBuffer.java index 0861d53b8..013f4fbe1 100644 --- a/java/org/apache/coyote/http11/InternalNioInputBuffer.java +++ b/java/org/apache/coyote/http11/InternalNioInputBuffer.java @@ -30,6 +30,7 @@ import org.apache.tomcat.util.http.MimeHeaders; import org.apache.tomcat.util.net.NioChannel; import org.apache.tomcat.util.net.NioSelectorPool; import org.apache.tomcat.util.res.StringManager; +import org.apache.tomcat.util.net.NioEndpoint; /** * Implementation of InputBuffer which provides HTTP request header parsing as @@ -51,8 +52,7 @@ public class InternalNioInputBuffer implements InputBuffer { /** * Alternate constructor. */ - public InternalNioInputBuffer(Request request, int headerBufferSize, - long readTimeout) { + public InternalNioInputBuffer(Request request, int headerBufferSize) { this.request = request; headers = request.getMimeHeaders(); @@ -80,12 +80,6 @@ public class InternalNioInputBuffer implements InputBuffer { headerData.recycle(); swallowInput = true; - if (readTimeout < 0) { - this.readTimeout = -1; - } else { - this.readTimeout = readTimeout; - } - } @@ -195,12 +189,6 @@ public class InternalNioInputBuffer implements InputBuffer { protected int lastActiveFilter; - /** - * The socket timeout used when reading the first block of the request - * header. - */ - protected long readTimeout; - // ------------------------------------------------------------- Properties @@ -296,7 +284,23 @@ public class InternalNioInputBuffer implements InputBuffer { } // --------------------------------------------------------- Public Methods - + /** + * Returns true if there are bytes available from the socket layer + * @return boolean + * @throws IOException + */ + public boolean isReadable() throws IOException { + return (pos < lastValid) || (nbRead()>0); + } + + /** + * Issues a non blocking read + * @return int + * @throws IOException + */ + public int nbRead() throws IOException { + return readSocket(true,false); + } /** * Recycle the input buffer. This should be called when closing the @@ -413,13 +417,8 @@ public class InternalNioInputBuffer implements InputBuffer { if (useAvailableData) { return false; } - if (readTimeout == -1) { - if (!fill(false,true)) //request line parsing - throw new EOFException(sm.getString("iib.eof.error")); - } else { - // Do a simple read with a short timeout - if ( !readSocket(true, false) ) return false; - } + // Do a simple read with a short timeout + if ( readSocket(true, false)==0 ) return false; } chr = buf[pos++]; } while ((chr == Constants.CR) || (chr == Constants.LF)); @@ -434,13 +433,8 @@ public class InternalNioInputBuffer implements InputBuffer { if (useAvailableData) { return false; } - if (readTimeout == -1) { - if (!fill(false,false)) //request line parsing - return false; - } else { - // Do a simple read with a short timeout - if ( !readSocket(true, false) ) return false; - } + // Do a simple read with a short timeout + if ( readSocket(true, false)==0 ) return false; } parsingRequestLinePhase = 2; } @@ -552,6 +546,7 @@ public class InternalNioInputBuffer implements InputBuffer { tmp = null; } } + /** * Perform blocking read with a timeout if desired * @param timeout boolean - if we want to use the timeout data @@ -560,15 +555,16 @@ public class InternalNioInputBuffer implements InputBuffer { * @throws IOException if a socket exception occurs * @throws EOFException if end of stream is reached */ - private boolean readSocket(boolean timeout, boolean block) throws IOException { + private int readSocket(boolean timeout, boolean block) throws IOException { int nRead = 0; - long rto = timeout?this.readTimeout:-1; socket.getBufHandler().getReadBuffer().clear(); if ( block ) { Selector selector = null; try { selector = getSelectorPool().get(); }catch ( IOException x ) {} try { - nRead = getSelectorPool().read(socket.getBufHandler().getReadBuffer(),socket,selector,rto); + NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false); + if ( att == null ) throw new IOException("Key must be cancelled."); + nRead = getSelectorPool().read(socket.getBufHandler().getReadBuffer(),socket,selector,att.getTimeout()); } catch ( EOFException eof ) { nRead = -1; } finally { @@ -583,12 +579,12 @@ public class InternalNioInputBuffer implements InputBuffer { expand(nRead + pos); socket.getBufHandler().getReadBuffer().get(buf, pos, nRead); lastValid = pos + nRead; - return true; + return nRead; } else if (nRead == -1) { //return false; throw new EOFException(sm.getString("iib.eof.error")); } else { - return false; + return 0; } } @@ -852,7 +848,7 @@ public class InternalNioInputBuffer implements InputBuffer { } // Do a simple read with a short timeout - read = readSocket(timeout,block); + read = readSocket(timeout,block)>0; } else { if (buf.length - end < 4500) { @@ -865,7 +861,7 @@ public class InternalNioInputBuffer implements InputBuffer { pos = end; lastValid = pos; // Do a simple read with a short timeout - read = readSocket(timeout, block); + read = readSocket(timeout, block)>0; } return read; } diff --git a/java/org/apache/coyote/http11/InternalNioOutputBuffer.java b/java/org/apache/coyote/http11/InternalNioOutputBuffer.java index 9891ab122..70a48925d 100644 --- a/java/org/apache/coyote/http11/InternalNioOutputBuffer.java +++ b/java/org/apache/coyote/http11/InternalNioOutputBuffer.java @@ -34,6 +34,8 @@ import org.apache.tomcat.util.net.NioChannel; import org.apache.tomcat.util.net.NioEndpoint; import org.apache.tomcat.util.net.NioSelectorPool; import org.apache.tomcat.util.res.StringManager; +import java.io.EOFException; +import org.apache.tomcat.util.MutableInteger; /** * Output buffer. @@ -56,14 +58,14 @@ public class InternalNioOutputBuffer * Default constructor. */ public InternalNioOutputBuffer(Response response) { - this(response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE, 10000); + this(response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE); } /** * Alternate constructor. */ - public InternalNioOutputBuffer(Response response, int headerBufferSize, long writeTimeout) { + public InternalNioOutputBuffer(Response response, int headerBufferSize) { this.response = response; headers = response.getMimeHeaders(); @@ -86,8 +88,6 @@ public class InternalNioOutputBuffer committed = false; finished = false; - this.writeTimeout = writeTimeout; - // Cause loading of HttpMessages HttpMessages.getMessage(200); @@ -142,6 +142,10 @@ public class InternalNioOutputBuffer */ protected int pos; + /** + * Number of bytes last written + */ + protected MutableInteger lastWrite = new MutableInteger(1); /** * Underlying socket. @@ -179,12 +183,6 @@ public class InternalNioOutputBuffer */ protected int lastActiveFilter; - /** - * Write time out in milliseconds - */ - protected long writeTimeout = -1; - - // ------------------------------------------------------------- Properties @@ -195,10 +193,6 @@ public class InternalNioOutputBuffer this.socket = socket; } - public void setWriteTimeout(long writeTimeout) { - this.writeTimeout = writeTimeout; - } - /** * Get the underlying socket input stream. */ @@ -206,10 +200,6 @@ public class InternalNioOutputBuffer return socket; } - public long getWriteTimeout() { - return writeTimeout; - } - public void setSelectorPool(NioSelectorPool pool) { this.pool = pool; } @@ -324,7 +314,6 @@ public class InternalNioOutputBuffer // Recycle Request object response.recycle(); - } @@ -343,6 +332,7 @@ public class InternalNioOutputBuffer lastActiveFilter = -1; committed = false; finished = false; + lastWrite.set(1); } @@ -401,7 +391,9 @@ public class InternalNioOutputBuffer } - + public boolean isWritable() { + return lastWrite.get()>0; + } // ------------------------------------------------ HTTP/1.1 Output Methods @@ -414,15 +406,25 @@ public class InternalNioOutputBuffer if (!committed) { //Socket.send(socket, Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length) < 0 ByteBuffer buf = ByteBuffer.wrap(Constants.ACK_BYTES,0,Constants.ACK_BYTES.length); - writeToSocket(buf,false); + writeToSocket(buf,false,true); } } - private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean flip) throws IOException { + /** + * + * @param bytebuffer ByteBuffer + * @param flip boolean + * @return int + * @throws IOException + */ + private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean flip, boolean block) throws IOException { //int limit = bytebuffer.position(); if ( flip ) bytebuffer.flip(); int written = 0; + NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false); + if ( att == null ) throw new IOException("Key must be cancelled"); + long writeTimeout = att.getTimeout(); Selector selector = null; try { selector = getSelectorPool().get(); @@ -430,10 +432,10 @@ public class InternalNioOutputBuffer //ignore } try { - written = getSelectorPool().write(bytebuffer, socket, selector, writeTimeout); + written = getSelectorPool().write(bytebuffer, socket, selector, writeTimeout, block,lastWrite); //make sure we are flushed do { - if (socket.flush(true,selector,writeTimeout)) break; + if (socket.flush(true,selector,writeTimeout,lastWrite)) break; }while ( true ); }finally { if ( selector != null ) getSelectorPool().put(selector); @@ -759,7 +761,7 @@ public class InternalNioOutputBuffer //write to the socket, if there is anything to write if (socket.getBufHandler().getWriteBuffer().position() > 0) { - writeToSocket(socket.getBufHandler().getWriteBuffer(),true); + writeToSocket(socket.getBufHandler().getWriteBuffer(),true,true); } } diff --git a/java/org/apache/tomcat/util/MutableBoolean.java b/java/org/apache/tomcat/util/MutableBoolean.java new file mode 100644 index 000000000..b7ea90a60 --- /dev/null +++ b/java/org/apache/tomcat/util/MutableBoolean.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tomcat.util; + +public class MutableBoolean { + protected boolean value = false; + public MutableBoolean() {} + public MutableBoolean(boolean val) { + this.value = val; + } + + public boolean get() { return value;} + public void set(boolean val) {this.value = val;} +} diff --git a/java/org/apache/tomcat/util/MutableInteger.java b/java/org/apache/tomcat/util/MutableInteger.java new file mode 100644 index 000000000..5f5817152 --- /dev/null +++ b/java/org/apache/tomcat/util/MutableInteger.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tomcat.util; + +public class MutableInteger { + protected int value = 0; + public MutableInteger() {} + public MutableInteger(int val) { + this.value = val; + } + + public int get() { return value;} + public void set(int val) {this.value = val;} +} diff --git a/java/org/apache/tomcat/util/net/NioBlockingSelector.java b/java/org/apache/tomcat/util/net/NioBlockingSelector.java index 79027d8bd..f23e76d4f 100644 --- a/java/org/apache/tomcat/util/net/NioBlockingSelector.java +++ b/java/org/apache/tomcat/util/net/NioBlockingSelector.java @@ -24,6 +24,7 @@ import java.nio.channels.SelectionKey; import java.util.concurrent.TimeUnit; import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment; +import org.apache.tomcat.util.MutableInteger; public class NioBlockingSelector { public NioBlockingSelector() { @@ -41,7 +42,7 @@ public class NioBlockingSelector { * @throws SocketTimeoutException if the write times out * @throws IOException if an IO Exception occurs in the underlying socket logic */ - public static int write(ByteBuffer buf, NioChannel socket, long writeTimeout) throws IOException { + public static int write(ByteBuffer buf, NioChannel socket, long writeTimeout,MutableInteger lastWrite) throws IOException { SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); int written = 0; boolean timedout = false; @@ -55,6 +56,7 @@ public class NioBlockingSelector { while ( (!timedout) && buf.hasRemaining()) { if (keycount > 0) { //only write if we were registered for a write int cnt = socket.write(buf); //write the data + lastWrite.set(cnt); if (cnt == -1) throw new EOFException(); written += cnt; diff --git a/java/org/apache/tomcat/util/net/NioChannel.java b/java/org/apache/tomcat/util/net/NioChannel.java index 901ef5f66..ba90442b6 100644 --- a/java/org/apache/tomcat/util/net/NioChannel.java +++ b/java/org/apache/tomcat/util/net/NioChannel.java @@ -27,6 +27,7 @@ import org.apache.tomcat.util.net.NioEndpoint.Poller; import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler; import java.nio.channels.Selector; import java.nio.channels.SelectionKey; +import org.apache.tomcat.util.MutableInteger; /** * @@ -70,7 +71,8 @@ public class NioChannel implements ByteChannel{ * been flushed out and is empty * @return boolean */ - public boolean flush(boolean block, Selector s,long timeout) throws IOException { + public boolean flush(boolean block, Selector s, long timeout,MutableInteger lastWrite) throws IOException { + if (lastWrite!=null) lastWrite.set(1); return true; //no network buffer in the regular channel } diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java index 727adcba6..12a04434a 100644 --- a/java/org/apache/tomcat/util/net/NioEndpoint.java +++ b/java/org/apache/tomcat/util/net/NioEndpoint.java @@ -479,24 +479,13 @@ public class NioEndpoint { /** * The socket poller. */ - protected Poller[] pollers = null; - protected int pollerRoundRobin = 0; + protected Poller poller = null; public Poller getPoller0() { - pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length; - Poller poller = pollers[pollerRoundRobin]; return poller; } - - /** - * The socket poller used for Comet support. - */ - public Poller getCometPoller0() { - Poller poller = getPoller0(); - return poller; - } - - + protected Poller readWritePoller = null; + /** * Dummy maxSpareThreads property. */ @@ -649,14 +638,10 @@ public class NioEndpoint { * Number of keepalive sockets. */ public int getKeepAliveCount() { - if (pollers == null) { + if (poller == null) { return 0; } else { - int keepAliveCount = 0; - for (int i = 0; i < pollers.length; i++) { - keepAliveCount += pollers[i].getKeepAliveCount(); - } - return keepAliveCount; + return poller.selector.keys().size(); } } @@ -793,16 +778,12 @@ public class NioEndpoint { acceptorThread.start(); } - // Start poller threads - pollers = new Poller[pollerThreadCount]; - for (int i = 0; i < pollerThreadCount; i++) { - pollers[i] = new Poller(); - pollers[i].init(); - Thread pollerThread = new Thread(pollers[i], getName() + "-Poller-" + i); - pollerThread.setPriority(threadPriority); - pollerThread.setDaemon(true); - pollerThread.start(); - } + // Start poller thread + poller = new Poller(); + Thread pollerThread = new Thread(poller, getName() + "-ClientPoller"); + pollerThread.setPriority(threadPriority); + pollerThread.setDaemon(true); + pollerThread.start(); } } @@ -836,10 +817,8 @@ public class NioEndpoint { if (running) { running = false; unlockAccept(); - for (int i = 0; i < pollers.length; i++) { - pollers[i].destroy(); - } - pollers = null; + poller.destroy(); + poller = null; } eventCache.clear(); keyCache.clear(); @@ -1118,6 +1097,8 @@ public class NioEndpoint { protected boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) { try { + KeyAttachment attachment = (KeyAttachment)socket.getAttachment(false); + attachment.setCometNotify(false); //will get reset upon next reg if (executor == null) { getWorkerThread().assign(socket, status); } else { @@ -1248,8 +1229,9 @@ public class NioEndpoint { interestOps = (interestOps & (~OP_CALLBACK));//remove the callback flag att.access();//to prevent timeout //we are registering the key to start with, reset the fairness counter. - att.interestOps(interestOps); - key.interestOps(interestOps); + int ops = key.interestOps() | interestOps; + att.interestOps(ops); + key.interestOps(ops); } else { cancel = true; } @@ -1269,6 +1251,7 @@ public class NioEndpoint { return super.toString()+"[intOps="+this.interestOps+"]"; } } + /** * Poller class. */ @@ -1279,9 +1262,6 @@ public class NioEndpoint { protected boolean close = false; protected long nextExpiration = 0;//optimize expiration handling - - protected int keepAliveCount = 0; - public int getKeepAliveCount() { return keepAliveCount; } protected AtomicLong wakeupCounter = new AtomicLong(0l); @@ -1296,14 +1276,6 @@ public class NioEndpoint { public Selector getSelector() { return selector;} /** - * Create the poller. With some versions of APR, the maximum poller size will - * be 62 (reocmpiling APR is necessary to remove this limitation). - */ - protected void init() { - keepAliveCount = 0; - } - - /** * Destroy the poller. */ protected void destroy() { @@ -1379,7 +1351,7 @@ public class NioEndpoint { socket.setPoller(this); KeyAttachment key = keyCache.poll(); final KeyAttachment ka = key!=null?key:new KeyAttachment(); - ka.reset(this,socket); + ka.reset(this,socket,getSocketProperties().getSoTimeout()); PollerEvent r = eventCache.poll(); ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); @@ -1621,7 +1593,6 @@ public class NioEndpoint { } else if ( ka.getError() ) { cancelledKey(key, SocketStatus.ERROR,true); } else if (ka.getComet() && ka.getCometNotify() ) { - ka.setCometNotify(false);//this will get reset after invokation if callback is still in there reg(key,ka,0);//avoid multiple calls, this gets reregistered after invokation if (!processSocket(ka.getChannel(), SocketStatus.OPEN_CALLBACK)) processSocket(ka.getChannel(), SocketStatus.DISCONNECT); }else if ((ka.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ) { @@ -1656,13 +1627,13 @@ public class NioEndpoint { public KeyAttachment() { } - public void reset(Poller poller, NioChannel channel) { + public void reset(Poller poller, NioChannel channel, long soTimeout) { this.channel = channel; this.poller = poller; lastAccess = System.currentTimeMillis(); currentAccess = false; comet = false; - timeout = -1; + timeout = soTimeout; error = false; lastRegistered = 0; sendfileData = null; @@ -1676,7 +1647,7 @@ public class NioEndpoint { } public void reset() { - reset(null,null); + reset(null,null,-1); } public Poller getPoller() { return poller;} diff --git a/java/org/apache/tomcat/util/net/NioSelectorPool.java b/java/org/apache/tomcat/util/net/NioSelectorPool.java index 5562b4d9c..19981053a 100644 --- a/java/org/apache/tomcat/util/net/NioSelectorPool.java +++ b/java/org/apache/tomcat/util/net/NioSelectorPool.java @@ -16,17 +16,20 @@ */ package org.apache.tomcat.util.net; -import java.util.concurrent.atomic.AtomicInteger; -import java.nio.channels.Selector; +import java.io.EOFException; import java.io.IOException; -import java.util.NoSuchElementException; +import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; -import java.io.EOFException; -import java.net.SocketTimeoutException; +import java.nio.channels.Selector; +import java.util.NoSuchElementException; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; +import org.apache.tomcat.util.MutableInteger; +import java.util.Iterator; /** * @@ -37,20 +40,23 @@ import org.apache.juli.logging.LogFactory; */ public class NioSelectorPool { + protected static int threadCount = 0; + protected static Log log = LogFactory.getLog(NioSelectorPool.class); protected final static boolean SHARED = Boolean.valueOf(System.getProperty("org.apache.tomcat.util.net.NioSelectorShared", "true")).booleanValue(); - protected static Selector SHARED_SELECTOR; + protected Selector SHARED_SELECTOR; protected int maxSelectors = 200; + protected long sharedSelectorTimeout = 30000; protected int maxSpareSelectors = -1; protected boolean enabled = true; protected AtomicInteger active = new AtomicInteger(0); protected AtomicInteger spare = new AtomicInteger(0); protected ConcurrentLinkedQueue selectors = new ConcurrentLinkedQueue(); - protected static Selector getSharedSelector() throws IOException { + protected Selector getSharedSelector() throws IOException { if (SHARED && SHARED_SELECTOR == null) { synchronized ( NioSelectorPool.class ) { if ( SHARED_SELECTOR == null ) { @@ -127,12 +133,13 @@ public class NioSelectorPool { * @throws IOException if an IO Exception occurs in the underlying socket logic */ public int write(ByteBuffer buf, NioChannel socket, Selector selector, long writeTimeout) throws IOException { - return write(buf,socket,selector,writeTimeout,true); + return write(buf,socket,selector,writeTimeout,true,null); } - public int write(ByteBuffer buf, NioChannel socket, Selector selector, long writeTimeout, boolean block) throws IOException { - if ( SHARED && block) { - return NioBlockingSelector.write(buf,socket,writeTimeout); + public int write(ByteBuffer buf, NioChannel socket, Selector selector, + long writeTimeout, boolean block,MutableInteger lastWrite) throws IOException { + if ( SHARED && block ) { + return NioBlockingSelector.write(buf,socket,writeTimeout,lastWrite); } SelectionKey key = null; int written = 0; @@ -148,7 +155,9 @@ public class NioSelectorPool { int cnt = 0; if ( keycount > 0 ) { //only write if we were registered for a write cnt = socket.write(buf); //write the data + if (lastWrite!=null) lastWrite.set(cnt); if (cnt == -1) throw new EOFException(); + written += cnt; if (cnt > 0) { time = System.currentTimeMillis(); //reset our timeout timer @@ -206,7 +215,7 @@ public class NioSelectorPool { * @throws IOException if an IO Exception occurs in the underlying socket logic */ public int read(ByteBuffer buf, NioChannel socket, Selector selector, long readTimeout, boolean block) throws IOException { - if ( SHARED && block) { + if ( SHARED && block ) { return NioBlockingSelector.read(buf,socket,readTimeout); } SelectionKey key = null; @@ -254,6 +263,10 @@ public class NioSelectorPool { this.enabled = enabled; } + public void setSharedSelectorTimeout(long sharedSelectorTimeout) { + this.sharedSelectorTimeout = sharedSelectorTimeout; + } + public int getMaxSelectors() { return maxSelectors; } @@ -265,4 +278,16 @@ public class NioSelectorPool { public boolean isEnabled() { return enabled; } + + public long getSharedSelectorTimeout() { + return sharedSelectorTimeout; + } + + public ConcurrentLinkedQueue getSelectors() { + return selectors; + } + + public AtomicInteger getSpare() { + return spare; + } } \ No newline at end of file diff --git a/java/org/apache/tomcat/util/net/SecureNioChannel.java b/java/org/apache/tomcat/util/net/SecureNioChannel.java index 2463f60f2..3fbcc6994 100644 --- a/java/org/apache/tomcat/util/net/SecureNioChannel.java +++ b/java/org/apache/tomcat/util/net/SecureNioChannel.java @@ -25,6 +25,7 @@ import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLEngineResult.HandshakeStatus; import javax.net.ssl.SSLEngineResult.Status; import java.nio.channels.Selector; +import org.apache.tomcat.util.MutableInteger; /** * @@ -102,11 +103,11 @@ public class SecureNioChannel extends NioChannel { * been flushed out and is empty * @return boolean */ - public boolean flush(boolean block, Selector s, long timeout) throws IOException { + public boolean flush(boolean block, Selector s, long timeout,MutableInteger lastWrite) throws IOException { if (!block) { flush(netOutBuffer); } else { - pool.write(netOutBuffer, this, s, timeout); + pool.write(netOutBuffer, this, s, timeout,block,lastWrite); } return !netOutBuffer.hasRemaining(); }