Initial AJP-NIO implementation.
authormarkt <markt@13f79535-47bb-0310-9956-ffa450edef68>
Sat, 14 May 2011 22:14:55 +0000 (22:14 +0000)
committermarkt <markt@13f79535-47bb-0310-9956-ffa450edef68>
Sat, 14 May 2011 22:14:55 +0000 (22:14 +0000)
Docs to follow once more testing has been completed.

git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1103243 13f79535-47bb-0310-9956-ffa450edef68

java/org/apache/coyote/ajp/AjpNioProcessor.java [new file with mode: 0644]
java/org/apache/coyote/ajp/AjpNioProtocol.java [new file with mode: 0644]

diff --git a/java/org/apache/coyote/ajp/AjpNioProcessor.java b/java/org/apache/coyote/ajp/AjpNioProcessor.java
new file mode 100644 (file)
index 0000000..b0514e3
--- /dev/null
@@ -0,0 +1,715 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.coyote.ajp;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Selector;
+import java.util.concurrent.Executor;
+
+import org.apache.coyote.ActionCode;
+import org.apache.coyote.OutputBuffer;
+import org.apache.coyote.Request;
+import org.apache.coyote.RequestInfo;
+import org.apache.coyote.Response;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.util.ExceptionUtils;
+import org.apache.tomcat.util.buf.ByteChunk;
+import org.apache.tomcat.util.buf.HexUtils;
+import org.apache.tomcat.util.http.HttpMessages;
+import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
+import org.apache.tomcat.util.net.NioChannel;
+import org.apache.tomcat.util.net.NioEndpoint;
+import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment;
+import org.apache.tomcat.util.net.NioSelectorPool;
+import org.apache.tomcat.util.net.SocketStatus;
+
+
+/**
+ * Processes AJP requests using NIO.
+ */
+public class AjpNioProcessor extends AbstractAjpProcessor {
+
+
+    /**
+     * Logger.
+     */
+    private static final Log log = LogFactory.getLog(AjpNioProcessor.class);
+    @Override
+    protected Log getLog() {
+        return log;
+    }
+
+    // ----------------------------------------------------------- Constructors
+
+
+    public AjpNioProcessor(int packetSize, NioEndpoint endpoint) {
+
+        this.endpoint = endpoint;
+
+        request = new Request();
+        request.setInputBuffer(new SocketInputBuffer());
+
+        response = new Response();
+        response.setHook(this);
+        response.setOutputBuffer(new SocketOutputBuffer());
+        request.setResponse(response);
+
+        pool = endpoint.getSelectorPool();
+
+        this.packetSize = packetSize;
+        requestHeaderMessage = new AjpMessage(packetSize);
+        responseHeaderMessage = new AjpMessage(packetSize);
+        bodyMessage = new AjpMessage(packetSize);
+
+        // Set the get body message buffer
+        AjpMessage getBodyMessage = new AjpMessage(16);
+        getBodyMessage.reset();
+        getBodyMessage.appendByte(Constants.JK_AJP13_GET_BODY_CHUNK);
+        // Adjust allowed size if packetSize != default (Constants.MAX_PACKET_SIZE)
+        getBodyMessage.appendInt(Constants.MAX_READ_SIZE + packetSize - Constants.MAX_PACKET_SIZE);
+        getBodyMessage.end();
+        getBodyMessageArray = new byte[getBodyMessage.getLen()];
+        System.arraycopy(getBodyMessage.getBuffer(), 0, getBodyMessageArray, 
+                         0, getBodyMessage.getLen());
+
+        // Cause loading of HexUtils
+        HexUtils.load();
+
+        // Cause loading of HttpMessages
+        HttpMessages.getMessage(200);
+
+    }
+
+
+    // ----------------------------------------------------- Instance Variables
+
+
+    /**
+     * Socket associated with the current connection.
+     */
+    protected NioChannel socket;
+
+    
+    protected NioSelectorPool pool;
+
+
+    /**
+     * Input buffer.
+     */
+    protected ByteBuffer readBuffer;
+    protected int readBufferEnd;
+    
+    /**
+     * Output buffer.
+     */
+    protected ByteBuffer writeBuffer;
+
+    
+    /**
+     * Direct buffer used for sending right away a get body message.
+     */
+    protected final byte[] getBodyMessageArray;
+
+
+    /**
+     * Direct buffer used for sending right away a pong message.
+     */
+    protected static final byte[] pongMessageArray;
+
+
+    /**
+     * End message array.
+     */
+    protected static final byte[] endMessageArray;
+
+
+    /**
+     * Flush message array.
+     */
+    protected static final byte[] flushMessageArray;
+    
+    // ----------------------------------------------------- Static Initializer
+
+
+    static {
+
+        // Set the read body message buffer
+        AjpMessage pongMessage = new AjpMessage(16);
+        pongMessage.reset();
+        pongMessage.appendByte(Constants.JK_AJP13_CPONG_REPLY);
+        pongMessage.end();
+        pongMessageArray = new byte[pongMessage.getLen()];
+        System.arraycopy(pongMessage.getBuffer(), 0, pongMessageArray, 
+                0, pongMessage.getLen());
+
+        // Allocate the end message array
+        AjpMessage endMessage = new AjpMessage(16);
+        endMessage.reset();
+        endMessage.appendByte(Constants.JK_AJP13_END_RESPONSE);
+        endMessage.appendByte(1);
+        endMessage.end();
+        endMessageArray = new byte[endMessage.getLen()];
+        System.arraycopy(endMessage.getBuffer(), 0, endMessageArray, 0,
+                endMessage.getLen());
+
+        // Allocate the flush message array
+        AjpMessage flushMessage = new AjpMessage(16);
+        flushMessage.reset();
+        flushMessage.appendByte(Constants.JK_AJP13_SEND_BODY_CHUNK);
+        flushMessage.appendInt(0);
+        flushMessage.appendByte(0);
+        flushMessage.end();
+        flushMessageArray = new byte[flushMessage.getLen()];
+        System.arraycopy(flushMessage.getBuffer(), 0, flushMessageArray, 0,
+                flushMessage.getLen());
+
+    }
+
+
+    // ------------------------------------------------------------- Properties
+
+
+    // --------------------------------------------------------- Public Methods
+
+
+    /**
+     * Process pipelined HTTP requests using the specified input and output
+     * streams.
+     *
+     * @throws IOException error during an I/O operation
+     */
+    public SocketState process(NioChannel socket)
+        throws IOException {
+        RequestInfo rp = request.getRequestProcessor();
+        rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
+
+        // Setting up the socket
+        this.socket = socket;
+        readBuffer = socket.getBufHandler().getReadBuffer();
+        readBufferEnd = 0;
+        readBuffer.clear();
+        writeBuffer = socket.getBufHandler().getWriteBuffer();
+        writeBuffer.clear();
+        
+        int soTimeout = -1;
+        final KeyAttachment ka = (KeyAttachment)socket.getAttachment(false);
+        if (keepAliveTimeout > 0) {
+            ka.setTimeout(soTimeout);
+        }
+
+        // Error flag
+        error = false;
+
+        while (!error && !endpoint.isPaused()) {
+
+            // Parsing the request header
+            try {
+                // Set keep alive timeout if enabled
+                if (keepAliveTimeout > 0) {
+                    ka.setTimeout(keepAliveTimeout);
+                }
+                // Get first message of the request
+                if (!readMessage(requestHeaderMessage)) {
+                    // This means a connection timeout
+                    rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
+                    break;
+                }
+                // Set back timeout if keep alive timeout is enabled
+                if (keepAliveTimeout > 0) {
+                    ka.setTimeout(soTimeout);
+                }
+                // Check message type, process right away and break if
+                // not regular request processing
+                int type = requestHeaderMessage.getByte();
+                if (type == Constants.JK_AJP13_CPING_REQUEST) {
+                    try {
+                        output(pongMessageArray, 0, pongMessageArray.length);
+                    } catch (IOException e) {
+                        error = true;
+                    }
+                    continue;
+                } else if(type != Constants.JK_AJP13_FORWARD_REQUEST) {
+                    // Usually the servlet didn't read the previous request body
+                    if(log.isDebugEnabled()) {
+                        log.debug("Unexpected message: "+type);
+                    }
+                    continue;
+                }
+
+                request.setStartTime(System.currentTimeMillis());
+            } catch (IOException e) {
+                error = true;
+                break;
+            } catch (Throwable t) {
+                ExceptionUtils.handleThrowable(t);
+                log.debug(sm.getString("ajpprocessor.header.error"), t);
+                // 400 - Bad Request
+                response.setStatus(400);
+                adapter.log(request, response, 0);
+                error = true;
+            }
+
+            if (!error) {
+                // Setting up filters, and parse some request headers
+                rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
+                try {
+                    prepareRequest();
+                } catch (Throwable t) {
+                    ExceptionUtils.handleThrowable(t);
+                    log.debug(sm.getString("ajpprocessor.request.prepare"), t);
+                    // 400 - Internal Server Error
+                    response.setStatus(400);
+                    adapter.log(request, response, 0);
+                    error = true;
+                }
+            }
+
+            if (endpoint.isPaused()) {
+                // 503 - Service unavailable
+                response.setStatus(503);
+                adapter.log(request, response, 0);
+                error = true;
+            }
+
+            // Process the request in the adapter
+            if (!error) {
+                try {
+                    rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
+                    adapter.service(request, response);
+                } catch (InterruptedIOException e) {
+                    error = true;
+                } catch (Throwable t) {
+                    ExceptionUtils.handleThrowable(t);
+                    log.error(sm.getString("ajpprocessor.request.process"), t);
+                    // 500 - Internal Server Error
+                    response.setStatus(500);
+                    adapter.log(request, response, 0);
+                    error = true;
+                }
+            }
+            
+            if (isAsync() && !error) {
+                break;
+            }
+
+            // Finish the response if not done yet
+            if (!finished) {
+                try {
+                    finish();
+                } catch (Throwable t) {
+                    ExceptionUtils.handleThrowable(t);
+                    error = true;
+                }
+            }
+
+            // If there was an error, make sure the request is counted as
+            // and error, and update the statistics counter
+            if (error) {
+                response.setStatus(500);
+            }
+            request.updateCounters();
+
+            rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);
+            recycle();
+        }
+        
+        rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
+
+        if (isAsync() && !error && !endpoint.isPaused()) {
+            return SocketState.LONG;
+        } else {
+            readBuffer = null;
+            writeBuffer = null;
+            return SocketState.CLOSED;
+        }
+        
+    }
+
+    
+    @Override
+    public void recycle() {
+        if (readBuffer != null) {
+            readBuffer.clear();
+        }
+        readBufferEnd = 0;
+        super.recycle();
+    }
+
+    public SocketState asyncDispatch(SocketStatus status) {
+
+        RequestInfo rp = request.getRequestProcessor();
+        try {
+            rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
+            error = !adapter.asyncDispatch(request, response, status);
+        } catch (InterruptedIOException e) {
+            error = true;
+        } catch (Throwable t) {
+            ExceptionUtils.handleThrowable(t);
+            log.error(sm.getString("http11processor.request.process"), t);
+            // 500 - Internal Server Error
+            response.setStatus(500);
+            adapter.log(request, response, 0);
+            error = true;
+        }
+
+        rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
+
+        if (isAsync()) {
+            if (error) {
+                response.setStatus(500);
+                request.updateCounters();
+                readBuffer = null;
+                writeBuffer = null;
+                return SocketState.CLOSED;
+            } else {
+                return SocketState.LONG;
+            }
+        } else {
+            if (error) {
+                response.setStatus(500);
+            }
+            request.updateCounters();
+            readBuffer = null;
+            writeBuffer = null;
+            return SocketState.CLOSED;
+        }
+
+
+    }
+
+    
+    @Override
+    public Executor getExecutor() {
+        return endpoint.getExecutor();
+    }
+    
+    
+    // ----------------------------------------------------- ActionHook Methods
+
+
+    /**
+     * Send an action to the connector.
+     *
+     * @param actionCode Type of the action
+     * @param param Action parameter
+     */
+    @Override
+    protected void actionInternal(ActionCode actionCode, Object param) {
+
+        if (actionCode == ActionCode.ASYNC_COMPLETE) {
+            if (asyncStateMachine.asyncComplete()) {
+                ((NioEndpoint)endpoint).processSocket(this.socket,
+                        SocketStatus.OPEN, false);
+            }
+        } else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) {
+            if (param == null) return;
+            long timeout = ((Long)param).longValue();
+            final KeyAttachment ka = (KeyAttachment)socket.getAttachment(false);
+            if (keepAliveTimeout > 0) {
+                ka.setTimeout(timeout);
+            }
+        } else if (actionCode == ActionCode.ASYNC_DISPATCH) {
+            if (asyncStateMachine.asyncDispatch()) {
+                ((NioEndpoint)endpoint).processSocket(this.socket,
+                        SocketStatus.OPEN, true);
+            }
+        }
+    }
+
+
+    // ------------------------------------------------------ Protected Methods
+
+    @Override
+    protected void output(byte[] src, int offset, int length)
+            throws IOException {
+        ByteBuffer writeBuffer = socket.getBufHandler() .getWriteBuffer();
+
+        writeBuffer.put(src, offset, length);
+        
+        writeBuffer.flip();
+        
+        NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
+        if ( att == null ) throw new IOException("Key must be cancelled");
+        long writeTimeout = att.getTimeout();
+        Selector selector = null;
+        try {
+            selector = pool.get();
+        } catch ( IOException x ) {
+            //ignore
+        }
+        try {
+            pool.write(writeBuffer, socket, selector, writeTimeout, true,
+                    null);
+        }finally { 
+            if ( selector != null ) pool.put(selector);
+        }
+        writeBuffer.clear();
+    }
+
+    /**
+     * Finish AJP response.
+     */
+    @Override
+    protected void finish() throws IOException {
+
+        if (!response.isCommitted()) {
+            // Validate and write response headers
+            try {
+                prepareResponse();
+            } catch (IOException e) {
+                // Set error flag
+                error = true;
+            }
+        }
+
+        if (finished)
+            return;
+
+        finished = true;
+
+        // Add the end message
+        output(endMessageArray, 0, endMessageArray.length);
+    }
+
+
+    /**
+     * Read at least the specified amount of bytes, and place them
+     * in the input buffer.
+     */
+    protected void read(byte[] buf, int pos, int n)
+        throws IOException {
+
+        int read = readBufferEnd - pos;
+        int res = 0;
+        while (read < n) {
+            res = readSocket(buf, read + pos, true);
+            if (res > 0) {
+                read += res;
+            } else {
+                throw new IOException(sm.getString("ajpprotocol.failedread"));
+            }
+        }
+        readBufferEnd += read;
+    }
+
+    private int readSocket(byte[] buf, int pos, boolean block) throws IOException {
+        int nRead = 0;
+        socket.getBufHandler().getReadBuffer().clear();
+        if ( block ) {
+            Selector selector = null;
+            try {
+                selector = pool.get();
+            } catch ( IOException x ) {
+                // Ignore
+            }
+            try {
+                NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
+                if ( att == null ) throw new IOException("Key must be cancelled.");
+                nRead = pool.read(socket.getBufHandler().getReadBuffer(),socket,selector,att.getTimeout());
+            } catch ( EOFException eof ) {
+                nRead = -1;
+            } finally { 
+                if ( selector != null ) pool.put(selector);
+            }
+        } else {
+            nRead = socket.read(socket.getBufHandler().getReadBuffer());
+        }
+        if (nRead > 0) {
+            socket.getBufHandler().getReadBuffer().flip();
+            socket.getBufHandler().getReadBuffer().limit(nRead);
+            socket.getBufHandler().getReadBuffer().get(buf, pos, nRead);
+            return nRead;
+        } else if (nRead == -1) {
+            //return false;
+            throw new EOFException(sm.getString("iib.eof.error"));
+        } else {
+            return 0;
+        }
+    }
+
+
+    /** Receive a chunk of data. Called to implement the
+     *  'special' packet in ajp13 and to receive the data
+     *  after we send a GET_BODY packet
+     */
+    @Override
+    public boolean receive() throws IOException {
+
+        first = false;
+        bodyMessage.reset();
+        if (!readMessage(bodyMessage)) {
+            // Invalid message
+            return false;
+        }
+        // No data received.
+        if (bodyMessage.getLen() == 0) {
+            // just the header
+            // Don't mark 'end of stream' for the first chunk.
+            return false;
+        }
+        int blen = bodyMessage.peekInt();
+        if (blen == 0) {
+            return false;
+        }
+
+        bodyMessage.getBytes(bodyBytes);
+        empty = false;
+        return true;
+    }
+
+    /**
+     * Get more request body data from the web server and store it in the
+     * internal buffer.
+     *
+     * @return true if there is more data, false if not.
+     */
+    @Override
+    protected boolean refillReadBuffer() throws IOException {
+        // If the server returns an empty packet, assume that that end of
+        // the stream has been reached (yuck -- fix protocol??).
+        // FORM support
+        if (replay) {
+            endOfStream = true; // we've read everything there is
+        }
+        if (endOfStream) {
+            return false;
+        }
+
+        // Request more data immediately
+        output(getBodyMessageArray, 0, getBodyMessageArray.length);
+
+        boolean moreData = receive();
+        if( !moreData ) {
+            endOfStream = true;
+        }
+        return moreData;
+    }
+
+
+    /**
+     * Read an AJP message.
+     *
+     * @return true if the message has been read, false if the short read
+     *         didn't return anything
+     * @throws IOException any other failure, including incomplete reads
+     */
+    protected boolean readMessage(AjpMessage message)
+        throws IOException {
+
+        byte[] buf = message.getBuffer();
+        int headerLength = message.getHeaderLength();
+
+        read(buf, 0, headerLength);
+
+        int messageLength = message.processHeader();
+        if (messageLength < 0) {
+            // Invalid AJP header signature
+            // TODO: Throw some exception and close the connection to frontend.
+            return false;
+        }
+        else if (messageLength == 0) {
+            // Zero length message.
+            return true;
+        }
+        else {
+            if (messageLength > buf.length) {
+                // Message too long for the buffer
+                // Need to trigger a 400 response
+                throw new IllegalArgumentException(sm.getString(
+                        "ajpprocessor.header.tooLong",
+                        Integer.valueOf(messageLength),
+                        Integer.valueOf(buf.length)));
+            }
+            read(buf, headerLength, messageLength);
+            return true;
+        }
+    }
+
+
+    /**
+     * Callback to write data from the buffer.
+     */
+    @Override
+    protected void flush(boolean explicit) throws IOException {
+        if (explicit && !finished) {
+            // Send the flush message
+            output(flushMessageArray, 0, flushMessageArray.length);
+        }
+    }
+
+
+    // ----------------------------------- OutputStreamOutputBuffer Inner Class
+
+
+    /**
+     * This class is an output buffer which will write data to an output
+     * stream.
+     */
+    protected class SocketOutputBuffer implements OutputBuffer {
+
+        /**
+         * Write chunk.
+         */
+        @Override
+        public int doWrite(ByteChunk chunk, Response res)
+            throws IOException {
+
+            if (!response.isCommitted()) {
+                // Validate and write response headers
+                try {
+                    prepareResponse();
+                } catch (IOException e) {
+                    // Set error flag
+                    error = true;
+                }
+            }
+
+            int len = chunk.getLength();
+            // 4 - hardcoded, byte[] marshaling overhead
+            // Adjust allowed size if packetSize != default (Constants.MAX_PACKET_SIZE)
+            int chunkSize = Constants.MAX_SEND_SIZE + packetSize - Constants.MAX_PACKET_SIZE;
+            int off = 0;
+            while (len > 0) {
+                int thisTime = len;
+                if (thisTime > chunkSize) {
+                    thisTime = chunkSize;
+                }
+                len -= thisTime;
+                responseHeaderMessage.reset();
+                responseHeaderMessage.appendByte(Constants.JK_AJP13_SEND_BODY_CHUNK);
+                responseHeaderMessage.appendBytes(chunk.getBytes(), chunk.getOffset() + off, thisTime);
+                responseHeaderMessage.end();
+                output(responseHeaderMessage.getBuffer(), 0, responseHeaderMessage.getLen());
+
+                off += thisTime;
+            }
+
+            byteCount += chunk.getLength();
+            return chunk.getLength();
+        }
+
+        @Override
+        public long getBytesWritten() {
+            return byteCount;
+        }
+    }
+}
diff --git a/java/org/apache/coyote/ajp/AjpNioProtocol.java b/java/org/apache/coyote/ajp/AjpNioProtocol.java
new file mode 100644 (file)
index 0000000..0f13ec6
--- /dev/null
@@ -0,0 +1,324 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.coyote.ajp;
+
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.management.ObjectName;
+
+import org.apache.coyote.RequestGroupInfo;
+import org.apache.coyote.RequestInfo;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.util.ExceptionUtils;
+import org.apache.tomcat.util.modeler.Registry;
+import org.apache.tomcat.util.net.AbstractEndpoint;
+import org.apache.tomcat.util.net.NioChannel;
+import org.apache.tomcat.util.net.NioEndpoint;
+import org.apache.tomcat.util.net.NioEndpoint.Handler;
+import org.apache.tomcat.util.net.SSLImplementation;
+import org.apache.tomcat.util.net.SocketStatus;
+
+
+/**
+ * Abstract the protocol implementation, including threading, etc.
+ * Processor is single threaded and specific to stream-based protocols,
+ * will not fit Jk protocols like JNI.
+ */
+public class AjpNioProtocol extends AbstractAjpProtocol {
+    
+    
+    private static final Log log = LogFactory.getLog(AjpNioProtocol.class);
+
+    @Override
+    protected Log getLog() { return log; }
+
+
+    @Override
+    protected AbstractEndpoint.Handler getHandler() {
+        return cHandler;
+    }
+
+
+    // ------------------------------------------------------------ Constructor
+
+
+    public AjpNioProtocol() {
+        endpoint = new NioEndpoint();
+        cHandler = new AjpConnectionHandler(this);
+        ((NioEndpoint) endpoint).setHandler(cHandler);
+        setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
+        setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
+        setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
+        // AJP does not use Send File
+        ((NioEndpoint) endpoint).setUseSendfile(false);
+    }
+
+    
+    // ----------------------------------------------------- Instance Variables
+
+
+    /**
+     * Connection handler for AJP.
+     */
+    private AjpConnectionHandler cHandler;
+
+
+    // --------------------------------------------------------- Public Methods
+
+
+    // AJP does not use Send File.
+    public boolean getUseSendfile() { return false; }
+
+
+    // ----------------------------------------------------- JMX related methods
+
+    @Override
+    protected String getNamePrefix() {
+        return ("ajp-nio");
+    }
+
+
+    // --------------------------------------  AjpConnectionHandler Inner Class
+
+
+    protected static class AjpConnectionHandler implements Handler {
+
+        protected AjpNioProtocol proto;
+        protected AtomicLong registerCount = new AtomicLong(0);
+        protected RequestGroupInfo global = new RequestGroupInfo();
+
+        protected ConcurrentHashMap<NioChannel, AjpNioProcessor> connections =
+            new ConcurrentHashMap<NioChannel, AjpNioProcessor>();
+
+        protected ConcurrentLinkedQueue<AjpNioProcessor> recycledProcessors = 
+            new ConcurrentLinkedQueue<AjpNioProcessor>() {
+            private static final long serialVersionUID = 1L;
+            protected AtomicInteger size = new AtomicInteger(0);
+            @Override
+            public boolean offer(AjpNioProcessor processor) {
+                boolean offer = (proto.processorCache == -1) ? true : (size.get() < proto.processorCache);
+                //avoid over growing our cache or add after we have stopped
+                boolean result = false;
+                if ( offer ) {
+                    result = super.offer(processor);
+                    if ( result ) {
+                        size.incrementAndGet();
+                    }
+                }
+                if (!result) unregister(processor);
+                return result;
+            }
+            
+            @Override
+            public AjpNioProcessor poll() {
+                AjpNioProcessor result = super.poll();
+                if ( result != null ) {
+                    size.decrementAndGet();
+                }
+                return result;
+            }
+            
+            @Override
+            public void clear() {
+                AjpNioProcessor next = poll();
+                while ( next != null ) {
+                    unregister(next);
+                    next = poll();
+                }
+                super.clear();
+                size.set(0);
+            }
+        };
+
+        public AjpConnectionHandler(AjpNioProtocol proto) {
+            this.proto = proto;
+        }
+
+        @Override
+        public Object getGlobal() {
+            return global;
+        }
+
+        @Override
+        public void recycle() {
+            recycledProcessors.clear();
+        }
+        
+        @Override
+        public SSLImplementation getSslImplementation() {
+            // AJP does not support SSL
+            return null;
+        }
+
+        @Override
+        public void release(SocketChannel socket) {
+            if (log.isDebugEnabled()) 
+                log.debug("Iterating through our connections to release a socket channel:"+socket);
+            boolean released = false;
+            Iterator<java.util.Map.Entry<NioChannel, AjpNioProcessor>> it = connections.entrySet().iterator();
+            while (it.hasNext()) {
+                java.util.Map.Entry<NioChannel, AjpNioProcessor> entry = it.next();
+                if (entry.getKey().getIOChannel()==socket) {
+                    it.remove();
+                    AjpNioProcessor result = entry.getValue();
+                    result.recycle();
+                    unregister(result);
+                    released = true;
+                    break;
+                }
+            }
+            if (log.isDebugEnabled()) 
+                log.debug("Done iterating through our connections to release a socket channel:"+socket +" released:"+released);
+        }
+        
+        /**
+         * Use this only if the processor is not available, otherwise use
+         * {@link #release(NioChannel, Http11NioProcessor).
+         */
+        @Override
+        public void release(NioChannel socket) {
+            AjpNioProcessor processor = connections.remove(socket);
+            if (processor != null) {
+                processor.recycle();
+                recycledProcessors.offer(processor);
+            }
+        }
+
+
+        public void release(NioChannel socket, AjpNioProcessor processor) {
+            connections.remove(socket);
+            processor.recycle();
+            recycledProcessors.offer(processor);
+        }
+
+        // FIXME: Support for this could be added in AJP as well
+        @Override
+        public SocketState event(NioChannel socket, SocketStatus status) {
+            return SocketState.CLOSED;
+        }
+        
+        @Override
+        public SocketState process(NioChannel socket) {
+            AjpNioProcessor processor = recycledProcessors.poll();
+            try {
+                if (processor == null) {
+                    processor = createProcessor();
+                }
+
+                SocketState state = processor.process(socket);
+                if (state == SocketState.LONG) {
+                    // Check if the post processing is going to change the state
+                    state = processor.asyncPostProcess();
+                }
+                if (state == SocketState.LONG || state == SocketState.ASYNC_END) {
+                    // Need to make socket available for next processing cycle
+                    // but no need for the poller
+                    connections.put(socket, processor);
+                    NioEndpoint.KeyAttachment att =
+                            (NioEndpoint.KeyAttachment)socket.getAttachment(false);
+                    att.setAsync(true);
+                } else {
+                    processor.recycle();
+                    recycledProcessors.offer(processor);
+                }
+                return state;
+
+            } catch(java.net.SocketException e) {
+                // SocketExceptions are normal
+                log.debug(sm.getString(
+                        "ajpprotocol.proto.socketexception.debug"), e);
+            } catch (java.io.IOException e) {
+                // IOExceptions are normal
+                log.debug(sm.getString(
+                        "ajpprotocol.proto.ioexception.debug"), e);
+            }
+            // Future developers: if you discover any other
+            // rare-but-nonfatal exceptions, catch them here, and log as
+            // above.
+            catch (Throwable e) {
+                ExceptionUtils.handleThrowable(e);
+                // any other exception or error is odd. Here we log it
+                // with "ERROR" level, so it will show up even on
+                // less-than-verbose logs.
+                log.error(sm.getString("ajpprotocol.proto.error"), e);
+            }
+            processor.recycle();
+            recycledProcessors.offer(processor);
+            return SocketState.CLOSED;
+        }
+
+        protected AjpNioProcessor createProcessor() {
+            AjpNioProcessor processor = new AjpNioProcessor(proto.packetSize, (NioEndpoint)proto.endpoint);
+            processor.setAdapter(proto.adapter);
+            processor.setTomcatAuthentication(proto.tomcatAuthentication);
+            processor.setRequiredSecret(proto.requiredSecret);
+            processor.setClientCertProvider(proto.getClientCertProvider());
+            register(processor);
+            return processor;
+        }
+        
+        protected void register(AjpNioProcessor processor) {
+            if (proto.getDomain() != null) {
+                synchronized (this) {
+                    try {
+                        long count = registerCount.incrementAndGet();
+                        RequestInfo rp = processor.getRequest().getRequestProcessor();
+                        rp.setGlobalProcessor(global);
+                        ObjectName rpName = new ObjectName
+                            (proto.getDomain() + ":type=RequestProcessor,worker="
+                                + proto.getName() + ",name=AjpRequest" + count);
+                        if (log.isDebugEnabled()) {
+                            log.debug("Register " + rpName);
+                        }
+                        Registry.getRegistry(null, null).registerComponent(rp, rpName, null);
+                        rp.setRpName(rpName);
+                    } catch (Exception e) {
+                        log.warn("Error registering request");
+                    }
+                }
+            }
+        }
+
+        protected void unregister(AjpNioProcessor processor) {
+            if (proto.getDomain() != null) {
+                synchronized (this) {
+                    try {
+                        RequestInfo rp = processor.getRequest().getRequestProcessor();
+                        rp.setGlobalProcessor(null);
+                        ObjectName rpName = rp.getRpName();
+                        if (log.isDebugEnabled()) {
+                            log.debug("Unregister " + rpName);
+                        }
+                        Registry.getRegistry(null, null).unregisterComponent(rpName);
+                        rp.setRpName(null);
+                    } catch (Exception e) {
+                        log.warn("Error unregistering request", e);
+                    }
+                }
+            }
+        }
+
+    }
+
+}