*/
public class Poller extends Thread {
- protected long serverPollset = 0;
- protected long pool = 0;
- protected long[] desc;
+ // Need two pollsets since the socketTimeout and the keep-alive timeout
+ // can have different values.
+ private long connectionPollset = 0;
+ private long keepAlivePollset = 0;
+ private long pool = 0;
+ private long[] desc;
- protected long[] addS;
- protected volatile int addCount = 0;
+ private long[] addSocket;
+ private boolean[] addSocketKeepAlive;
+
+ private volatile int addCount = 0;
- protected boolean comet = true;
+ private boolean comet = true;
+ private boolean separateKeepAlive = false;
protected volatile int keepAliveCount = 0;
public int getKeepAliveCount() { return keepAliveCount; }
}
/**
- * Create the poller. With some versions of APR, the maximum poller size will
- * be 62 (recompiling APR is necessary to remove this limitation).
+ * Create the poller. With some versions of APR, the maximum poller size
+ * will be 62 (recompiling APR is necessary to remove this limitation).
*/
protected void init() {
pool = Pool.create(serverSockPool);
int size = getMaxConnections() / pollerThreadCount;
- int timeout = getKeepAliveTimeout();
- if (timeout <= 0) {
- timeout = socketProperties.getSoTimeout();
+ int keepAliveTimeout = getKeepAliveTimeout();
+ int socketTimeout = socketProperties.getSoTimeout();
+ if (keepAliveTimeout > 0 && !comet) {
+ separateKeepAlive = true;
}
- serverPollset = allocatePoller(size, pool, timeout);
- if (serverPollset == 0 && size > 1024) {
+ connectionPollset = allocatePoller(size, pool, socketTimeout);
+ if (separateKeepAlive) {
+ keepAlivePollset = allocatePoller(size, pool, keepAliveTimeout);
+ }
+ if (connectionPollset == 0 && size > 1024) {
size = 1024;
- serverPollset = allocatePoller(size, pool, timeout);
+ connectionPollset = allocatePoller(size, pool, socketTimeout);
+ if (separateKeepAlive) {
+ keepAlivePollset =
+ allocatePoller(size, pool, keepAliveTimeout);
+ }
}
- if (serverPollset == 0) {
+ if (connectionPollset == 0) {
size = 62;
- serverPollset = allocatePoller(size, pool, timeout);
+ connectionPollset = allocatePoller(size, pool, socketTimeout);
+ if (separateKeepAlive) {
+ keepAlivePollset =
+ allocatePoller(size, pool, keepAliveTimeout);
+ }
}
desc = new long[size * 2];
keepAliveCount = 0;
- addS = new long[size];
+ addSocket = new long[size];
+ addSocketKeepAlive = new boolean[size];
addCount = 0;
}
// Close all sockets in the add queue
for (int i = 0; i < addCount; i++) {
if (comet) {
- processSocket(addS[i], SocketStatus.STOP);
+ processSocket(addSocket[i], SocketStatus.STOP);
} else {
- destroySocket(addS[i]);
+ destroySocket(addSocket[i]);
}
}
- // Close all sockets still in the poller
- int rv = Poll.pollset(serverPollset, desc);
- if (rv > 0) {
- for (int n = 0; n < rv; n++) {
- if (comet) {
- processSocket(desc[n*2+1], SocketStatus.STOP);
- } else {
- destroySocket(desc[n*2+1]);
- }
- }
+ // Close all sockets still in the pollers
+ closePollset(connectionPollset);
+ if (separateKeepAlive) {
+ closePollset(keepAlivePollset);
}
Pool.destroy(pool);
keepAliveCount = 0;
}
}
+ private void closePollset(long pollset) {
+ int rv = Poll.pollset(pollset, desc);
+ if (rv > 0) {
+ for (int n = 0; n < rv; n++) {
+ if (comet) {
+ processSocket(desc[n*2+1], SocketStatus.STOP);
+ } else {
+ destroySocket(desc[n*2+1]);
+ }
+ }
+ }
+ }
+
/**
* Add specified socket and associated pool to the poller. The socket will
* be added to a temporary array, and polled first after a maximum amount
*
* @param socket to add to the poller
*/
- public void add(long socket) {
+ public void add(long socket, boolean keepAlive) {
synchronized (this) {
// Add socket to the list. Newly added sockets will wait
// at most for pollTime before being polled
- if (addCount >= addS.length) {
+ if (addCount >= addSocket.length) {
// Can't do anything: close the socket right away
if (comet) {
processSocket(socket, SocketStatus.ERROR);
}
return;
}
- addS[addCount] = socket;
+ addSocket[addCount] = socket;
+ addSocketKeepAlive[addCount] = keepAlive;
addCount++;
this.notify();
}
int successCount = 0;
try {
for (int i = (addCount - 1); i >= 0; i--) {
- int rv = Poll.add
- (serverPollset, addS[i], Poll.APR_POLLIN);
+ int rv;
+ if (separateKeepAlive && addSocketKeepAlive[i]) {
+ rv = Poll.add(keepAlivePollset,
+ addSocket[i], Poll.APR_POLLIN);
+ } else {
+ rv = Poll.add(connectionPollset,
+ addSocket[i], Poll.APR_POLLIN);
+ }
if (rv == Status.APR_SUCCESS) {
successCount++;
} else {
// Can't do anything: close the socket right away
if (comet) {
- processSocket(addS[i], SocketStatus.ERROR);
+ processSocket(addSocket[i], SocketStatus.ERROR);
} else {
- destroySocket(addS[i]);
+ destroySocket(addSocket[i]);
}
}
}
}
maintainTime += pollTime;
- // Pool for the specified interval
- int rv = Poll.poll(serverPollset, pollTime, desc, true);
- if (rv > 0) {
- keepAliveCount -= rv;
- for (int n = 0; n < rv; n++) {
- // Check for failed sockets and hand this socket off to a worker
- if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
- || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)
- || (comet && (!processSocket(desc[n*2+1], SocketStatus.OPEN)))
- || (!comet && (!processSocket(desc[n*2+1])))) {
- // Close socket and clear pool
- if (comet) {
- processSocket(desc[n*2+1], SocketStatus.DISCONNECT);
- } else {
- destroySocket(desc[n*2+1]);
- }
- continue;
- }
- }
- } else if (rv < 0) {
- int errn = -rv;
- /* Any non timeup or interrupted error is critical */
- if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) {
- if (errn > Status.APR_OS_START_USERERR) {
- errn -= Status.APR_OS_START_USERERR;
- }
- log.error(sm.getString("endpoint.poll.fail", "" + errn, Error.strerror(errn)));
- // Handle poll critical failure
- synchronized (this) {
- destroy();
- init();
- }
- continue;
- }
+ // Poll for the specified interval
+ if (doPoll(connectionPollset)) {
+ continue;
}
- if (socketProperties.getSoTimeout() > 0 && maintainTime > 1000000L && running) {
- rv = Poll.maintain(serverPollset, desc, true);
+ if (separateKeepAlive && doPoll(keepAlivePollset)) {
+ continue;
+ }
+
+ // Check timeouts (much less frequently that polling)
+ if (maintainTime > 1000000L && running) {
maintainTime = 0;
- if (rv > 0) {
- keepAliveCount -= rv;
- for (int n = 0; n < rv; n++) {
- // Close socket and clear pool
- if (comet) {
- processSocket(desc[n], SocketStatus.TIMEOUT);
- } else {
- destroySocket(desc[n]);
- }
- }
+ if (socketProperties.getSoTimeout() > 0) {
+ doTimeout(connectionPollset);
+ }
+ if (separateKeepAlive) {
+ doTimeout(keepAlivePollset);
}
}
} catch (Throwable t) {
}
- }
-
+ private boolean doPoll(long pollset) {
+ int rv = Poll.poll(pollset, pollTime, desc, true);
+ if (rv > 0) {
+ keepAliveCount -= rv;
+ for (int n = 0; n < rv; n++) {
+ // Check for failed sockets and hand this socket off to a worker
+ if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
+ || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)
+ || (comet && (!processSocket(desc[n*2+1], SocketStatus.OPEN)))
+ || (!comet && (!processSocket(desc[n*2+1])))) {
+ // Close socket and clear pool
+ if (comet) {
+ processSocket(desc[n*2+1], SocketStatus.DISCONNECT);
+ } else {
+ destroySocket(desc[n*2+1]);
+ }
+ return true;
+ }
+ }
+ } else if (rv < 0) {
+ int errn = -rv;
+ /* Any non timeup or interrupted error is critical */
+ if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) {
+ if (errn > Status.APR_OS_START_USERERR) {
+ errn -= Status.APR_OS_START_USERERR;
+ }
+ log.error(sm.getString("endpoint.poll.fail", "" + errn, Error.strerror(errn)));
+ // Handle poll critical failure
+ synchronized (this) {
+ destroy();
+ init();
+ }
+ return true;
+ }
+ }
+ return false;
+ }
- // ----------------------------------------------------- Worker Inner Class
+ private void doTimeout(long pollset) {
+ int rv = Poll.maintain(pollset, desc, true);
+ if (rv > 0) {
+ keepAliveCount -= rv;
+ for (int n = 0; n < rv; n++) {
+ // Close socket and clear pool
+ if (comet) {
+ processSocket(desc[n], SocketStatus.TIMEOUT);
+ } else {
+ destroySocket(desc[n]);
+ }
+ }
+ }
+ }
+ }
// ----------------------------------------------- SendfileData Inner Class
Socket.timeoutSet(state.socket, socketProperties.getSoTimeout() * 1000);
// If all done put the socket back in the poller for
// processing of further requests
- getPoller().add(state.socket);
+ getPoller().add(state.socket, true);
} else {
// Close the socket since this is
// the end of not keep-alive request.
synchronized (socket) {
if (!deferAccept) {
if (setSocketOptions(socket.getSocket().longValue())) {
- getPoller().add(socket.getSocket().longValue());
+ getPoller().add(socket.getSocket().longValue(), false);
} else {
// Close socket and pool
destroySocket(socket.getSocket().longValue());