From: markt Date: Sat, 14 May 2011 22:14:55 +0000 (+0000) Subject: Initial AJP-NIO implementation. X-Git-Url: https://git.internetallee.de/?a=commitdiff_plain;h=a8d056e4ba26952fa9e5bff8f1be96450abda927;p=tomcat7.0 Initial AJP-NIO implementation. Docs to follow once more testing has been completed. git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1103243 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/java/org/apache/coyote/ajp/AjpNioProcessor.java b/java/org/apache/coyote/ajp/AjpNioProcessor.java new file mode 100644 index 000000000..b0514e3c0 --- /dev/null +++ b/java/org/apache/coyote/ajp/AjpNioProcessor.java @@ -0,0 +1,715 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.coyote.ajp; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.nio.ByteBuffer; +import java.nio.channels.Selector; +import java.util.concurrent.Executor; + +import org.apache.coyote.ActionCode; +import org.apache.coyote.OutputBuffer; +import org.apache.coyote.Request; +import org.apache.coyote.RequestInfo; +import org.apache.coyote.Response; +import org.apache.juli.logging.Log; +import org.apache.juli.logging.LogFactory; +import org.apache.tomcat.util.ExceptionUtils; +import org.apache.tomcat.util.buf.ByteChunk; +import org.apache.tomcat.util.buf.HexUtils; +import org.apache.tomcat.util.http.HttpMessages; +import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; +import org.apache.tomcat.util.net.NioChannel; +import org.apache.tomcat.util.net.NioEndpoint; +import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment; +import org.apache.tomcat.util.net.NioSelectorPool; +import org.apache.tomcat.util.net.SocketStatus; + + +/** + * Processes AJP requests using NIO. + */ +public class AjpNioProcessor extends AbstractAjpProcessor { + + + /** + * Logger. + */ + private static final Log log = LogFactory.getLog(AjpNioProcessor.class); + @Override + protected Log getLog() { + return log; + } + + // ----------------------------------------------------------- Constructors + + + public AjpNioProcessor(int packetSize, NioEndpoint endpoint) { + + this.endpoint = endpoint; + + request = new Request(); + request.setInputBuffer(new SocketInputBuffer()); + + response = new Response(); + response.setHook(this); + response.setOutputBuffer(new SocketOutputBuffer()); + request.setResponse(response); + + pool = endpoint.getSelectorPool(); + + this.packetSize = packetSize; + requestHeaderMessage = new AjpMessage(packetSize); + responseHeaderMessage = new AjpMessage(packetSize); + bodyMessage = new AjpMessage(packetSize); + + // Set the get body message buffer + AjpMessage getBodyMessage = new AjpMessage(16); + getBodyMessage.reset(); + getBodyMessage.appendByte(Constants.JK_AJP13_GET_BODY_CHUNK); + // Adjust allowed size if packetSize != default (Constants.MAX_PACKET_SIZE) + getBodyMessage.appendInt(Constants.MAX_READ_SIZE + packetSize - Constants.MAX_PACKET_SIZE); + getBodyMessage.end(); + getBodyMessageArray = new byte[getBodyMessage.getLen()]; + System.arraycopy(getBodyMessage.getBuffer(), 0, getBodyMessageArray, + 0, getBodyMessage.getLen()); + + // Cause loading of HexUtils + HexUtils.load(); + + // Cause loading of HttpMessages + HttpMessages.getMessage(200); + + } + + + // ----------------------------------------------------- Instance Variables + + + /** + * Socket associated with the current connection. + */ + protected NioChannel socket; + + + protected NioSelectorPool pool; + + + /** + * Input buffer. + */ + protected ByteBuffer readBuffer; + protected int readBufferEnd; + + /** + * Output buffer. + */ + protected ByteBuffer writeBuffer; + + + /** + * Direct buffer used for sending right away a get body message. + */ + protected final byte[] getBodyMessageArray; + + + /** + * Direct buffer used for sending right away a pong message. + */ + protected static final byte[] pongMessageArray; + + + /** + * End message array. + */ + protected static final byte[] endMessageArray; + + + /** + * Flush message array. + */ + protected static final byte[] flushMessageArray; + + // ----------------------------------------------------- Static Initializer + + + static { + + // Set the read body message buffer + AjpMessage pongMessage = new AjpMessage(16); + pongMessage.reset(); + pongMessage.appendByte(Constants.JK_AJP13_CPONG_REPLY); + pongMessage.end(); + pongMessageArray = new byte[pongMessage.getLen()]; + System.arraycopy(pongMessage.getBuffer(), 0, pongMessageArray, + 0, pongMessage.getLen()); + + // Allocate the end message array + AjpMessage endMessage = new AjpMessage(16); + endMessage.reset(); + endMessage.appendByte(Constants.JK_AJP13_END_RESPONSE); + endMessage.appendByte(1); + endMessage.end(); + endMessageArray = new byte[endMessage.getLen()]; + System.arraycopy(endMessage.getBuffer(), 0, endMessageArray, 0, + endMessage.getLen()); + + // Allocate the flush message array + AjpMessage flushMessage = new AjpMessage(16); + flushMessage.reset(); + flushMessage.appendByte(Constants.JK_AJP13_SEND_BODY_CHUNK); + flushMessage.appendInt(0); + flushMessage.appendByte(0); + flushMessage.end(); + flushMessageArray = new byte[flushMessage.getLen()]; + System.arraycopy(flushMessage.getBuffer(), 0, flushMessageArray, 0, + flushMessage.getLen()); + + } + + + // ------------------------------------------------------------- Properties + + + // --------------------------------------------------------- Public Methods + + + /** + * Process pipelined HTTP requests using the specified input and output + * streams. + * + * @throws IOException error during an I/O operation + */ + public SocketState process(NioChannel socket) + throws IOException { + RequestInfo rp = request.getRequestProcessor(); + rp.setStage(org.apache.coyote.Constants.STAGE_PARSE); + + // Setting up the socket + this.socket = socket; + readBuffer = socket.getBufHandler().getReadBuffer(); + readBufferEnd = 0; + readBuffer.clear(); + writeBuffer = socket.getBufHandler().getWriteBuffer(); + writeBuffer.clear(); + + int soTimeout = -1; + final KeyAttachment ka = (KeyAttachment)socket.getAttachment(false); + if (keepAliveTimeout > 0) { + ka.setTimeout(soTimeout); + } + + // Error flag + error = false; + + while (!error && !endpoint.isPaused()) { + + // Parsing the request header + try { + // Set keep alive timeout if enabled + if (keepAliveTimeout > 0) { + ka.setTimeout(keepAliveTimeout); + } + // Get first message of the request + if (!readMessage(requestHeaderMessage)) { + // This means a connection timeout + rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); + break; + } + // Set back timeout if keep alive timeout is enabled + if (keepAliveTimeout > 0) { + ka.setTimeout(soTimeout); + } + // Check message type, process right away and break if + // not regular request processing + int type = requestHeaderMessage.getByte(); + if (type == Constants.JK_AJP13_CPING_REQUEST) { + try { + output(pongMessageArray, 0, pongMessageArray.length); + } catch (IOException e) { + error = true; + } + continue; + } else if(type != Constants.JK_AJP13_FORWARD_REQUEST) { + // Usually the servlet didn't read the previous request body + if(log.isDebugEnabled()) { + log.debug("Unexpected message: "+type); + } + continue; + } + + request.setStartTime(System.currentTimeMillis()); + } catch (IOException e) { + error = true; + break; + } catch (Throwable t) { + ExceptionUtils.handleThrowable(t); + log.debug(sm.getString("ajpprocessor.header.error"), t); + // 400 - Bad Request + response.setStatus(400); + adapter.log(request, response, 0); + error = true; + } + + if (!error) { + // Setting up filters, and parse some request headers + rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE); + try { + prepareRequest(); + } catch (Throwable t) { + ExceptionUtils.handleThrowable(t); + log.debug(sm.getString("ajpprocessor.request.prepare"), t); + // 400 - Internal Server Error + response.setStatus(400); + adapter.log(request, response, 0); + error = true; + } + } + + if (endpoint.isPaused()) { + // 503 - Service unavailable + response.setStatus(503); + adapter.log(request, response, 0); + error = true; + } + + // Process the request in the adapter + if (!error) { + try { + rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); + adapter.service(request, response); + } catch (InterruptedIOException e) { + error = true; + } catch (Throwable t) { + ExceptionUtils.handleThrowable(t); + log.error(sm.getString("ajpprocessor.request.process"), t); + // 500 - Internal Server Error + response.setStatus(500); + adapter.log(request, response, 0); + error = true; + } + } + + if (isAsync() && !error) { + break; + } + + // Finish the response if not done yet + if (!finished) { + try { + finish(); + } catch (Throwable t) { + ExceptionUtils.handleThrowable(t); + error = true; + } + } + + // If there was an error, make sure the request is counted as + // and error, and update the statistics counter + if (error) { + response.setStatus(500); + } + request.updateCounters(); + + rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE); + recycle(); + } + + rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); + + if (isAsync() && !error && !endpoint.isPaused()) { + return SocketState.LONG; + } else { + readBuffer = null; + writeBuffer = null; + return SocketState.CLOSED; + } + + } + + + @Override + public void recycle() { + if (readBuffer != null) { + readBuffer.clear(); + } + readBufferEnd = 0; + super.recycle(); + } + + public SocketState asyncDispatch(SocketStatus status) { + + RequestInfo rp = request.getRequestProcessor(); + try { + rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); + error = !adapter.asyncDispatch(request, response, status); + } catch (InterruptedIOException e) { + error = true; + } catch (Throwable t) { + ExceptionUtils.handleThrowable(t); + log.error(sm.getString("http11processor.request.process"), t); + // 500 - Internal Server Error + response.setStatus(500); + adapter.log(request, response, 0); + error = true; + } + + rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); + + if (isAsync()) { + if (error) { + response.setStatus(500); + request.updateCounters(); + readBuffer = null; + writeBuffer = null; + return SocketState.CLOSED; + } else { + return SocketState.LONG; + } + } else { + if (error) { + response.setStatus(500); + } + request.updateCounters(); + readBuffer = null; + writeBuffer = null; + return SocketState.CLOSED; + } + + + } + + + @Override + public Executor getExecutor() { + return endpoint.getExecutor(); + } + + + // ----------------------------------------------------- ActionHook Methods + + + /** + * Send an action to the connector. + * + * @param actionCode Type of the action + * @param param Action parameter + */ + @Override + protected void actionInternal(ActionCode actionCode, Object param) { + + if (actionCode == ActionCode.ASYNC_COMPLETE) { + if (asyncStateMachine.asyncComplete()) { + ((NioEndpoint)endpoint).processSocket(this.socket, + SocketStatus.OPEN, false); + } + } else if (actionCode == ActionCode.ASYNC_SETTIMEOUT) { + if (param == null) return; + long timeout = ((Long)param).longValue(); + final KeyAttachment ka = (KeyAttachment)socket.getAttachment(false); + if (keepAliveTimeout > 0) { + ka.setTimeout(timeout); + } + } else if (actionCode == ActionCode.ASYNC_DISPATCH) { + if (asyncStateMachine.asyncDispatch()) { + ((NioEndpoint)endpoint).processSocket(this.socket, + SocketStatus.OPEN, true); + } + } + } + + + // ------------------------------------------------------ Protected Methods + + @Override + protected void output(byte[] src, int offset, int length) + throws IOException { + ByteBuffer writeBuffer = socket.getBufHandler() .getWriteBuffer(); + + writeBuffer.put(src, offset, length); + + writeBuffer.flip(); + + NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false); + if ( att == null ) throw new IOException("Key must be cancelled"); + long writeTimeout = att.getTimeout(); + Selector selector = null; + try { + selector = pool.get(); + } catch ( IOException x ) { + //ignore + } + try { + pool.write(writeBuffer, socket, selector, writeTimeout, true, + null); + }finally { + if ( selector != null ) pool.put(selector); + } + writeBuffer.clear(); + } + + /** + * Finish AJP response. + */ + @Override + protected void finish() throws IOException { + + if (!response.isCommitted()) { + // Validate and write response headers + try { + prepareResponse(); + } catch (IOException e) { + // Set error flag + error = true; + } + } + + if (finished) + return; + + finished = true; + + // Add the end message + output(endMessageArray, 0, endMessageArray.length); + } + + + /** + * Read at least the specified amount of bytes, and place them + * in the input buffer. + */ + protected void read(byte[] buf, int pos, int n) + throws IOException { + + int read = readBufferEnd - pos; + int res = 0; + while (read < n) { + res = readSocket(buf, read + pos, true); + if (res > 0) { + read += res; + } else { + throw new IOException(sm.getString("ajpprotocol.failedread")); + } + } + readBufferEnd += read; + } + + private int readSocket(byte[] buf, int pos, boolean block) throws IOException { + int nRead = 0; + socket.getBufHandler().getReadBuffer().clear(); + if ( block ) { + Selector selector = null; + try { + selector = pool.get(); + } catch ( IOException x ) { + // Ignore + } + try { + NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false); + if ( att == null ) throw new IOException("Key must be cancelled."); + nRead = pool.read(socket.getBufHandler().getReadBuffer(),socket,selector,att.getTimeout()); + } catch ( EOFException eof ) { + nRead = -1; + } finally { + if ( selector != null ) pool.put(selector); + } + } else { + nRead = socket.read(socket.getBufHandler().getReadBuffer()); + } + if (nRead > 0) { + socket.getBufHandler().getReadBuffer().flip(); + socket.getBufHandler().getReadBuffer().limit(nRead); + socket.getBufHandler().getReadBuffer().get(buf, pos, nRead); + return nRead; + } else if (nRead == -1) { + //return false; + throw new EOFException(sm.getString("iib.eof.error")); + } else { + return 0; + } + } + + + /** Receive a chunk of data. Called to implement the + * 'special' packet in ajp13 and to receive the data + * after we send a GET_BODY packet + */ + @Override + public boolean receive() throws IOException { + + first = false; + bodyMessage.reset(); + if (!readMessage(bodyMessage)) { + // Invalid message + return false; + } + // No data received. + if (bodyMessage.getLen() == 0) { + // just the header + // Don't mark 'end of stream' for the first chunk. + return false; + } + int blen = bodyMessage.peekInt(); + if (blen == 0) { + return false; + } + + bodyMessage.getBytes(bodyBytes); + empty = false; + return true; + } + + /** + * Get more request body data from the web server and store it in the + * internal buffer. + * + * @return true if there is more data, false if not. + */ + @Override + protected boolean refillReadBuffer() throws IOException { + // If the server returns an empty packet, assume that that end of + // the stream has been reached (yuck -- fix protocol??). + // FORM support + if (replay) { + endOfStream = true; // we've read everything there is + } + if (endOfStream) { + return false; + } + + // Request more data immediately + output(getBodyMessageArray, 0, getBodyMessageArray.length); + + boolean moreData = receive(); + if( !moreData ) { + endOfStream = true; + } + return moreData; + } + + + /** + * Read an AJP message. + * + * @return true if the message has been read, false if the short read + * didn't return anything + * @throws IOException any other failure, including incomplete reads + */ + protected boolean readMessage(AjpMessage message) + throws IOException { + + byte[] buf = message.getBuffer(); + int headerLength = message.getHeaderLength(); + + read(buf, 0, headerLength); + + int messageLength = message.processHeader(); + if (messageLength < 0) { + // Invalid AJP header signature + // TODO: Throw some exception and close the connection to frontend. + return false; + } + else if (messageLength == 0) { + // Zero length message. + return true; + } + else { + if (messageLength > buf.length) { + // Message too long for the buffer + // Need to trigger a 400 response + throw new IllegalArgumentException(sm.getString( + "ajpprocessor.header.tooLong", + Integer.valueOf(messageLength), + Integer.valueOf(buf.length))); + } + read(buf, headerLength, messageLength); + return true; + } + } + + + /** + * Callback to write data from the buffer. + */ + @Override + protected void flush(boolean explicit) throws IOException { + if (explicit && !finished) { + // Send the flush message + output(flushMessageArray, 0, flushMessageArray.length); + } + } + + + // ----------------------------------- OutputStreamOutputBuffer Inner Class + + + /** + * This class is an output buffer which will write data to an output + * stream. + */ + protected class SocketOutputBuffer implements OutputBuffer { + + /** + * Write chunk. + */ + @Override + public int doWrite(ByteChunk chunk, Response res) + throws IOException { + + if (!response.isCommitted()) { + // Validate and write response headers + try { + prepareResponse(); + } catch (IOException e) { + // Set error flag + error = true; + } + } + + int len = chunk.getLength(); + // 4 - hardcoded, byte[] marshaling overhead + // Adjust allowed size if packetSize != default (Constants.MAX_PACKET_SIZE) + int chunkSize = Constants.MAX_SEND_SIZE + packetSize - Constants.MAX_PACKET_SIZE; + int off = 0; + while (len > 0) { + int thisTime = len; + if (thisTime > chunkSize) { + thisTime = chunkSize; + } + len -= thisTime; + responseHeaderMessage.reset(); + responseHeaderMessage.appendByte(Constants.JK_AJP13_SEND_BODY_CHUNK); + responseHeaderMessage.appendBytes(chunk.getBytes(), chunk.getOffset() + off, thisTime); + responseHeaderMessage.end(); + output(responseHeaderMessage.getBuffer(), 0, responseHeaderMessage.getLen()); + + off += thisTime; + } + + byteCount += chunk.getLength(); + return chunk.getLength(); + } + + @Override + public long getBytesWritten() { + return byteCount; + } + } +} diff --git a/java/org/apache/coyote/ajp/AjpNioProtocol.java b/java/org/apache/coyote/ajp/AjpNioProtocol.java new file mode 100644 index 000000000..0f13ec63f --- /dev/null +++ b/java/org/apache/coyote/ajp/AjpNioProtocol.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.coyote.ajp; + +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import javax.management.ObjectName; + +import org.apache.coyote.RequestGroupInfo; +import org.apache.coyote.RequestInfo; +import org.apache.juli.logging.Log; +import org.apache.juli.logging.LogFactory; +import org.apache.tomcat.util.ExceptionUtils; +import org.apache.tomcat.util.modeler.Registry; +import org.apache.tomcat.util.net.AbstractEndpoint; +import org.apache.tomcat.util.net.NioChannel; +import org.apache.tomcat.util.net.NioEndpoint; +import org.apache.tomcat.util.net.NioEndpoint.Handler; +import org.apache.tomcat.util.net.SSLImplementation; +import org.apache.tomcat.util.net.SocketStatus; + + +/** + * Abstract the protocol implementation, including threading, etc. + * Processor is single threaded and specific to stream-based protocols, + * will not fit Jk protocols like JNI. + */ +public class AjpNioProtocol extends AbstractAjpProtocol { + + + private static final Log log = LogFactory.getLog(AjpNioProtocol.class); + + @Override + protected Log getLog() { return log; } + + + @Override + protected AbstractEndpoint.Handler getHandler() { + return cHandler; + } + + + // ------------------------------------------------------------ Constructor + + + public AjpNioProtocol() { + endpoint = new NioEndpoint(); + cHandler = new AjpConnectionHandler(this); + ((NioEndpoint) endpoint).setHandler(cHandler); + setSoLinger(Constants.DEFAULT_CONNECTION_LINGER); + setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT); + setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY); + // AJP does not use Send File + ((NioEndpoint) endpoint).setUseSendfile(false); + } + + + // ----------------------------------------------------- Instance Variables + + + /** + * Connection handler for AJP. + */ + private AjpConnectionHandler cHandler; + + + // --------------------------------------------------------- Public Methods + + + // AJP does not use Send File. + public boolean getUseSendfile() { return false; } + + + // ----------------------------------------------------- JMX related methods + + @Override + protected String getNamePrefix() { + return ("ajp-nio"); + } + + + // -------------------------------------- AjpConnectionHandler Inner Class + + + protected static class AjpConnectionHandler implements Handler { + + protected AjpNioProtocol proto; + protected AtomicLong registerCount = new AtomicLong(0); + protected RequestGroupInfo global = new RequestGroupInfo(); + + protected ConcurrentHashMap connections = + new ConcurrentHashMap(); + + protected ConcurrentLinkedQueue recycledProcessors = + new ConcurrentLinkedQueue() { + private static final long serialVersionUID = 1L; + protected AtomicInteger size = new AtomicInteger(0); + @Override + public boolean offer(AjpNioProcessor processor) { + boolean offer = (proto.processorCache == -1) ? true : (size.get() < proto.processorCache); + //avoid over growing our cache or add after we have stopped + boolean result = false; + if ( offer ) { + result = super.offer(processor); + if ( result ) { + size.incrementAndGet(); + } + } + if (!result) unregister(processor); + return result; + } + + @Override + public AjpNioProcessor poll() { + AjpNioProcessor result = super.poll(); + if ( result != null ) { + size.decrementAndGet(); + } + return result; + } + + @Override + public void clear() { + AjpNioProcessor next = poll(); + while ( next != null ) { + unregister(next); + next = poll(); + } + super.clear(); + size.set(0); + } + }; + + public AjpConnectionHandler(AjpNioProtocol proto) { + this.proto = proto; + } + + @Override + public Object getGlobal() { + return global; + } + + @Override + public void recycle() { + recycledProcessors.clear(); + } + + @Override + public SSLImplementation getSslImplementation() { + // AJP does not support SSL + return null; + } + + @Override + public void release(SocketChannel socket) { + if (log.isDebugEnabled()) + log.debug("Iterating through our connections to release a socket channel:"+socket); + boolean released = false; + Iterator> it = connections.entrySet().iterator(); + while (it.hasNext()) { + java.util.Map.Entry entry = it.next(); + if (entry.getKey().getIOChannel()==socket) { + it.remove(); + AjpNioProcessor result = entry.getValue(); + result.recycle(); + unregister(result); + released = true; + break; + } + } + if (log.isDebugEnabled()) + log.debug("Done iterating through our connections to release a socket channel:"+socket +" released:"+released); + } + + /** + * Use this only if the processor is not available, otherwise use + * {@link #release(NioChannel, Http11NioProcessor). + */ + @Override + public void release(NioChannel socket) { + AjpNioProcessor processor = connections.remove(socket); + if (processor != null) { + processor.recycle(); + recycledProcessors.offer(processor); + } + } + + + public void release(NioChannel socket, AjpNioProcessor processor) { + connections.remove(socket); + processor.recycle(); + recycledProcessors.offer(processor); + } + + // FIXME: Support for this could be added in AJP as well + @Override + public SocketState event(NioChannel socket, SocketStatus status) { + return SocketState.CLOSED; + } + + @Override + public SocketState process(NioChannel socket) { + AjpNioProcessor processor = recycledProcessors.poll(); + try { + if (processor == null) { + processor = createProcessor(); + } + + SocketState state = processor.process(socket); + if (state == SocketState.LONG) { + // Check if the post processing is going to change the state + state = processor.asyncPostProcess(); + } + if (state == SocketState.LONG || state == SocketState.ASYNC_END) { + // Need to make socket available for next processing cycle + // but no need for the poller + connections.put(socket, processor); + NioEndpoint.KeyAttachment att = + (NioEndpoint.KeyAttachment)socket.getAttachment(false); + att.setAsync(true); + } else { + processor.recycle(); + recycledProcessors.offer(processor); + } + return state; + + } catch(java.net.SocketException e) { + // SocketExceptions are normal + log.debug(sm.getString( + "ajpprotocol.proto.socketexception.debug"), e); + } catch (java.io.IOException e) { + // IOExceptions are normal + log.debug(sm.getString( + "ajpprotocol.proto.ioexception.debug"), e); + } + // Future developers: if you discover any other + // rare-but-nonfatal exceptions, catch them here, and log as + // above. + catch (Throwable e) { + ExceptionUtils.handleThrowable(e); + // any other exception or error is odd. Here we log it + // with "ERROR" level, so it will show up even on + // less-than-verbose logs. + log.error(sm.getString("ajpprotocol.proto.error"), e); + } + processor.recycle(); + recycledProcessors.offer(processor); + return SocketState.CLOSED; + } + + protected AjpNioProcessor createProcessor() { + AjpNioProcessor processor = new AjpNioProcessor(proto.packetSize, (NioEndpoint)proto.endpoint); + processor.setAdapter(proto.adapter); + processor.setTomcatAuthentication(proto.tomcatAuthentication); + processor.setRequiredSecret(proto.requiredSecret); + processor.setClientCertProvider(proto.getClientCertProvider()); + register(processor); + return processor; + } + + protected void register(AjpNioProcessor processor) { + if (proto.getDomain() != null) { + synchronized (this) { + try { + long count = registerCount.incrementAndGet(); + RequestInfo rp = processor.getRequest().getRequestProcessor(); + rp.setGlobalProcessor(global); + ObjectName rpName = new ObjectName + (proto.getDomain() + ":type=RequestProcessor,worker=" + + proto.getName() + ",name=AjpRequest" + count); + if (log.isDebugEnabled()) { + log.debug("Register " + rpName); + } + Registry.getRegistry(null, null).registerComponent(rp, rpName, null); + rp.setRpName(rpName); + } catch (Exception e) { + log.warn("Error registering request"); + } + } + } + } + + protected void unregister(AjpNioProcessor processor) { + if (proto.getDomain() != null) { + synchronized (this) { + try { + RequestInfo rp = processor.getRequest().getRequestProcessor(); + rp.setGlobalProcessor(null); + ObjectName rpName = rp.getRpName(); + if (log.isDebugEnabled()) { + log.debug("Unregister " + rpName); + } + Registry.getRegistry(null, null).unregisterComponent(rpName); + rp.setRpName(null); + } catch (Exception e) { + log.warn("Error unregistering request", e); + } + } + } + } + + } + +}