--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.coyote.ajp;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Selector;
+import java.util.concurrent.Executor;
+
+import org.apache.coyote.ActionCode;
+import org.apache.coyote.OutputBuffer;
+import org.apache.coyote.Request;
+import org.apache.coyote.RequestInfo;
+import org.apache.coyote.Response;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.util.ExceptionUtils;
+import org.apache.tomcat.util.buf.ByteChunk;
+import org.apache.tomcat.util.buf.HexUtils;
+import org.apache.tomcat.util.http.HttpMessages;
+import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
+import org.apache.tomcat.util.net.NioChannel;
+import org.apache.tomcat.util.net.NioEndpoint;
+import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment;
+import org.apache.tomcat.util.net.NioSelectorPool;
+import org.apache.tomcat.util.net.SocketStatus;
+
+
+/**
+ * Processes AJP requests using NIO.
+ */
+public class AjpNioProcessor extends AbstractAjpProcessor {
+
+
+ /**
+ * Logger.
+ */
+ private static final Log log = LogFactory.getLog(AjpNioProcessor.class);
+ @Override
+ protected Log getLog() {
+ return log;
+ }
+
+ // ----------------------------------------------------------- Constructors
+
+
+ public AjpNioProcessor(int packetSize, NioEndpoint endpoint) {
+
+ this.endpoint = endpoint;
+
+ request = new Request();
+ request.setInputBuffer(new SocketInputBuffer());
+
+ response = new Response();
+ response.setHook(this);
+ response.setOutputBuffer(new SocketOutputBuffer());
+ request.setResponse(response);
+
+ pool = endpoint.getSelectorPool();
+
+ this.packetSize = packetSize;
+ requestHeaderMessage = new AjpMessage(packetSize);
+ responseHeaderMessage = new AjpMessage(packetSize);
+ bodyMessage = new AjpMessage(packetSize);
+
+ // Set the get body message buffer
+ AjpMessage getBodyMessage = new AjpMessage(16);
+ getBodyMessage.reset();
+ getBodyMessage.appendByte(Constants.JK_AJP13_GET_BODY_CHUNK);
+ // Adjust allowed size if packetSize != default (Constants.MAX_PACKET_SIZE)
+ getBodyMessage.appendInt(Constants.MAX_READ_SIZE + packetSize - Constants.MAX_PACKET_SIZE);
+ getBodyMessage.end();
+ getBodyMessageArray = new byte[getBodyMessage.getLen()];
+ System.arraycopy(getBodyMessage.getBuffer(), 0, getBodyMessageArray,
+ 0, getBodyMessage.getLen());
+
+ // Cause loading of HexUtils
+ HexUtils.load();
+
+ // Cause loading of HttpMessages
+ HttpMessages.getMessage(200);
+
+ }
+
+
+ // ----------------------------------------------------- Instance Variables
+
+
+ /**
+ * Socket associated with the current connection.
+ */
+ protected NioChannel socket;
+
+
+ protected NioSelectorPool pool;
+
+
+ /**
+ * Input buffer.
+ */
+ protected ByteBuffer readBuffer;
+ protected int readBufferEnd;
+
+ /**
+ * Output buffer.
+ */
+ protected ByteBuffer writeBuffer;
+
+
+ /**
+ * Direct buffer used for sending right away a get body message.
+ */
+ protected final byte[] getBodyMessageArray;
+
+
+ /**
+ * Direct buffer used for sending right away a pong message.
+ */
+ protected static final byte[] pongMessageArray;
+
+
+ /**
+ * End message array.
+ */
+ protected static final byte[] endMessageArray;
+
+
+ /**
+ * Flush message array.
+ */
+ protected static final byte[] flushMessageArray;
+
+ // ----------------------------------------------------- Static Initializer
+
+
+ static {
+
+ // Set the read body message buffer
+ AjpMessage pongMessage = new AjpMessage(16);
+ pongMessage.reset();
+ pongMessage.appendByte(Constants.JK_AJP13_CPONG_REPLY);
+ pongMessage.end();
+ pongMessageArray = new byte[pongMessage.getLen()];
+ System.arraycopy(pongMessage.getBuffer(), 0, pongMessageArray,
+ 0, pongMessage.getLen());
+
+ // Allocate the end message array
+ AjpMessage endMessage = new AjpMessage(16);
+ endMessage.reset();
+ endMessage.appendByte(Constants.JK_AJP13_END_RESPONSE);
+ endMessage.appendByte(1);
+ endMessage.end();
+ endMessageArray = new byte[endMessage.getLen()];
+ System.arraycopy(endMessage.getBuffer(), 0, endMessageArray, 0,
+ endMessage.getLen());
+
+ // Allocate the flush message array
+ AjpMessage flushMessage = new AjpMessage(16);
+ flushMessage.reset();
+ flushMessage.appendByte(Constants.JK_AJP13_SEND_BODY_CHUNK);
+ flushMessage.appendInt(0);
+ flushMessage.appendByte(0);
+ flushMessage.end();
+ flushMessageArray = new byte[flushMessage.getLen()];
+ System.arraycopy(flushMessage.getBuffer(), 0, flushMessageArray, 0,
+ flushMessage.getLen());
+
+ }
+
+
+ // ------------------------------------------------------------- Properties
+
+
+ // --------------------------------------------------------- Public Methods
+
+
+ /**
+ * Process pipelined HTTP requests using the specified input and output
+ * streams.
+ *
+ * @throws IOException error during an I/O operation
+ */
+ public SocketState process(NioChannel socket)
+ throws IOException {
+ RequestInfo rp = request.getRequestProcessor();
+ rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
+
+ // Setting up the socket
+ this.socket = socket;
+ readBuffer = socket.getBufHandler().getReadBuffer();
+ readBufferEnd = 0;
+ readBuffer.clear();
+ writeBuffer = socket.getBufHandler().getWriteBuffer();
+ writeBuffer.clear();
+
+ int soTimeout = -1;
+ final KeyAttachment ka = (KeyAttachment)socket.getAttachment(false);
+ if (keepAliveTimeout > 0) {
+ ka.setTimeout(soTimeout);
+ }
+
+ // Error flag
+ error = false;
+
+ while (!error && !endpoint.isPaused()) {
+
+ // Parsing the request header
+ try {
+ // Set keep alive timeout if enabled
+ if (keepAliveTimeout > 0) {
+ ka.setTimeout(keepAliveTimeout);
+ }
+ // Get first message of the request
+ if (!readMessage(requestHeaderMessage)) {
+ // This means a connection timeout
+ rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
+ break;
+ }
+ // Set back timeout if keep alive timeout is enabled
+ if (keepAliveTimeout > 0) {
+ ka.setTimeout(soTimeout);
+ }
+ // Check message type, process right away and break if
+ // not regular request processing
+ int type = requestHeaderMessage.getByte();
+ if (type == Constants.JK_AJP13_CPING_REQUEST) {
+ try {
+ output(pongMessageArray, 0, pongMessageArray.length);
+ } catch (IOException e) {
+ error = true;
+ }
+ continue;
+ } else if(type != Constants.JK_AJP13_FORWARD_REQUEST) {
+ // Usually the servlet didn't read the previous request body
+ if(log.isDebugEnabled()) {
+ log.debug("Unexpected message: "+type);
+ }
+ continue;
+ }
+
+ request.setStartTime(System.currentTimeMillis());
+ } catch (IOException e) {
+ error = true;
+ break;
+ } catch (Throwable t) {
+ ExceptionUtils.handleThrowable(t);
+ log.debug(sm.getString("ajpprocessor.header.error"), t);
+ // 400 - Bad Request
+ response.setStatus(400);
+ adapter.log(request, response, 0);
+ error = true;
+ }
+
+ if (!error) {
+ // Setting up filters, and parse some request headers
+ rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
+ try {
+ prepareRequest();
+ } catch (Throwable t) {
+ ExceptionUtils.handleThrowable(t);
+ log.debug(sm.getString("ajpprocessor.request.prepare"), t);
+ // 400 - Internal Server Error
+ response.setStatus(400);
+ adapter.log(request, response, 0);
+ error = true;
+ }
+ }
+
+ if (endpoint.isPaused()) {
+ // 503 - Service unavailable
+ response.setStatus(503);
+ adapter.log(request, response, 0);
+ error = true;
+ }
+
+ // Process the request in the adapter
+ if (!error) {
+ try {
+ rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
+ adapter.service(request, response);
+ } catch (InterruptedIOException e) {
+ error = true;
+ } catch (Throwable t) {
+ ExceptionUtils.handleThrowable(t);
+ log.error(sm.getString("ajpprocessor.request.process"), t);
+ // 500 - Internal Server Error
+ response.setStatus(500);
+ adapter.log(request, response, 0);
+ error = true;
+ }
+ }
+
+ if (isAsync() && !error) {
+ break;
+ }
+
+ // Finish the response if not done yet
+ if (!finished) {
+ try {
+ finish();
+ } catch (Throwable t) {
+ ExceptionUtils.handleThrowable(t);
+ error = true;
+ }
+ }
+
+ // If there was an error, make sure the request is counted as
+ // and error, and update the statistics counter
+ if (error) {
+ response.setStatus(500);
+ }
+ request.updateCounters();
+
+ rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);
+ recycle();
+ }
+
+ rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
+
+ if (isAsync() && !error && !endpoint.isPaused()) {
+ return SocketState.LONG;
+ } else {
+ readBuffer = null;
+ writeBuffer = null;
+ return SocketState.CLOSED;
+ }
+
+ }
+
+
+ @Override
+ public void recycle() {
+ if (readBuffer != null) {
+ readBuffer.clear();
+ }
+ readBufferEnd = 0;
+ super.recycle();
+ }
+
+ public SocketState asyncDispatch(SocketStatus status) {
+
+ RequestInfo rp = request.getRequestProcessor();
+ try {
+ rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
+ error = !adapter.asyncDispatch(request, response, status);
+ } catch (InterruptedIOException e) {
+ error = true;
+ } catch (Throwable t) {
+ ExceptionUtils.handleThrowable(t);
+ log.error(sm.getString("http11processor.request.process"), t);
+ // 500 - Internal Server Error
+ response.setStatus(500);
+ adapter.log(request, response, 0);
+ error = true;
+ }
+
+ rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
+
+ if (isAsync()) {
+ if (error) {
+ response.setStatus(500);
+ request.updateCounters();
+ readBuffer = null;
+ writeBuffer = null;
+ return SocketState.CLOSED;
+ } else {
+ return SocketState.LONG;
+ }
+ } else {
+ if (error) {
+ response.setStatus(500);
+ }
+ request.updateCounters();
+ readBuffer = null;
+ writeBuffer = null;
+ return SocketState.CLOSED;
+ }
+
+
+ }
+
+
+ @Override
+ public Executor getExecutor() {
+ return endpoint.getExecutor();
+ }
+
+
+ // ----------------------------------------------------- ActionHook Methods
+
+
+ /**
+ * Send an action to the connector.
+ *
+ * @param actionCode Type of the action
+ * @param param Action parameter
+ */
+ @Override
+ protected void actionInternal(ActionCode actionCode, Object param) {
+
+ if (actionCode == ActionCode.ASYNC_COMPLETE) {
+ if (asyncStateMachine.asyncComplete()) {
+ ((NioEndpoint)endpoint).processSocket(this.socket,
+ SocketStatus.OPEN, false);
+ }
+ } else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) {
+ if (param == null) return;
+ long timeout = ((Long)param).longValue();
+ final KeyAttachment ka = (KeyAttachment)socket.getAttachment(false);
+ if (keepAliveTimeout > 0) {
+ ka.setTimeout(timeout);
+ }
+ } else if (actionCode == ActionCode.ASYNC_DISPATCH) {
+ if (asyncStateMachine.asyncDispatch()) {
+ ((NioEndpoint)endpoint).processSocket(this.socket,
+ SocketStatus.OPEN, true);
+ }
+ }
+ }
+
+
+ // ------------------------------------------------------ Protected Methods
+
+ @Override
+ protected void output(byte[] src, int offset, int length)
+ throws IOException {
+ ByteBuffer writeBuffer = socket.getBufHandler() .getWriteBuffer();
+
+ writeBuffer.put(src, offset, length);
+
+ writeBuffer.flip();
+
+ NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
+ if ( att == null ) throw new IOException("Key must be cancelled");
+ long writeTimeout = att.getTimeout();
+ Selector selector = null;
+ try {
+ selector = pool.get();
+ } catch ( IOException x ) {
+ //ignore
+ }
+ try {
+ pool.write(writeBuffer, socket, selector, writeTimeout, true,
+ null);
+ }finally {
+ if ( selector != null ) pool.put(selector);
+ }
+ writeBuffer.clear();
+ }
+
+ /**
+ * Finish AJP response.
+ */
+ @Override
+ protected void finish() throws IOException {
+
+ if (!response.isCommitted()) {
+ // Validate and write response headers
+ try {
+ prepareResponse();
+ } catch (IOException e) {
+ // Set error flag
+ error = true;
+ }
+ }
+
+ if (finished)
+ return;
+
+ finished = true;
+
+ // Add the end message
+ output(endMessageArray, 0, endMessageArray.length);
+ }
+
+
+ /**
+ * Read at least the specified amount of bytes, and place them
+ * in the input buffer.
+ */
+ protected void read(byte[] buf, int pos, int n)
+ throws IOException {
+
+ int read = readBufferEnd - pos;
+ int res = 0;
+ while (read < n) {
+ res = readSocket(buf, read + pos, true);
+ if (res > 0) {
+ read += res;
+ } else {
+ throw new IOException(sm.getString("ajpprotocol.failedread"));
+ }
+ }
+ readBufferEnd += read;
+ }
+
+ private int readSocket(byte[] buf, int pos, boolean block) throws IOException {
+ int nRead = 0;
+ socket.getBufHandler().getReadBuffer().clear();
+ if ( block ) {
+ Selector selector = null;
+ try {
+ selector = pool.get();
+ } catch ( IOException x ) {
+ // Ignore
+ }
+ try {
+ NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
+ if ( att == null ) throw new IOException("Key must be cancelled.");
+ nRead = pool.read(socket.getBufHandler().getReadBuffer(),socket,selector,att.getTimeout());
+ } catch ( EOFException eof ) {
+ nRead = -1;
+ } finally {
+ if ( selector != null ) pool.put(selector);
+ }
+ } else {
+ nRead = socket.read(socket.getBufHandler().getReadBuffer());
+ }
+ if (nRead > 0) {
+ socket.getBufHandler().getReadBuffer().flip();
+ socket.getBufHandler().getReadBuffer().limit(nRead);
+ socket.getBufHandler().getReadBuffer().get(buf, pos, nRead);
+ return nRead;
+ } else if (nRead == -1) {
+ //return false;
+ throw new EOFException(sm.getString("iib.eof.error"));
+ } else {
+ return 0;
+ }
+ }
+
+
+ /** Receive a chunk of data. Called to implement the
+ * 'special' packet in ajp13 and to receive the data
+ * after we send a GET_BODY packet
+ */
+ @Override
+ public boolean receive() throws IOException {
+
+ first = false;
+ bodyMessage.reset();
+ if (!readMessage(bodyMessage)) {
+ // Invalid message
+ return false;
+ }
+ // No data received.
+ if (bodyMessage.getLen() == 0) {
+ // just the header
+ // Don't mark 'end of stream' for the first chunk.
+ return false;
+ }
+ int blen = bodyMessage.peekInt();
+ if (blen == 0) {
+ return false;
+ }
+
+ bodyMessage.getBytes(bodyBytes);
+ empty = false;
+ return true;
+ }
+
+ /**
+ * Get more request body data from the web server and store it in the
+ * internal buffer.
+ *
+ * @return true if there is more data, false if not.
+ */
+ @Override
+ protected boolean refillReadBuffer() throws IOException {
+ // If the server returns an empty packet, assume that that end of
+ // the stream has been reached (yuck -- fix protocol??).
+ // FORM support
+ if (replay) {
+ endOfStream = true; // we've read everything there is
+ }
+ if (endOfStream) {
+ return false;
+ }
+
+ // Request more data immediately
+ output(getBodyMessageArray, 0, getBodyMessageArray.length);
+
+ boolean moreData = receive();
+ if( !moreData ) {
+ endOfStream = true;
+ }
+ return moreData;
+ }
+
+
+ /**
+ * Read an AJP message.
+ *
+ * @return true if the message has been read, false if the short read
+ * didn't return anything
+ * @throws IOException any other failure, including incomplete reads
+ */
+ protected boolean readMessage(AjpMessage message)
+ throws IOException {
+
+ byte[] buf = message.getBuffer();
+ int headerLength = message.getHeaderLength();
+
+ read(buf, 0, headerLength);
+
+ int messageLength = message.processHeader();
+ if (messageLength < 0) {
+ // Invalid AJP header signature
+ // TODO: Throw some exception and close the connection to frontend.
+ return false;
+ }
+ else if (messageLength == 0) {
+ // Zero length message.
+ return true;
+ }
+ else {
+ if (messageLength > buf.length) {
+ // Message too long for the buffer
+ // Need to trigger a 400 response
+ throw new IllegalArgumentException(sm.getString(
+ "ajpprocessor.header.tooLong",
+ Integer.valueOf(messageLength),
+ Integer.valueOf(buf.length)));
+ }
+ read(buf, headerLength, messageLength);
+ return true;
+ }
+ }
+
+
+ /**
+ * Callback to write data from the buffer.
+ */
+ @Override
+ protected void flush(boolean explicit) throws IOException {
+ if (explicit && !finished) {
+ // Send the flush message
+ output(flushMessageArray, 0, flushMessageArray.length);
+ }
+ }
+
+
+ // ----------------------------------- OutputStreamOutputBuffer Inner Class
+
+
+ /**
+ * This class is an output buffer which will write data to an output
+ * stream.
+ */
+ protected class SocketOutputBuffer implements OutputBuffer {
+
+ /**
+ * Write chunk.
+ */
+ @Override
+ public int doWrite(ByteChunk chunk, Response res)
+ throws IOException {
+
+ if (!response.isCommitted()) {
+ // Validate and write response headers
+ try {
+ prepareResponse();
+ } catch (IOException e) {
+ // Set error flag
+ error = true;
+ }
+ }
+
+ int len = chunk.getLength();
+ // 4 - hardcoded, byte[] marshaling overhead
+ // Adjust allowed size if packetSize != default (Constants.MAX_PACKET_SIZE)
+ int chunkSize = Constants.MAX_SEND_SIZE + packetSize - Constants.MAX_PACKET_SIZE;
+ int off = 0;
+ while (len > 0) {
+ int thisTime = len;
+ if (thisTime > chunkSize) {
+ thisTime = chunkSize;
+ }
+ len -= thisTime;
+ responseHeaderMessage.reset();
+ responseHeaderMessage.appendByte(Constants.JK_AJP13_SEND_BODY_CHUNK);
+ responseHeaderMessage.appendBytes(chunk.getBytes(), chunk.getOffset() + off, thisTime);
+ responseHeaderMessage.end();
+ output(responseHeaderMessage.getBuffer(), 0, responseHeaderMessage.getLen());
+
+ off += thisTime;
+ }
+
+ byteCount += chunk.getLength();
+ return chunk.getLength();
+ }
+
+ @Override
+ public long getBytesWritten() {
+ return byteCount;
+ }
+ }
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.coyote.ajp;
+
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.management.ObjectName;
+
+import org.apache.coyote.RequestGroupInfo;
+import org.apache.coyote.RequestInfo;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.util.ExceptionUtils;
+import org.apache.tomcat.util.modeler.Registry;
+import org.apache.tomcat.util.net.AbstractEndpoint;
+import org.apache.tomcat.util.net.NioChannel;
+import org.apache.tomcat.util.net.NioEndpoint;
+import org.apache.tomcat.util.net.NioEndpoint.Handler;
+import org.apache.tomcat.util.net.SSLImplementation;
+import org.apache.tomcat.util.net.SocketStatus;
+
+
+/**
+ * Abstract the protocol implementation, including threading, etc.
+ * Processor is single threaded and specific to stream-based protocols,
+ * will not fit Jk protocols like JNI.
+ */
+public class AjpNioProtocol extends AbstractAjpProtocol {
+
+
+ private static final Log log = LogFactory.getLog(AjpNioProtocol.class);
+
+ @Override
+ protected Log getLog() { return log; }
+
+
+ @Override
+ protected AbstractEndpoint.Handler getHandler() {
+ return cHandler;
+ }
+
+
+ // ------------------------------------------------------------ Constructor
+
+
+ public AjpNioProtocol() {
+ endpoint = new NioEndpoint();
+ cHandler = new AjpConnectionHandler(this);
+ ((NioEndpoint) endpoint).setHandler(cHandler);
+ setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
+ setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
+ setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
+ // AJP does not use Send File
+ ((NioEndpoint) endpoint).setUseSendfile(false);
+ }
+
+
+ // ----------------------------------------------------- Instance Variables
+
+
+ /**
+ * Connection handler for AJP.
+ */
+ private AjpConnectionHandler cHandler;
+
+
+ // --------------------------------------------------------- Public Methods
+
+
+ // AJP does not use Send File.
+ public boolean getUseSendfile() { return false; }
+
+
+ // ----------------------------------------------------- JMX related methods
+
+ @Override
+ protected String getNamePrefix() {
+ return ("ajp-nio");
+ }
+
+
+ // -------------------------------------- AjpConnectionHandler Inner Class
+
+
+ protected static class AjpConnectionHandler implements Handler {
+
+ protected AjpNioProtocol proto;
+ protected AtomicLong registerCount = new AtomicLong(0);
+ protected RequestGroupInfo global = new RequestGroupInfo();
+
+ protected ConcurrentHashMap<NioChannel, AjpNioProcessor> connections =
+ new ConcurrentHashMap<NioChannel, AjpNioProcessor>();
+
+ protected ConcurrentLinkedQueue<AjpNioProcessor> recycledProcessors =
+ new ConcurrentLinkedQueue<AjpNioProcessor>() {
+ private static final long serialVersionUID = 1L;
+ protected AtomicInteger size = new AtomicInteger(0);
+ @Override
+ public boolean offer(AjpNioProcessor processor) {
+ boolean offer = (proto.processorCache == -1) ? true : (size.get() < proto.processorCache);
+ //avoid over growing our cache or add after we have stopped
+ boolean result = false;
+ if ( offer ) {
+ result = super.offer(processor);
+ if ( result ) {
+ size.incrementAndGet();
+ }
+ }
+ if (!result) unregister(processor);
+ return result;
+ }
+
+ @Override
+ public AjpNioProcessor poll() {
+ AjpNioProcessor result = super.poll();
+ if ( result != null ) {
+ size.decrementAndGet();
+ }
+ return result;
+ }
+
+ @Override
+ public void clear() {
+ AjpNioProcessor next = poll();
+ while ( next != null ) {
+ unregister(next);
+ next = poll();
+ }
+ super.clear();
+ size.set(0);
+ }
+ };
+
+ public AjpConnectionHandler(AjpNioProtocol proto) {
+ this.proto = proto;
+ }
+
+ @Override
+ public Object getGlobal() {
+ return global;
+ }
+
+ @Override
+ public void recycle() {
+ recycledProcessors.clear();
+ }
+
+ @Override
+ public SSLImplementation getSslImplementation() {
+ // AJP does not support SSL
+ return null;
+ }
+
+ @Override
+ public void release(SocketChannel socket) {
+ if (log.isDebugEnabled())
+ log.debug("Iterating through our connections to release a socket channel:"+socket);
+ boolean released = false;
+ Iterator<java.util.Map.Entry<NioChannel, AjpNioProcessor>> it = connections.entrySet().iterator();
+ while (it.hasNext()) {
+ java.util.Map.Entry<NioChannel, AjpNioProcessor> entry = it.next();
+ if (entry.getKey().getIOChannel()==socket) {
+ it.remove();
+ AjpNioProcessor result = entry.getValue();
+ result.recycle();
+ unregister(result);
+ released = true;
+ break;
+ }
+ }
+ if (log.isDebugEnabled())
+ log.debug("Done iterating through our connections to release a socket channel:"+socket +" released:"+released);
+ }
+
+ /**
+ * Use this only if the processor is not available, otherwise use
+ * {@link #release(NioChannel, Http11NioProcessor).
+ */
+ @Override
+ public void release(NioChannel socket) {
+ AjpNioProcessor processor = connections.remove(socket);
+ if (processor != null) {
+ processor.recycle();
+ recycledProcessors.offer(processor);
+ }
+ }
+
+
+ public void release(NioChannel socket, AjpNioProcessor processor) {
+ connections.remove(socket);
+ processor.recycle();
+ recycledProcessors.offer(processor);
+ }
+
+ // FIXME: Support for this could be added in AJP as well
+ @Override
+ public SocketState event(NioChannel socket, SocketStatus status) {
+ return SocketState.CLOSED;
+ }
+
+ @Override
+ public SocketState process(NioChannel socket) {
+ AjpNioProcessor processor = recycledProcessors.poll();
+ try {
+ if (processor == null) {
+ processor = createProcessor();
+ }
+
+ SocketState state = processor.process(socket);
+ if (state == SocketState.LONG) {
+ // Check if the post processing is going to change the state
+ state = processor.asyncPostProcess();
+ }
+ if (state == SocketState.LONG || state == SocketState.ASYNC_END) {
+ // Need to make socket available for next processing cycle
+ // but no need for the poller
+ connections.put(socket, processor);
+ NioEndpoint.KeyAttachment att =
+ (NioEndpoint.KeyAttachment)socket.getAttachment(false);
+ att.setAsync(true);
+ } else {
+ processor.recycle();
+ recycledProcessors.offer(processor);
+ }
+ return state;
+
+ } catch(java.net.SocketException e) {
+ // SocketExceptions are normal
+ log.debug(sm.getString(
+ "ajpprotocol.proto.socketexception.debug"), e);
+ } catch (java.io.IOException e) {
+ // IOExceptions are normal
+ log.debug(sm.getString(
+ "ajpprotocol.proto.ioexception.debug"), e);
+ }
+ // Future developers: if you discover any other
+ // rare-but-nonfatal exceptions, catch them here, and log as
+ // above.
+ catch (Throwable e) {
+ ExceptionUtils.handleThrowable(e);
+ // any other exception or error is odd. Here we log it
+ // with "ERROR" level, so it will show up even on
+ // less-than-verbose logs.
+ log.error(sm.getString("ajpprotocol.proto.error"), e);
+ }
+ processor.recycle();
+ recycledProcessors.offer(processor);
+ return SocketState.CLOSED;
+ }
+
+ protected AjpNioProcessor createProcessor() {
+ AjpNioProcessor processor = new AjpNioProcessor(proto.packetSize, (NioEndpoint)proto.endpoint);
+ processor.setAdapter(proto.adapter);
+ processor.setTomcatAuthentication(proto.tomcatAuthentication);
+ processor.setRequiredSecret(proto.requiredSecret);
+ processor.setClientCertProvider(proto.getClientCertProvider());
+ register(processor);
+ return processor;
+ }
+
+ protected void register(AjpNioProcessor processor) {
+ if (proto.getDomain() != null) {
+ synchronized (this) {
+ try {
+ long count = registerCount.incrementAndGet();
+ RequestInfo rp = processor.getRequest().getRequestProcessor();
+ rp.setGlobalProcessor(global);
+ ObjectName rpName = new ObjectName
+ (proto.getDomain() + ":type=RequestProcessor,worker="
+ + proto.getName() + ",name=AjpRequest" + count);
+ if (log.isDebugEnabled()) {
+ log.debug("Register " + rpName);
+ }
+ Registry.getRegistry(null, null).registerComponent(rp, rpName, null);
+ rp.setRpName(rpName);
+ } catch (Exception e) {
+ log.warn("Error registering request");
+ }
+ }
+ }
+ }
+
+ protected void unregister(AjpNioProcessor processor) {
+ if (proto.getDomain() != null) {
+ synchronized (this) {
+ try {
+ RequestInfo rp = processor.getRequest().getRequestProcessor();
+ rp.setGlobalProcessor(null);
+ ObjectName rpName = rp.getRpName();
+ if (log.isDebugEnabled()) {
+ log.debug("Unregister " + rpName);
+ }
+ Registry.getRegistry(null, null).unregisterComponent(rpName);
+ rp.setRpName(null);
+ } catch (Exception e) {
+ log.warn("Error unregistering request", e);
+ }
+ }
+ }
+ }
+
+ }
+
+}