From: markt Date: Mon, 16 May 2011 19:56:21 +0000 (+0000) Subject: Tweak processor blocking so that it is non-blocking while no message is being process... X-Git-Url: https://git.internetallee.de/?a=commitdiff_plain;h=26f82147175d81ddd268e4d759c25da3cda6720a;p=tomcat7.0 Tweak processor blocking so that it is non-blocking while no message is being processed and blocking during the processing of a message Align ajp nio protocol implementation with http nio Still some TCK failures to resolve git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1103860 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/java/org/apache/coyote/ajp/AjpNioProcessor.java b/java/org/apache/coyote/ajp/AjpNioProcessor.java index b8aeb28b8..8f0396fc1 100644 --- a/java/org/apache/coyote/ajp/AjpNioProcessor.java +++ b/java/org/apache/coyote/ajp/AjpNioProcessor.java @@ -196,29 +196,20 @@ public class AjpNioProcessor extends AbstractAjpProcessor { // Setting up the socket this.socket = socket; - int soTimeout = -1; - final KeyAttachment ka = (KeyAttachment)socket.getAttachment(false); - if (keepAliveTimeout > 0) { - ka.setTimeout(soTimeout); - } + long soTimeout = endpoint.getSoTimeout(); + int keepAliveTimeout = endpoint.getKeepAliveTimeout(); // Error flag error = false; - boolean keptAlive = false; - + final KeyAttachment ka = (KeyAttachment)socket.getAttachment(false); + while (!error && !endpoint.isPaused()) { - // Parsing the request header try { - // Set keep alive timeout if enabled - if (keepAliveTimeout > 0) { - ka.setTimeout(keepAliveTimeout); - } // Get first message of the request - int bytesRead = readMessage(requestHeaderMessage, !keptAlive); - if (!keptAlive && bytesRead == 0) { - // No bytes on a blocking read - connection timeout + int bytesRead = readMessage(requestHeaderMessage, false); + if (bytesRead == 0) { rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); break; } @@ -235,8 +226,6 @@ public class AjpNioProcessor extends AbstractAjpProcessor { } catch (IOException e) { error = true; } - // Should be unnecessary but just in case... - keptAlive = true; recycle(); continue; } else if(type != Constants.JK_AJP13_FORWARD_REQUEST) { @@ -244,12 +233,9 @@ public class AjpNioProcessor extends AbstractAjpProcessor { if(log.isDebugEnabled()) { log.debug("Unexpected message: "+type); } - // Should be unnecessary but just in case... - keptAlive = true; recycle(); continue; } - request.setStartTime(System.currentTimeMillis()); } catch (IOException e) { error = true; @@ -324,7 +310,11 @@ public class AjpNioProcessor extends AbstractAjpProcessor { request.updateCounters(); rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE); - keptAlive = true; + // Set keep alive timeout if enabled + if (keepAliveTimeout > 0) { + ka.setTimeout(keepAliveTimeout); + } + recycle(); } @@ -479,18 +469,23 @@ public class AjpNioProcessor extends AbstractAjpProcessor { /** * Read the specified amount of bytes, and place them in the input buffer. */ - protected int read(byte[] buf, int pos, int n, boolean block) + protected int read(byte[] buf, int pos, int n, boolean blockFirstRead) throws IOException { int read = 0; int res = 0; + boolean block = blockFirstRead; + while (read < n) { res = readSocket(buf, read + pos, n, block); if (res > 0) { read += res; + } else if (res == 0 && !block) { + break; } else { throw new IOException(sm.getString("ajpprotocol.failedread")); } + block = true; } return read; } @@ -596,14 +591,18 @@ public class AjpNioProcessor extends AbstractAjpProcessor { * @return The number of bytes read * @throws IOException any other failure, including incomplete reads */ - protected int readMessage(AjpMessage message, boolean block) + protected int readMessage(AjpMessage message, boolean blockFirstRead) throws IOException { byte[] buf = message.getBuffer(); int headerLength = message.getHeaderLength(); - int bytesRead = read(buf, 0, headerLength, block); + int bytesRead = read(buf, 0, headerLength, blockFirstRead); + if (bytesRead == 0) { + return 0; + } + int messageLength = message.processHeader(); if (messageLength < 0) { // Invalid AJP header signature diff --git a/java/org/apache/coyote/ajp/AjpNioProtocol.java b/java/org/apache/coyote/ajp/AjpNioProtocol.java index 9895041c7..13c3b03a8 100644 --- a/java/org/apache/coyote/ajp/AjpNioProtocol.java +++ b/java/org/apache/coyote/ajp/AjpNioProtocol.java @@ -17,7 +17,6 @@ package org.apache.coyote.ajp; -import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; @@ -113,7 +112,7 @@ public class AjpNioProtocol extends AbstractAjpProtocol { new ConcurrentHashMap(); protected ConcurrentLinkedQueue recycledProcessors = - new ConcurrentLinkedQueue() { + new ConcurrentLinkedQueue() { private static final long serialVersionUID = 1L; protected AtomicInteger size = new AtomicInteger(0); @Override @@ -213,7 +212,7 @@ public class AjpNioProtocol extends AbstractAjpProtocol { recycledProcessors.offer(processor); } - // FIXME: Support for this could be added in AJP as well + // FIXME: Support for Comet could be added in AJP as well @Override public SocketState event(NioChannel socket, SocketStatus status) { AjpNioProcessor processor = connections.get(socket); @@ -244,16 +243,7 @@ public class AjpNioProtocol extends AbstractAjpProtocol { socket.getPoller().add(socket); } } else if (state == SocketState.LONG) { - if (processor.isAsync()) { - att.setAsync(true); // Re-enable timeouts - } else { - // Comet - if (log.isDebugEnabled()) log.debug("Keeping processor["+processor); - // May receive more data from client - SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); - key.interestOps(SelectionKey.OP_READ); - att.interestOps(SelectionKey.OP_READ); - } + att.setAsync(true); // Re-enable timeouts } else { // state == SocketState.ASYNC_END // No further work required @@ -265,27 +255,37 @@ public class AjpNioProtocol extends AbstractAjpProtocol { @Override public SocketState process(NioChannel socket) { - AjpNioProcessor processor = recycledProcessors.poll(); + AjpNioProcessor processor = connections.remove(socket); try { if (processor == null) { + processor = recycledProcessors.poll(); + } + if (processor == null) { processor = createProcessor(); } SocketState state = processor.process(socket); if (state == SocketState.LONG) { - // Check if the post processing is going to change the state + // In the middle of processing a request/response. Keep the + // socket associated with the processor. + connections.put(socket, processor); + + NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false); + att.setAsync(true); + // longPoll may change socket state (e.g. to trigger a + // complete or dispatch) state = processor.asyncPostProcess(); } if (state == SocketState.LONG || state == SocketState.ASYNC_END) { - // Need to make socket available for next processing cycle - // but no need for the poller - connections.put(socket, processor); - NioEndpoint.KeyAttachment att = - (NioEndpoint.KeyAttachment)socket.getAttachment(false); - att.setAsync(true); + // Already done all we need to do. + } else if (state == SocketState.OPEN){ + // In keep-alive but between requests. OK to recycle + // processor. Continue to poll for the next request. + release(socket, processor); + socket.getPoller().add(socket); } else { - processor.recycle(); - recycledProcessors.offer(processor); + // Connection closed. OK to recycle the processor. + release(socket, processor); } return state; @@ -308,8 +308,7 @@ public class AjpNioProtocol extends AbstractAjpProtocol { // less-than-verbose logs. log.error(sm.getString("ajpprotocol.proto.error"), e); } - processor.recycle(); - recycledProcessors.offer(processor); + release(socket, processor); return SocketState.CLOSED; }