From 8b4b444f315b72e8ea46a1509d80d1fe91b5137f Mon Sep 17 00:00:00 2001 From: fhanik Date: Thu, 22 Jun 2006 00:48:53 +0000 Subject: [PATCH] Non blocking polling information. This implementation in pure Java NIO is almost a mimic of the APR implementation. It blocks on read and write, but has non blocking polling capabilities. Currently the read/write blocking is "busy" blocking, but I will see if I can simply configure blocking for the socket and if that would still allow the poller to work as expected. This makes it a suitable connector for comet style protocols and where APR is not desired or available. git-svn-id: https://svn.apache.org/repos/asf/tomcat/tc6.0.x/trunk@416187 13f79535-47bb-0310-9956-ffa450edef68 --- java/org/apache/catalina/connector/Connector.java | 8 +- .../apache/coyote/http11/Http11NioProcessor.java | 1797 +++++++++++++++++++ .../apache/coyote/http11/Http11NioProtocol.java | 775 ++++++++ .../coyote/http11/InternalNioInputBuffer.java | 827 +++++++++ .../coyote/http11/InternalNioOutputBuffer.java | 783 +++++++++ java/org/apache/tomcat/util/net/NioEndpoint.java | 1845 ++++++++++++++++++++ 6 files changed, 6033 insertions(+), 2 deletions(-) create mode 100644 java/org/apache/coyote/http11/Http11NioProcessor.java create mode 100644 java/org/apache/coyote/http11/Http11NioProtocol.java create mode 100644 java/org/apache/coyote/http11/InternalNioInputBuffer.java create mode 100644 java/org/apache/coyote/http11/InternalNioOutputBuffer.java create mode 100644 java/org/apache/tomcat/util/net/NioEndpoint.java diff --git a/java/org/apache/catalina/connector/Connector.java b/java/org/apache/catalina/connector/Connector.java index 2e32c8263..2e355259d 100644 --- a/java/org/apache/catalina/connector/Connector.java +++ b/java/org/apache/catalina/connector/Connector.java @@ -623,7 +623,9 @@ public class Connector } if (apr) { - if ("HTTP/1.1".equals(protocol)) { + if ("HTTP/1.1/NIO".equals(protocol)) { + setProtocolHandlerClassName("org.apache.coyote.http11.Http11NioProtocol"); + } else if ("HTTP/1.1".equals(protocol)) { setProtocolHandlerClassName ("org.apache.coyote.http11.Http11AprProtocol"); } else if ("AJP/1.3".equals(protocol)) { @@ -636,7 +638,9 @@ public class Connector ("org.apache.coyote.http11.Http11AprProtocol"); } } else { - if ("HTTP/1.1".equals(protocol)) { + if ("HTTP/1.1/NIO".equals(protocol)) { + setProtocolHandlerClassName("org.apache.coyote.http11.Http11NioProtocol"); + }else if ("HTTP/1.1".equals(protocol)) { setProtocolHandlerClassName ("org.apache.coyote.http11.Http11Protocol"); } else if ("AJP/1.3".equals(protocol)) { diff --git a/java/org/apache/coyote/http11/Http11NioProcessor.java b/java/org/apache/coyote/http11/Http11NioProcessor.java new file mode 100644 index 000000000..55e4bdbe5 --- /dev/null +++ b/java/org/apache/coyote/http11/Http11NioProcessor.java @@ -0,0 +1,1797 @@ +/* + * Copyright 1999-2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.coyote.http11; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.InetAddress; +import java.nio.channels.SocketChannel; +import java.util.StringTokenizer; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +import org.apache.coyote.ActionCode; +import org.apache.coyote.ActionHook; +import org.apache.coyote.Adapter; +import org.apache.coyote.Request; +import org.apache.coyote.RequestInfo; +import org.apache.coyote.Response; +import org.apache.coyote.http11.filters.BufferedInputFilter; +import org.apache.coyote.http11.filters.ChunkedInputFilter; +import org.apache.coyote.http11.filters.ChunkedOutputFilter; +import org.apache.coyote.http11.filters.GzipOutputFilter; +import org.apache.coyote.http11.filters.IdentityInputFilter; +import org.apache.coyote.http11.filters.IdentityOutputFilter; +import org.apache.coyote.http11.filters.SavedRequestInputFilter; +import org.apache.coyote.http11.filters.VoidInputFilter; +import org.apache.coyote.http11.filters.VoidOutputFilter; +import org.apache.tomcat.util.buf.Ascii; +import org.apache.tomcat.util.buf.ByteChunk; +import org.apache.tomcat.util.buf.HexUtils; +import org.apache.tomcat.util.buf.MessageBytes; +import org.apache.tomcat.util.http.FastHttpDateFormat; +import org.apache.tomcat.util.http.MimeHeaders; +import org.apache.tomcat.util.net.NioEndpoint; +import org.apache.tomcat.util.net.NioEndpoint.Handler; +import org.apache.tomcat.util.net.NioEndpoint.Handler.SocketState; +import org.apache.tomcat.util.net.NioEndpoint.SendfileData; +import org.apache.tomcat.util.res.StringManager; +import java.nio.channels.SelectionKey; + + +/** + * Processes HTTP requests. + * + * @author Remy Maucherat + * @author Filip Hanik + */ +public class Http11NioProcessor implements ActionHook { + + + /** + * Logger. + */ + protected static org.apache.commons.logging.Log log + = org.apache.commons.logging.LogFactory.getLog(Http11NioProcessor.class); + + /** + * The string manager for this package. + */ + protected static StringManager sm = + StringManager.getManager(Constants.Package); + + + // ----------------------------------------------------------- Constructors + + + public Http11NioProcessor(int headerBufferSize, NioEndpoint endpoint) { + + this.endpoint = endpoint; + + request = new Request(); + int readTimeout = endpoint.getFirstReadTimeout(); + if (readTimeout == 0) { + readTimeout = 100; + } else if (readTimeout < 0) { + readTimeout = timeout; + //readTimeout = -1; + } + inputBuffer = new InternalNioInputBuffer(request, headerBufferSize, + readTimeout); + request.setInputBuffer(inputBuffer); + + response = new Response(); + response.setHook(this); + outputBuffer = new InternalNioOutputBuffer(response, headerBufferSize); + response.setOutputBuffer(outputBuffer); + request.setResponse(response); + + ssl = !"off".equalsIgnoreCase(endpoint.getSSLEngine()); + + initializeFilters(); + + // Cause loading of HexUtils + int foo = HexUtils.DEC[0]; + + // Cause loading of FastHttpDateFormat + FastHttpDateFormat.getCurrentDate(); + + } + + + // ----------------------------------------------------- Instance Variables + + + /** + * Associated adapter. + */ + protected Adapter adapter = null; + + + /** + * Request object. + */ + protected Request request = null; + + + /** + * Response object. + */ + protected Response response = null; + + + /** + * Input. + */ + protected InternalNioInputBuffer inputBuffer = null; + + + /** + * Output. + */ + protected InternalNioOutputBuffer outputBuffer = null; + + + /** + * Error flag. + */ + protected boolean error = false; + + + /** + * Keep-alive. + */ + protected boolean keepAlive = true; + + + /** + * HTTP/1.1 flag. + */ + protected boolean http11 = true; + + + /** + * HTTP/0.9 flag. + */ + protected boolean http09 = false; + + + /** + * Sendfile data. + */ + protected NioEndpoint.SendfileData sendfileData = null; + + + /** + * Comet used. + */ + protected boolean comet = false; + + + /** + * Content delimitator for the request (if false, the connection will + * be closed at the end of the request). + */ + protected boolean contentDelimitation = true; + + + /** + * Is there an expectation ? + */ + protected boolean expectation = false; + + + /** + * List of restricted user agents. + */ + protected Pattern[] restrictedUserAgents = null; + + + /** + * Maximum number of Keep-Alive requests to honor. + */ + protected int maxKeepAliveRequests = -1; + + + /** + * SSL enabled ? + */ + protected boolean ssl = false; + + + /** + * Socket associated with the current connection. + */ + protected SocketChannel socket = null; + + + /** + * Remote Address associated with the current connection. + */ + protected String remoteAddr = null; + + + /** + * Remote Host associated with the current connection. + */ + protected String remoteHost = null; + + + /** + * Local Host associated with the current connection. + */ + protected String localName = null; + + + + /** + * Local port to which the socket is connected + */ + protected int localPort = -1; + + + /** + * Remote port to which the socket is connected + */ + protected int remotePort = -1; + + + /** + * The local Host address. + */ + protected String localAddr = null; + + + /** + * Maximum timeout on uploads. 5 minutes as in Apache HTTPD server. + */ + protected int timeout = 300000; + + + /** + * Flag to disable setting a different time-out on uploads. + */ + protected boolean disableUploadTimeout = false; + + + /** + * Allowed compression level. + */ + protected int compressionLevel = 0; + + + /** + * Minimum contentsize to make compression. + */ + protected int compressionMinSize = 2048; + + + /** + * Socket buffering. + */ + protected int socketBuffer = -1; + + + /** + * Max save post size. + */ + protected int maxSavePostSize = 4 * 1024; + + + /** + * List of user agents to not use gzip with + */ + protected Pattern noCompressionUserAgents[] = null; + + /** + * List of MIMES which could be gzipped + */ + protected String[] compressableMimeTypes = + { "text/html", "text/xml", "text/plain" }; + + + /** + * Host name (used to avoid useless B2C conversion on the host name). + */ + protected char[] hostNameC = new char[0]; + + + /** + * Associated endpoint. + */ + protected NioEndpoint endpoint; + + + /** + * Allow a customized the server header for the tin-foil hat folks. + */ + protected String server = null; + + + // ------------------------------------------------------------- Properties + + + /** + * Return compression level. + */ + public String getCompression() { + switch (compressionLevel) { + case 0: + return "off"; + case 1: + return "on"; + case 2: + return "force"; + } + return "off"; + } + + + /** + * Set compression level. + */ + public void setCompression(String compression) { + if (compression.equals("on")) { + this.compressionLevel = 1; + } else if (compression.equals("force")) { + this.compressionLevel = 2; + } else if (compression.equals("off")) { + this.compressionLevel = 0; + } else { + try { + // Try to parse compression as an int, which would give the + // minimum compression size + compressionMinSize = Integer.parseInt(compression); + this.compressionLevel = 1; + } catch (Exception e) { + this.compressionLevel = 0; + } + } + } + + /** + * Set Minimum size to trigger compression. + */ + public void setCompressionMinSize(int compressionMinSize) { + this.compressionMinSize = compressionMinSize; + } + + + /** + * Add user-agent for which gzip compression didn't works + * The user agent String given will be exactly matched + * to the user-agent header submitted by the client. + * + * @param userAgent user-agent string + */ + public void addNoCompressionUserAgent(String userAgent) { + try { + Pattern nRule = Pattern.compile(userAgent); + noCompressionUserAgents = + addREArray(noCompressionUserAgents, nRule); + } catch (PatternSyntaxException pse) { + log.error(sm.getString("http11processor.regexp.error", userAgent), pse); + } + } + + + /** + * Set no compression user agent list (this method is best when used with + * a large number of connectors, where it would be better to have all of + * them referenced a single array). + */ + public void setNoCompressionUserAgents(Pattern[] noCompressionUserAgents) { + this.noCompressionUserAgents = noCompressionUserAgents; + } + + + /** + * Set no compression user agent list. + * List contains users agents separated by ',' : + * + * ie: "gorilla,desesplorer,tigrus" + */ + public void setNoCompressionUserAgents(String noCompressionUserAgents) { + if (noCompressionUserAgents != null) { + StringTokenizer st = new StringTokenizer(noCompressionUserAgents, ","); + + while (st.hasMoreTokens()) { + addNoCompressionUserAgent(st.nextToken().trim()); + } + } + } + + /** + * Add a mime-type which will be compressable + * The mime-type String will be exactly matched + * in the response mime-type header . + * + * @param mimeType mime-type string + */ + public void addCompressableMimeType(String mimeType) { + compressableMimeTypes = + addStringArray(compressableMimeTypes, mimeType); + } + + + /** + * Set compressable mime-type list (this method is best when used with + * a large number of connectors, where it would be better to have all of + * them referenced a single array). + */ + public void setCompressableMimeTypes(String[] compressableMimeTypes) { + this.compressableMimeTypes = compressableMimeTypes; + } + + + /** + * Set compressable mime-type list + * List contains users agents separated by ',' : + * + * ie: "text/html,text/xml,text/plain" + */ + public void setCompressableMimeTypes(String compressableMimeTypes) { + if (compressableMimeTypes != null) { + StringTokenizer st = new StringTokenizer(compressableMimeTypes, ","); + + while (st.hasMoreTokens()) { + addCompressableMimeType(st.nextToken().trim()); + } + } + } + + + /** + * Return the list of restricted user agents. + */ + public String[] findCompressableMimeTypes() { + return (compressableMimeTypes); + } + + + + // --------------------------------------------------------- Public Methods + + + /** + * Add input or output filter. + * + * @param className class name of the filter + */ + protected void addFilter(String className) { + try { + Class clazz = Class.forName(className); + Object obj = clazz.newInstance(); + if (obj instanceof InputFilter) { + inputBuffer.addFilter((InputFilter) obj); + } else if (obj instanceof OutputFilter) { + outputBuffer.addFilter((OutputFilter) obj); + } else { + log.warn(sm.getString("http11processor.filter.unknown", className)); + } + } catch (Exception e) { + log.error(sm.getString("http11processor.filter.error", className), e); + } + } + + + /** + * General use method + * + * @param sArray the StringArray + * @param value string + */ + private String[] addStringArray(String sArray[], String value) { + String[] result = null; + if (sArray == null) { + result = new String[1]; + result[0] = value; + } + else { + result = new String[sArray.length + 1]; + for (int i = 0; i < sArray.length; i++) + result[i] = sArray[i]; + result[sArray.length] = value; + } + return result; + } + + + /** + * General use method + * + * @param rArray the REArray + * @param value Obj + */ + private Pattern[] addREArray(Pattern rArray[], Pattern value) { + Pattern[] result = null; + if (rArray == null) { + result = new Pattern[1]; + result[0] = value; + } + else { + result = new Pattern[rArray.length + 1]; + for (int i = 0; i < rArray.length; i++) + result[i] = rArray[i]; + result[rArray.length] = value; + } + return result; + } + + + /** + * General use method + * + * @param sArray the StringArray + * @param value string + */ + private boolean inStringArray(String sArray[], String value) { + for (int i = 0; i < sArray.length; i++) { + if (sArray[i].equals(value)) { + return true; + } + } + return false; + } + + + /** + * Checks if any entry in the string array starts with the specified value + * + * @param sArray the StringArray + * @param value string + */ + private boolean startsWithStringArray(String sArray[], String value) { + if (value == null) + return false; + for (int i = 0; i < sArray.length; i++) { + if (value.startsWith(sArray[i])) { + return true; + } + } + return false; + } + + + /** + * Add restricted user-agent (which will downgrade the connector + * to HTTP/1.0 mode). The user agent String given will be matched + * via regexp to the user-agent header submitted by the client. + * + * @param userAgent user-agent string + */ + public void addRestrictedUserAgent(String userAgent) { + try { + Pattern nRule = Pattern.compile(userAgent); + restrictedUserAgents = addREArray(restrictedUserAgents, nRule); + } catch (PatternSyntaxException pse) { + log.error(sm.getString("http11processor.regexp.error", userAgent), pse); + } + } + + + /** + * Set restricted user agent list (this method is best when used with + * a large number of connectors, where it would be better to have all of + * them referenced a single array). + */ + public void setRestrictedUserAgents(Pattern[] restrictedUserAgents) { + this.restrictedUserAgents = restrictedUserAgents; + } + + + /** + * Set restricted user agent list (which will downgrade the connector + * to HTTP/1.0 mode). List contains users agents separated by ',' : + * + * ie: "gorilla,desesplorer,tigrus" + */ + public void setRestrictedUserAgents(String restrictedUserAgents) { + if (restrictedUserAgents != null) { + StringTokenizer st = + new StringTokenizer(restrictedUserAgents, ","); + while (st.hasMoreTokens()) { + addRestrictedUserAgent(st.nextToken().trim()); + } + } + } + + + /** + * Return the list of restricted user agents. + */ + public String[] findRestrictedUserAgents() { + String[] sarr = new String [restrictedUserAgents.length]; + + for (int i = 0; i < restrictedUserAgents.length; i++) + sarr[i] = restrictedUserAgents[i].toString(); + + return (sarr); + } + + + /** + * Set the maximum number of Keep-Alive requests to honor. + * This is to safeguard from DoS attacks. Setting to a negative + * value disables the check. + */ + public void setMaxKeepAliveRequests(int mkar) { + maxKeepAliveRequests = mkar; + } + + + /** + * Return the number of Keep-Alive requests that we will honor. + */ + public int getMaxKeepAliveRequests() { + return maxKeepAliveRequests; + } + + + /** + * Set the maximum size of a POST which will be buffered in SSL mode. + */ + public void setMaxSavePostSize(int msps) { + maxSavePostSize = msps; + } + + + /** + * Return the maximum size of a POST which will be buffered in SSL mode. + */ + public int getMaxSavePostSize() { + return maxSavePostSize; + } + + + /** + * Set the flag to control upload time-outs. + */ + public void setDisableUploadTimeout(boolean isDisabled) { + disableUploadTimeout = isDisabled; + } + + /** + * Get the flag that controls upload time-outs. + */ + public boolean getDisableUploadTimeout() { + return disableUploadTimeout; + } + + /** + * Set the socket buffer flag. + */ + public void setSocketBuffer(int socketBuffer) { + this.socketBuffer = socketBuffer; + outputBuffer.setSocketBuffer(socketBuffer); + } + + /** + * Get the socket buffer flag. + */ + public int getSocketBuffer() { + return socketBuffer; + } + + /** + * Set the upload timeout. + */ + public void setTimeout( int timeouts ) { + timeout = timeouts ; + } + + /** + * Get the upload timeout. + */ + public int getTimeout() { + return timeout; + } + + + /** + * Set the server header name. + */ + public void setServer( String server ) { + if (server==null || server.equals("")) { + this.server = null; + } else { + this.server = server; + } + } + + /** + * Get the server header name. + */ + public String getServer() { + return server; + } + + + /** Get the request associated with this processor. + * + * @return The request + */ + public Request getRequest() { + return request; + } + + /** + * Process pipelined HTTP requests using the specified input and output + * streams. + * + * @throws IOException error during an I/O operation + */ + public SocketState event(boolean error) + throws IOException { + + RequestInfo rp = request.getRequestProcessor(); + + try { + rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); + error = !adapter.event(request, response, error); + if (request.getAttribute("org.apache.tomcat.comet") == null) { + comet = false; + } + SelectionKey key = socket.keyFor(endpoint.getPoller().getSelector()); + if ( key != null ) { + NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment(); + if ( attach!=null ) attach.setComet(comet); + } + + } catch (InterruptedIOException e) { + error = true; + } catch (Throwable t) { + log.error(sm.getString("http11processor.request.process"), t); + // 500 - Internal Server Error + response.setStatus(500); + error = true; + } + + rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); + + if (error) { + recycle(); + return SocketState.CLOSED; + } else if (!comet) { + recycle(); + endpoint.getPoller().add(socket); + return SocketState.OPEN; + } else { + endpoint.getCometPoller().add(socket); + return SocketState.LONG; + } + } + + /** + * Process pipelined HTTP requests using the specified input and output + * streams. + * + * @throws IOException error during an I/O operation + */ + public SocketState process(SocketChannel socket) + throws IOException { + RequestInfo rp = request.getRequestProcessor(); + rp.setStage(org.apache.coyote.Constants.STAGE_PARSE); + + // Set the remote address + remoteAddr = null; + remoteHost = null; + localAddr = null; + localName = null; + remotePort = -1; + localPort = -1; + + // Setting up the socket + this.socket = socket; + inputBuffer.setSocket(socket); + outputBuffer.setSocket(socket); + outputBuffer.setSelector(endpoint.getPoller().getSelector()); + + // Error flag + error = false; + keepAlive = true; + + int keepAliveLeft = maxKeepAliveRequests; + long soTimeout = endpoint.getSoTimeout(); + + int limit = 0; + if (endpoint.getFirstReadTimeout() > 0 || endpoint.getFirstReadTimeout() < -1) { + limit = endpoint.getMaxThreads() / 2; + } + + boolean keptAlive = false; + boolean openSocket = false; + + while (!error && keepAlive && !comet) { + + // Parsing the request header + try { + if( !disableUploadTimeout && keptAlive && soTimeout > 0 ) { + socket.socket().setSoTimeout((int)soTimeout); + inputBuffer.readTimeout = soTimeout; + } + if (!inputBuffer.parseRequestLine + (keptAlive && (endpoint.getCurrentThreadsBusy() > limit))) { + // This means that no data is available right now + // (long keepalive), so that the processor should be recycled + // and the method should return true + openSocket = true; + // Add the socket to the poller + endpoint.getPoller().add(socket); + break; + } + request.setStartTime(System.currentTimeMillis()); + keptAlive = true; + if (!disableUploadTimeout) { + socket.socket().setSoTimeout((int)timeout); + inputBuffer.readTimeout = soTimeout; + } + inputBuffer.parseHeaders(); + } catch (IOException e) { + error = true; + break; + } catch (Throwable t) { + if (log.isDebugEnabled()) { + log.debug(sm.getString("http11processor.header.parse"), t); + } + // 400 - Bad Request + response.setStatus(400); + error = true; + } + + // Setting up filters, and parse some request headers + rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE); + try { + prepareRequest(); + } catch (Throwable t) { + if (log.isDebugEnabled()) { + log.debug(sm.getString("http11processor.request.prepare"), t); + } + // 400 - Internal Server Error + response.setStatus(400); + error = true; + } + + if (maxKeepAliveRequests > 0 && --keepAliveLeft == 0) + keepAlive = false; + + // Process the request in the adapter + if (!error) { + try { + rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); + adapter.service(request, response); + // Handle when the response was committed before a serious + // error occurred. Throwing a ServletException should both + // set the status to 500 and set the errorException. + // If we fail here, then the response is likely already + // committed, so we can't try and set headers. + if(keepAlive && !error) { // Avoid checking twice. + error = response.getErrorException() != null || + statusDropsConnection(response.getStatus()); + } + // Comet support + if (request.getAttribute("org.apache.tomcat.comet") != null) { + comet = true; + } + SelectionKey key = socket.keyFor(endpoint.getPoller().getSelector()); + if (key != null) { + NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment(); + if (attach != null) attach.setComet(comet); + } + } catch (InterruptedIOException e) { + error = true; + } catch (Throwable t) { + log.error(sm.getString("http11processor.request.process"), t); + // 500 - Internal Server Error + response.setStatus(500); + error = true; + } + } + + // Finish the handling of the request + if (!comet) { + endRequest(); + } + + // If there was an error, make sure the request is counted as + // and error, and update the statistics counter + if (error) { + response.setStatus(500); + } + request.updateCounters(); + + // Do sendfile as needed: add socket to sendfile and end + if (sendfileData != null && !error) { + sendfileData.socket = socket; + sendfileData.keepAlive = keepAlive; + if (!endpoint.getSendfile().add(sendfileData)) { + openSocket = true; + break; + } + } + + rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE); + + } + + rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); + + if (comet) { + if (error) { + recycle(); + return SocketState.CLOSED; + } else { + return SocketState.LONG; + } + } else { + recycle(); + return (openSocket) ? SocketState.OPEN : SocketState.CLOSED; + } + + } + + + public void endRequest() { + + // Finish the handling of the request + try { + inputBuffer.endRequest(); + } catch (IOException e) { + error = true; + } catch (Throwable t) { + log.error(sm.getString("http11processor.request.finish"), t); + // 500 - Internal Server Error + response.setStatus(500); + error = true; + } + try { + outputBuffer.endRequest(); + } catch (IOException e) { + error = true; + } catch (Throwable t) { + log.error(sm.getString("http11processor.response.finish"), t); + error = true; + } + + // Next request + inputBuffer.nextRequest(); + outputBuffer.nextRequest(); + + } + + + public void recycle() { + inputBuffer.recycle(); + outputBuffer.recycle(); + this.socket = null; + } + + + // ----------------------------------------------------- ActionHook Methods + + + /** + * Send an action to the connector. + * + * @param actionCode Type of the action + * @param param Action parameter + */ + public void action(ActionCode actionCode, Object param) { + + if (actionCode == ActionCode.ACTION_COMMIT) { + // Commit current response + + if (response.isCommitted()) + return; + + // Validate and write response headers + prepareResponse(); + try { + outputBuffer.commit(); + } catch (IOException e) { + // Set error flag + error = true; + } + + } else if (actionCode == ActionCode.ACTION_ACK) { + + // Acknowlege request + + // Send a 100 status back if it makes sense (response not committed + // yet, and client specified an expectation for 100-continue) + + if ((response.isCommitted()) || !expectation) + return; + + inputBuffer.setSwallowInput(true); + try { + outputBuffer.sendAck(); + } catch (IOException e) { + // Set error flag + error = true; + } + + } else if (actionCode == ActionCode.ACTION_CLIENT_FLUSH) { + + try { + outputBuffer.flush(); + } catch (IOException e) { + // Set error flag + error = true; + response.setErrorException(e); + } + + } else if (actionCode == ActionCode.ACTION_CLOSE) { + // Close + + // End the processing of the current request, and stop any further + // transactions with the client + + comet = false; + try { + outputBuffer.endRequest(); + } catch (IOException e) { + // Set error flag + error = true; + } + + } else if (actionCode == ActionCode.ACTION_RESET) { + + // Reset response + + // Note: This must be called before the response is committed + + outputBuffer.reset(); + + } else if (actionCode == ActionCode.ACTION_CUSTOM) { + + // Do nothing + + } else if (actionCode == ActionCode.ACTION_REQ_HOST_ADDR_ATTRIBUTE) { + + // Get remote host address + if ((remoteAddr == null) && (socket != null)) { + InetAddress inetAddr = socket.socket().getInetAddress(); + if (inetAddr != null) { + remoteAddr = inetAddr.getHostAddress(); + } + } + request.remoteAddr().setString(remoteAddr); + + } else if (actionCode == ActionCode.ACTION_REQ_LOCAL_NAME_ATTRIBUTE) { + + // Get local host name + if ((localName == null) && (socket != null)) { + InetAddress inetAddr = socket.socket().getLocalAddress(); + if (inetAddr != null) { + localName = inetAddr.getHostName(); + } + } + request.localName().setString(localName); + + } else if (actionCode == ActionCode.ACTION_REQ_HOST_ATTRIBUTE) { + + // Get remote host name + if ((remoteHost == null) && (socket != null)) { + InetAddress inetAddr = socket.socket().getInetAddress(); + if (inetAddr != null) { + remoteHost = inetAddr.getHostName(); + } + if(remoteHost == null) { + if(remoteAddr != null) { + remoteHost = remoteAddr; + } else { // all we can do is punt + request.remoteHost().recycle(); + } + } + } + request.remoteHost().setString(remoteHost); + + } else if (actionCode == ActionCode.ACTION_REQ_LOCAL_ADDR_ATTRIBUTE) { + + if (localAddr == null) + localAddr = socket.socket().getLocalAddress().getHostAddress(); + + request.localAddr().setString(localAddr); + + } else if (actionCode == ActionCode.ACTION_REQ_REMOTEPORT_ATTRIBUTE) { + + if ((remotePort == -1 ) && (socket !=null)) { + remotePort = socket.socket().getPort(); + } + request.setRemotePort(remotePort); + + } else if (actionCode == ActionCode.ACTION_REQ_LOCALPORT_ATTRIBUTE) { + + if ((localPort == -1 ) && (socket !=null)) { + localPort = socket.socket().getLocalPort(); + } + request.setLocalPort(localPort); + + } else if (actionCode == ActionCode.ACTION_REQ_SSL_ATTRIBUTE ) { + +// if (ssl && (socket != 0)) { +// try { +// // Cipher suite +// Object sslO = SSLSocket.getInfoS(socket, SSL.SSL_INFO_CIPHER); +// if (sslO != null) { +// request.setAttribute +// (NioEndpoint.CIPHER_SUITE_KEY, sslO); +// } +// // Client certificate chain if present +// int certLength = SSLSocket.getInfoI(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN); +// X509Certificate[] certs = null; +// if (certLength > 0) { +// certs = new X509Certificate[certLength]; +// for (int i = 0; i < certLength; i++) { +// byte[] data = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN + i); +// CertificateFactory cf = +// CertificateFactory.getInstance("X.509"); +// ByteArrayInputStream stream = new ByteArrayInputStream(data); +// certs[i] = (X509Certificate) cf.generateCertificate(stream); +// } +// } +// if (certs != null) { +// request.setAttribute +// (NioEndpoint.CERTIFICATE_KEY, certs); +// } +// // User key size +// sslO = new Integer(SSLSocket.getInfoI(socket, SSL.SSL_INFO_CIPHER_USEKEYSIZE)); +// if (sslO != null) { +// request.setAttribute +// (NioEndpoint.KEY_SIZE_KEY, sslO); +// } +// // SSL session ID +// sslO = SSLSocket.getInfoS(socket, SSL.SSL_INFO_SESSION_ID); +// if (sslO != null) { +// request.setAttribute +// (NioEndpoint.SESSION_ID_KEY, sslO); +// } +// } catch (Exception e) { +// log.warn(sm.getString("http11processor.socket.ssl"), e); +// } +// } + + } else if (actionCode == ActionCode.ACTION_REQ_SSL_CERTIFICATE) { + +// if (ssl && (socket != 0)) { +// // Consume and buffer the request body, so that it does not +// // interfere with the client's handshake messages +// InputFilter[] inputFilters = inputBuffer.getFilters(); +// ((BufferedInputFilter) inputFilters[Constants.BUFFERED_FILTER]) +// .setLimit(maxSavePostSize); +// inputBuffer.addActiveFilter +// (inputFilters[Constants.BUFFERED_FILTER]); +// try { +// // Renegociate certificates +// SSLSocket.renegotiate(socket); +// // Client certificate chain if present +// int certLength = SSLSocket.getInfoI(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN); +// X509Certificate[] certs = null; +// if (certLength > 0) { +// certs = new X509Certificate[certLength]; +// for (int i = 0; i < certLength; i++) { +// byte[] data = SSLSocket.getInfoB(socket, SSL.SSL_INFO_CLIENT_CERT_CHAIN + i); +// CertificateFactory cf = +// CertificateFactory.getInstance("X.509"); +// ByteArrayInputStream stream = new ByteArrayInputStream(data); +// certs[i] = (X509Certificate) cf.generateCertificate(stream); +// } +// } +// if (certs != null) { +// request.setAttribute +// (NioEndpoint.CERTIFICATE_KEY, certs); +// } +// } catch (Exception e) { +// log.warn(sm.getString("http11processor.socket.ssl"), e); +// } +// } + + } else if (actionCode == ActionCode.ACTION_REQ_SET_BODY_REPLAY) { + ByteChunk body = (ByteChunk) param; + + InputFilter savedBody = new SavedRequestInputFilter(body); + savedBody.setRequest(request); + + InternalNioInputBuffer internalBuffer = (InternalNioInputBuffer) + request.getInputBuffer(); + internalBuffer.addActiveFilter(savedBody); + } + + } + + + // ------------------------------------------------------ Connector Methods + + + /** + * Set the associated adapter. + * + * @param adapter the new adapter + */ + public void setAdapter(Adapter adapter) { + this.adapter = adapter; + } + + + /** + * Get the associated adapter. + * + * @return the associated adapter + */ + public Adapter getAdapter() { + return adapter; + } + + + // ------------------------------------------------------ Protected Methods + + + /** + * After reading the request headers, we have to setup the request filters. + */ + protected void prepareRequest() { + + http11 = true; + http09 = false; + contentDelimitation = false; + expectation = false; + sendfileData = null; + if (ssl) { + request.scheme().setString("https"); + } + MessageBytes protocolMB = request.protocol(); + if (protocolMB.equals(Constants.HTTP_11)) { + http11 = true; + protocolMB.setString(Constants.HTTP_11); + } else if (protocolMB.equals(Constants.HTTP_10)) { + http11 = false; + keepAlive = false; + protocolMB.setString(Constants.HTTP_10); + } else if (protocolMB.equals("")) { + // HTTP/0.9 + http09 = true; + http11 = false; + keepAlive = false; + } else { + // Unsupported protocol + http11 = false; + error = true; + // Send 505; Unsupported HTTP version + response.setStatus(505); + } + + MessageBytes methodMB = request.method(); + if (methodMB.equals(Constants.GET)) { + methodMB.setString(Constants.GET); + } else if (methodMB.equals(Constants.POST)) { + methodMB.setString(Constants.POST); + } + + MimeHeaders headers = request.getMimeHeaders(); + + // Check connection header + MessageBytes connectionValueMB = headers.getValue("connection"); + if (connectionValueMB != null) { + ByteChunk connectionValueBC = connectionValueMB.getByteChunk(); + if (findBytes(connectionValueBC, Constants.CLOSE_BYTES) != -1) { + keepAlive = false; + } else if (findBytes(connectionValueBC, + Constants.KEEPALIVE_BYTES) != -1) { + keepAlive = true; + } + } + + MessageBytes expectMB = null; + if (http11) + expectMB = headers.getValue("expect"); + if ((expectMB != null) + && (expectMB.indexOfIgnoreCase("100-continue", 0) != -1)) { + inputBuffer.setSwallowInput(false); + expectation = true; + } + + // Check user-agent header + if ((restrictedUserAgents != null) && ((http11) || (keepAlive))) { + MessageBytes userAgentValueMB = headers.getValue("user-agent"); + // Check in the restricted list, and adjust the http11 + // and keepAlive flags accordingly + if(userAgentValueMB != null) { + String userAgentValue = userAgentValueMB.toString(); + for (int i = 0; i < restrictedUserAgents.length; i++) { + if (restrictedUserAgents[i].matcher(userAgentValue).matches()) { + http11 = false; + keepAlive = false; + break; + } + } + } + } + + // Check for a full URI (including protocol://host:port/) + ByteChunk uriBC = request.requestURI().getByteChunk(); + if (uriBC.startsWithIgnoreCase("http", 0)) { + + int pos = uriBC.indexOf("://", 0, 3, 4); + int uriBCStart = uriBC.getStart(); + int slashPos = -1; + if (pos != -1) { + byte[] uriB = uriBC.getBytes(); + slashPos = uriBC.indexOf('/', pos + 3); + if (slashPos == -1) { + slashPos = uriBC.getLength(); + // Set URI as "/" + request.requestURI().setBytes + (uriB, uriBCStart + pos + 1, 1); + } else { + request.requestURI().setBytes + (uriB, uriBCStart + slashPos, + uriBC.getLength() - slashPos); + } + MessageBytes hostMB = headers.setValue("host"); + hostMB.setBytes(uriB, uriBCStart + pos + 3, + slashPos - pos - 3); + } + + } + + // Input filter setup + InputFilter[] inputFilters = inputBuffer.getFilters(); + + // Parse transfer-encoding header + MessageBytes transferEncodingValueMB = null; + if (http11) + transferEncodingValueMB = headers.getValue("transfer-encoding"); + if (transferEncodingValueMB != null) { + String transferEncodingValue = transferEncodingValueMB.toString(); + // Parse the comma separated list. "identity" codings are ignored + int startPos = 0; + int commaPos = transferEncodingValue.indexOf(','); + String encodingName = null; + while (commaPos != -1) { + encodingName = transferEncodingValue.substring + (startPos, commaPos).toLowerCase().trim(); + if (!addInputFilter(inputFilters, encodingName)) { + // Unsupported transfer encoding + error = true; + // 501 - Unimplemented + response.setStatus(501); + } + startPos = commaPos + 1; + commaPos = transferEncodingValue.indexOf(',', startPos); + } + encodingName = transferEncodingValue.substring(startPos) + .toLowerCase().trim(); + if (!addInputFilter(inputFilters, encodingName)) { + // Unsupported transfer encoding + error = true; + // 501 - Unimplemented + response.setStatus(501); + } + } + + // Parse content-length header + long contentLength = request.getContentLengthLong(); + if (contentLength >= 0 && !contentDelimitation) { + inputBuffer.addActiveFilter + (inputFilters[Constants.IDENTITY_FILTER]); + contentDelimitation = true; + } + + MessageBytes valueMB = headers.getValue("host"); + + // Check host header + if (http11 && (valueMB == null)) { + error = true; + // 400 - Bad request + response.setStatus(400); + } + + parseHost(valueMB); + + if (!contentDelimitation) { + // If there's no content length + // (broken HTTP/1.0 or HTTP/1.1), assume + // the client is not broken and didn't send a body + inputBuffer.addActiveFilter + (inputFilters[Constants.VOID_FILTER]); + contentDelimitation = true; + } + + // Advertise sendfile support through a request attribute + if (endpoint.getUseSendfile()) { + request.setAttribute("org.apache.tomcat.sendfile.support", Boolean.FALSE); + } + // Advertise comet support through a request attribute + request.setAttribute("org.apache.tomcat.comet.support", Boolean.TRUE); + + } + + + /** + * Parse host. + */ + public void parseHost(MessageBytes valueMB) { + + if (valueMB == null || valueMB.isNull()) { + // HTTP/1.0 + // Default is what the socket tells us. Overriden if a host is + // found/parsed + request.setServerPort(endpoint.getPort()); + return; + } + + ByteChunk valueBC = valueMB.getByteChunk(); + byte[] valueB = valueBC.getBytes(); + int valueL = valueBC.getLength(); + int valueS = valueBC.getStart(); + int colonPos = -1; + if (hostNameC.length < valueL) { + hostNameC = new char[valueL]; + } + + boolean ipv6 = (valueB[valueS] == '['); + boolean bracketClosed = false; + for (int i = 0; i < valueL; i++) { + char b = (char) valueB[i + valueS]; + hostNameC[i] = b; + if (b == ']') { + bracketClosed = true; + } else if (b == ':') { + if (!ipv6 || bracketClosed) { + colonPos = i; + break; + } + } + } + + if (colonPos < 0) { + if (!ssl) { + // 80 - Default HTTP port + request.setServerPort(80); + } else { + // 443 - Default HTTPS port + request.setServerPort(443); + } + request.serverName().setChars(hostNameC, 0, valueL); + } else { + + request.serverName().setChars(hostNameC, 0, colonPos); + + int port = 0; + int mult = 1; + for (int i = valueL - 1; i > colonPos; i--) { + int charValue = HexUtils.DEC[(int) valueB[i + valueS]]; + if (charValue == -1) { + // Invalid character + error = true; + // 400 - Bad request + response.setStatus(400); + break; + } + port = port + (charValue * mult); + mult = 10 * mult; + } + request.setServerPort(port); + + } + + } + + + /** + * Check for compression + */ + private boolean isCompressable() { + + // Nope Compression could works in HTTP 1.0 also + // cf: mod_deflate + + // Compression only since HTTP 1.1 + // if (! http11) + // return false; + + // Check if browser support gzip encoding + MessageBytes acceptEncodingMB = + request.getMimeHeaders().getValue("accept-encoding"); + + if ((acceptEncodingMB == null) + || (acceptEncodingMB.indexOf("gzip") == -1)) + return false; + + // Check if content is not allready gzipped + MessageBytes contentEncodingMB = + response.getMimeHeaders().getValue("Content-Encoding"); + + if ((contentEncodingMB != null) + && (contentEncodingMB.indexOf("gzip") != -1)) + return false; + + // If force mode, allways compress (test purposes only) + if (compressionLevel == 2) + return true; + + // Check for incompatible Browser + if (noCompressionUserAgents != null) { + MessageBytes userAgentValueMB = + request.getMimeHeaders().getValue("user-agent"); + if(userAgentValueMB != null) { + String userAgentValue = userAgentValueMB.toString(); + + // If one Regexp rule match, disable compression + for (int i = 0; i < noCompressionUserAgents.length; i++) + if (noCompressionUserAgents[i].matcher(userAgentValue).matches()) + return false; + } + } + + // Check if suffisant len to trig the compression + long contentLength = response.getContentLengthLong(); + if ((contentLength == -1) + || (contentLength > compressionMinSize)) { + // Check for compatible MIME-TYPE + if (compressableMimeTypes != null) { + return (startsWithStringArray(compressableMimeTypes, + response.getContentType())); + } + } + + return false; + } + + + /** + * When committing the response, we have to validate the set of headers, as + * well as setup the response filters. + */ + protected void prepareResponse() { + + boolean entityBody = true; + contentDelimitation = false; + + OutputFilter[] outputFilters = outputBuffer.getFilters(); + + if (http09 == true) { + // HTTP/0.9 + outputBuffer.addActiveFilter + (outputFilters[Constants.IDENTITY_FILTER]); + return; + } + + int statusCode = response.getStatus(); + if ((statusCode == 204) || (statusCode == 205) + || (statusCode == 304)) { + // No entity body + outputBuffer.addActiveFilter + (outputFilters[Constants.VOID_FILTER]); + entityBody = false; + contentDelimitation = true; + } + + MessageBytes methodMB = request.method(); + if (methodMB.equals("HEAD")) { + // No entity body + outputBuffer.addActiveFilter + (outputFilters[Constants.VOID_FILTER]); + contentDelimitation = true; + } + + // Sendfile support + if (endpoint.getUseSendfile()) { + String fileName = (String) request.getAttribute("org.apache.tomcat.sendfile.filename"); + if (fileName != null) { + // No entity body sent here + outputBuffer.addActiveFilter + (outputFilters[Constants.VOID_FILTER]); + contentDelimitation = true; + sendfileData = new NioEndpoint.SendfileData(); + sendfileData.fileName = fileName; + sendfileData.start = + ((Long) request.getAttribute("org.apache.tomcat.sendfile.start")).longValue(); + sendfileData.end = + ((Long) request.getAttribute("org.apache.tomcat.sendfile.end")).longValue(); + } + } + + // Check for compression + boolean useCompression = false; + if (entityBody && (compressionLevel > 0) && (sendfileData == null)) { + useCompression = isCompressable(); + // Change content-length to -1 to force chunking + if (useCompression) { + response.setContentLength(-1); + } + } + + MimeHeaders headers = response.getMimeHeaders(); + if (!entityBody) { + response.setContentLength(-1); + } else { + String contentType = response.getContentType(); + if (contentType != null) { + headers.setValue("Content-Type").setString(contentType); + } + String contentLanguage = response.getContentLanguage(); + if (contentLanguage != null) { + headers.setValue("Content-Language") + .setString(contentLanguage); + } + } + + long contentLength = response.getContentLengthLong(); + if (contentLength != -1) { + headers.setValue("Content-Length").setLong(contentLength); + outputBuffer.addActiveFilter + (outputFilters[Constants.IDENTITY_FILTER]); + contentDelimitation = true; + } else { + if (entityBody && http11 && keepAlive) { + outputBuffer.addActiveFilter + (outputFilters[Constants.CHUNKED_FILTER]); + contentDelimitation = true; + headers.addValue(Constants.TRANSFERENCODING).setString(Constants.CHUNKED); + } else { + outputBuffer.addActiveFilter + (outputFilters[Constants.IDENTITY_FILTER]); + } + } + + if (useCompression) { + outputBuffer.addActiveFilter(outputFilters[Constants.GZIP_FILTER]); + headers.setValue("Content-Encoding").setString("gzip"); + // Make Proxies happy via Vary (from mod_deflate) + headers.setValue("Vary").setString("Accept-Encoding"); + } + + // Add date header + headers.setValue("Date").setString(FastHttpDateFormat.getCurrentDate()); + + // FIXME: Add transfer encoding header + + if ((entityBody) && (!contentDelimitation)) { + // Mark as close the connection after the request, and add the + // connection: close header + keepAlive = false; + } + + // If we know that the request is bad this early, add the + // Connection: close header. + keepAlive = keepAlive && !statusDropsConnection(statusCode); + if (!keepAlive) { + headers.addValue(Constants.CONNECTION).setString(Constants.CLOSE); + } else if (!http11 && !error) { + headers.addValue(Constants.CONNECTION).setString(Constants.KEEPALIVE); + } + + // Build the response header + outputBuffer.sendStatus(); + + // Add server header + if (server != null) { + headers.setValue("Server").setString(server); + } else { + outputBuffer.write(Constants.SERVER_BYTES); + } + + int size = headers.size(); + for (int i = 0; i < size; i++) { + outputBuffer.sendHeader(headers.getName(i), headers.getValue(i)); + } + outputBuffer.endHeaders(); + + } + + + /** + * Initialize standard input and output filters. + */ + protected void initializeFilters() { + + // Create and add the identity filters. + inputBuffer.addFilter(new IdentityInputFilter()); + outputBuffer.addFilter(new IdentityOutputFilter()); + + // Create and add the chunked filters. + inputBuffer.addFilter(new ChunkedInputFilter()); + outputBuffer.addFilter(new ChunkedOutputFilter()); + + // Create and add the void filters. + inputBuffer.addFilter(new VoidInputFilter()); + outputBuffer.addFilter(new VoidOutputFilter()); + + // Create and add buffered input filter + inputBuffer.addFilter(new BufferedInputFilter()); + + // Create and add the chunked filters. + //inputBuffer.addFilter(new GzipInputFilter()); + outputBuffer.addFilter(new GzipOutputFilter()); + + } + + + /** + * Add an input filter to the current request. + * + * @return false if the encoding was not found (which would mean it is + * unsupported) + */ + protected boolean addInputFilter(InputFilter[] inputFilters, + String encodingName) { + if (encodingName.equals("identity")) { + // Skip + } else if (encodingName.equals("chunked")) { + inputBuffer.addActiveFilter + (inputFilters[Constants.CHUNKED_FILTER]); + contentDelimitation = true; + } else { + for (int i = 2; i < inputFilters.length; i++) { + if (inputFilters[i].getEncodingName() + .toString().equals(encodingName)) { + inputBuffer.addActiveFilter(inputFilters[i]); + return true; + } + } + return false; + } + return true; + } + + + /** + * Specialized utility method: find a sequence of lower case bytes inside + * a ByteChunk. + */ + protected int findBytes(ByteChunk bc, byte[] b) { + + byte first = b[0]; + byte[] buff = bc.getBuffer(); + int start = bc.getStart(); + int end = bc.getEnd(); + + // Look for first char + int srcEnd = b.length; + + for (int i = start; i <= (end - srcEnd); i++) { + if (Ascii.toLower(buff[i]) != first) continue; + // found first char, now look for a match + int myPos = i+1; + for (int srcPos = 1; srcPos < srcEnd; ) { + if (Ascii.toLower(buff[myPos++]) != b[srcPos++]) + break; + if (srcPos == srcEnd) return i - start; // found it + } + } + return -1; + + } + + /** + * Determine if we must drop the connection because of the HTTP status + * code. Use the same list of codes as Apache/httpd. + */ + protected boolean statusDropsConnection(int status) { + return status == 400 /* SC_BAD_REQUEST */ || + status == 408 /* SC_REQUEST_TIMEOUT */ || + status == 411 /* SC_LENGTH_REQUIRED */ || + status == 413 /* SC_REQUEST_ENTITY_TOO_LARGE */ || + status == 414 /* SC_REQUEST_URI_TOO_LARGE */ || + status == 500 /* SC_INTERNAL_SERVER_ERROR */ || + status == 503 /* SC_SERVICE_UNAVAILABLE */ || + status == 501 /* SC_NOT_IMPLEMENTED */; + } + +} diff --git a/java/org/apache/coyote/http11/Http11NioProtocol.java b/java/org/apache/coyote/http11/Http11NioProtocol.java new file mode 100644 index 000000000..b0d105b47 --- /dev/null +++ b/java/org/apache/coyote/http11/Http11NioProtocol.java @@ -0,0 +1,775 @@ +/* + * Copyright 1999-2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.coyote.http11; + +import java.net.InetAddress; +import java.net.URLEncoder; +import java.nio.channels.SocketChannel; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import javax.management.MBeanRegistration; +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.coyote.ActionCode; +import org.apache.coyote.ActionHook; +import org.apache.coyote.Adapter; +import org.apache.coyote.ProtocolHandler; +import org.apache.coyote.RequestGroupInfo; +import org.apache.coyote.RequestInfo; +import org.apache.tomcat.util.modeler.Registry; +import org.apache.tomcat.util.net.NioEndpoint; +import org.apache.tomcat.util.net.NioEndpoint.Handler; +import org.apache.tomcat.util.res.StringManager; + + +/** + * Abstract the protocol implementation, including threading, etc. + * Processor is single threaded and specific to stream-based protocols, + * will not fit Jk protocols like JNI. + * + * @author Remy Maucherat + * @author Costin Manolache + * @author Filip Hanik + */ +public class Http11NioProtocol implements ProtocolHandler, MBeanRegistration +{ + public Http11NioProtocol() { + cHandler = new Http11ConnectionHandler( this ); + setSoLinger(Constants.DEFAULT_CONNECTION_LINGER); + setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT); + //setServerSoTimeout(Constants.DEFAULT_SERVER_SOCKET_TIMEOUT); + setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY); + } + + /** + * The string manager for this package. + */ + protected static StringManager sm = + StringManager.getManager(Constants.Package); + + /** Pass config info + */ + public void setAttribute( String name, Object value ) { + if( log.isTraceEnabled()) + log.trace(sm.getString("http11protocol.setattribute", name, value)); + + attributes.put(name, value); + } + + public Object getAttribute( String key ) { + if( log.isTraceEnabled()) + log.trace(sm.getString("http11protocol.getattribute", key)); + return attributes.get(key); + } + + public Iterator getAttributeNames() { + return attributes.keySet().iterator(); + } + + /** + * Set a property. + */ + public void setProperty(String name, String value) { + setAttribute(name, value); + } + + /** + * Get a property + */ + public String getProperty(String name) { + return (String)getAttribute(name); + } + + /** The adapter, used to call the connector + */ + public void setAdapter(Adapter adapter) { + this.adapter=adapter; + } + + public Adapter getAdapter() { + return adapter; + } + + + /** Start the protocol + */ + public void init() throws Exception { + ep.setName(getName()); + ep.setHandler(cHandler); + + try { + ep.init(); + } catch (Exception ex) { + log.error(sm.getString("http11protocol.endpoint.initerror"), ex); + throw ex; + } + if(log.isInfoEnabled()) + log.info(sm.getString("http11protocol.init", getName())); + + } + + ObjectName tpOname; + ObjectName rgOname; + + public void start() throws Exception { + if( this.domain != null ) { + try { + tpOname=new ObjectName + (domain + ":" + "type=ThreadPool,name=" + getName()); + Registry.getRegistry(null, null) + .registerComponent(ep, tpOname, null ); + } catch (Exception e) { + log.error("Can't register threadpool" ); + } + rgOname=new ObjectName + (domain + ":type=GlobalRequestProcessor,name=" + getName()); + Registry.getRegistry(null, null).registerComponent + ( cHandler.global, rgOname, null ); + } + + try { + ep.start(); + } catch (Exception ex) { + log.error(sm.getString("http11protocol.endpoint.starterror"), ex); + throw ex; + } + if(log.isInfoEnabled()) + log.info(sm.getString("http11protocol.start", getName())); + } + + public void pause() throws Exception { + try { + ep.pause(); + } catch (Exception ex) { + log.error(sm.getString("http11protocol.endpoint.pauseerror"), ex); + throw ex; + } + if(log.isInfoEnabled()) + log.info(sm.getString("http11protocol.pause", getName())); + } + + public void resume() throws Exception { + try { + ep.resume(); + } catch (Exception ex) { + log.error(sm.getString("http11protocol.endpoint.resumeerror"), ex); + throw ex; + } + if(log.isInfoEnabled()) + log.info(sm.getString("http11protocol.resume", getName())); + } + + public void destroy() throws Exception { + if(log.isInfoEnabled()) + log.info(sm.getString("http11protocol.stop", getName())); + ep.destroy(); + if( tpOname!=null ) + Registry.getRegistry(null, null).unregisterComponent(tpOname); + if( rgOname != null ) + Registry.getRegistry(null, null).unregisterComponent(rgOname); + } + + // -------------------- Properties-------------------- + protected NioEndpoint ep=new NioEndpoint(); + protected boolean secure; + + protected Hashtable attributes = new Hashtable(); + + private int maxKeepAliveRequests=100; // as in Apache HTTPD server + private int timeout = 300000; // 5 minutes as in Apache HTTPD server + private int maxSavePostSize = 4 * 1024; + private int maxHttpHeaderSize = 8 * 1024; + private int socketCloseDelay=-1; + private boolean disableUploadTimeout = true; + private int socketBuffer = 9000; + private Adapter adapter; + private Http11ConnectionHandler cHandler; + + /** + * Compression value. + */ + private String compression = "off"; + private String noCompressionUserAgents = null; + private String restrictedUserAgents = null; + private String compressableMimeTypes = "text/html,text/xml,text/plain"; + private int compressionMinSize = 2048; + + private String server; + + // -------------------- Pool setup -------------------- + + // * + public Executor getExecutor() { + return ep.getExecutor(); + } + + // * + public void setExecutor(Executor executor) { + ep.setExecutor(executor); + } + + public int getMaxThreads() { + return ep.getMaxThreads(); + } + + public void setMaxThreads( int maxThreads ) { + ep.setMaxThreads(maxThreads); + setAttribute("maxThreads", "" + maxThreads); + } + + public void setThreadPriority(int threadPriority) { + ep.setThreadPriority(threadPriority); + setAttribute("threadPriority", "" + threadPriority); + } + + public int getThreadPriority() { + return ep.getThreadPriority(); + } + + // -------------------- Tcp setup -------------------- + + public int getBacklog() { + return ep.getBacklog(); + } + + public void setBacklog( int i ) { + ep.setBacklog(i); + setAttribute("backlog", "" + i); + } + + public int getPort() { + return ep.getPort(); + } + + public void setPort( int port ) { + ep.setPort(port); + setAttribute("port", "" + port); + } + + public int getFirstReadTimeout() { + return ep.getFirstReadTimeout(); + } + + public void setFirstReadTimeout( int i ) { + ep.setFirstReadTimeout(i); + setAttribute("firstReadTimeout", "" + i); + } + + public int getPollTime() { + return ep.getPollTime(); + } + + public void setPollTime( int i ) { + ep.setPollTime(i); + setAttribute("pollTime", "" + i); + } + + public void setPollerSize(int i) { + ep.setPollerSize(i); + setAttribute("pollerSize", "" + i); + } + + public int getPollerSize() { + return ep.getPollerSize(); + } + + public void setSendfileSize(int i) { + ep.setSendfileSize(i); + setAttribute("sendfileSize", "" + i); + } + + public int getSendfileSize() { + return ep.getSendfileSize(); + } + + public boolean getUseSendfile() { + return ep.getUseSendfile(); + } + + public void setUseSendfile(boolean useSendfile) { + ep.setUseSendfile(useSendfile); + } + + public InetAddress getAddress() { + return ep.getAddress(); + } + + public void setAddress(InetAddress ia) { + ep.setAddress( ia ); + setAttribute("address", "" + ia); + } + + public String getName() { + String encodedAddr = ""; + if (getAddress() != null) { + encodedAddr = "" + getAddress(); + if (encodedAddr.startsWith("/")) + encodedAddr = encodedAddr.substring(1); + encodedAddr = URLEncoder.encode(encodedAddr) + "-"; + } + return ("http-" + encodedAddr + ep.getPort()); + } + + public boolean getTcpNoDelay() { + return ep.getTcpNoDelay(); + } + + public void setTcpNoDelay( boolean b ) { + ep.setTcpNoDelay( b ); + setAttribute("tcpNoDelay", "" + b); + } + + public boolean getDisableUploadTimeout() { + return disableUploadTimeout; + } + + public void setDisableUploadTimeout(boolean isDisabled) { + disableUploadTimeout = isDisabled; + } + + public int getSocketBuffer() { + return socketBuffer; + } + + public void setSocketBuffer(int valueI) { + socketBuffer = valueI; + } + + public String getCompression() { + return compression; + } + + public void setCompression(String valueS) { + compression = valueS; + setAttribute("compression", valueS); + } + + public int getMaxSavePostSize() { + return maxSavePostSize; + } + + public void setMaxSavePostSize(int valueI) { + maxSavePostSize = valueI; + setAttribute("maxSavePostSize", "" + valueI); + } + + public int getMaxHttpHeaderSize() { + return maxHttpHeaderSize; + } + + public void setMaxHttpHeaderSize(int valueI) { + maxHttpHeaderSize = valueI; + setAttribute("maxHttpHeaderSize", "" + valueI); + } + + public String getRestrictedUserAgents() { + return restrictedUserAgents; + } + + public void setRestrictedUserAgents(String valueS) { + restrictedUserAgents = valueS; + setAttribute("restrictedUserAgents", valueS); + } + + public String getNoCompressionUserAgents() { + return noCompressionUserAgents; + } + + public void setNoCompressionUserAgents(String valueS) { + noCompressionUserAgents = valueS; + setAttribute("noCompressionUserAgents", valueS); + } + + public String getCompressableMimeType() { + return compressableMimeTypes; + } + + public void setCompressableMimeType(String valueS) { + compressableMimeTypes = valueS; + setAttribute("compressableMimeTypes", valueS); + } + + public int getCompressionMinSize() { + return compressionMinSize; + } + + public void setCompressionMinSize(int valueI) { + compressionMinSize = valueI; + setAttribute("compressionMinSize", "" + valueI); + } + + public int getSoLinger() { + return ep.getSoLinger(); + } + + public void setSoLinger( int i ) { + ep.setSoLinger( i ); + setAttribute("soLinger", "" + i); + } + + public int getSoTimeout() { + return ep.getSoTimeout(); + } + + public void setSoTimeout( int i ) { + ep.setSoTimeout(i); + setAttribute("soTimeout", "" + i); + } + + public String getProtocol() { + return getProperty("protocol"); + } + + public void setProtocol( String k ) { + setSecure(true); + setAttribute("protocol", k); + } + + public boolean getSecure() { + return secure; + } + + public void setSecure( boolean b ) { + secure=b; + setAttribute("secure", "" + b); + } + + public int getMaxKeepAliveRequests() { + return maxKeepAliveRequests; + } + + /** Set the maximum number of Keep-Alive requests that we will honor. + */ + public void setMaxKeepAliveRequests(int mkar) { + maxKeepAliveRequests = mkar; + setAttribute("maxKeepAliveRequests", "" + mkar); + } + + /** + * Return the Keep-Alive policy for the connection. + */ + public boolean getKeepAlive() { + return ((maxKeepAliveRequests != 0) && (maxKeepAliveRequests != 1)); + } + + /** + * Set the keep-alive policy for this connection. + */ + public void setKeepAlive(boolean keepAlive) { + if (!keepAlive) { + setMaxKeepAliveRequests(1); + } + } + + public int getSocketCloseDelay() { + return socketCloseDelay; + } + + public void setSocketCloseDelay( int d ) { + socketCloseDelay=d; + setAttribute("socketCloseDelay", "" + d); + } + + public void setServer( String server ) { + this.server = server; + } + + public String getServer() { + return server; + } + + public int getTimeout() { + return timeout; + } + + public void setTimeout( int timeouts ) { + timeout = timeouts; + setAttribute("timeout", "" + timeouts); + } + + // -------------------- SSL related properties -------------------- + + /** + * SSL engine. + */ + public String getSSLEngine() { return ep.getSSLEngine(); } + public void setSSLEngine(String SSLEngine) { ep.setSSLEngine(SSLEngine); } + + + /** + * SSL protocol. + */ + public String getSSLProtocol() { return ep.getSSLProtocol(); } + public void setSSLProtocol(String SSLProtocol) { ep.setSSLProtocol(SSLProtocol); } + + + /** + * SSL password (if a cert is encrypted, and no password has been provided, a callback + * will ask for a password). + */ + public String getSSLPassword() { return ep.getSSLPassword(); } + public void setSSLPassword(String SSLPassword) { ep.setSSLPassword(SSLPassword); } + + + /** + * SSL cipher suite. + */ + public String getSSLCipherSuite() { return ep.getSSLCipherSuite(); } + public void setSSLCipherSuite(String SSLCipherSuite) { ep.setSSLCipherSuite(SSLCipherSuite); } + + + /** + * SSL certificate file. + */ + public String getSSLCertificateFile() { return ep.getSSLCertificateFile(); } + public void setSSLCertificateFile(String SSLCertificateFile) { ep.setSSLCertificateFile(SSLCertificateFile); } + + + /** + * SSL certificate key file. + */ + public String getSSLCertificateKeyFile() { return ep.getSSLCertificateKeyFile(); } + public void setSSLCertificateKeyFile(String SSLCertificateKeyFile) { ep.setSSLCertificateKeyFile(SSLCertificateKeyFile); } + + + /** + * SSL certificate chain file. + */ + public String getSSLCertificateChainFile() { return ep.getSSLCertificateChainFile(); } + public void setSSLCertificateChainFile(String SSLCertificateChainFile) { ep.setSSLCertificateChainFile(SSLCertificateChainFile); } + + + /** + * SSL CA certificate path. + */ + public String getSSLCACertificatePath() { return ep.getSSLCACertificatePath(); } + public void setSSLCACertificatePath(String SSLCACertificatePath) { ep.setSSLCACertificatePath(SSLCACertificatePath); } + + + /** + * SSL CA certificate file. + */ + public String getSSLCACertificateFile() { return ep.getSSLCACertificateFile(); } + public void setSSLCACertificateFile(String SSLCACertificateFile) { ep.setSSLCACertificateFile(SSLCACertificateFile); } + + + /** + * SSL CA revocation path. + */ + public String getSSLCARevocationPath() { return ep.getSSLCARevocationPath(); } + public void setSSLCARevocationPath(String SSLCARevocationPath) { ep.setSSLCARevocationPath(SSLCARevocationPath); } + + + /** + * SSL CA revocation file. + */ + public String getSSLCARevocationFile() { return ep.getSSLCARevocationFile(); } + public void setSSLCARevocationFile(String SSLCARevocationFile) { ep.setSSLCARevocationFile(SSLCARevocationFile); } + + + /** + * SSL verify client. + */ + public String getSSLVerifyClient() { return ep.getSSLVerifyClient(); } + public void setSSLVerifyClient(String SSLVerifyClient) { ep.setSSLVerifyClient(SSLVerifyClient); } + + + /** + * SSL verify depth. + */ + public int getSSLVerifyDepth() { return ep.getSSLVerifyDepth(); } + public void setSSLVerifyDepth(int SSLVerifyDepth) { ep.setSSLVerifyDepth(SSLVerifyDepth); } + + // -------------------- Connection handler -------------------- + + static class Http11ConnectionHandler implements Handler { + + protected Http11NioProtocol proto; + protected static int count = 0; + protected RequestGroupInfo global = new RequestGroupInfo(); + + protected ThreadLocal localProcessor = + new ThreadLocal(); + protected ConcurrentHashMap connections = + new ConcurrentHashMap(); + protected java.util.Stack recycledProcessors = + new java.util.Stack(); + + Http11ConnectionHandler(Http11NioProtocol proto) { + this.proto = proto; + } + + public SocketState event(SocketChannel socket, boolean error) { + Http11NioProcessor result = connections.get(socket); + + SocketState state = SocketState.CLOSED; + if (result != null) { + boolean recycle = error; + // Call the appropriate event + try { + state = result.event(error); + } catch (java.net.SocketException e) { + // SocketExceptions are normal + Http11NioProtocol.log.debug + (sm.getString + ("http11protocol.proto.socketexception.debug"), e); + } catch (java.io.IOException e) { + // IOExceptions are normal + Http11NioProtocol.log.debug + (sm.getString + ("http11protocol.proto.ioexception.debug"), e); + } + // Future developers: if you discover any other + // rare-but-nonfatal exceptions, catch them here, and log as + // above. + catch (Throwable e) { + // any other exception or error is odd. Here we log it + // with "ERROR" level, so it will show up even on + // less-than-verbose logs. + Http11NioProtocol.log.error + (sm.getString("http11protocol.proto.error"), e); + } finally { + if (state != SocketState.LONG) { + connections.remove(socket); + recycledProcessors.push(result); + } + } + } + return state; + } + + public SocketState process(SocketChannel socket) { + Http11NioProcessor processor = null; + try { + processor = (Http11NioProcessor) localProcessor.get(); + if (processor == null) { + synchronized (recycledProcessors) { + if (!recycledProcessors.isEmpty()) { + processor = recycledProcessors.pop(); + localProcessor.set(processor); + } + } + } + if (processor == null) { + processor = + new Http11NioProcessor(proto.maxHttpHeaderSize, proto.ep); + processor.setAdapter(proto.adapter); + processor.setMaxKeepAliveRequests(proto.maxKeepAliveRequests); + processor.setTimeout(proto.timeout); + processor.setDisableUploadTimeout(proto.disableUploadTimeout); + processor.setCompression(proto.compression); + processor.setCompressionMinSize(proto.compressionMinSize); + processor.setNoCompressionUserAgents(proto.noCompressionUserAgents); + processor.setCompressableMimeTypes(proto.compressableMimeTypes); + processor.setRestrictedUserAgents(proto.restrictedUserAgents); + processor.setSocketBuffer(proto.socketBuffer); + processor.setMaxSavePostSize(proto.maxSavePostSize); + processor.setServer(proto.server); + localProcessor.set(processor); + if (proto.getDomain() != null) { + synchronized (this) { + try { + RequestInfo rp = processor.getRequest().getRequestProcessor(); + rp.setGlobalProcessor(global); + ObjectName rpName = new ObjectName + (proto.getDomain() + ":type=RequestProcessor,worker=" + + proto.getName() + ",name=HttpRequest" + count++); + Registry.getRegistry(null, null).registerComponent(rp, rpName, null); + } catch (Exception e) { + log.warn("Error registering request"); + } + } + } + } + + if (processor instanceof ActionHook) { + ((ActionHook) processor).action(ActionCode.ACTION_START, null); + } + + SocketState state = processor.process(socket); + if (state == SocketState.LONG) { + // Associate the connection with the processor. The next request + // processed by this thread will use either a new or a recycled + // processor. + connections.put(socket, processor); + localProcessor.set(null); + proto.ep.getCometPoller().add(socket); + } + return state; + + } catch (java.net.SocketException e) { + // SocketExceptions are normal + Http11NioProtocol.log.debug + (sm.getString + ("http11protocol.proto.socketexception.debug"), e); + } catch (java.io.IOException e) { + // IOExceptions are normal + Http11NioProtocol.log.debug + (sm.getString + ("http11protocol.proto.ioexception.debug"), e); + } + // Future developers: if you discover any other + // rare-but-nonfatal exceptions, catch them here, and log as + // above. + catch (Throwable e) { + // any other exception or error is odd. Here we log it + // with "ERROR" level, so it will show up even on + // less-than-verbose logs. + Http11NioProtocol.log.error + (sm.getString("http11protocol.proto.error"), e); + } + return SocketState.CLOSED; + } + } + + protected static org.apache.commons.logging.Log log + = org.apache.commons.logging.LogFactory.getLog(Http11NioProtocol.class); + + // -------------------- Various implementation classes -------------------- + + protected String domain; + protected ObjectName oname; + protected MBeanServer mserver; + + public ObjectName getObjectName() { + return oname; + } + + public String getDomain() { + return domain; + } + + public ObjectName preRegister(MBeanServer server, + ObjectName name) throws Exception { + oname=name; + mserver=server; + domain=name.getDomain(); + return name; + } + + public void postRegister(Boolean registrationDone) { + } + + public void preDeregister() throws Exception { + } + + public void postDeregister() { + } +} diff --git a/java/org/apache/coyote/http11/InternalNioInputBuffer.java b/java/org/apache/coyote/http11/InternalNioInputBuffer.java new file mode 100644 index 000000000..2f229efbd --- /dev/null +++ b/java/org/apache/coyote/http11/InternalNioInputBuffer.java @@ -0,0 +1,827 @@ +/* + * Copyright 1999-2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.coyote.http11; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +import org.apache.coyote.InputBuffer; +import org.apache.coyote.Request; +import org.apache.tomcat.util.buf.ByteChunk; +import org.apache.tomcat.util.buf.MessageBytes; +import org.apache.tomcat.util.http.MimeHeaders; +import org.apache.tomcat.util.res.StringManager; + +/** + * Implementation of InputBuffer which provides HTTP request header parsing as + * well as transfer decoding. + * + * @author Remy Maucherat + * @author Filip Hanik + */ +public class InternalNioInputBuffer implements InputBuffer { + + + // -------------------------------------------------------------- Constants + + + // ----------------------------------------------------------- Constructors + + + /** + * Alternate constructor. + */ + public InternalNioInputBuffer(Request request, int headerBufferSize, + long readTimeout) { + + this.request = request; + headers = request.getMimeHeaders(); + + buf = new byte[headerBufferSize]; + if (headerBufferSize < (8 * 1024)) { + bbuf = ByteBuffer.allocateDirect(6 * 1500); + } else { + bbuf = ByteBuffer.allocateDirect((headerBufferSize / 1500 + 1) * 1500); + } + + inputStreamInputBuffer = new SocketInputBuffer(); + + filterLibrary = new InputFilter[0]; + activeFilters = new InputFilter[0]; + lastActiveFilter = -1; + + parsingHeader = true; + swallowInput = true; + + if (readTimeout < 0) { + this.readTimeout = -1; + } else { + this.readTimeout = readTimeout; + } + + } + + + // -------------------------------------------------------------- Variables + + + /** + * The string manager for this package. + */ + protected static StringManager sm = + StringManager.getManager(Constants.Package); + + + // ----------------------------------------------------- Instance Variables + + + /** + * Associated Coyote request. + */ + protected Request request; + + + /** + * Headers of the associated request. + */ + protected MimeHeaders headers; + + + /** + * State. + */ + protected boolean parsingHeader; + + + /** + * Swallow input ? (in the case of an expectation) + */ + protected boolean swallowInput; + + + /** + * Pointer to the current read buffer. + */ + protected byte[] buf; + + + /** + * Last valid byte. + */ + protected int lastValid; + + + /** + * Position in the buffer. + */ + protected int pos; + + + /** + * Pos of the end of the header in the buffer, which is also the + * start of the body. + */ + protected int end; + + + /** + * Direct byte buffer used to perform actual reading. + */ + protected ByteBuffer bbuf; + + + /** + * Underlying socket. + */ + protected SocketChannel socket; + + + /** + * Underlying input buffer. + */ + protected InputBuffer inputStreamInputBuffer; + + + /** + * Filter library. + * Note: Filter[0] is always the "chunked" filter. + */ + protected InputFilter[] filterLibrary; + + + /** + * Active filters (in order). + */ + protected InputFilter[] activeFilters; + + + /** + * Index of the last active filter. + */ + protected int lastActiveFilter; + + + /** + * The socket timeout used when reading the first block of the request + * header. + */ + protected long readTimeout; + + + // ------------------------------------------------------------- Properties + + + /** + * Set the underlying socket. + */ + public void setSocket(SocketChannel socket) { + this.socket = socket; + } + + + /** + * Get the underlying socket input stream. + */ + public SocketChannel getSocket() { + return socket; + } + + + /** + * Add an input filter to the filter library. + */ + public void addFilter(InputFilter filter) { + + InputFilter[] newFilterLibrary = + new InputFilter[filterLibrary.length + 1]; + for (int i = 0; i < filterLibrary.length; i++) { + newFilterLibrary[i] = filterLibrary[i]; + } + newFilterLibrary[filterLibrary.length] = filter; + filterLibrary = newFilterLibrary; + + activeFilters = new InputFilter[filterLibrary.length]; + + } + + + /** + * Get filters. + */ + public InputFilter[] getFilters() { + + return filterLibrary; + + } + + + /** + * Clear filters. + */ + public void clearFilters() { + + filterLibrary = new InputFilter[0]; + lastActiveFilter = -1; + + } + + + /** + * Add an input filter to the filter library. + */ + public void addActiveFilter(InputFilter filter) { + + if (lastActiveFilter == -1) { + filter.setBuffer(inputStreamInputBuffer); + } else { + for (int i = 0; i <= lastActiveFilter; i++) { + if (activeFilters[i] == filter) + return; + } + filter.setBuffer(activeFilters[lastActiveFilter]); + } + + activeFilters[++lastActiveFilter] = filter; + + filter.setRequest(request); + + } + + + /** + * Set the swallow input flag. + */ + public void setSwallowInput(boolean swallowInput) { + this.swallowInput = swallowInput; + } + + + // --------------------------------------------------------- Public Methods + + + /** + * Recycle the input buffer. This should be called when closing the + * connection. + */ + public void recycle() { + + // Recycle Request object + request.recycle(); + + socket = null; + lastValid = 0; + pos = 0; + lastActiveFilter = -1; + parsingHeader = true; + swallowInput = true; + + } + + + /** + * End processing of current HTTP request. + * Note: All bytes of the current request should have been already + * consumed. This method only resets all the pointers so that we are ready + * to parse the next HTTP request. + */ + public void nextRequest() { + + // Recycle Request object + request.recycle(); + + //System.out.println("LV-pos: " + (lastValid - pos)); + // Copy leftover bytes to the beginning of the buffer + if (lastValid - pos > 0) { + int npos = 0; + int opos = pos; + while (lastValid - opos > opos - npos) { + System.arraycopy(buf, opos, buf, npos, opos - npos); + npos += pos; + opos += pos; + } + System.arraycopy(buf, opos, buf, npos, lastValid - opos); + } + + // Recycle filters + for (int i = 0; i <= lastActiveFilter; i++) { + activeFilters[i].recycle(); + } + + // Reset pointers + lastValid = lastValid - pos; + pos = 0; + lastActiveFilter = -1; + parsingHeader = true; + swallowInput = true; + + } + + + /** + * End request (consumes leftover bytes). + * + * @throws IOException an undelying I/O error occured + */ + public void endRequest() + throws IOException { + + if (swallowInput && (lastActiveFilter != -1)) { + int extraBytes = (int) activeFilters[lastActiveFilter].end(); + pos = pos - extraBytes; + } + + } + + + /** + * Read the request line. This function is meant to be used during the + * HTTP request header parsing. Do NOT attempt to read the request body + * using it. + * + * @throws IOException If an exception occurs during the underlying socket + * read operations, or if the given buffer is not big enough to accomodate + * the whole line. + * @return true if data is properly fed; false if no data is available + * immediately and thread should be freed + */ + public boolean parseRequestLine(boolean useAvailableData) + throws IOException { + + int start = 0; + + // + // Skipping blank lines + // + + byte chr = 0; + do { + + // Read new bytes if needed + if (pos >= lastValid) { + if (useAvailableData) { + return false; + } + if (readTimeout == -1) { + if (!fill()) + throw new EOFException(sm.getString("iib.eof.error")); + } else { + // Do a simple read with a short timeout + if ( !readSocket(true) ) return false; + } + } + + chr = buf[pos++]; + + } while ((chr == Constants.CR) || (chr == Constants.LF)); + + pos--; + + // Mark the current buffer position + start = pos; + + if (pos >= lastValid) { + if (useAvailableData) { + return false; + } + if (readTimeout == -1) { + if (!fill()) + throw new EOFException(sm.getString("iib.eof.error")); + } else { + // Do a simple read with a short timeout + if ( !readSocket(true) ) return false; + } + } + + // + // Reading the method name + // Method name is always US-ASCII + // + + boolean space = false; + + while (!space) { + + // Read new bytes if needed + if (pos >= lastValid) { + if (!fill()) + throw new EOFException(sm.getString("iib.eof.error")); + } + + if (buf[pos] == Constants.SP) { + space = true; + request.method().setBytes(buf, start, pos - start); + } + + pos++; + + } + + // Mark the current buffer position + start = pos; + int end = 0; + int questionPos = -1; + + // + // Reading the URI + // + + space = false; + boolean eol = false; + + while (!space) { + + // Read new bytes if needed + if (pos >= lastValid) { + if (!fill()) + throw new EOFException(sm.getString("iib.eof.error")); + } + + if (buf[pos] == Constants.SP) { + space = true; + end = pos; + } else if ((buf[pos] == Constants.CR) + || (buf[pos] == Constants.LF)) { + // HTTP/0.9 style request + eol = true; + space = true; + end = pos; + } else if ((buf[pos] == Constants.QUESTION) + && (questionPos == -1)) { + questionPos = pos; + } + + pos++; + + } + + request.unparsedURI().setBytes(buf, start, end - start); + if (questionPos >= 0) { + request.queryString().setBytes(buf, questionPos + 1, + end - questionPos - 1); + request.requestURI().setBytes(buf, start, questionPos - start); + } else { + request.requestURI().setBytes(buf, start, end - start); + } + + // Mark the current buffer position + start = pos; + end = 0; + + // + // Reading the protocol + // Protocol is always US-ASCII + // + + while (!eol) { + + // Read new bytes if needed + if (pos >= lastValid) { + if (!fill()) + throw new EOFException(sm.getString("iib.eof.error")); + } + + if (buf[pos] == Constants.CR) { + end = pos; + } else if (buf[pos] == Constants.LF) { + if (end == 0) + end = pos; + eol = true; + } + + pos++; + + } + + if ((end - start) > 0) { + request.protocol().setBytes(buf, start, end - start); + } else { + request.protocol().setString(""); + } + + return true; + + } + + private void expand(int newsize) { + if ( newsize > buf.length ) { + byte[] tmp = new byte[newsize]; + System.arraycopy(buf,0,tmp,0,buf.length); + buf = tmp; + tmp = null; + } + } + /** + * Perform blocking read with a timeout if desired + * @param timeout boolean - set to true if the system will time out + * @return boolean - true if data was read, false is EOF is reached + * @throws IOException + */ + private boolean readSocket(boolean timeout) throws IOException { + int nRead = 0; + long start = System.currentTimeMillis(); + boolean timedOut = false; + do { + bbuf.clear(); + nRead = socket.read(bbuf); + if (nRead > 0) { + bbuf.flip(); + bbuf.limit(nRead); + expand(nRead + pos); + bbuf.get(buf, pos, nRead); + lastValid = pos + nRead; + return true; + } else if (nRead == -1) { + return false; + } + timedOut = (readTimeout != -1) && ((System.currentTimeMillis()-start)>this.readTimeout); + if ( !timedOut && nRead == 0 ) try {Thread.sleep(25);}catch ( Exception x ) {} + }while ( nRead == 0 && (!timedOut) ); + //else throw new IOException(sm.getString("iib.failedread")); + return false; //timeout + } + + + /** + * Parse the HTTP headers. + */ + public void parseHeaders() + throws IOException { + + while (parseHeader()) { + } + + parsingHeader = false; + end = pos; + + } + + + /** + * Parse an HTTP header. + * + * @return false after reading a blank line (which indicates that the + * HTTP header parsing is done + */ + public boolean parseHeader() + throws IOException { + + // + // Check for blank line + // + + byte chr = 0; + while (true) { + + // Read new bytes if needed + if (pos >= lastValid) { + if (!fill()) + throw new EOFException(sm.getString("iib.eof.error")); + } + + chr = buf[pos]; + + if ((chr == Constants.CR) || (chr == Constants.LF)) { + if (chr == Constants.LF) { + pos++; + return false; + } + } else { + break; + } + + pos++; + + } + + // Mark the current buffer position + int start = pos; + + // + // Reading the header name + // Header name is always US-ASCII + // + + boolean colon = false; + MessageBytes headerValue = null; + + while (!colon) { + + // Read new bytes if needed + if (pos >= lastValid) { + if (!fill()) + throw new EOFException(sm.getString("iib.eof.error")); + } + + if (buf[pos] == Constants.COLON) { + colon = true; + headerValue = headers.addValue(buf, start, pos - start); + } + chr = buf[pos]; + if ((chr >= Constants.A) && (chr <= Constants.Z)) { + buf[pos] = (byte) (chr - Constants.LC_OFFSET); + } + + pos++; + + } + + // Mark the current buffer position + start = pos; + int realPos = pos; + + // + // Reading the header value (which can be spanned over multiple lines) + // + + boolean eol = false; + boolean validLine = true; + + while (validLine) { + + boolean space = true; + + // Skipping spaces + while (space) { + + // Read new bytes if needed + if (pos >= lastValid) { + if (!fill()) + throw new EOFException(sm.getString("iib.eof.error")); + } + + if ((buf[pos] == Constants.SP) || (buf[pos] == Constants.HT)) { + pos++; + } else { + space = false; + } + + } + + int lastSignificantChar = realPos; + + // Reading bytes until the end of the line + while (!eol) { + + // Read new bytes if needed + if (pos >= lastValid) { + if (!fill()) + throw new EOFException(sm.getString("iib.eof.error")); + } + + if (buf[pos] == Constants.CR) { + } else if (buf[pos] == Constants.LF) { + eol = true; + } else if (buf[pos] == Constants.SP) { + buf[realPos] = buf[pos]; + realPos++; + } else { + buf[realPos] = buf[pos]; + realPos++; + lastSignificantChar = realPos; + } + + pos++; + + } + + realPos = lastSignificantChar; + + // Checking the first character of the new line. If the character + // is a LWS, then it's a multiline header + + // Read new bytes if needed + if (pos >= lastValid) { + if (!fill()) + throw new EOFException(sm.getString("iib.eof.error")); + } + + chr = buf[pos]; + if ((chr != Constants.SP) && (chr != Constants.HT)) { + validLine = false; + } else { + eol = false; + // Copying one extra space in the buffer (since there must + // be at least one space inserted between the lines) + buf[realPos] = chr; + realPos++; + } + + } + + // Set the header value + headerValue.setBytes(buf, start, realPos - start); + + return true; + + } + + + // ---------------------------------------------------- InputBuffer Methods + + + /** + * Read some bytes. + */ + public int doRead(ByteChunk chunk, Request req) + throws IOException { + + if (lastActiveFilter == -1) + return inputStreamInputBuffer.doRead(chunk, req); + else + return activeFilters[lastActiveFilter].doRead(chunk,req); + + } + + + // ------------------------------------------------------ Protected Methods + + + /** + * Fill the internal buffer using data from the undelying input stream. + * + * @return false if at end of stream + */ + protected boolean fill() + throws IOException { + + boolean read = false; + + if (parsingHeader) { + + if (lastValid == buf.length) { + throw new IOException + (sm.getString("iib.requestheadertoolarge.error")); + } + + // Do a simple read with a short timeout + read = readSocket(true); + } else { + + if (buf.length - end < 4500) { + // In this case, the request header was really large, so we allocate a + // brand new one; the old one will get GCed when subsequent requests + // clear all references + buf = new byte[buf.length]; + end = 0; + } + pos = end; + lastValid = pos; + // Do a simple read with a short timeout + read = readSocket(true); + } + return read; + } + + + // ------------------------------------- InputStreamInputBuffer Inner Class + + + /** + * This class is an input buffer which will read its data from an input + * stream. + */ + protected class SocketInputBuffer + implements InputBuffer { + + + /** + * Read bytes into the specified chunk. + */ + public int doRead(ByteChunk chunk, Request req ) + throws IOException { + + if (pos >= lastValid) { + if (!fill()) + return -1; + } + + int length = lastValid - pos; + chunk.setBytes(buf, pos, length); + pos = lastValid; + + return (length); + + } + + + } + + +} diff --git a/java/org/apache/coyote/http11/InternalNioOutputBuffer.java b/java/org/apache/coyote/http11/InternalNioOutputBuffer.java new file mode 100644 index 000000000..3dfb331a4 --- /dev/null +++ b/java/org/apache/coyote/http11/InternalNioOutputBuffer.java @@ -0,0 +1,783 @@ +/* + * Copyright 1999-2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.coyote.http11; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +import org.apache.coyote.ActionCode; +import org.apache.coyote.OutputBuffer; +import org.apache.coyote.Response; +import org.apache.tomcat.util.buf.ByteChunk; +import org.apache.tomcat.util.buf.CharChunk; +import org.apache.tomcat.util.buf.MessageBytes; +import org.apache.tomcat.util.http.HttpMessages; +import org.apache.tomcat.util.http.MimeHeaders; +import org.apache.tomcat.util.res.StringManager; +import java.nio.channels.SelectionKey; +import org.apache.tomcat.util.net.NioEndpoint; +import java.nio.channels.Selector; + +/** + * Output buffer. + * + * @author Remy Maucherat + * @author Filip Hanik + */ +public class InternalNioOutputBuffer + implements OutputBuffer { + + + // -------------------------------------------------------------- Constants + + + // ----------------------------------------------------------- Constructors + int bbufLimit = 0; + + Selector selector; + + /** + * Default constructor. + */ + public InternalNioOutputBuffer(Response response) { + this(response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE); + } + + + /** + * Alternate constructor. + */ + public InternalNioOutputBuffer(Response response, int headerBufferSize) { + + this.response = response; + headers = response.getMimeHeaders(); + + buf = new byte[headerBufferSize]; + + if (headerBufferSize < (8 * 1024)) { + bbufLimit = 6 * 1500; + } else { + bbufLimit = (headerBufferSize / 1500 + 1) * 1500; + } + bbuf = ByteBuffer.allocateDirect(bbufLimit); + + outputStreamOutputBuffer = new SocketOutputBuffer(); + + filterLibrary = new OutputFilter[0]; + activeFilters = new OutputFilter[0]; + lastActiveFilter = -1; + + committed = false; + finished = false; + + // Cause loading of HttpMessages + HttpMessages.getMessage(200); + + } + + + // -------------------------------------------------------------- Variables + + + /** + * The string manager for this package. + */ + protected static StringManager sm = + StringManager.getManager(Constants.Package); + + + // ----------------------------------------------------- Instance Variables + + + /** + * Associated Coyote response. + */ + protected Response response; + + + /** + * Headers of the associated request. + */ + protected MimeHeaders headers; + + + /** + * Committed flag. + */ + protected boolean committed; + + + /** + * Finished flag. + */ + protected boolean finished; + + + /** + * Pointer to the current write buffer. + */ + protected byte[] buf; + + + /** + * Position in the buffer. + */ + protected int pos; + + + /** + * Underlying socket. + */ + protected SocketChannel socket; + + + /** + * Underlying output buffer. + */ + protected OutputBuffer outputStreamOutputBuffer; + + + /** + * Filter library. + * Note: Filter[0] is always the "chunked" filter. + */ + protected OutputFilter[] filterLibrary; + + + /** + * Active filter (which is actually the top of the pipeline). + */ + protected OutputFilter[] activeFilters; + + + /** + * Index of the last active filter. + */ + protected int lastActiveFilter; + + + /** + * Direct byte buffer used for writing. + */ + protected ByteBuffer bbuf = null; + + + // ------------------------------------------------------------- Properties + + + /** + * Set the underlying socket. + */ + public void setSocket(SocketChannel socket) { + this.socket = socket; + } + + public void setSelector(Selector selector) { + this.selector = selector; + } + + /** + * Get the underlying socket input stream. + */ + public SocketChannel getSocket() { + return socket; + } + /** + * Set the socket buffer size. + */ + public void setSocketBuffer(int socketBufferSize) { + // FIXME: Remove + } + + + /** + * Add an output filter to the filter library. + */ + public void addFilter(OutputFilter filter) { + + OutputFilter[] newFilterLibrary = + new OutputFilter[filterLibrary.length + 1]; + for (int i = 0; i < filterLibrary.length; i++) { + newFilterLibrary[i] = filterLibrary[i]; + } + newFilterLibrary[filterLibrary.length] = filter; + filterLibrary = newFilterLibrary; + + activeFilters = new OutputFilter[filterLibrary.length]; + + } + + + /** + * Get filters. + */ + public OutputFilter[] getFilters() { + + return filterLibrary; + + } + + + /** + * Clear filters. + */ + public void clearFilters() { + + filterLibrary = new OutputFilter[0]; + lastActiveFilter = -1; + + } + + + /** + * Add an output filter to the filter library. + */ + public void addActiveFilter(OutputFilter filter) { + + if (lastActiveFilter == -1) { + filter.setBuffer(outputStreamOutputBuffer); + } else { + for (int i = 0; i <= lastActiveFilter; i++) { + if (activeFilters[i] == filter) + return; + } + filter.setBuffer(activeFilters[lastActiveFilter]); + } + + activeFilters[++lastActiveFilter] = filter; + + filter.setResponse(response); + + } + + + // --------------------------------------------------------- Public Methods + + + /** + * Flush the response. + * + * @throws IOException an undelying I/O error occured + */ + public void flush() + throws IOException { + + if (!committed) { + + // Send the connector a request for commit. The connector should + // then validate the headers, send them (using sendHeader) and + // set the filters accordingly. + response.action(ActionCode.ACTION_COMMIT, null); + + } + + // Flush the current buffer + flushBuffer(); + + } + + + /** + * Reset current response. + * + * @throws IllegalStateException if the response has already been committed + */ + public void reset() { + + if (committed) + throw new IllegalStateException(/*FIXME:Put an error message*/); + + // Recycle Request object + response.recycle(); + + } + + + /** + * Recycle the output buffer. This should be called when closing the + * connection. + */ + public void recycle() { + + // Recycle Request object + response.recycle(); + bbuf.clear(); + + socket = null; + pos = 0; + lastActiveFilter = -1; + committed = false; + finished = false; + + } + + + /** + * End processing of current HTTP request. + * Note: All bytes of the current request should have been already + * consumed. This method only resets all the pointers so that we are ready + * to parse the next HTTP request. + */ + public void nextRequest() { + + // Recycle Request object + response.recycle(); + + // Recycle filters + for (int i = 0; i <= lastActiveFilter; i++) { + activeFilters[i].recycle(); + } + + // Reset pointers + pos = 0; + lastActiveFilter = -1; + committed = false; + finished = false; + + } + + + /** + * End request. + * + * @throws IOException an undelying I/O error occured + */ + public void endRequest() + throws IOException { + + if (!committed) { + + // Send the connector a request for commit. The connector should + // then validate the headers, send them (using sendHeader) and + // set the filters accordingly. + response.action(ActionCode.ACTION_COMMIT, null); + + } + + if (finished) + return; + + if (lastActiveFilter != -1) + activeFilters[lastActiveFilter].end(); + + flushBuffer(); + + finished = true; + + } + + + // ------------------------------------------------ HTTP/1.1 Output Methods + + + /** + * Send an acknoledgement. + */ + public void sendAck() + throws IOException { + + if (!committed) { + //Socket.send(socket, Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length) < 0 + ByteBuffer buf = ByteBuffer.wrap(Constants.ACK_BYTES,0,Constants.ACK_BYTES.length); + writeToSocket(buf); + } + + } + + private void writeToSocket(ByteBuffer bytebuffer) throws IOException { + int limit = bytebuffer.position(); + bytebuffer.rewind(); + bytebuffer.limit(limit); + int remaining = limit; + while ( remaining > 0 ) { + int written = socket.write(bytebuffer); + remaining -= written; + } + bbuf.clear(); + bbuf.rewind(); + bbuf.limit(bbufLimit); + + //System.out.println("Written:"+limit); + this.total = 0; + } + + + /** + * Send the response status line. + */ + public void sendStatus() { + + // Write protocol name + write(Constants.HTTP_11_BYTES); + buf[pos++] = Constants.SP; + + // Write status code + int status = response.getStatus(); + switch (status) { + case 200: + write(Constants._200_BYTES); + break; + case 400: + write(Constants._400_BYTES); + break; + case 404: + write(Constants._404_BYTES); + break; + default: + write(status); + } + + buf[pos++] = Constants.SP; + + // Write message + String message = response.getMessage(); + if (message == null) { + write(HttpMessages.getMessage(status)); + } else { + write(message); + } + + // End the response status line + buf[pos++] = Constants.CR; + buf[pos++] = Constants.LF; + + } + + + /** + * Send a header. + * + * @param name Header name + * @param value Header value + */ + public void sendHeader(MessageBytes name, MessageBytes value) { + + write(name); + buf[pos++] = Constants.COLON; + buf[pos++] = Constants.SP; + write(value); + buf[pos++] = Constants.CR; + buf[pos++] = Constants.LF; + + } + + + /** + * Send a header. + * + * @param name Header name + * @param value Header value + */ + public void sendHeader(ByteChunk name, ByteChunk value) { + + write(name); + buf[pos++] = Constants.COLON; + buf[pos++] = Constants.SP; + write(value); + buf[pos++] = Constants.CR; + buf[pos++] = Constants.LF; + + } + + + /** + * Send a header. + * + * @param name Header name + * @param value Header value + */ + public void sendHeader(String name, String value) { + + write(name); + buf[pos++] = Constants.COLON; + buf[pos++] = Constants.SP; + write(value); + buf[pos++] = Constants.CR; + buf[pos++] = Constants.LF; + + } + + + /** + * End the header block. + */ + public void endHeaders() { + + buf[pos++] = Constants.CR; + buf[pos++] = Constants.LF; + + } + + + // --------------------------------------------------- OutputBuffer Methods + + + /** + * Write the contents of a byte chunk. + * + * @param chunk byte chunk + * @return number of bytes written + * @throws IOException an undelying I/O error occured + */ + public int doWrite(ByteChunk chunk, Response res) + throws IOException { + + if (!committed) { + + // Send the connector a request for commit. The connector should + // then validate the headers, send them (using sendHeaders) and + // set the filters accordingly. + response.action(ActionCode.ACTION_COMMIT, null); + + } + + if (lastActiveFilter == -1) + return outputStreamOutputBuffer.doWrite(chunk, res); + else + return activeFilters[lastActiveFilter].doWrite(chunk, res); + + } + + + // ------------------------------------------------------ Protected Methods + + + /** + * Commit the response. + * + * @throws IOException an undelying I/O error occured + */ + protected void commit() + throws IOException { + + // The response is now committed + committed = true; + response.setCommitted(true); + + if (pos > 0) { + // Sending the response header buffer + addToBB(buf, 0, pos); + } + + } + + int total = 0; + private synchronized void addToBB(byte[] buf, int offset, int length) throws IOException { + try { + if (bbuf.capacity() <= (offset + length)) { + flushBuffer(); + } + bbuf.put(buf, offset, length); + total += length; + }catch ( Exception x ) { + x.printStackTrace(); + } + //System.out.println("Total:"+total); + } + + + /** + * This method will write the contents of the specyfied message bytes + * buffer to the output stream, without filtering. This method is meant to + * be used to write the response header. + * + * @param mb data to be written + */ + protected void write(MessageBytes mb) { + + if (mb.getType() == MessageBytes.T_BYTES) { + ByteChunk bc = mb.getByteChunk(); + write(bc); + } else if (mb.getType() == MessageBytes.T_CHARS) { + CharChunk cc = mb.getCharChunk(); + write(cc); + } else { + write(mb.toString()); + } + + } + + + /** + * This method will write the contents of the specyfied message bytes + * buffer to the output stream, without filtering. This method is meant to + * be used to write the response header. + * + * @param bc data to be written + */ + protected void write(ByteChunk bc) { + + // Writing the byte chunk to the output buffer + System.arraycopy(bc.getBytes(), bc.getStart(), buf, pos, + bc.getLength()); + pos = pos + bc.getLength(); + + } + + + /** + * This method will write the contents of the specyfied char + * buffer to the output stream, without filtering. This method is meant to + * be used to write the response header. + * + * @param cc data to be written + */ + protected void write(CharChunk cc) { + + int start = cc.getStart(); + int end = cc.getEnd(); + char[] cbuf = cc.getBuffer(); + for (int i = start; i < end; i++) { + char c = cbuf[i]; + // Note: This is clearly incorrect for many strings, + // but is the only consistent approach within the current + // servlet framework. It must suffice until servlet output + // streams properly encode their output. + if ((c <= 31) && (c != 9)) { + c = ' '; + } else if (c == 127) { + c = ' '; + } + buf[pos++] = (byte) c; + } + + } + + + /** + * This method will write the contents of the specyfied byte + * buffer to the output stream, without filtering. This method is meant to + * be used to write the response header. + * + * @param b data to be written + */ + public void write(byte[] b) { + + // Writing the byte chunk to the output buffer + System.arraycopy(b, 0, buf, pos, b.length); + pos = pos + b.length; + + } + + + /** + * This method will write the contents of the specyfied String to the + * output stream, without filtering. This method is meant to be used to + * write the response header. + * + * @param s data to be written + */ + protected void write(String s) { + + if (s == null) + return; + + // From the Tomcat 3.3 HTTP/1.0 connector + int len = s.length(); + for (int i = 0; i < len; i++) { + char c = s.charAt (i); + // Note: This is clearly incorrect for many strings, + // but is the only consistent approach within the current + // servlet framework. It must suffice until servlet output + // streams properly encode their output. + if ((c <= 31) && (c != 9)) { + c = ' '; + } else if (c == 127) { + c = ' '; + } + buf[pos++] = (byte) c; + } + + } + + + /** + * This method will print the specified integer to the output stream, + * without filtering. This method is meant to be used to write the + * response header. + * + * @param i data to be written + */ + protected void write(int i) { + + write(String.valueOf(i)); + + } + + + /** + * Callback to write data from the buffer. + */ + protected void flushBuffer() + throws IOException { + + //prevent timeout for async, + SelectionKey key = socket.keyFor(selector); + if (key != null) { + NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment(); + attach.access(); + } + + //write to the socket, if there is anything to write + if (bbuf.position() > 0) { + writeToSocket(bbuf); + } + } + + + // ----------------------------------- OutputStreamOutputBuffer Inner Class + + + /** + * This class is an output buffer which will write data to an output + * stream. + */ + protected class SocketOutputBuffer + implements OutputBuffer { + + + /** + * Write chunk. + */ + public int doWrite(ByteChunk chunk, Response res) + throws IOException { + + int len = chunk.getLength(); + int start = chunk.getStart(); + byte[] b = chunk.getBuffer(); + while (len > 0) { + int thisTime = len; + if (bbuf.position() == bbuf.capacity()) { + flushBuffer(); + } + if (thisTime > bbuf.capacity() - bbuf.position()) { + thisTime = bbuf.capacity() - bbuf.position(); + } + addToBB(b,start,thisTime); + len = len - thisTime; + start = start + thisTime; + } + return chunk.getLength(); + + } + + + } + + +} diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java new file mode 100644 index 000000000..96dda4f37 --- /dev/null +++ b/java/org/apache/tomcat/util/net/NioEndpoint.java @@ -0,0 +1,1845 @@ +/* + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tomcat.util.net; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Set; +import java.util.concurrent.Executor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tomcat.jni.Error; +import org.apache.tomcat.jni.Library; +import org.apache.tomcat.jni.Poll; +import org.apache.tomcat.jni.SSL; +import org.apache.tomcat.jni.Status; +import org.apache.tomcat.util.res.StringManager; + +/** + * NIO tailored thread pool, providing the following services: + *
    + *
  • Socket acceptor thread
  • + *
  • Socket poller thread
  • + *
  • Sendfile thread
  • + *
  • Worker threads pool
  • + *
+ * + * When switching to Java 5, there's an opportunity to use the virtual + * machine's thread pool. + * + * @author Mladen Turk + * @author Remy Maucherat + * @author Filip Hanik + */ +public class NioEndpoint { + + + // -------------------------------------------------------------- Constants + + + protected static Log log = LogFactory.getLog(NioEndpoint.class); + + protected static StringManager sm = + StringManager.getManager("org.apache.tomcat.util.net.res"); + + + /** + * The Request attribute key for the cipher suite. + */ + public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite"; + + /** + * The Request attribute key for the key size. + */ + public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size"; + + /** + * The Request attribute key for the client certificate chain. + */ + public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate"; + + /** + * The Request attribute key for the session id. + * This one is a Tomcat extension to the Servlet spec. + */ + public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session"; + + + // ----------------------------------------------------------------- Fields + + + /** + * Available workers. + */ + protected WorkerStack workers = null; + + + /** + * Running state of the endpoint. + */ + protected volatile boolean running = false; + + + /** + * Will be set to true whenever the endpoint is paused. + */ + protected volatile boolean paused = false; + + + /** + * Track the initialization state of the endpoint. + */ + protected boolean initialized = false; + + + /** + * Current worker threads busy count. + */ + protected int curThreadsBusy = 0; + + + /** + * Current worker threads count. + */ + protected int curThreads = 0; + + + /** + * Sequence number used to generate thread names. + */ + protected int sequence = 0; + + + /** + * Root APR memory pool. + */ + protected long rootPool = 0; + + + /** + * Server socket "pointer". + */ + protected ServerSocketChannel serverSock = null; + + + /** + * APR memory pool for the server socket. + */ + protected long serverSockPool = 0; + + + /** + * SSL context. + */ + protected long sslContext = 0; + + + // ------------------------------------------------------------- Properties + + + /** + * External Executor based thread pool. + */ + protected Executor executor = null; + public void setExecutor(Executor executor) { this.executor = executor; } + public Executor getExecutor() { return executor; } + + + /** + * Maximum amount of worker threads. + */ + protected int maxThreads = 40; + public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; } + public int getMaxThreads() { return maxThreads; } + + + /** + * Priority of the acceptor and poller threads. + */ + protected int threadPriority = Thread.NORM_PRIORITY; + public void setThreadPriority(int threadPriority) { this.threadPriority = threadPriority; } + public int getThreadPriority() { return threadPriority; } + + + /** + * Size of the socket poller. + */ + protected int pollerSize = 8 * 1024; + public void setPollerSize(int pollerSize) { this.pollerSize = pollerSize; } + public int getPollerSize() { return pollerSize; } + + + /** + * Size of the sendfile (= concurrent files which can be served). + */ + protected int sendfileSize = 1 * 1024; + public void setSendfileSize(int sendfileSize) { this.sendfileSize = sendfileSize; } + public int getSendfileSize() { return sendfileSize; } + + + /** + * Server socket port. + */ + protected int port; + public int getPort() { return port; } + public void setPort(int port ) { this.port=port; } + + + /** + * Address for the server socket. + */ + protected InetAddress address; + public InetAddress getAddress() { return address; } + public void setAddress(InetAddress address) { this.address = address; } + + + /** + * Handling of accepted sockets. + */ + protected Handler handler = null; + public void setHandler(Handler handler ) { this.handler = handler; } + public Handler getHandler() { return handler; } + + + /** + * Allows the server developer to specify the backlog that + * should be used for server sockets. By default, this value + * is 100. + */ + protected int backlog = 100; + public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; } + public int getBacklog() { return backlog; } + + + /** + * Socket TCP no delay. + */ + protected boolean tcpNoDelay = false; + public boolean getTcpNoDelay() { return tcpNoDelay; } + public void setTcpNoDelay(boolean tcpNoDelay) { this.tcpNoDelay = tcpNoDelay; } + + + /** + * Socket linger. + */ + protected int soLinger = 100; + public int getSoLinger() { return soLinger; } + public void setSoLinger(int soLinger) { this.soLinger = soLinger; } + + + /** + * Socket timeout. + */ + protected int soTimeout = -1; + public int getSoTimeout() { return soTimeout; } + public void setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; } + + + /** + * Timeout on first request read before going to the poller, in ms. + */ + protected int firstReadTimeout = 60000; + public int getFirstReadTimeout() { return firstReadTimeout; } + public void setFirstReadTimeout(int firstReadTimeout) { this.firstReadTimeout = firstReadTimeout; } + + + /** + * Poll interval, in microseconds. The smaller the value, the more CPU the poller + * will use, but the more responsive to activity it will be. + */ + protected int pollTime = 2000; + public int getPollTime() { return pollTime; } + public void setPollTime(int pollTime) { if (pollTime > 0) { this.pollTime = pollTime; } } + + + /** + * The default is true - the created threads will be + * in daemon mode. If set to false, the control thread + * will not be daemon - and will keep the process alive. + */ + protected boolean daemon = true; + public void setDaemon(boolean b) { daemon = b; } + public boolean getDaemon() { return daemon; } + + + /** + * Name of the thread pool, which will be used for naming child threads. + */ + protected String name = "TP"; + public void setName(String name) { this.name = name; } + public String getName() { return name; } + + + /** + * Use endfile for sending static files. + */ + protected boolean useSendfile = Library.APR_HAS_SENDFILE; + public void setUseSendfile(boolean useSendfile) { this.useSendfile = useSendfile; } + public boolean getUseSendfile() { return useSendfile; } + + + /** + * Allow comet request handling. + */ + protected boolean useComet = true; + public void setUseComet(boolean useComet) { this.useComet = useComet; } + public boolean getUseComet() { return useComet; } + + + /** + * Acceptor thread count. + */ + protected int acceptorThreadCount = 0; + public void setAcceptorThreadCount(int acceptorThreadCount) { this.acceptorThreadCount = acceptorThreadCount; } + public int getAcceptorThreadCount() { return acceptorThreadCount; } + + + /** + * Sendfile thread count. + */ + protected int sendfileThreadCount = 0; + public void setSendfileThreadCount(int sendfileThreadCount) { this.sendfileThreadCount = sendfileThreadCount; } + public int getSendfileThreadCount() { return sendfileThreadCount; } + + + /** + * Poller thread count. + */ + protected int pollerThreadCount = 0; + public void setPollerThreadCount(int pollerThreadCount) { this.pollerThreadCount = pollerThreadCount; } + public int getPollerThreadCount() { return pollerThreadCount; } + + protected long selectorTimeout = 5000; + public void setSelectorTimeout(long timeout){ this.selectorTimeout = timeout;} + public long getSelectorTimeout(){ return this.selectorTimeout; } + /** + * The socket poller. + */ + protected Poller[] pollers = null; + protected int pollerRoundRobin = 0; + public Poller getPoller() { + pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length; + Poller poller = pollers[pollerRoundRobin]; + poller.comet = false; + return poller; + } + + + /** + * The socket poller used for Comet support. + */ + public Poller getCometPoller() { + Poller poller = getPoller(); + poller.comet = true; + return poller; + } + + + /** + * The static file sender. + */ + protected Sendfile[] sendfiles = null; + protected int sendfileRoundRobin = 0; + public Sendfile getSendfile() { + sendfileRoundRobin = (sendfileRoundRobin + 1) % sendfiles.length; + return sendfiles[sendfileRoundRobin]; + } + + + /** + * Dummy maxSpareThreads property. + */ + public int getMaxSpareThreads() { return 0; } + + + /** + * Dummy minSpareThreads property. + */ + public int getMinSpareThreads() { return 0; } + + + /** + * SSL engine. + */ + protected String SSLEngine = "off"; + public String getSSLEngine() { return SSLEngine; } + public void setSSLEngine(String SSLEngine) { this.SSLEngine = SSLEngine; } + + + /** + * SSL protocols. + */ + protected String SSLProtocol = "all"; + public String getSSLProtocol() { return SSLProtocol; } + public void setSSLProtocol(String SSLProtocol) { this.SSLProtocol = SSLProtocol; } + + + /** + * SSL password (if a cert is encrypted, and no password has been provided, a callback + * will ask for a password). + */ + protected String SSLPassword = null; + public String getSSLPassword() { return SSLPassword; } + public void setSSLPassword(String SSLPassword) { this.SSLPassword = SSLPassword; } + + + /** + * SSL cipher suite. + */ + protected String SSLCipherSuite = "ALL"; + public String getSSLCipherSuite() { return SSLCipherSuite; } + public void setSSLCipherSuite(String SSLCipherSuite) { this.SSLCipherSuite = SSLCipherSuite; } + + + /** + * SSL certificate file. + */ + protected String SSLCertificateFile = null; + public String getSSLCertificateFile() { return SSLCertificateFile; } + public void setSSLCertificateFile(String SSLCertificateFile) { this.SSLCertificateFile = SSLCertificateFile; } + + + /** + * SSL certificate key file. + */ + protected String SSLCertificateKeyFile = null; + public String getSSLCertificateKeyFile() { return SSLCertificateKeyFile; } + public void setSSLCertificateKeyFile(String SSLCertificateKeyFile) { this.SSLCertificateKeyFile = SSLCertificateKeyFile; } + + + /** + * SSL certificate chain file. + */ + protected String SSLCertificateChainFile = null; + public String getSSLCertificateChainFile() { return SSLCertificateChainFile; } + public void setSSLCertificateChainFile(String SSLCertificateChainFile) { this.SSLCertificateChainFile = SSLCertificateChainFile; } + + + /** + * SSL CA certificate path. + */ + protected String SSLCACertificatePath = null; + public String getSSLCACertificatePath() { return SSLCACertificatePath; } + public void setSSLCACertificatePath(String SSLCACertificatePath) { this.SSLCACertificatePath = SSLCACertificatePath; } + + + /** + * SSL CA certificate file. + */ + protected String SSLCACertificateFile = null; + public String getSSLCACertificateFile() { return SSLCACertificateFile; } + public void setSSLCACertificateFile(String SSLCACertificateFile) { this.SSLCACertificateFile = SSLCACertificateFile; } + + + /** + * SSL CA revocation path. + */ + protected String SSLCARevocationPath = null; + public String getSSLCARevocationPath() { return SSLCARevocationPath; } + public void setSSLCARevocationPath(String SSLCARevocationPath) { this.SSLCARevocationPath = SSLCARevocationPath; } + + + /** + * SSL CA revocation file. + */ + protected String SSLCARevocationFile = null; + public String getSSLCARevocationFile() { return SSLCARevocationFile; } + public void setSSLCARevocationFile(String SSLCARevocationFile) { this.SSLCARevocationFile = SSLCARevocationFile; } + + + /** + * SSL verify client. + */ + protected String SSLVerifyClient = "none"; + public String getSSLVerifyClient() { return SSLVerifyClient; } + public void setSSLVerifyClient(String SSLVerifyClient) { this.SSLVerifyClient = SSLVerifyClient; } + + + /** + * SSL verify depth. + */ + protected int SSLVerifyDepth = 10; + public int getSSLVerifyDepth() { return SSLVerifyDepth; } + public void setSSLVerifyDepth(int SSLVerifyDepth) { this.SSLVerifyDepth = SSLVerifyDepth; } + + + // --------------------------------------------------------- Public Methods + + + /** + * Number of keepalive sockets. + */ + public int getKeepAliveCount() { + if (pollers == null) { + return 0; + } else { + int keepAliveCount = 0; + for (int i = 0; i < pollers.length; i++) { + keepAliveCount += pollers[i].getKeepAliveCount(); + } + return keepAliveCount; + } + } + + + /** + * Number of sendfile sockets. + */ + public int getSendfileCount() { + if (sendfiles == null) { + return 0; + } else { + int sendfileCount = 0; + for (int i = 0; i < sendfiles.length; i++) { + sendfileCount += sendfiles[i].getSendfileCount(); + } + return sendfileCount; + } + } + + + /** + * Return the amount of threads that are managed by the pool. + * + * @return the amount of threads that are managed by the pool + */ + public int getCurrentThreadCount() { + return curThreads; + } + + + /** + * Return the amount of threads currently busy. + * + * @return the amount of threads currently busy + */ + public int getCurrentThreadsBusy() { + return curThreadsBusy; + } + + + /** + * Return the state of the endpoint. + * + * @return true if the endpoint is running, false otherwise + */ + public boolean isRunning() { + return running; + } + + + /** + * Return the state of the endpoint. + * + * @return true if the endpoint is paused, false otherwise + */ + public boolean isPaused() { + return paused; + } + + + // ----------------------------------------------- Public Lifecycle Methods + + + /** + * Initialize the endpoint. + */ + public void init() + throws Exception { + + if (initialized) + return; + + serverSock = ServerSocketChannel.open(); + InetSocketAddress addr = (address!=null?new InetSocketAddress(address,port):new InetSocketAddress(port)); + serverSock.socket().bind(addr,100); //todo, set backlog value + serverSock.configureBlocking(true); //mimic APR behavior + // Sendfile usage on systems which don't support it cause major problems + if (useSendfile) { + log.warn(sm.getString("endpoint.sendfile.nosupport")); + useSendfile = false; + } + + // Initialize thread count defaults for acceptor, poller and sendfile + if (acceptorThreadCount == 0) { + // FIXME: Doesn't seem to work that well with multiple accept threads + acceptorThreadCount = 1; + } + if (pollerThreadCount != 1) { + // limit to one poller, no need for others + pollerThreadCount = 1; + } + if (sendfileThreadCount != 0) { + sendfileThreadCount = 0; + } + + // Initialize SSL if needed + if (!"off".equalsIgnoreCase(SSLEngine)) { + // Initialize SSL + // FIXME: one per VM call ? + if ("on".equalsIgnoreCase(SSLEngine)) { + SSL.initialize(null); + } else { + SSL.initialize(SSLEngine); + } + // SSL protocol + int value = SSL.SSL_PROTOCOL_ALL; + if ("SSLv2".equalsIgnoreCase(SSLProtocol)) { + value = SSL.SSL_PROTOCOL_SSLV2; + } else if ("SSLv3".equalsIgnoreCase(SSLProtocol)) { + value = SSL.SSL_PROTOCOL_SSLV3; + } else if ("TLSv1".equalsIgnoreCase(SSLProtocol)) { + value = SSL.SSL_PROTOCOL_TLSV1; + } else if ("SSLv2+SSLv3".equalsIgnoreCase(SSLProtocol)) { + value = SSL.SSL_PROTOCOL_SSLV2 | SSL.SSL_PROTOCOL_SSLV3; + } +// // Create SSL Context +// sslContext = SSLContext.make(rootPool, value, SSL.SSL_MODE_SERVER); +// // List the ciphers that the client is permitted to negotiate +// SSLContext.setCipherSuite(sslContext, SSLCipherSuite); +// // Load Server key and certificate +// SSLContext.setCertificate(sslContext, SSLCertificateFile, SSLCertificateKeyFile, SSLPassword, SSL.SSL_AIDX_RSA); +// // Set certificate chain file +// SSLContext.setCertificateChainFile(sslContext, SSLCertificateChainFile, false); +// // Support Client Certificates +// SSLContext.setCACertificate(sslContext, SSLCACertificateFile, SSLCACertificatePath); +// // Set revocation +// SSLContext.setCARevocation(sslContext, SSLCARevocationFile, SSLCARevocationPath); +// // Client certificate verification +// value = SSL.SSL_CVERIFY_NONE; +// if ("optional".equalsIgnoreCase(SSLVerifyClient)) { +// value = SSL.SSL_CVERIFY_OPTIONAL; +// } else if ("require".equalsIgnoreCase(SSLVerifyClient)) { +// value = SSL.SSL_CVERIFY_REQUIRE; +// } else if ("optionalNoCA".equalsIgnoreCase(SSLVerifyClient)) { +// value = SSL.SSL_CVERIFY_OPTIONAL_NO_CA; +// } +// SSLContext.setVerify(sslContext, value, SSLVerifyDepth); + // For now, sendfile is not supported with SSL + useSendfile = false; + } + + initialized = true; + + } + + + /** + * Start the APR endpoint, creating acceptor, poller and sendfile threads. + */ + public void start() + throws Exception { + // Initialize socket if not done before + if (!initialized) { + init(); + } + if (!running) { + running = true; + paused = false; + + // Create worker collection + if (executor == null) { + workers = new WorkerStack(maxThreads); + } + + // Start acceptor threads + for (int i = 0; i < acceptorThreadCount; i++) { + Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i); + acceptorThread.setPriority(threadPriority); + acceptorThread.setDaemon(daemon); + acceptorThread.start(); + } + + // Start poller threads + pollers = new Poller[pollerThreadCount]; + for (int i = 0; i < pollerThreadCount; i++) { + pollers[i] = new Poller(false); + pollers[i].init(); + Thread pollerThread = new Thread(pollers[i], getName() + "-Poller-" + i); + pollerThread.setPriority(threadPriority); + pollerThread.setDaemon(true); + pollerThread.start(); + } + + // Start sendfile threads + if (useSendfile) { + sendfiles = new Sendfile[sendfileThreadCount]; + for (int i = 0; i < sendfileThreadCount; i++) { + sendfiles[i] = new Sendfile(); + sendfiles[i].init(); + Thread sendfileThread = new Thread(sendfiles[i], getName() + "-Sendfile-" + i); + sendfileThread.setPriority(threadPriority); + sendfileThread.setDaemon(true); + sendfileThread.start(); + } + } + } + } + + + /** + * Pause the endpoint, which will make it stop accepting new sockets. + */ + public void pause() { + if (running && !paused) { + paused = true; + unlockAccept(); + } + } + + + /** + * Resume the endpoint, which will make it start accepting new sockets + * again. + */ + public void resume() { + if (running) { + paused = false; + } + } + + + /** + * Stop the endpoint. This will cause all processing threads to stop. + */ + public void stop() { + if (running) { + running = false; + unlockAccept(); + for (int i = 0; i < pollers.length; i++) { + pollers[i].destroy(); + } + pollers = null; + if (useSendfile) { + for (int i = 0; i < sendfiles.length; i++) { + sendfiles[i].destroy(); + } + sendfiles = null; + } + } + } + + + /** + * Deallocate APR memory pools, and close server socket. + */ + public void destroy() throws Exception { + if (running) { + stop(); + } + // Close server socket + serverSock.socket().close(); + serverSock.close(); + serverSock = null; + sslContext = 0; + initialized = false; + } + + + // ------------------------------------------------------ Protected Methods + + + /** + * Get a sequence number used for thread naming. + */ + protected int getSequence() { + return sequence++; + } + + + /** + * Unlock the server socket accept using a bugus connection. + */ + protected void unlockAccept() { + java.net.Socket s = null; + try { + // Need to create a connection to unlock the accept(); + if (address == null) { + s = new java.net.Socket("127.0.0.1", port); + } else { + s = new java.net.Socket(address, port); + // setting soLinger to a small value will help shutdown the + // connection quicker + s.setSoLinger(true, 0); + } + } catch(Exception e) { + if (log.isDebugEnabled()) { + log.debug(sm.getString("endpoint.debug.unlock", "" + port), e); + } + } finally { + if (s != null) { + try { + s.close(); + } catch (Exception e) { + // Ignore + } + } + } + } + + + /** + * Process the specified connection. + */ + protected boolean setSocketOptions(SocketChannel socket) { + // Process the connection + int step = 1; + try { + //disable blocking, APR style, we are gonna be polling it + socket.configureBlocking(false); + + // 1: Set socket options: timeout, linger, etc + if (soLinger >= 0) + socket.socket().setSoLinger(true,soLinger); + if (tcpNoDelay) + socket.socket().setTcpNoDelay(true); + if (soTimeout > 0) + socket.socket().setSoTimeout(soTimeout); + + + // 2: SSL handshake + step = 2; + if (sslContext != 0) { +// SSLSocket.attach(sslContext, socket); +// if (SSLSocket.handshake(socket) != 0) { +// if (log.isDebugEnabled()) { +// log.debug(sm.getString("endpoint.err.handshake") + ": " + SSL.getLastError()); +// } +// return false; +// } + } + + getPoller().register(socket); + + } catch (Throwable t) { + if (log.isDebugEnabled()) { + if (step == 2) { + log.debug(sm.getString("endpoint.err.handshake"), t); + } else { + log.debug(sm.getString("endpoint.err.unexpected"), t); + } + } + // Tell to close the socket + return false; + } + return true; + } + + + /** + * Create (or allocate) and return an available processor for use in + * processing a specific HTTP request, if possible. If the maximum + * allowed processors have already been created and are in use, return + * null instead. + */ + protected Worker createWorkerThread() { + + synchronized (workers) { + if (workers.size() > 0) { + curThreadsBusy++; + return (workers.pop()); + } + if ((maxThreads > 0) && (curThreads < maxThreads)) { + curThreadsBusy++; + return (newWorkerThread()); + } else { + if (maxThreads < 0) { + curThreadsBusy++; + return (newWorkerThread()); + } else { + return (null); + } + } + } + + } + + + /** + * Create and return a new processor suitable for processing HTTP + * requests and returning the corresponding responses. + */ + protected Worker newWorkerThread() { + + Worker workerThread = new Worker(); + workerThread.start(); + return (workerThread); + + } + + + /** + * Return a new worker thread, and block while to worker is available. + */ + protected Worker getWorkerThread() { + // Allocate a new worker thread + Worker workerThread = createWorkerThread(); + while (workerThread == null) { + try { + synchronized (workers) { + workers.wait(); + } + } catch (InterruptedException e) { + // Ignore + } + workerThread = createWorkerThread(); + } + return workerThread; + } + + + /** + * Recycle the specified Processor so that it can be used again. + * + * @param workerThread The processor to be recycled + */ + protected void recycleWorkerThread(Worker workerThread) { + synchronized (workers) { + workers.push(workerThread); + curThreadsBusy--; + workers.notify(); + } + } + + + /** + * Allocate a new poller of the specified size. + */ + protected long allocatePoller(int size, long pool, int timeout) { + try { + return Poll.create(size, pool, 0, timeout * 1000); + } catch (Error e) { + if (Status.APR_STATUS_IS_EINVAL(e.getError())) { + log.info(sm.getString("endpoint.poll.limitedpollsize", "" + size)); + return 0; + } else { + log.error(sm.getString("endpoint.poll.initfail"), e); + return -1; + } + } + } + + + /** + * Process given socket. + */ + protected boolean processSocket(SocketChannel socket) { + try { + if (executor == null) { + getWorkerThread().assign(socket); + } else { + executor.execute(new SocketProcessor(socket)); + } + } catch (Throwable t) { + // This means we got an OOM or similar creating a thread, or that + // the pool and its queue are full + log.error(sm.getString("endpoint.process.fail"), t); + return false; + } + return true; + } + + + /** + * Process given socket for an event. + */ + protected boolean processSocket(SocketChannel socket, boolean error) { + try { + if (executor == null) { + getWorkerThread().assign(socket, error); + } else { + executor.execute(new SocketEventProcessor(socket, error)); + } + } catch (Throwable t) { + // This means we got an OOM or similar creating a thread, or that + // the pool and its queue are full + log.error(sm.getString("endpoint.process.fail"), t); + return false; + } + return true; + } + + + // --------------------------------------------------- Acceptor Inner Class + + + /** + * Server socket acceptor thread. + */ + protected class Acceptor implements Runnable { + + + /** + * The background thread that listens for incoming TCP/IP connections and + * hands them off to an appropriate processor. + */ + public void run() { + + // Loop until we receive a shutdown command + while (running) { + + // Loop if endpoint is paused + while (paused) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Ignore + } + } + + try { + // Accept the next incoming connection from the server socket + SocketChannel socket = serverSock.accept(); + // Hand this socket off to an appropriate processor + if(!setSocketOptions(socket)) + { + // Close socket right away + socket.socket().close(); + socket.close(); + } + } catch (Throwable t) { + log.error(sm.getString("endpoint.accept.fail"), t); + } + + // The processor will recycle itself when it finishes + + } + + } + + } + + + // ----------------------------------------------------- Poller Inner Class + + + /** + * Poller class. + */ + public class Poller implements Runnable { + + protected Selector selector; + protected LinkedList events = new LinkedList(); + protected boolean close = false; + protected boolean comet = true; + + protected int keepAliveCount = 0; + public int getKeepAliveCount() { return keepAliveCount; } + + + + public Poller(boolean comet) throws IOException { + this.comet = comet; + this.selector = Selector.open(); + } + + public Selector getSelector() { return selector;} + + /** + * Create the poller. With some versions of APR, the maximum poller size will + * be 62 (reocmpiling APR is necessary to remove this limitation). + */ + protected void init() { + keepAliveCount = 0; + } + + /** + * Destroy the poller. + */ + protected void destroy() { + // Wait for polltime before doing anything, so that the poller threads + // exit, otherwise parallel descturction of sockets which are still + // in the poller can cause problems + try { + synchronized (this) { + this.wait(pollTime / 1000); + } + } catch (InterruptedException e) { + // Ignore + } + close = true; + } + + /** + * Add specified socket and associated pool to the poller. The socket will + * be added to a temporary array, and polled first after a maximum amount + * of time equal to pollTime (in most cases, latency will be much lower, + * however). + * + * @param socket to add to the poller + */ + public void add(final SocketChannel socket) { + final SelectionKey key = socket.keyFor(selector); + Runnable r = new Runnable() { + public void run() { + if ( key != null ) key.interestOps(SelectionKey.OP_READ); + } + }; + synchronized (events) { + events.add(r); + } + selector.wakeup(); + } + + public void events() { + synchronized (events) { + Runnable r = null; + while ( (events.size() > 0) && (r = events.removeFirst()) != null ) { + try { + r.run(); + } catch ( Exception x ) { + log.error("",x); + } + } + events.clear(); + } + } + + public void register(final SocketChannel socket) + { + SelectionKey key = socket.keyFor(selector); + Runnable r = new Runnable() { + public void run() { + try { + socket.register(selector, SelectionKey.OP_READ, new KeyAttachment()); + } catch (Exception x) { + log.error("", x); + } + } + + }; + synchronized (events) { + events.add(r); + } + selector.wakeup(); + } + + public void cancelledKey(SelectionKey key) { + try { + KeyAttachment ka = (KeyAttachment) key.attachment(); + key.cancel(); + if (ka.getComet()) processSocket( (SocketChannel) key.channel(), true); + key.channel().close(); + } catch (IOException e) { + if ( log.isDebugEnabled() ) log.debug("",e); + // Ignore + } + } + /** + * The background thread that listens for incoming TCP/IP connections and + * hands them off to an appropriate processor. + */ + public void run() { + + // Loop until we receive a shutdown command + while (running) { + // Loop if endpoint is paused + while (paused) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Ignore + } + } + + events(); + // Time to terminate? + if (close) return; + + int keyCount = 0; + try { + keyCount = selector.select(selectorTimeout); + } catch (IOException x) { + log.error("",x); + continue; + } + //timeout + Set keys = selector.keys(); + long now = System.currentTimeMillis(); + for (Iterator iter = keys.iterator(); iter.hasNext(); ) { + SelectionKey key = (SelectionKey) iter.next(); + try { + if (key.interestOps() == SelectionKey.OP_READ) { + //only timeout sockets that we are waiting for a read from + KeyAttachment ka = (KeyAttachment) key.attachment(); + long delta = now - ka.getLastAccess(); + if (delta > (long) soTimeout) { + cancelledKey(key); + } + } + }catch ( CancelledKeyException ckx ) { + cancelledKey(key); + } + } + + + if (keyCount == 0) continue; + + Iterator iterator = selector.selectedKeys().iterator(); + // Walk through the collection of ready keys and dispatch + // any active event. + while (iterator.hasNext()) { + SelectionKey sk = (SelectionKey) iterator.next(); + iterator.remove(); + KeyAttachment attachment = (KeyAttachment)sk.attachment(); + try { + if(attachment == null) attachment = new KeyAttachment(); + attachment.access(); + sk.attach(attachment); + + int readyOps = sk.readyOps(); + sk.interestOps(sk.interestOps() & ~readyOps); + SocketChannel channel = (SocketChannel)sk.channel(); + boolean read = sk.isReadable(); + if (read) { + if ( comet ) { + if (!processSocket(channel,false)) processSocket(channel,true); + } else { + boolean close = (!processSocket(channel)); + if ( close ) { + channel.socket().close(); + channel.close(); + } + } + } + if (sk.isValid() && sk.isWritable()) { + } + } catch ( CancelledKeyException ckx ) { + if (attachment!=null && attachment.getComet()) processSocket( (SocketChannel) sk.channel(), true); + try { + sk.channel().close(); + }catch ( Exception ignore){} + } catch (Throwable t) { + log.error("",t); + } + }//while + + + } + synchronized (this) { + this.notifyAll(); + } + + } + + } + + public static class KeyAttachment { + + public long getLastAccess() { return lastAccess; } + public void access() { access(System.currentTimeMillis()); } + public void access(long access) { lastAccess = access; } + public void setComet(boolean comet) { this.comet = comet; } + public boolean getComet() { return comet; } + public boolean getCurrentAccess() { return currentAccess; } + public void setCurrentAccess(boolean access) { currentAccess = access; } + + protected long lastAccess = System.currentTimeMillis(); + protected boolean currentAccess = false; + protected boolean comet = false; + + } + + + + // ----------------------------------------------------- Worker Inner Class + + + /** + * Server processor class. + */ + protected class Worker implements Runnable { + + + protected Thread thread = null; + protected boolean available = false; + protected SocketChannel socket = null; + protected boolean event = false; + protected boolean error = false; + + + /** + * Process an incoming TCP/IP connection on the specified socket. Any + * exception that occurs during processing must be logged and swallowed. + * NOTE: This method is called from our Connector's thread. We + * must assign it to our own thread so that multiple simultaneous + * requests can be handled. + * + * @param socket TCP socket to process + */ + protected synchronized void assign(SocketChannel socket) { + + // Wait for the Processor to get the previous Socket + while (available) { + try { + wait(); + } catch (InterruptedException e) { + } + } + + // Store the newly available Socket and notify our thread + this.socket = socket; + event = false; + error = false; + available = true; + notifyAll(); + + } + + + protected synchronized void assign(SocketChannel socket, boolean error) { + + // Wait for the Processor to get the previous Socket + while (available) { + try { + wait(); + } catch (InterruptedException e) { + } + } + + // Store the newly available Socket and notify our thread + this.socket = socket; + event = true; + this.error = error; + available = true; + notifyAll(); + } + + + /** + * Await a newly assigned Socket from our Connector, or null + * if we are supposed to shut down. + */ + protected synchronized SocketChannel await() { + + // Wait for the Connector to provide a new Socket + while (!available) { + try { + wait(); + } catch (InterruptedException e) { + } + } + + // Notify the Connector that we have received this Socket + SocketChannel socket = this.socket; + available = false; + notifyAll(); + + return (socket); + + } + + + /** + * The background thread that listens for incoming TCP/IP connections and + * hands them off to an appropriate processor. + */ + public void run() { + + // Process requests until we receive a shutdown signal + while (running) { + + // Wait for the next socket to be assigned + SocketChannel socket = await(); + if (socket == null) + continue; + + // Process the request from this socket + if ((event) && (handler.event(socket, error) == Handler.SocketState.CLOSED)) { + // Close socket and pool + try { + socket.socket().close(); + socket.close(); + }catch ( Exception x ) { + log.error("",x); + } + } else if ((!event) && (handler.process(socket) == Handler.SocketState.CLOSED)) { + // Close socket and pool + try { + socket.socket().close(); + socket.close(); + }catch ( Exception x ) { + log.error("",x); + } + } + + // Finish up this request + recycleWorkerThread(this); + + } + + } + + + /** + * Start the background processing thread. + */ + public void start() { + thread = new Thread(this); + thread.setName(getName() + "-" + (++curThreads)); + thread.setDaemon(true); + thread.start(); + } + + + } + + + // ----------------------------------------------- SendfileData Inner Class + + + /** + * SendfileData class. + */ + public static class SendfileData { + // File + public String fileName; + public long fd; + public long fdpool; + // Range information + public long start; + public long end; + // Socket and socket pool + public SocketChannel socket; + // Position + public long pos; + // KeepAlive flag + public boolean keepAlive; + } + + + // --------------------------------------------------- Sendfile Inner Class + + + /** + * Sendfile class. + */ + public class Sendfile implements Runnable { + + protected long sendfilePollset = 0; + protected long pool = 0; + protected long[] desc; + protected HashMap sendfileData; + + protected int sendfileCount; + public int getSendfileCount() { return sendfileCount; } + + protected ArrayList addS; + + /** + * Create the sendfile poller. With some versions of APR, the maximum poller size will + * be 62 (reocmpiling APR is necessary to remove this limitation). + */ + protected void init() { +// pool = Pool.create(serverSockPool); +// int size = sendfileSize / sendfileThreadCount; +// sendfilePollset = allocatePoller(size, pool, soTimeout); +// if (sendfilePollset == 0 && size > 1024) { +// size = 1024; +// sendfilePollset = allocatePoller(size, pool, soTimeout); +// } +// if (sendfilePollset == 0) { +// size = 62; +// sendfilePollset = allocatePoller(size, pool, soTimeout); +// } +// desc = new long[size * 2]; +// sendfileData = new HashMap(size); +// addS = new ArrayList(); + } + + /** + * Destroy the poller. + */ + protected void destroy() { +// // Wait for polltime before doing anything, so that the poller threads +// // exit, otherwise parallel descturction of sockets which are still +// // in the poller can cause problems +// try { +// synchronized (this) { +// this.wait(pollTime / 1000); +// } +// } catch (InterruptedException e) { +// // Ignore +// } +// // Close any socket remaining in the add queue +// for (int i = (addS.size() - 1); i >= 0; i--) { +// SendfileData data = addS.get(i); +// Socket.destroy(data.socket); +// } +// // Close all sockets still in the poller +// int rv = Poll.pollset(sendfilePollset, desc); +// if (rv > 0) { +// for (int n = 0; n < rv; n++) { +// Socket.destroy(desc[n*2+1]); +// } +// } +// Pool.destroy(pool); +// sendfileData.clear(); + } + + /** + * Add the sendfile data to the sendfile poller. Note that in most cases, + * the initial non blocking calls to sendfile will return right away, and + * will be handled asynchronously inside the kernel. As a result, + * the poller will never be used. + * + * @param data containing the reference to the data which should be snet + * @return true if all the data has been sent right away, and false + * otherwise + */ + public boolean add(SendfileData data) { +// // Initialize fd from data given +// try { +// data.fdpool = Socket.pool(data.socket); +// data.fd = File.open +// (data.fileName, File.APR_FOPEN_READ +// | File.APR_FOPEN_SENDFILE_ENABLED | File.APR_FOPEN_BINARY, +// 0, data.fdpool); +// data.pos = data.start; +// // Set the socket to nonblocking mode +// Socket.timeoutSet(data.socket, 0); +// while (true) { +// long nw = Socket.sendfilen(data.socket, data.fd, +// data.pos, data.end - data.pos, 0); +// if (nw < 0) { +// if (!(-nw == Status.EAGAIN)) { +// Socket.destroy(data.socket); +// data.socket = 0; +// return false; +// } else { +// // Break the loop and add the socket to poller. +// break; +// } +// } else { +// data.pos = data.pos + nw; +// if (data.pos >= data.end) { +// // Entire file has been sent +// Pool.destroy(data.fdpool); +// // Set back socket to blocking mode +// Socket.timeoutSet(data.socket, soTimeout * 1000); +// return true; +// } +// } +// } +// } catch (Exception e) { +// log.error(sm.getString("endpoint.sendfile.error"), e); +// return false; +// } +// // Add socket to the list. Newly added sockets will wait +// // at most for pollTime before being polled +// synchronized (this) { +// addS.add(data); +// this.notify(); +// } + return false; + } + + /** + * Remove socket from the poller. + * + * @param data the sendfile data which should be removed + */ + protected void remove(SendfileData data) { +// int rv = Poll.remove(sendfilePollset, data.socket); +// if (rv == Status.APR_SUCCESS) { +// sendfileCount--; +// } +// sendfileData.remove(data); + } + + /** + * The background thread that listens for incoming TCP/IP connections and + * hands them off to an appropriate processor. + */ + public void run() { + +// // Loop until we receive a shutdown command +// while (running) { +// +// // Loop if endpoint is paused +// while (paused) { +// try { +// Thread.sleep(1000); +// } catch (InterruptedException e) { +// // Ignore +// } +// } +// +// while (sendfileCount < 1 && addS.size() < 1) { +// try { +// synchronized (this) { +// this.wait(); +// } +// } catch (InterruptedException e) { +// // Ignore +// } +// } +// +// try { +// // Add socket to the poller +// if (addS.size() > 0) { +// synchronized (this) { +// for (int i = (addS.size() - 1); i >= 0; i--) { +// SendfileData data = addS.get(i); +// int rv = Poll.add(sendfilePollset, data.socket, Poll.APR_POLLOUT); +// if (rv == Status.APR_SUCCESS) { +// sendfileData.put(new Long(data.socket), data); +// sendfileCount++; +// } else { +// log.warn(sm.getString("endpoint.sendfile.addfail", "" + rv, Error.strerror(rv))); +// // Can't do anything: close the socket right away +// Socket.destroy(data.socket); +// } +// } +// addS.clear(); +// } +// } +// // Pool for the specified interval +// int rv = Poll.poll(sendfilePollset, pollTime, desc, false); +// if (rv > 0) { +// for (int n = 0; n < rv; n++) { +// // Get the sendfile state +// SendfileData state = +// sendfileData.get(new Long(desc[n*2+1])); +// // Problem events +// if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP) +// || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)) { +// // Close socket and clear pool +// remove(state); +// // Destroy file descriptor pool, which should close the file +// // Close the socket, as the reponse would be incomplete +// Socket.destroy(state.socket); +// continue; +// } +// // Write some data using sendfile +// long nw = Socket.sendfilen(state.socket, state.fd, +// state.pos, +// state.end - state.pos, 0); +// if (nw < 0) { +// // Close socket and clear pool +// remove(state); +// // Close the socket, as the reponse would be incomplete +// // This will close the file too. +// Socket.destroy(state.socket); +// continue; +// } +// +// state.pos = state.pos + nw; +// if (state.pos >= state.end) { +// remove(state); +// if (state.keepAlive) { +// // Destroy file descriptor pool, which should close the file +// Pool.destroy(state.fdpool); +// Socket.timeoutSet(state.socket, soTimeout * 1000); +// // If all done hand this socket off to a worker for +// // processing of further requests +// if (!processSocket(state.socket)) { +// Socket.destroy(state.socket); +// } +// } else { +// // Close the socket since this is +// // the end of not keep-alive request. +// Socket.destroy(state.socket); +// } +// } +// } +// } else if (rv < 0) { +// int errn = -rv; +// /* Any non timeup or interrupted error is critical */ +// if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) { +// if (errn > Status.APR_OS_START_USERERR) { +// errn -= Status.APR_OS_START_USERERR; +// } +// log.error(sm.getString("endpoint.poll.fail", "" + errn, Error.strerror(errn))); +// // Handle poll critical failure +// synchronized (this) { +// destroy(); +// init(); +// } +// continue; +// } +// } +// /* TODO: See if we need to call the maintain for sendfile poller */ +// } catch (Throwable t) { +// log.error(sm.getString("endpoint.poll.error"), t); +// } +// } +// +// synchronized (this) { +// this.notifyAll(); +// } + + } + + } + + + // ------------------------------------------------ Handler Inner Interface + + + /** + * Bare bones interface used for socket processing. Per thread data is to be + * stored in the ThreadWithAttributes extra folders, or alternately in + * thread local fields. + */ + public interface Handler { + public enum SocketState { + OPEN, CLOSED, LONG + } + public SocketState process(SocketChannel socket); + public SocketState event(SocketChannel socket, boolean error); + } + + + // ------------------------------------------------- WorkerStack Inner Class + + + public class WorkerStack { + + protected Worker[] workers = null; + protected int end = 0; + + public WorkerStack(int size) { + workers = new Worker[size]; + } + + /** + * Put the object into the queue. + * + * @param object the object to be appended to the queue (first element). + */ + public void push(Worker worker) { + workers[end++] = worker; + } + + /** + * Get the first object out of the queue. Return null if the queue + * is empty. + */ + public Worker pop() { + if (end > 0) { + return workers[--end]; + } + return null; + } + + /** + * Get the first object out of the queue, Return null if the queue + * is empty. + */ + public Worker peek() { + return workers[end]; + } + + /** + * Is the queue empty? + */ + public boolean isEmpty() { + return (end == 0); + } + + /** + * How many elements are there in this queue? + */ + public int size() { + return (end); + } + } + + + // ---------------------------------------------- SocketProcessor Inner Class + + + /** + * This class is the equivalent of the Worker, but will simply use in an + * external Executor thread pool. + */ + protected class SocketProcessor implements Runnable { + + protected SocketChannel socket = null; + + public SocketProcessor(SocketChannel socket) { + this.socket = socket; + } + + public void run() { + + // Process the request from this socket + if (handler.process(socket) == Handler.SocketState.CLOSED) { + // Close socket and pool + try { + socket.socket().close(); + socket.close(); + } catch ( Exception x ) { + log.error("",x); + } + socket = null; + } + + } + + } + + + // --------------------------------------- SocketEventProcessor Inner Class + + + /** + * This class is the equivalent of the Worker, but will simply use in an + * external Executor thread pool. + */ + protected class SocketEventProcessor implements Runnable { + + protected SocketChannel socket = null; + protected boolean error = false; + + public SocketEventProcessor(SocketChannel socket, boolean error) { + this.socket = socket; + this.error = error; + } + + public void run() { + + // Process the request from this socket + if (handler.event(socket, error) == Handler.SocketState.CLOSED) { + // Close socket and pool + try { + socket.socket().close(); + socket.close(); + } catch ( Exception x ) { + log.error("",x); + } + socket = null; + } + + } + + } + + +} -- 2.11.0