import org.apache.tomcat.util.buf.MessageBytes;
import org.apache.tomcat.util.http.Cookies;
import org.apache.tomcat.util.http.ServerCookie;
+import org.apache.tomcat.util.net.SocketStatus;
/**
* @return false to indicate an error, expected or not
*/
public boolean event(org.apache.coyote.Request req,
- org.apache.coyote.Response res, boolean error) {
+ org.apache.coyote.Response res, SocketStatus status) {
Request request = (Request) req.getNote(ADAPTER_NOTES);
Response response = (Response) res.getNote(ADAPTER_NOTES);
if (request.getWrapper() != null) {
+ boolean error = false;
try {
- if (error) {
- request.getEvent().setEventType(CometEvent.EventType.ERROR);
- } else {
+ if (status == SocketStatus.OPEN) {
request.getEvent().setEventType(CometEvent.EventType.READ);
+ request.getEvent().setEventSubType(null);
+ } else if (status == SocketStatus.DISCONNECT) {
+ request.getEvent().setEventType(CometEvent.EventType.ERROR);
+ request.getEvent().setEventSubType(CometEvent.EventSubType.CLIENT_DISCONNECT);
+ error = true;
+ } else if (status == SocketStatus.ERROR) {
+ request.getEvent().setEventType(CometEvent.EventType.ERROR);
+ request.getEvent().setEventSubType(CometEvent.EventSubType.IOEXCEPTION);
+ error = true;
+ } else if (status == SocketStatus.STOP) {
+ request.getEvent().setEventType(CometEvent.EventType.END);
+ request.getEvent().setEventSubType(CometEvent.EventSubType.SERVER_SHUTDOWN);
+ } else if (status == SocketStatus.TIMEOUT) {
+ request.getEvent().setEventType(CometEvent.EventType.ERROR);
+ request.getEvent().setEventSubType(CometEvent.EventSubType.TIMEOUT);
}
+
// Calling the container
connector.getContainer().getPipeline().getFirst().event(request, response, request.getEvent());
response.recycle();
}
}
+
+ } else {
+ return false;
}
- return true;
}
package org.apache.coyote;
+import org.apache.tomcat.util.net.SocketStatus;
+
/**
* Adapter. This represents the entry point in a coyote-based servlet container.
*/
public interface Adapter {
-
/**
* Call the service method, and notify all listeners
*
public void service(Request req, Response res)
throws Exception;
- public boolean event(Request req, Response res, boolean error)
+ public boolean event(Request req, Response res, SocketStatus status)
throws Exception;
}
import org.apache.coyote.RequestInfo;
import org.apache.tomcat.util.modeler.Registry;
import org.apache.tomcat.util.net.AprEndpoint;
+import org.apache.tomcat.util.net.SocketStatus;
import org.apache.tomcat.util.net.AprEndpoint.Handler;
-import org.apache.tomcat.util.net.AprEndpoint.Handler.SocketState;
import org.apache.tomcat.util.res.StringManager;
}
// FIXME: Support for this could be added in AJP as well
- public SocketState event(long socket, boolean error) {
+ public SocketState event(long socket, SocketStatus status) {
return SocketState.CLOSED;
}
import org.apache.tomcat.util.http.FastHttpDateFormat;
import org.apache.tomcat.util.http.MimeHeaders;
import org.apache.tomcat.util.net.AprEndpoint;
+import org.apache.tomcat.util.net.SocketStatus;
import org.apache.tomcat.util.net.AprEndpoint.Handler.SocketState;
import org.apache.tomcat.util.res.StringManager;
*
* @throws IOException error during an I/O operation
*/
- public SocketState event(boolean error)
+ public SocketState event(SocketStatus status)
throws IOException {
RequestInfo rp = request.getRequestProcessor();
try {
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
- error = !adapter.event(request, response, error);
+ error = !adapter.event(request, response, status);
} catch (InterruptedIOException e) {
error = true;
} catch (Throwable t) {
import org.apache.coyote.RequestInfo;
import org.apache.tomcat.util.modeler.Registry;
import org.apache.tomcat.util.net.AprEndpoint;
+import org.apache.tomcat.util.net.SocketStatus;
import org.apache.tomcat.util.net.AprEndpoint.Handler;
import org.apache.tomcat.util.res.StringManager;
this.proto = proto;
}
- public SocketState event(long socket, boolean error) {
+ public SocketState event(long socket, SocketStatus status) {
Http11AprProcessor result = connections.get(socket);
SocketState state = SocketState.CLOSED;
if (result != null) {
- boolean recycle = error;
// Call the appropriate event
try {
- state = result.event(error);
+ state = result.event(status);
} catch (java.net.SocketException e) {
// SocketExceptions are normal
Http11AprProtocol.log.debug
import org.apache.tomcat.util.net.NioChannel;
import org.apache.tomcat.util.net.NioEndpoint;
import org.apache.tomcat.util.net.SSLSupport;
+import org.apache.tomcat.util.net.SocketStatus;
import org.apache.tomcat.util.net.NioEndpoint.Handler.SocketState;
import org.apache.tomcat.util.res.StringManager;
*
* @throws IOException error during an I/O operation
*/
- public SocketState event(boolean error)
+ public SocketState event(SocketStatus status)
throws IOException {
RequestInfo rp = request.getRequestProcessor();
try {
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
- error = !adapter.event(request, response, error);
+ error = !adapter.event(request, response, status);
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
if ( key != null ) {
NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
import org.apache.tomcat.util.net.NioChannel;
import org.apache.tomcat.util.net.SSLImplementation;
import org.apache.tomcat.util.net.SecureNioChannel;
+import org.apache.tomcat.util.net.SocketStatus;
/**
this.proto = proto;
}
- public SocketState event(NioChannel socket, boolean error) {
+ public SocketState event(NioChannel socket, SocketStatus status) {
Http11NioProcessor result = connections.get(socket);
SocketState state = SocketState.CLOSED;
if (result != null) {
- boolean recycle = error;
// Call the appropriate event
try {
- state = result.event(error);
+ state = result.event(status);
} catch (java.net.SocketException e) {
// SocketExceptions are normal
Http11NioProtocol.log.debug
/**
* Process given socket for an event.
*/
- protected boolean processSocket(long socket, boolean error) {
+ protected boolean processSocket(long socket, SocketStatus status) {
try {
if (executor == null) {
- getWorkerThread().assign(socket, error);
+ getWorkerThread().assign(socket, status);
} else {
- executor.execute(new SocketEventProcessor(socket, error));
+ executor.execute(new SocketEventProcessor(socket, status));
}
} catch (Throwable t) {
// This means we got an OOM or similar creating a thread, or that
// Close all sockets in the add queue
for (int i = 0; i < addCount; i++) {
if (comet) {
- processSocket(addS[i], true);
+ processSocket(addS[i], SocketStatus.STOP);
} else {
Socket.destroy(addS[i]);
}
if (rv > 0) {
for (int n = 0; n < rv; n++) {
if (comet) {
- processSocket(desc[n*2+1], true);
+ processSocket(desc[n*2+1], SocketStatus.STOP);
} else {
Socket.destroy(desc[n*2+1]);
}
if (addCount >= addS.length) {
// Can't do anything: close the socket right away
if (comet) {
- processSocket(socket, true);
+ processSocket(socket, SocketStatus.ERROR);
} else {
Socket.destroy(socket);
}
} else {
// Can't do anything: close the socket right away
if (comet) {
- processSocket(addS[i], true);
+ processSocket(addS[i], SocketStatus.ERROR);
} else {
Socket.destroy(addS[i]);
}
// Check for failed sockets and hand this socket off to a worker
if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
|| ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)
- || (comet && (!processSocket(desc[n*2+1], false)))
+ || (comet && (!processSocket(desc[n*2+1], SocketStatus.OPEN)))
|| (!comet && (!processSocket(desc[n*2+1])))) {
// Close socket and clear pool
if (comet) {
- processSocket(desc[n*2+1], true);
+ processSocket(desc[n*2+1], SocketStatus.DISCONNECT);
} else {
Socket.destroy(desc[n*2+1]);
}
for (int n = 0; n < rv; n++) {
// Close socket and clear pool
if (comet) {
- processSocket(desc[n], true);
+ processSocket(desc[n], SocketStatus.TIMEOUT);
} else {
Socket.destroy(desc[n]);
}
protected Thread thread = null;
protected boolean available = false;
protected long socket = 0;
- protected boolean event = false;
- protected boolean error = false;
+ protected SocketStatus status = null;
protected boolean options = false;
// Store the newly available Socket and notify our thread
this.socket = socket;
- event = false;
- error = false;
+ status = null;
options = true;
available = true;
notifyAll();
// Store the newly available Socket and notify our thread
this.socket = socket;
- event = false;
- error = false;
+ status = null;
options = false;
available = true;
notifyAll();
}
- protected synchronized void assign(long socket, boolean error) {
+ protected synchronized void assign(long socket, SocketStatus status) {
// Wait for the Processor to get the previous Socket
while (available) {
// Store the newly available Socket and notify our thread
this.socket = socket;
- event = true;
- this.error = error;
+ this.status = status;
options = false;
available = true;
notifyAll();
continue;
// Process the request from this socket
- if ((event) && (handler.event(socket, error) == Handler.SocketState.CLOSED)) {
+ if ((status != null) && (handler.event(socket, status) == Handler.SocketState.CLOSED)) {
// Close socket and pool
Socket.destroy(socket);
socket = 0;
- } else if ((!event) && ((options && !setSocketOptions(socket))
+ } else if ((status == null) && ((options && !setSocketOptions(socket))
|| handler.process(socket) == Handler.SocketState.CLOSED)) {
// Close socket and pool
Socket.destroy(socket);
OPEN, CLOSED, LONG
}
public SocketState process(long socket);
- public SocketState event(long socket, boolean error);
+ public SocketState event(long socket, SocketStatus status);
}
protected class SocketEventProcessor implements Runnable {
protected long socket = 0;
- protected boolean error = false;
+ protected SocketStatus status = null;
- public SocketEventProcessor(long socket, boolean error) {
+ public SocketEventProcessor(long socket, SocketStatus status) {
this.socket = socket;
- this.error = error;
+ this.status = status;
}
public void run() {
// Process the request from this socket
- if (handler.event(socket, error) == Handler.SocketState.CLOSED) {
+ if (handler.event(socket, status) == Handler.SocketState.CLOSED) {
// Close socket and pool
Socket.destroy(socket);
socket = 0;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.security.KeyStore;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.commons.logging.LogFactory;
import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler;
import org.apache.tomcat.util.res.StringManager;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.net.Socket;
-import java.util.StringTokenizer;
/**
* NIO tailored thread pool, providing the following services:
/**
* Process given socket for an event.
*/
- protected boolean processSocket(NioChannel socket, boolean error) {
+ protected boolean processSocket(NioChannel socket, SocketStatus status) {
try {
if (executor == null) {
- getWorkerThread().assign(socket, error);
+ getWorkerThread().assign(socket, status);
} else {
- executor.execute(new SocketEventProcessor(socket, error));
+ executor.execute(new SocketEventProcessor(socket, status));
}
} catch (Throwable t) {
// This means we got an OOM or similar creating a thread, or that
addEvent(r);
}
- public void cancelledKey(SelectionKey key) {
+ public void cancelledKey(SelectionKey key, SocketStatus status) {
try {
KeyAttachment ka = (KeyAttachment) key.attachment();
if ( key.isValid() ) key.cancel();
- if (ka != null && ka.getComet()) processSocket( ka.getChannel(), true);
+ if (ka != null && ka.getComet()) processSocket( ka.getChannel(), status);
+ // FIXME: closing in all these cases is a bit mean. IMO, it should leave it
+ // to the worker (or executor) depending on what the request processor
+ // returns
if ( key.channel().isOpen() ) key.channel().close();
key.attach(null);
} catch (Throwable e) {
attachment.setWakeUp(false);
synchronized (attachment.getMutex()) {attachment.getMutex().notifyAll();}
} else if ( attachment.getComet() ) {
- if (!processSocket(channel,false)) processSocket(channel,true);
+ if (!processSocket(channel, SocketStatus.OPEN))
+ processSocket(channel, SocketStatus.DISCONNECT);
} else {
boolean close = (!processSocket(channel));
if ( close ) {
}
} else {
//invalid key
- cancelledKey(sk);
+ cancelledKey(sk, SocketStatus.ERROR);
}
} catch ( CancelledKeyException ckx ) {
- cancelledKey(sk);
+ cancelledKey(sk, SocketStatus.ERROR);
} catch (Throwable t) {
log.error("",t);
}
try {
KeyAttachment ka = (KeyAttachment) key.attachment();
if ( ka == null ) {
- cancelledKey(key); //we don't support any keys without attachments
+ cancelledKey(key, SocketStatus.ERROR); //we don't support any keys without attachments
} else if ( ka.getError() ) {
- cancelledKey(key);
+ cancelledKey(key, SocketStatus.DISCONNECT);
}else if ((ka.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ) {
//only timeout sockets that we are waiting for a read from
long delta = now - ka.getLastAccess();
long timeout = (ka.getTimeout()==-1)?((long) soTimeout):(ka.getTimeout());
boolean isTimedout = delta > timeout;
if (isTimedout) {
- cancelledKey(key);
+ cancelledKey(key, SocketStatus.TIMEOUT);
} else {
long nextTime = now+(timeout-delta);
nextExpiration = (nextTime < nextExpiration)?nextTime:nextExpiration;
}
}//end if
}catch ( CancelledKeyException ckx ) {
- cancelledKey(key);
+ cancelledKey(key, SocketStatus.ERROR);
}
}//for
}
protected Thread thread = null;
protected boolean available = false;
protected Object socket = null;
- protected boolean event = false;
- protected boolean error = false;
+ protected SocketStatus status = null;
/**
}
// Store the newly available Socket and notify our thread
this.socket = socket;
- event = false;
- error = false;
+ status = null;
available = true;
notifyAll();
}
- protected synchronized void assign(Object socket, boolean error) {
+ protected synchronized void assign(Object socket, SocketStatus status) {
// Wait for the Processor to get the previous Socket
while (available) {
// Store the newly available Socket and notify our thread
this.socket = socket;
- event = true;
- this.error = error;
+ this.status = status;
available = true;
notifyAll();
}
}
if ( handshake == 0 ) {
// Process the request from this socket
- if ((event) && (handler.event(socket, error) == Handler.SocketState.CLOSED)) {
+ if ((status != null) && (handler.event(socket, status) == Handler.SocketState.CLOSED)) {
// Close socket and pool
try {
}catch ( Exception x ) {
log.error("",x);
}
- } else if ((!event) && (handler.process(socket) == Handler.SocketState.CLOSED)) {
+ } else if ((status == null) && (handler.process(socket) == Handler.SocketState.CLOSED)) {
// Close socket and pool
try {
OPEN, CLOSED, LONG
}
public SocketState process(NioChannel socket);
- public SocketState event(NioChannel socket, boolean error);
+ public SocketState event(NioChannel socket, SocketStatus status);
}
protected class SocketEventProcessor implements Runnable {
protected NioChannel socket = null;
- protected boolean error = false;
+ protected SocketStatus status = null;
- public SocketEventProcessor(NioChannel socket, boolean error) {
+ public SocketEventProcessor(NioChannel socket, SocketStatus status) {
this.socket = socket;
- this.error = error;
+ this.status = status;
}
public void run() {
// Process the request from this socket
- if (handler.event(socket, error) == Handler.SocketState.CLOSED) {
+ if (handler.event(socket, status) == Handler.SocketState.CLOSED) {
// Close socket and pool
try {
try {socket.close();}catch (Exception ignore){}
--- /dev/null
+/*
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tomcat.util.net;
+
+/**
+ * Someone, please change the enum name.
+ *
+ * @author remm
+ */
+public enum SocketStatus {
+ OPEN, STOP, TIMEOUT, DISCONNECT, ERROR
+}