// Setting up the socket
this.socket = socket;
- int soTimeout = -1;
- final KeyAttachment ka = (KeyAttachment)socket.getAttachment(false);
- if (keepAliveTimeout > 0) {
- ka.setTimeout(soTimeout);
- }
+ long soTimeout = endpoint.getSoTimeout();
+ int keepAliveTimeout = endpoint.getKeepAliveTimeout();
// Error flag
error = false;
- boolean keptAlive = false;
-
+ final KeyAttachment ka = (KeyAttachment)socket.getAttachment(false);
+
while (!error && !endpoint.isPaused()) {
-
// Parsing the request header
try {
- // Set keep alive timeout if enabled
- if (keepAliveTimeout > 0) {
- ka.setTimeout(keepAliveTimeout);
- }
// Get first message of the request
- int bytesRead = readMessage(requestHeaderMessage, !keptAlive);
- if (!keptAlive && bytesRead == 0) {
- // No bytes on a blocking read - connection timeout
+ int bytesRead = readMessage(requestHeaderMessage, false);
+ if (bytesRead == 0) {
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
break;
}
} catch (IOException e) {
error = true;
}
- // Should be unnecessary but just in case...
- keptAlive = true;
recycle();
continue;
} else if(type != Constants.JK_AJP13_FORWARD_REQUEST) {
if(log.isDebugEnabled()) {
log.debug("Unexpected message: "+type);
}
- // Should be unnecessary but just in case...
- keptAlive = true;
recycle();
continue;
}
-
request.setStartTime(System.currentTimeMillis());
} catch (IOException e) {
error = true;
request.updateCounters();
rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);
- keptAlive = true;
+ // Set keep alive timeout if enabled
+ if (keepAliveTimeout > 0) {
+ ka.setTimeout(keepAliveTimeout);
+ }
+
recycle();
}
/**
* Read the specified amount of bytes, and place them in the input buffer.
*/
- protected int read(byte[] buf, int pos, int n, boolean block)
+ protected int read(byte[] buf, int pos, int n, boolean blockFirstRead)
throws IOException {
int read = 0;
int res = 0;
+ boolean block = blockFirstRead;
+
while (read < n) {
res = readSocket(buf, read + pos, n, block);
if (res > 0) {
read += res;
+ } else if (res == 0 && !block) {
+ break;
} else {
throw new IOException(sm.getString("ajpprotocol.failedread"));
}
+ block = true;
}
return read;
}
* @return The number of bytes read
* @throws IOException any other failure, including incomplete reads
*/
- protected int readMessage(AjpMessage message, boolean block)
+ protected int readMessage(AjpMessage message, boolean blockFirstRead)
throws IOException {
byte[] buf = message.getBuffer();
int headerLength = message.getHeaderLength();
- int bytesRead = read(buf, 0, headerLength, block);
+ int bytesRead = read(buf, 0, headerLength, blockFirstRead);
+ if (bytesRead == 0) {
+ return 0;
+ }
+
int messageLength = message.processHeader();
if (messageLength < 0) {
// Invalid AJP header signature
package org.apache.coyote.ajp;
-import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
new ConcurrentHashMap<NioChannel, AjpNioProcessor>();
protected ConcurrentLinkedQueue<AjpNioProcessor> recycledProcessors =
- new ConcurrentLinkedQueue<AjpNioProcessor>() {
+ new ConcurrentLinkedQueue<AjpNioProcessor>() {
private static final long serialVersionUID = 1L;
protected AtomicInteger size = new AtomicInteger(0);
@Override
recycledProcessors.offer(processor);
}
- // FIXME: Support for this could be added in AJP as well
+ // FIXME: Support for Comet could be added in AJP as well
@Override
public SocketState event(NioChannel socket, SocketStatus status) {
AjpNioProcessor processor = connections.get(socket);
socket.getPoller().add(socket);
}
} else if (state == SocketState.LONG) {
- if (processor.isAsync()) {
- att.setAsync(true); // Re-enable timeouts
- } else {
- // Comet
- if (log.isDebugEnabled()) log.debug("Keeping processor["+processor);
- // May receive more data from client
- SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
- key.interestOps(SelectionKey.OP_READ);
- att.interestOps(SelectionKey.OP_READ);
- }
+ att.setAsync(true); // Re-enable timeouts
} else {
// state == SocketState.ASYNC_END
// No further work required
@Override
public SocketState process(NioChannel socket) {
- AjpNioProcessor processor = recycledProcessors.poll();
+ AjpNioProcessor processor = connections.remove(socket);
try {
if (processor == null) {
+ processor = recycledProcessors.poll();
+ }
+ if (processor == null) {
processor = createProcessor();
}
SocketState state = processor.process(socket);
if (state == SocketState.LONG) {
- // Check if the post processing is going to change the state
+ // In the middle of processing a request/response. Keep the
+ // socket associated with the processor.
+ connections.put(socket, processor);
+
+ NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
+ att.setAsync(true);
+ // longPoll may change socket state (e.g. to trigger a
+ // complete or dispatch)
state = processor.asyncPostProcess();
}
if (state == SocketState.LONG || state == SocketState.ASYNC_END) {
- // Need to make socket available for next processing cycle
- // but no need for the poller
- connections.put(socket, processor);
- NioEndpoint.KeyAttachment att =
- (NioEndpoint.KeyAttachment)socket.getAttachment(false);
- att.setAsync(true);
+ // Already done all we need to do.
+ } else if (state == SocketState.OPEN){
+ // In keep-alive but between requests. OK to recycle
+ // processor. Continue to poll for the next request.
+ release(socket, processor);
+ socket.getPoller().add(socket);
} else {
- processor.recycle();
- recycledProcessors.offer(processor);
+ // Connection closed. OK to recycle the processor.
+ release(socket, processor);
}
return state;
// less-than-verbose logs.
log.error(sm.getString("ajpprotocol.proto.error"), e);
}
- processor.recycle();
- recycledProcessors.offer(processor);
+ release(socket, processor);
return SocketState.CLOSED;
}