protected AtomicLong registerCount = new AtomicLong(0);
protected RequestGroupInfo global = new RequestGroupInfo();
- protected ConcurrentHashMap<SocketWrapper<Long>, Http11AprProcessor> connections =
- new ConcurrentHashMap<SocketWrapper<Long>, Http11AprProcessor>();
+ protected ConcurrentHashMap<Long, Http11AprProcessor> connections =
+ new ConcurrentHashMap<Long, Http11AprProcessor>();
protected ConcurrentLinkedQueue<Http11AprProcessor> recycledProcessors =
new ConcurrentLinkedQueue<Http11AprProcessor>() {
@Override
public SocketState event(SocketWrapper<Long> socket, SocketStatus status) {
- Http11AprProcessor processor = connections.get(socket);
+ Http11AprProcessor processor = connections.get(socket.getSocket());
SocketState state = SocketState.CLOSED;
if (processor != null) {
"http11protocol.proto.error"), e);
} finally {
if (state != SocketState.LONG) {
- connections.remove(socket);
+ connections.remove(socket.getSocket());
socket.setAsync(false);
recycledProcessors.offer(processor);
if (state == SocketState.OPEN) {
if (state == SocketState.LONG || state == SocketState.ASYNC_END) {
// Need to make socket available for next processing cycle
// but no need for the poller
- connections.put(socket, processor);
- socket.setAsync(true);
+ connections.put(socket.getSocket(), processor);
+ if (processor.isAsync()) {
+ socket.setAsync(true);
+ } else if (processor.comet) {
+ ((AprEndpoint) proto.endpoint).getCometPoller().add(
+ socket.getSocket().longValue());
+ }
} else {
recycledProcessors.offer(processor);
}
@Override
public SocketState asyncDispatch(SocketWrapper<Long> socket, SocketStatus status) {
- Http11AprProcessor result = connections.get(socket);
+ Http11AprProcessor result = connections.get(socket.getSocket());
SocketState state = SocketState.CLOSED;
if (result != null) {