/**
* Process given socket.
*/
+ protected boolean processSocketWithOptions(long socket) {
+ try {
+ if (executor == null) {
+ getWorkerThread().assignWithOptions(socket);
+ } else {
+ executor.execute(new SocketWithOptionsProcessor(socket));
+ }
+ } catch (Throwable t) {
+ // This means we got an OOM or similar creating a thread, or that
+ // the pool and its queue are full
+ log.error(sm.getString("endpoint.process.fail"), t);
+ return false;
+ }
+ return true;
+ }
+
+
+ /**
+ * Process given socket.
+ */
protected boolean processSocket(long socket) {
try {
if (executor == null) {
// Accept the next incoming connection from the server socket
long socket = Socket.accept(serverSock);
// Hand this socket off to an appropriate processor
- if (!setSocketOptions(socket) || !processSocket(socket)) {
+ if (!processSocketWithOptions(socket)) {
// Close socket and pool right away
Socket.destroy(socket);
}
protected long socket = 0;
protected boolean event = false;
protected boolean error = false;
+ protected boolean options = false;
+
+
+ /**
+ * Process an incoming TCP/IP connection on the specified socket. Any
+ * exception that occurs during processing must be logged and swallowed.
+ * <b>NOTE</b>: This method is called from our Connector's thread. We
+ * must assign it to our own thread so that multiple simultaneous
+ * requests can be handled.
+ *
+ * @param socket TCP socket to process
+ */
+ protected synchronized void assignWithOptions(long socket) {
+
+ // Wait for the Processor to get the previous Socket
+ while (available) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ }
+ }
+
+ // Store the newly available Socket and notify our thread
+ this.socket = socket;
+ event = false;
+ error = false;
+ options = true;
+ available = true;
+ notifyAll();
+
+ }
/**
this.socket = socket;
event = false;
error = false;
+ options = false;
available = true;
notifyAll();
this.socket = socket;
event = true;
this.error = error;
+ options = false;
available = true;
notifyAll();
// Close socket and pool
Socket.destroy(socket);
socket = 0;
- } else if ((!event) && (handler.process(socket) == Handler.SocketState.CLOSED)) {
+ } else if ((!event) && ((options && !setSocketOptions(socket))
+ || handler.process(socket) == Handler.SocketState.CLOSED)) {
// Close socket and pool
Socket.destroy(socket);
socket = 0;
/**
* This class is the equivalent of the Worker, but will simply use in an
+ * external Executor thread pool. This will also set the socket options
+ * and do the handshake.
+ */
+ protected class SocketWithOptionsProcessor implements Runnable {
+
+ protected long socket = 0;
+
+ public SocketWithOptionsProcessor(long socket) {
+ this.socket = socket;
+ }
+
+ public void run() {
+
+ // Process the request from this socket
+ if (!setSocketOptions(socket)
+ || handler.process(socket) == Handler.SocketState.CLOSED) {
+ // Close socket and pool
+ Socket.destroy(socket);
+ socket = 0;
+ }
+
+ }
+
+ }
+
+
+ // ---------------------------------------------- SocketProcessor Inner Class
+
+
+ /**
+ * This class is the equivalent of the Worker, but will simply use in an
* external Executor thread pool.
*/
protected class SocketProcessor implements Runnable {
Socket socket = serverSocketFactory.acceptSocket(serverSocket);
serverSocketFactory.initSocket(socket);
// Hand this socket off to an appropriate processor
- if (!setSocketOptions(socket) || !processSocket(socket)) {
+ if (!processSocket(socket)) {
// Close socket right away
try {
socket.close();
public void run() {
// Process the request from this socket
- if (!handler.process(socket)) {
+ if (!setSocketOptions(socket) || !handler.process(socket)) {
// Close socket
try {
socket.close();
continue;
// Process the request from this socket
- if (!handler.process(socket)) {
+ if (!setSocketOptions(socket) || !handler.process(socket)) {
// Close socket
try {
socket.close();