import java.net.Socket;
import java.security.AccessController;
import java.security.PrivilegedAction;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.coyote.ActionCode;
import org.apache.coyote.ActionHook;
import org.apache.tomcat.util.http.MimeHeaders;
import org.apache.tomcat.util.net.JIoEndpoint;
import org.apache.tomcat.util.net.SSLSupport;
+import org.apache.tomcat.util.net.SocketStatus;
import org.apache.tomcat.util.net.SocketWrapper;
+import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
/**
*/
protected SSLSupport sslSupport;
+ /**
+ * Async used
+ */
+ protected boolean async = false;
+
/**
* Socket associated with the current connection.
*/
- protected Socket socket;
+ protected SocketWrapper<Socket> socket;
*
* @throws IOException error during an I/O operation
*/
- public boolean process(SocketWrapper<Socket> socketWrapper)
+ public SocketState process(SocketWrapper<Socket> socketWrapper)
throws IOException {
- Socket theSocket = socketWrapper.getSocket();
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
localPort = -1;
// Setting up the I/O
- this.socket = theSocket;
- inputBuffer.setInputStream(socket.getInputStream());
- outputBuffer.setOutputStream(socket.getOutputStream());
+ this.socket = socketWrapper;
+ inputBuffer.setInputStream(socket.getSocket().getInputStream());
+ outputBuffer.setOutputStream(socket.getSocket().getOutputStream());
// Error flag
error = false;
int soTimeout = endpoint.getSoTimeout();
try {
- socket.setSoTimeout(soTimeout);
+ socket.getSocket().setSoTimeout(soTimeout);
} catch (Throwable t) {
log.debug(sm.getString("http11processor.socket.timeout"), t);
error = true;
//TODO - calculate timeout based on length in queue (System.currentTimeMills() - wrapper.getLastAccess() is the time in queue)
if (keptAlive) {
if (keepAliveTimeout > 0) {
- socket.setSoTimeout(keepAliveTimeout);
+ socket.getSocket().setSoTimeout(keepAliveTimeout);
}
else if (soTimeout > 0) {
- socket.setSoTimeout(soTimeout);
+ socket.getSocket().setSoTimeout(soTimeout);
}
}
inputBuffer.parseRequestLine(false);
request.setStartTime(System.currentTimeMillis());
keptAlive = true;
if (disableUploadTimeout) {
- socket.setSoTimeout(soTimeout);
+ socket.getSocket().setSoTimeout(soTimeout);
} else {
- socket.setSoTimeout(timeout);
+ socket.getSocket().setSoTimeout(timeout);
}
inputBuffer.parseHeaders();
} catch (IOException e) {
// If we know we are closing the connection, don't drain input.
// This way uploading a 100GB file doesn't tie up the thread
// if the servlet has rejected it.
- if(error)
+
+ if(error && !async)
inputBuffer.setSwallowInput(false);
- inputBuffer.endRequest();
- } catch (IOException e) {
- error = true;
+ if (!async)
+ endRequest();
} catch (Throwable t) {
log.error(sm.getString("http11processor.request.finish"), t);
// 500 - Internal Server Error
}
try {
rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);
- outputBuffer.endRequest();
- } catch (IOException e) {
- error = true;
} catch (Throwable t) {
log.error(sm.getString("http11processor.response.finish"), t);
error = true;
// will reset it
// thrA.setParam(null);
// Next request
- inputBuffer.nextRequest();
- outputBuffer.nextRequest();
+ if (!async) {
+ inputBuffer.nextRequest();
+ outputBuffer.nextRequest();
+ }
//hack keep alive behavior
break;
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
+ if (async) {
+ if (error) {
+ recycle();
+ return SocketState.CLOSED;
+ } else {
+ socket.setAsync(true);
+ return SocketState.LONG;
+ }
+ } else {
+ socket.setAsync(false);
+ if ( error || (!keepAlive)) {
+ recycle();
+ return SocketState.CLOSED;
+ } else {
+ return SocketState.OPEN;
+ }
+ }
+ }
+
+
+ public SocketState asyncDispatch(SocketStatus status) throws IOException {
+
+ RequestInfo rp = request.getRequestProcessor();
+ try {
+ rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
+ error = !adapter.asyncDispatch(request, response, status);
+ } catch (InterruptedIOException e) {
+ error = true;
+ } catch (Throwable t) {
+ log.error(sm.getString("http11processor.request.process"), t);
+ // 500 - Internal Server Error
+ response.setStatus(500);
+ error = true;
+ }
+
+ rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
+ if (async) {
+ if (error) {
+ recycle();
+ return SocketState.CLOSED;
+ } else {
+ return SocketState.LONG;
+ }
+ } else {
+ if ( error || (!keepAlive)) {
+ recycle();
+ return SocketState.CLOSED;
+ } else {
+ return SocketState.OPEN;
+ }
+ }
+ }
+
+
+ public void endRequest() {
+
+ // Finish the handling of the request
+ try {
+ inputBuffer.endRequest();
+ } catch (IOException e) {
+ error = true;
+ } catch (Throwable t) {
+ log.error(sm.getString("http11processor.request.finish"), t);
+ // 500 - Internal Server Error
+ response.setStatus(500);
+ error = true;
+ }
+ try {
+ outputBuffer.endRequest();
+ } catch (IOException e) {
+ error = true;
+ } catch (Throwable t) {
+ log.error(sm.getString("http11processor.response.finish"), t);
+ error = true;
+ }
+
+ }
+
+
+ public void recycle() {
// Recycle
inputBuffer.recycle();
outputBuffer.recycle();
this.socket = null;
+ async = false;
// Recycle ssl info
sslSupport = null;
- if (log.isTraceEnabled()) {
- boolean returnvalue = (!error && keepAlive);
- log.trace("Returning "+returnvalue+" to adjust for keep alive.");
- }
- return !error && keepAlive;
}
} else if (actionCode == ActionCode.ACTION_CLOSE) {
// Close
-
+ async = false;
// End the processing of the current request, and stop any further
// transactions with the client
} else if (actionCode == ActionCode.ACTION_REQ_HOST_ADDR_ATTRIBUTE) {
if ((remoteAddr == null) && (socket != null)) {
- InetAddress inetAddr = socket.getInetAddress();
+ InetAddress inetAddr = socket.getSocket().getInetAddress();
if (inetAddr != null) {
remoteAddr = inetAddr.getHostAddress();
}
} else if (actionCode == ActionCode.ACTION_REQ_LOCAL_NAME_ATTRIBUTE) {
if ((localName == null) && (socket != null)) {
- InetAddress inetAddr = socket.getLocalAddress();
+ InetAddress inetAddr = socket.getSocket().getLocalAddress();
if (inetAddr != null) {
localName = inetAddr.getHostName();
}
} else if (actionCode == ActionCode.ACTION_REQ_HOST_ATTRIBUTE) {
if ((remoteHost == null) && (socket != null)) {
- InetAddress inetAddr = socket.getInetAddress();
+ InetAddress inetAddr = socket.getSocket().getInetAddress();
if (inetAddr != null) {
remoteHost = inetAddr.getHostName();
}
} else if (actionCode == ActionCode.ACTION_REQ_LOCAL_ADDR_ATTRIBUTE) {
if (localAddr == null)
- localAddr = socket.getLocalAddress().getHostAddress();
+ localAddr = socket.getSocket().getLocalAddress().getHostAddress();
request.localAddr().setString(localAddr);
} else if (actionCode == ActionCode.ACTION_REQ_REMOTEPORT_ATTRIBUTE) {
if ((remotePort == -1 ) && (socket !=null)) {
- remotePort = socket.getPort();
+ remotePort = socket.getSocket().getPort();
}
request.setRemotePort(remotePort);
} else if (actionCode == ActionCode.ACTION_REQ_LOCALPORT_ATTRIBUTE) {
if ((localPort == -1 ) && (socket !=null)) {
- localPort = socket.getLocalPort();
+ localPort = socket.getSocket().getLocalPort();
}
request.setLocalPort(localPort);
internalBuffer.addActiveFilter(savedBody);
} else if (actionCode == ActionCode.ACTION_ASYNC_START) {
//TODO SERVLET3 - async
+ async = true;
} else if (actionCode == ActionCode.ACTION_ASYNC_COMPLETE) {
//TODO SERVLET3 - async
+ AtomicBoolean dispatch = (AtomicBoolean)param;
+ RequestInfo rp = request.getRequestProcessor();
+ if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) { //async handling
+ dispatch.set(true);
+ endpoint.processSocket(this.socket, SocketStatus.STOP);
+ } else {
+ //TODO SERVLET3 async=false
+ }
} else if (actionCode == ActionCode.ACTION_ASYNC_SETTIMEOUT) {
//TODO SERVLET3 - async
+ if (param==null) return;
+ long timeout = ((Long)param).longValue();
+ //if we are not piggy backing on a worker thread, set the timeout
+ socket.setAsyncTimeout(timeout);
+ } else if (actionCode == ActionCode.ACTION_ASYNC_DISPATCH) {
+ RequestInfo rp = request.getRequestProcessor();
+ AtomicBoolean dispatch = (AtomicBoolean)param;
+ if ( rp.getStage() != org.apache.coyote.Constants.STAGE_SERVICE ) {//async handling
+ endpoint.processSocket(this.socket, SocketStatus.OPEN);
+ dispatch.set(true);
+ } else {
+ //TODO SERVLET3 - do nothing?
+ }
}
+
}
// HTTP/1.0
// Default is what the socket tells us. Overridden if a host is
// found/parsed
- request.setServerPort(socket.getLocalPort());
- InetAddress localAddress = socket.getLocalAddress();
+ request.setServerPort(socket.getSocket().getLocalPort());
+ InetAddress localAddress = socket.getSocket().getLocalAddress();
// Setting the socket-related fields. The adapter doesn't know
// about socket.
request.serverName().setString(localAddress.getHostName());
import java.net.Socket;
import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.coyote.RequestInfo;
import org.apache.tomcat.util.modeler.Registry;
import org.apache.tomcat.util.net.JIoEndpoint;
+import org.apache.tomcat.util.net.NioChannel;
import org.apache.tomcat.util.net.SSLImplementation;
import org.apache.tomcat.util.net.ServerSocketFactory;
+import org.apache.tomcat.util.net.SocketStatus;
import org.apache.tomcat.util.net.SocketWrapper;
+import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
import org.apache.tomcat.util.net.JIoEndpoint.Handler;
protected Http11Protocol proto;
protected AtomicLong registerCount = new AtomicLong(0);
protected RequestGroupInfo global = new RequestGroupInfo();
+ protected ConcurrentHashMap<SocketWrapper, Http11Processor> connections =
+ new ConcurrentHashMap<SocketWrapper, Http11Processor>();
protected ConcurrentLinkedQueue<Http11Processor> recycledProcessors =
new ConcurrentLinkedQueue<Http11Processor>() {
this.proto = proto;
}
- public boolean process(SocketWrapper<Socket> socket) {
- Http11Processor processor = recycledProcessors.poll();
+
+ public SocketState process(SocketWrapper<Socket> socket) {
+ Http11Processor processor = connections.remove(socket);
try {
-
+ if (processor == null) {
+ processor = recycledProcessors.poll();
+ }
if (processor == null) {
processor = createProcessor();
}
-
processor.action(ActionCode.ACTION_START, null);
if (proto.isSSLEnabled() && (proto.sslImplementation != null)) {
processor.setSSLSupport(null);
}
- return processor.process(socket);
- //return false;
-
+ SocketState state = socket.isAsync()?processor.asyncDispatch(SocketStatus.OPEN):processor.process(socket);
+ if (state == SocketState.LONG) {
+ connections.put(socket, processor);
+ } else {
+ connections.remove(socket);
+ }
+ return state;
} catch(java.net.SocketException e) {
// SocketExceptions are normal
Http11Protocol.log.debug
processor.action(ActionCode.ACTION_STOP, null);
recycledProcessors.offer(processor);
}
- return false;
+ return SocketState.CLOSED;
}
protected Http11Processor createProcessor() {
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.RejectedExecutionException;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.IntrospectionUtils;
+import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
+import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment;
+import org.apache.tomcat.util.net.NioEndpoint.SocketProcessor;
/**
* Handle incoming TCP connections.
public ServerSocketFactory getServerSocketFactory() { return serverSocketFactory; }
+
+
// ------------------------------------------------ Handler Inner Interface
* thread local fields.
*/
public interface Handler {
- public boolean process(SocketWrapper<Socket> socket);
+ public SocketState process(SocketWrapper<Socket> socket);
}
}
public void run() {
- boolean close = false;
+ SocketState state = SocketState.OPEN;
// Process the request from this socket
- if ( (!socket.isKeptAlive()) && (!setSocketOptions(socket.getSocket())) ) { //this does a handshake and resets socket value
- close = true;
+ if ( (!socket.isInitialized()) && (!setSocketOptions(socket.getSocket())) ) {
+ state = SocketState.CLOSED;
}
+ socket.setInitialized(true);
- if ( (!close) ) {
- close = !handler.process(socket);
+ if ( (state != SocketState.CLOSED) ) {
+ state = handler.process(socket);
}
- if (close) {
+ if (state == SocketState.CLOSED) {
// Close socket
if (log.isTraceEnabled()) {
log.trace("Closing socket:"+socket);
} catch (IOException e) {
// Ignore
}
- } else {
+ } else if (state == SocketState.OPEN){
socket.setKeptAlive(true);
socket.access();
//keepalive connection
//TODO - servlet3 check async status, we may just be in a hold pattern
getExecutor().execute(new SocketProcessor(socket));
+ } else if (state == SocketState.LONG) {
+ socket.access();
+ waitingRequests.add(socket);
}
// Finish up this request
socket = null;
return true;
}
+ public boolean processSocket(SocketWrapper<Socket> socket, SocketStatus status) {
+ try {
+ if (status == SocketStatus.OPEN || status == SocketStatus.STOP) {
+ if (waitingRequests.remove(socket)) {
+ SocketProcessor proc = new SocketProcessor(socket);
+ getExecutor().execute(proc);
+ }
+ }
+ } catch (Throwable t) {
+ // This means we got an OOM or similar creating a thread, or that
+ // the pool and its queue are full
+ log.error(sm.getString("endpoint.process.fail"), t);
+ return false;
+ }
+ return true;
+ }
+
+ protected ConcurrentLinkedQueue<SocketWrapper> waitingRequests = new ConcurrentLinkedQueue<SocketWrapper>();
+
+ protected class RequestProcessor implements Runnable {
+ @Override
+ public void run() {
+
+ }
+
+ }
}