-/*\r
- * Copyright 2005-2006 The Apache Software Foundation\r
- *\r
- * Licensed under the Apache License, Version 2.0 (the "License");\r
- * you may not use this file except in compliance with the License.\r
- * You may obtain a copy of the License at\r
- *\r
- * http://www.apache.org/licenses/LICENSE-2.0\r
- *\r
- * Unless required by applicable law or agreed to in writing, software\r
- * distributed under the License is distributed on an "AS IS" BASIS,\r
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * See the License for the specific language governing permissions and\r
- * limitations under the License.\r
- */\r
-\r
-package org.apache.tomcat.util.net;\r
-\r
+/*
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tomcat.util.net;
+
import java.io.IOException;\r
import java.net.InetAddress;\r
import java.net.InetSocketAddress;\r
import org.apache.tomcat.jni.Poll;\r
import org.apache.tomcat.jni.SSL;\r
import org.apache.tomcat.jni.Status;\r
-import org.apache.tomcat.util.res.StringManager;\r
-import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment;\r
-\r
-/**\r
- * NIO tailored thread pool, providing the following services:\r
- * <ul>\r
- * <li>Socket acceptor thread</li>\r
- * <li>Socket poller thread</li>\r
- * <li>Sendfile thread</li>\r
- * <li>Worker threads pool</li>\r
- * </ul>\r
- *\r
- * When switching to Java 5, there's an opportunity to use the virtual\r
- * machine's thread pool.\r
- *\r
- * @author Mladen Turk\r
- * @author Remy Maucherat\r
- * @author Filip Hanik\r
- */\r
-public class NioEndpoint {\r
-\r
-\r
- // -------------------------------------------------------------- Constants\r
-\r
-\r
- protected static Log log = LogFactory.getLog(NioEndpoint.class);\r
-\r
- protected static StringManager sm =\r
- StringManager.getManager("org.apache.tomcat.util.net.res");\r
-\r
-\r
- /**\r
- * The Request attribute key for the cipher suite.\r
- */\r
- public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite";\r
-\r
- /**\r
- * The Request attribute key for the key size.\r
- */\r
- public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size";\r
-\r
- /**\r
- * The Request attribute key for the client certificate chain.\r
- */\r
- public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate";\r
-\r
- /**\r
- * The Request attribute key for the session id.\r
- * This one is a Tomcat extension to the Servlet spec.\r
- */\r
- public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session";\r
-\r
-\r
- // ----------------------------------------------------------------- Fields\r
-\r
-\r
- /**\r
- * Available workers.\r
- */\r
- protected WorkerStack workers = null;\r
-\r
-\r
- /**\r
- * Running state of the endpoint.\r
- */\r
- protected volatile boolean running = false;\r
-\r
-\r
- /**\r
- * Will be set to true whenever the endpoint is paused.\r
- */\r
- protected volatile boolean paused = false;\r
-\r
-\r
- /**\r
- * Track the initialization state of the endpoint.\r
- */\r
- protected boolean initialized = false;\r
-\r
-\r
- /**\r
- * Current worker threads busy count.\r
- */\r
- protected int curThreadsBusy = 0;\r
-\r
-\r
- /**\r
- * Current worker threads count.\r
- */\r
- protected int curThreads = 0;\r
-\r
-\r
- /**\r
- * Sequence number used to generate thread names.\r
- */\r
- protected int sequence = 0;\r
-\r
-\r
- /**\r
- * Root APR memory pool.\r
- */\r
- protected long rootPool = 0;\r
-\r
-\r
- /**\r
- * Server socket "pointer".\r
- */\r
- protected ServerSocketChannel serverSock = null;\r
-\r
-\r
- /**\r
- * APR memory pool for the server socket.\r
- */\r
- protected long serverSockPool = 0;\r
-\r
-\r
- /**\r
- * SSL context.\r
- */\r
- protected long sslContext = 0;\r
-\r
-\r
- // ------------------------------------------------------------- Properties\r
-\r
-\r
- /**\r
- * External Executor based thread pool.\r
- */\r
- protected Executor executor = null;\r
- public void setExecutor(Executor executor) { this.executor = executor; }\r
- public Executor getExecutor() { return executor; }\r
-\r
-\r
- /**\r
- * Maximum amount of worker threads.\r
- */\r
- protected int maxThreads = 40;\r
- public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; }\r
- public int getMaxThreads() { return maxThreads; }\r
-\r
-\r
- /**\r
- * Priority of the acceptor and poller threads.\r
- */\r
- protected int threadPriority = Thread.NORM_PRIORITY;\r
- public void setThreadPriority(int threadPriority) { this.threadPriority = threadPriority; }\r
- public int getThreadPriority() { return threadPriority; }\r
-\r
-\r
- /**\r
- * Size of the socket poller.\r
- */\r
- protected int pollerSize = 8 * 1024;\r
- public void setPollerSize(int pollerSize) { this.pollerSize = pollerSize; }\r
- public int getPollerSize() { return pollerSize; }\r
-\r
-\r
- /**\r
- * Size of the sendfile (= concurrent files which can be served).\r
- */\r
- protected int sendfileSize = 1 * 1024;\r
- public void setSendfileSize(int sendfileSize) { this.sendfileSize = sendfileSize; }\r
- public int getSendfileSize() { return sendfileSize; }\r
-\r
-\r
- /**\r
- * Server socket port.\r
- */\r
- protected int port;\r
- public int getPort() { return port; }\r
- public void setPort(int port ) { this.port=port; }\r
-\r
-\r
- /**\r
- * Address for the server socket.\r
- */\r
- protected InetAddress address;\r
- public InetAddress getAddress() { return address; }\r
- public void setAddress(InetAddress address) { this.address = address; }\r
-\r
-\r
- /**\r
- * Handling of accepted sockets.\r
- */\r
- protected Handler handler = null;\r
- public void setHandler(Handler handler ) { this.handler = handler; }\r
- public Handler getHandler() { return handler; }\r
-\r
-\r
- /**\r
- * Allows the server developer to specify the backlog that\r
- * should be used for server sockets. By default, this value\r
- * is 100.\r
- */\r
- protected int backlog = 100;\r
- public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; }\r
- public int getBacklog() { return backlog; }\r
-\r
-\r
- /**\r
- * Socket TCP no delay.\r
- */\r
- protected boolean tcpNoDelay = false;\r
- public boolean getTcpNoDelay() { return tcpNoDelay; }\r
- public void setTcpNoDelay(boolean tcpNoDelay) { this.tcpNoDelay = tcpNoDelay; }\r
-\r
-\r
- /**\r
- * Socket linger.\r
- */\r
- protected int soLinger = 100;\r
- public int getSoLinger() { return soLinger; }\r
- public void setSoLinger(int soLinger) { this.soLinger = soLinger; }\r
-\r
-\r
- /**\r
- * Socket timeout.\r
- */\r
- protected int soTimeout = -1;\r
- public int getSoTimeout() { return soTimeout; }\r
- public void setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; }\r
-\r
-\r
- /**\r
- * Timeout on first request read before going to the poller, in ms.\r
- */\r
- protected int firstReadTimeout = 60000;\r
- public int getFirstReadTimeout() { return firstReadTimeout; }\r
- public void setFirstReadTimeout(int firstReadTimeout) { this.firstReadTimeout = firstReadTimeout; }\r
-\r
-\r
- /**\r
- * Poll interval, in microseconds. The smaller the value, the more CPU the poller\r
- * will use, but the more responsive to activity it will be.\r
- */\r
- protected int pollTime = 2000;\r
- public int getPollTime() { return pollTime; }\r
- public void setPollTime(int pollTime) { if (pollTime > 0) { this.pollTime = pollTime; } }\r
-\r
-\r
- /**\r
- * The default is true - the created threads will be\r
- * in daemon mode. If set to false, the control thread\r
- * will not be daemon - and will keep the process alive.\r
- */\r
- protected boolean daemon = true;\r
- public void setDaemon(boolean b) { daemon = b; }\r
- public boolean getDaemon() { return daemon; }\r
-\r
-\r
- /**\r
- * Name of the thread pool, which will be used for naming child threads.\r
- */\r
- protected String name = "TP";\r
- public void setName(String name) { this.name = name; }\r
- public String getName() { return name; }\r
-\r
-\r
- /**\r
- * Use endfile for sending static files.\r
- */\r
- protected boolean useSendfile = Library.APR_HAS_SENDFILE;\r
- public void setUseSendfile(boolean useSendfile) { this.useSendfile = useSendfile; }\r
- public boolean getUseSendfile() { return useSendfile; }\r
-\r
-\r
- /**\r
- * Allow comet request handling.\r
- */\r
- protected boolean useComet = true;\r
- public void setUseComet(boolean useComet) { this.useComet = useComet; }\r
- public boolean getUseComet() { return useComet; }\r
-\r
-\r
- /**\r
- * Acceptor thread count.\r
- */\r
- protected int acceptorThreadCount = 0;\r
- public void setAcceptorThreadCount(int acceptorThreadCount) { this.acceptorThreadCount = acceptorThreadCount; }\r
- public int getAcceptorThreadCount() { return acceptorThreadCount; }\r
-\r
-\r
- /**\r
- * Sendfile thread count.\r
- */\r
- protected int sendfileThreadCount = 0;\r
- public void setSendfileThreadCount(int sendfileThreadCount) { this.sendfileThreadCount = sendfileThreadCount; }\r
- public int getSendfileThreadCount() { return sendfileThreadCount; }\r
-\r
-\r
- /**\r
- * Poller thread count.\r
- */\r
- protected int pollerThreadCount = 0;\r
- public void setPollerThreadCount(int pollerThreadCount) { this.pollerThreadCount = pollerThreadCount; }\r
- public int getPollerThreadCount() { return pollerThreadCount; }\r
-\r
- protected long selectorTimeout = 5000;\r
- public void setSelectorTimeout(long timeout){ this.selectorTimeout = timeout;}\r
- public long getSelectorTimeout(){ return this.selectorTimeout; }\r
- /**\r
- * The socket poller.\r
- */\r
- protected Poller[] pollers = null;\r
- protected int pollerRoundRobin = 0;\r
- public Poller getPoller() {\r
- pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length;\r
- Poller poller = pollers[pollerRoundRobin];\r
- poller.comet = false;\r
- return poller;\r
- }\r
-\r
-\r
- /**\r
- * The socket poller used for Comet support.\r
- */\r
- public Poller getCometPoller() {\r
- Poller poller = getPoller();\r
- poller.comet = true;\r
- return poller;\r
- }\r
-\r
-\r
- /**\r
- * The static file sender.\r
- */\r
- protected Sendfile[] sendfiles = null;\r
- protected int sendfileRoundRobin = 0;\r
- public Sendfile getSendfile() {\r
- sendfileRoundRobin = (sendfileRoundRobin + 1) % sendfiles.length;\r
- return sendfiles[sendfileRoundRobin];\r
- }\r
-\r
-\r
- /**\r
- * Dummy maxSpareThreads property.\r
- */\r
- public int getMaxSpareThreads() { return 0; }\r
-\r
-\r
- /**\r
- * Dummy minSpareThreads property.\r
- */\r
- public int getMinSpareThreads() { return 0; }\r
-\r
-\r
- /**\r
- * SSL engine.\r
- */\r
- protected String SSLEngine = "off";\r
- public String getSSLEngine() { return SSLEngine; }\r
- public void setSSLEngine(String SSLEngine) { this.SSLEngine = SSLEngine; }\r
-\r
-\r
- /**\r
- * SSL protocols.\r
- */\r
- protected String SSLProtocol = "all";\r
- public String getSSLProtocol() { return SSLProtocol; }\r
- public void setSSLProtocol(String SSLProtocol) { this.SSLProtocol = SSLProtocol; }\r
-\r
-\r
- /**\r
- * SSL password (if a cert is encrypted, and no password has been provided, a callback\r
- * will ask for a password).\r
- */\r
- protected String SSLPassword = null;\r
- public String getSSLPassword() { return SSLPassword; }\r
- public void setSSLPassword(String SSLPassword) { this.SSLPassword = SSLPassword; }\r
-\r
-\r
- /**\r
- * SSL cipher suite.\r
- */\r
- protected String SSLCipherSuite = "ALL";\r
- public String getSSLCipherSuite() { return SSLCipherSuite; }\r
- public void setSSLCipherSuite(String SSLCipherSuite) { this.SSLCipherSuite = SSLCipherSuite; }\r
-\r
-\r
- /**\r
- * SSL certificate file.\r
- */\r
- protected String SSLCertificateFile = null;\r
- public String getSSLCertificateFile() { return SSLCertificateFile; }\r
- public void setSSLCertificateFile(String SSLCertificateFile) { this.SSLCertificateFile = SSLCertificateFile; }\r
-\r
-\r
- /**\r
- * SSL certificate key file.\r
- */\r
- protected String SSLCertificateKeyFile = null;\r
- public String getSSLCertificateKeyFile() { return SSLCertificateKeyFile; }\r
- public void setSSLCertificateKeyFile(String SSLCertificateKeyFile) { this.SSLCertificateKeyFile = SSLCertificateKeyFile; }\r
-\r
-\r
- /**\r
- * SSL certificate chain file.\r
- */\r
- protected String SSLCertificateChainFile = null;\r
- public String getSSLCertificateChainFile() { return SSLCertificateChainFile; }\r
- public void setSSLCertificateChainFile(String SSLCertificateChainFile) { this.SSLCertificateChainFile = SSLCertificateChainFile; }\r
-\r
-\r
- /**\r
- * SSL CA certificate path.\r
- */\r
- protected String SSLCACertificatePath = null;\r
- public String getSSLCACertificatePath() { return SSLCACertificatePath; }\r
- public void setSSLCACertificatePath(String SSLCACertificatePath) { this.SSLCACertificatePath = SSLCACertificatePath; }\r
-\r
-\r
- /**\r
- * SSL CA certificate file.\r
- */\r
- protected String SSLCACertificateFile = null;\r
- public String getSSLCACertificateFile() { return SSLCACertificateFile; }\r
- public void setSSLCACertificateFile(String SSLCACertificateFile) { this.SSLCACertificateFile = SSLCACertificateFile; }\r
-\r
-\r
- /**\r
- * SSL CA revocation path.\r
- */\r
- protected String SSLCARevocationPath = null;\r
- public String getSSLCARevocationPath() { return SSLCARevocationPath; }\r
- public void setSSLCARevocationPath(String SSLCARevocationPath) { this.SSLCARevocationPath = SSLCARevocationPath; }\r
-\r
-\r
- /**\r
- * SSL CA revocation file.\r
- */\r
- protected String SSLCARevocationFile = null;\r
- public String getSSLCARevocationFile() { return SSLCARevocationFile; }\r
- public void setSSLCARevocationFile(String SSLCARevocationFile) { this.SSLCARevocationFile = SSLCARevocationFile; }\r
-\r
-\r
- /**\r
- * SSL verify client.\r
- */\r
- protected String SSLVerifyClient = "none";\r
- public String getSSLVerifyClient() { return SSLVerifyClient; }\r
- public void setSSLVerifyClient(String SSLVerifyClient) { this.SSLVerifyClient = SSLVerifyClient; }\r
-\r
-\r
- /**\r
- * SSL verify depth.\r
- */\r
- protected int SSLVerifyDepth = 10;\r
- public int getSSLVerifyDepth() { return SSLVerifyDepth; }\r
- public void setSSLVerifyDepth(int SSLVerifyDepth) { this.SSLVerifyDepth = SSLVerifyDepth; }\r
-\r
-\r
- // --------------------------------------------------------- Public Methods\r
-\r
-\r
- /**\r
- * Number of keepalive sockets.\r
- */\r
- public int getKeepAliveCount() {\r
- if (pollers == null) {\r
- return 0;\r
- } else {\r
- int keepAliveCount = 0;\r
- for (int i = 0; i < pollers.length; i++) {\r
- keepAliveCount += pollers[i].getKeepAliveCount();\r
- }\r
- return keepAliveCount;\r
- }\r
- }\r
-\r
-\r
- /**\r
- * Number of sendfile sockets.\r
- */\r
- public int getSendfileCount() {\r
- if (sendfiles == null) {\r
- return 0;\r
- } else {\r
- int sendfileCount = 0;\r
- for (int i = 0; i < sendfiles.length; i++) {\r
- sendfileCount += sendfiles[i].getSendfileCount();\r
- }\r
- return sendfileCount;\r
- }\r
- }\r
-\r
-\r
- /**\r
- * Return the amount of threads that are managed by the pool.\r
- *\r
- * @return the amount of threads that are managed by the pool\r
- */\r
- public int getCurrentThreadCount() {\r
- return curThreads;\r
- }\r
-\r
-\r
- /**\r
- * Return the amount of threads currently busy.\r
- *\r
- * @return the amount of threads currently busy\r
- */\r
- public int getCurrentThreadsBusy() {\r
- return curThreadsBusy;\r
- }\r
-\r
-\r
- /**\r
- * Return the state of the endpoint.\r
- *\r
- * @return true if the endpoint is running, false otherwise\r
- */\r
- public boolean isRunning() {\r
- return running;\r
- }\r
-\r
-\r
- /**\r
- * Return the state of the endpoint.\r
- *\r
- * @return true if the endpoint is paused, false otherwise\r
- */\r
- public boolean isPaused() {\r
- return paused;\r
- }\r
-\r
-\r
- // ----------------------------------------------- Public Lifecycle Methods\r
-\r
-\r
- /**\r
- * Initialize the endpoint.\r
- */\r
- public void init()\r
- throws Exception {\r
-\r
- if (initialized)\r
- return;\r
-\r
- serverSock = ServerSocketChannel.open();\r
- InetSocketAddress addr = (address!=null?new InetSocketAddress(address,port):new InetSocketAddress(port));\r
- serverSock.socket().bind(addr,100); //todo, set backlog value\r
- serverSock.configureBlocking(true); //mimic APR behavior\r
- // Sendfile usage on systems which don't support it cause major problems\r
- if (useSendfile) {\r
- log.warn(sm.getString("endpoint.sendfile.nosupport"));\r
- useSendfile = false;\r
- }\r
-\r
- // Initialize thread count defaults for acceptor, poller and sendfile\r
- if (acceptorThreadCount == 0) {\r
- // FIXME: Doesn't seem to work that well with multiple accept threads\r
- acceptorThreadCount = 1;\r
- }\r
- if (pollerThreadCount != 1) {\r
- // limit to one poller, no need for others\r
- pollerThreadCount = 1;\r
- }\r
- if (sendfileThreadCount != 0) {\r
- sendfileThreadCount = 0;\r
- }\r
-\r
- // Initialize SSL if needed\r
- if (!"off".equalsIgnoreCase(SSLEngine)) {\r
- // Initialize SSL\r
- // FIXME: one per VM call ?\r
- if ("on".equalsIgnoreCase(SSLEngine)) {\r
- SSL.initialize(null);\r
- } else {\r
- SSL.initialize(SSLEngine);\r
- }\r
- // SSL protocol\r
- int value = SSL.SSL_PROTOCOL_ALL;\r
- if ("SSLv2".equalsIgnoreCase(SSLProtocol)) {\r
- value = SSL.SSL_PROTOCOL_SSLV2;\r
- } else if ("SSLv3".equalsIgnoreCase(SSLProtocol)) {\r
- value = SSL.SSL_PROTOCOL_SSLV3;\r
- } else if ("TLSv1".equalsIgnoreCase(SSLProtocol)) {\r
- value = SSL.SSL_PROTOCOL_TLSV1;\r
- } else if ("SSLv2+SSLv3".equalsIgnoreCase(SSLProtocol)) {\r
- value = SSL.SSL_PROTOCOL_SSLV2 | SSL.SSL_PROTOCOL_SSLV3;\r
- }\r
-// // Create SSL Context\r
-// sslContext = SSLContext.make(rootPool, value, SSL.SSL_MODE_SERVER);\r
-// // List the ciphers that the client is permitted to negotiate\r
-// SSLContext.setCipherSuite(sslContext, SSLCipherSuite);\r
-// // Load Server key and certificate\r
-// SSLContext.setCertificate(sslContext, SSLCertificateFile, SSLCertificateKeyFile, SSLPassword, SSL.SSL_AIDX_RSA);\r
-// // Set certificate chain file\r
-// SSLContext.setCertificateChainFile(sslContext, SSLCertificateChainFile, false);\r
-// // Support Client Certificates\r
-// SSLContext.setCACertificate(sslContext, SSLCACertificateFile, SSLCACertificatePath);\r
-// // Set revocation\r
-// SSLContext.setCARevocation(sslContext, SSLCARevocationFile, SSLCARevocationPath);\r
-// // Client certificate verification\r
-// value = SSL.SSL_CVERIFY_NONE;\r
-// if ("optional".equalsIgnoreCase(SSLVerifyClient)) {\r
-// value = SSL.SSL_CVERIFY_OPTIONAL;\r
-// } else if ("require".equalsIgnoreCase(SSLVerifyClient)) {\r
-// value = SSL.SSL_CVERIFY_REQUIRE;\r
-// } else if ("optionalNoCA".equalsIgnoreCase(SSLVerifyClient)) {\r
-// value = SSL.SSL_CVERIFY_OPTIONAL_NO_CA;\r
-// }\r
-// SSLContext.setVerify(sslContext, value, SSLVerifyDepth);\r
- // For now, sendfile is not supported with SSL\r
- useSendfile = false;\r
- }\r
-\r
- initialized = true;\r
-\r
- }\r
-\r
-\r
- /**\r
- * Start the APR endpoint, creating acceptor, poller and sendfile threads.\r
- */\r
- public void start()\r
- throws Exception {\r
- // Initialize socket if not done before\r
- if (!initialized) {\r
- init();\r
- }\r
- if (!running) {\r
- running = true;\r
- paused = false;\r
-\r
- // Create worker collection\r
- if (executor == null) {\r
- workers = new WorkerStack(maxThreads);\r
- }\r
-\r
- // Start acceptor threads\r
- for (int i = 0; i < acceptorThreadCount; i++) {\r
- Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i);\r
- acceptorThread.setPriority(threadPriority);\r
- acceptorThread.setDaemon(daemon);\r
- acceptorThread.start();\r
- }\r
-\r
- // Start poller threads\r
- pollers = new Poller[pollerThreadCount];\r
- for (int i = 0; i < pollerThreadCount; i++) {\r
- pollers[i] = new Poller(false);\r
- pollers[i].init();\r
- Thread pollerThread = new Thread(pollers[i], getName() + "-Poller-" + i);\r
- pollerThread.setPriority(threadPriority);\r
- pollerThread.setDaemon(true);\r
- pollerThread.start();\r
- }\r
-\r
- // Start sendfile threads\r
- if (useSendfile) {\r
- sendfiles = new Sendfile[sendfileThreadCount];\r
- for (int i = 0; i < sendfileThreadCount; i++) {\r
- sendfiles[i] = new Sendfile();\r
- sendfiles[i].init();\r
- Thread sendfileThread = new Thread(sendfiles[i], getName() + "-Sendfile-" + i);\r
- sendfileThread.setPriority(threadPriority);\r
- sendfileThread.setDaemon(true);\r
- sendfileThread.start();\r
- }\r
- }\r
- }\r
- }\r
-\r
-\r
- /**\r
- * Pause the endpoint, which will make it stop accepting new sockets.\r
- */\r
- public void pause() {\r
- if (running && !paused) {\r
- paused = true;\r
- unlockAccept();\r
- }\r
- }\r
-\r
-\r
- /**\r
- * Resume the endpoint, which will make it start accepting new sockets\r
- * again.\r
- */\r
- public void resume() {\r
- if (running) {\r
- paused = false;\r
- }\r
- }\r
-\r
-\r
- /**\r
- * Stop the endpoint. This will cause all processing threads to stop.\r
- */\r
- public void stop() {\r
- if (running) {\r
- running = false;\r
- unlockAccept();\r
- for (int i = 0; i < pollers.length; i++) {\r
- pollers[i].destroy();\r
- }\r
- pollers = null;\r
- if (useSendfile) {\r
- for (int i = 0; i < sendfiles.length; i++) {\r
- sendfiles[i].destroy();\r
- }\r
- sendfiles = null;\r
- }\r
- }\r
- }\r
-\r
-\r
- /**\r
- * Deallocate APR memory pools, and close server socket.\r
- */\r
- public void destroy() throws Exception {\r
- if (running) {\r
- stop();\r
- }\r
- // Close server socket\r
- serverSock.socket().close();\r
- serverSock.close();\r
- serverSock = null;\r
- sslContext = 0;\r
- initialized = false;\r
- }\r
-\r
-\r
- // ------------------------------------------------------ Protected Methods\r
-\r
-\r
- /**\r
- * Get a sequence number used for thread naming.\r
- */\r
- protected int getSequence() {\r
- return sequence++;\r
- }\r
-\r
-\r
- /**\r
- * Unlock the server socket accept using a bugus connection.\r
- */\r
- protected void unlockAccept() {\r
- java.net.Socket s = null;\r
- try {\r
- // Need to create a connection to unlock the accept();\r
- if (address == null) {\r
- s = new java.net.Socket("127.0.0.1", port);\r
- } else {\r
- s = new java.net.Socket(address, port);\r
- // setting soLinger to a small value will help shutdown the\r
- // connection quicker\r
- s.setSoLinger(true, 0);\r
- }\r
- } catch(Exception e) {\r
- if (log.isDebugEnabled()) {\r
- log.debug(sm.getString("endpoint.debug.unlock", "" + port), e);\r
- }\r
- } finally {\r
- if (s != null) {\r
- try {\r
- s.close();\r
- } catch (Exception e) {\r
- // Ignore\r
- }\r
- }\r
- }\r
- }\r
-\r
-\r
- /**\r
- * Process the specified connection.\r
- */\r
- protected boolean setSocketOptions(SocketChannel socket) {\r
- // Process the connection\r
- int step = 1;\r
- try {\r
- //disable blocking, APR style, we are gonna be polling it\r
- socket.configureBlocking(false);\r
-\r
- // 1: Set socket options: timeout, linger, etc\r
- if (soLinger >= 0)\r
- socket.socket().setSoLinger(true,soLinger);\r
- if (tcpNoDelay)\r
- socket.socket().setTcpNoDelay(true);\r
- if (soTimeout > 0)\r
- socket.socket().setSoTimeout(soTimeout);\r
-\r
-\r
- // 2: SSL handshake\r
- step = 2;\r
- if (sslContext != 0) {\r
-// SSLSocket.attach(sslContext, socket);\r
-// if (SSLSocket.handshake(socket) != 0) {\r
-// if (log.isDebugEnabled()) {\r
-// log.debug(sm.getString("endpoint.err.handshake") + ": " + SSL.getLastError());\r
-// }\r
-// return false;\r
-// }\r
- }\r
- \r
- getPoller().register(socket);\r
-\r
- } catch (Throwable t) {\r
- if (log.isDebugEnabled()) {\r
- if (step == 2) {\r
- log.debug(sm.getString("endpoint.err.handshake"), t);\r
- } else {\r
- log.debug(sm.getString("endpoint.err.unexpected"), t);\r
- }\r
- }\r
- // Tell to close the socket\r
- return false;\r
- }\r
- return true;\r
- }\r
-\r
-\r
- /**\r
- * Create (or allocate) and return an available processor for use in\r
- * processing a specific HTTP request, if possible. If the maximum\r
- * allowed processors have already been created and are in use, return\r
- * <code>null</code> instead.\r
- */\r
- protected Worker createWorkerThread() {\r
-\r
- synchronized (workers) {\r
- if (workers.size() > 0) {\r
- curThreadsBusy++;\r
- return (workers.pop());\r
- }\r
- if ((maxThreads > 0) && (curThreads < maxThreads)) {\r
- curThreadsBusy++;\r
- return (newWorkerThread());\r
- } else {\r
- if (maxThreads < 0) {\r
- curThreadsBusy++;\r
- return (newWorkerThread());\r
- } else {\r
- return (null);\r
- }\r
- }\r
- }\r
-\r
- }\r
-\r
-\r
- /**\r
- * Create and return a new processor suitable for processing HTTP\r
- * requests and returning the corresponding responses.\r
- */\r
- protected Worker newWorkerThread() {\r
-\r
- Worker workerThread = new Worker();\r
- workerThread.start();\r
- return (workerThread);\r
-\r
- }\r
-\r
-\r
- /**\r
- * Return a new worker thread, and block while to worker is available.\r
- */\r
- protected Worker getWorkerThread() {\r
- // Allocate a new worker thread\r
- Worker workerThread = createWorkerThread();\r
- while (workerThread == null) {\r
- try {\r
- synchronized (workers) {\r
- workers.wait();\r
- }\r
- } catch (InterruptedException e) {\r
- // Ignore\r
- }\r
- workerThread = createWorkerThread();\r
- }\r
- return workerThread;\r
- }\r
-\r
-\r
- /**\r
- * Recycle the specified Processor so that it can be used again.\r
- *\r
- * @param workerThread The processor to be recycled\r
- */\r
- protected void recycleWorkerThread(Worker workerThread) {\r
- synchronized (workers) {\r
- workers.push(workerThread);\r
- curThreadsBusy--;\r
- workers.notify();\r
- }\r
- }\r
-\r
-\r
- /**\r
- * Allocate a new poller of the specified size.\r
- */\r
- protected long allocatePoller(int size, long pool, int timeout) {\r
- try {\r
- return Poll.create(size, pool, 0, timeout * 1000);\r
- } catch (Error e) {\r
- if (Status.APR_STATUS_IS_EINVAL(e.getError())) {\r
- log.info(sm.getString("endpoint.poll.limitedpollsize", "" + size));\r
- return 0;\r
- } else {\r
- log.error(sm.getString("endpoint.poll.initfail"), e);\r
- return -1;\r
- }\r
- }\r
- }\r
-\r
-\r
- /**\r
- * Process given socket.\r
- */\r
- protected boolean processSocket(SocketChannel socket) {\r
- return processSocket(socket,false);\r
- }\r
-\r
-\r
- /**\r
- * Process given socket for an event.\r
- */\r
- protected boolean processSocket(SocketChannel socket, boolean error) {\r
- try {\r
- if (executor == null) {\r
- getWorkerThread().assign(socket, error);\r
- } else {\r
- executor.execute(new SocketEventProcessor(socket, error));\r
- }\r
- } catch (Throwable t) {\r
- // This means we got an OOM or similar creating a thread, or that\r
- // the pool and its queue are full\r
- log.error(sm.getString("endpoint.process.fail"), t);\r
- return false;\r
- }\r
- return true;\r
- }\r
-\r
-\r
- // --------------------------------------------------- Acceptor Inner Class\r
-\r
-\r
- /**\r
- * Server socket acceptor thread.\r
- */\r
- protected class Acceptor implements Runnable {\r
-\r
-\r
- /**\r
- * The background thread that listens for incoming TCP/IP connections and\r
- * hands them off to an appropriate processor.\r
- */\r
- public void run() {\r
-\r
- // Loop until we receive a shutdown command\r
- while (running) {\r
-\r
- // Loop if endpoint is paused\r
- while (paused) {\r
- try {\r
- Thread.sleep(1000);\r
- } catch (InterruptedException e) {\r
- // Ignore\r
- }\r
- }\r
-\r
- try {\r
- // Accept the next incoming connection from the server socket\r
- SocketChannel socket = serverSock.accept();\r
- // Hand this socket off to an appropriate processor\r
- if(!setSocketOptions(socket))\r
- {\r
- // Close socket right away\r
- socket.socket().close();\r
- socket.close();\r
- }\r
- } catch (Throwable t) {\r
- log.error(sm.getString("endpoint.accept.fail"), t);\r
- }\r
-\r
- // The processor will recycle itself when it finishes\r
-\r
- }\r
-\r
- }\r
-\r
- }\r
-\r
-\r
- // ----------------------------------------------------- Poller Inner Class\r
-\r
-\r
- /**\r
- * Poller class.\r
- */\r
- public class Poller implements Runnable {\r
-\r
- protected Selector selector;\r
- protected LinkedList<Runnable> events = new LinkedList<Runnable>();\r
- protected boolean close = false;\r
- protected boolean comet = true;\r
-\r
- protected int keepAliveCount = 0;\r
- public int getKeepAliveCount() { return keepAliveCount; }\r
-\r
-\r
-\r
- public Poller(boolean comet) throws IOException {\r
- this.comet = comet;\r
- this.selector = Selector.open();\r
- }\r
- \r
- public Selector getSelector() { return selector;}\r
-\r
- /**\r
- * Create the poller. With some versions of APR, the maximum poller size will\r
- * be 62 (reocmpiling APR is necessary to remove this limitation).\r
- */\r
- protected void init() {\r
- keepAliveCount = 0;\r
- }\r
-\r
- /**\r
- * Destroy the poller.\r
- */\r
- protected void destroy() {\r
- // Wait for polltime before doing anything, so that the poller threads\r
- // exit, otherwise parallel descturction of sockets which are still\r
- // in the poller can cause problems\r
- try {\r
- synchronized (this) {\r
- this.wait(pollTime / 1000);\r
- }\r
- } catch (InterruptedException e) {\r
- // Ignore\r
- }\r
- close = true;\r
- }\r
-\r
- /**\r
- * Add specified socket and associated pool to the poller. The socket will\r
- * be added to a temporary array, and polled first after a maximum amount\r
- * of time equal to pollTime (in most cases, latency will be much lower,\r
- * however).\r
- *\r
- * @param socket to add to the poller\r
- */\r
- public void add(final SocketChannel socket) {\r
- KeyAttachment att = (KeyAttachment)socket.keyFor(getPoller().getSelector()).attachment();\r
- if ( att!=null )att.setCurrentAccess(false);\r
- final SelectionKey key = socket.keyFor(selector);\r
- Runnable r = new Runnable() {\r
- public void run() {\r
- if ( key != null ) key.interestOps(SelectionKey.OP_READ);\r
- }\r
- };\r
- synchronized (events) {\r
- events.add(r);\r
- }\r
- selector.wakeup();\r
- }\r
-\r
- public void events() {\r
- synchronized (events) {\r
- Runnable r = null;\r
- while ( (events.size() > 0) && (r = events.removeFirst()) != null ) {\r
- try {\r
- r.run();\r
- } catch ( Exception x ) {\r
- log.error("",x);\r
- }\r
- }\r
- events.clear();\r
- }\r
- }\r
- \r
- public void register(final SocketChannel socket)\r
- {\r
- SelectionKey key = socket.keyFor(selector);\r
- Runnable r = new Runnable() {\r
- public void run() {\r
- try {\r
- socket.register(selector, SelectionKey.OP_READ, new KeyAttachment());\r
- } catch (Exception x) {\r
- log.error("", x);\r
- }\r
- }\r
- \r
- };\r
- synchronized (events) {\r
- events.add(r);\r
- }\r
- selector.wakeup();\r
- }\r
- \r
- public void cancelledKey(SelectionKey key) {\r
- try {\r
- KeyAttachment ka = (KeyAttachment) key.attachment();\r
- key.cancel();\r
- if (ka.getComet()) processSocket( (SocketChannel) key.channel(), true);\r
- key.channel().close();\r
- } catch (IOException e) {\r
- if ( log.isDebugEnabled() ) log.debug("",e);\r
- // Ignore\r
- }\r
- }\r
- /**\r
- * The background thread that listens for incoming TCP/IP connections and\r
- * hands them off to an appropriate processor.\r
- */\r
- public void run() {\r
-\r
- // Loop until we receive a shutdown command\r
- while (running) {\r
- // Loop if endpoint is paused\r
- while (paused) {\r
- try {\r
- Thread.sleep(1000);\r
- } catch (InterruptedException e) {\r
- // Ignore\r
- }\r
- }\r
-\r
- events();\r
- // Time to terminate?\r
- if (close) return;\r
-\r
- int keyCount = 0;\r
- try {\r
- keyCount = selector.select(selectorTimeout);\r
- } catch (IOException x) {\r
- log.error("",x);\r
- continue;\r
- }\r
- //timeout\r
- Set keys = selector.keys();\r
- long now = System.currentTimeMillis();\r
- for (Iterator iter = keys.iterator(); iter.hasNext(); ) {\r
- SelectionKey key = (SelectionKey) iter.next();\r
- try {\r
- if (key.interestOps() == SelectionKey.OP_READ) {\r
- //only timeout sockets that we are waiting for a read from\r
- KeyAttachment ka = (KeyAttachment) key.attachment();\r
- long delta = now - ka.getLastAccess();\r
- if (delta > (long) soTimeout) {\r
- cancelledKey(key);\r
- }\r
- }\r
- }catch ( CancelledKeyException ckx ) {\r
- cancelledKey(key);\r
- }\r
- }\r
- \r
-\r
- if (keyCount == 0) continue;\r
-\r
- Iterator iterator = selector.selectedKeys().iterator();\r
- // Walk through the collection of ready keys and dispatch\r
- // any active event.\r
- while (iterator.hasNext()) {\r
- SelectionKey sk = (SelectionKey) iterator.next();\r
- iterator.remove();\r
- KeyAttachment attachment = (KeyAttachment)sk.attachment();\r
- try {\r
- if(attachment == null) attachment = new KeyAttachment();\r
- attachment.access();\r
- sk.attach(attachment);\r
-\r
- int readyOps = sk.readyOps();\r
- sk.interestOps(sk.interestOps() & ~readyOps);\r
- SocketChannel channel = (SocketChannel)sk.channel();\r
- boolean read = sk.isReadable();\r
- if (read) {\r
- if ( comet ) {\r
- if (!processSocket(channel,false)) processSocket(channel,true);\r
- } else {\r
- boolean close = (!processSocket(channel));\r
- if ( close ) {\r
- channel.socket().close();\r
- channel.close();\r
- }\r
- }\r
- }\r
- if (sk.isValid() && sk.isWritable()) {\r
- }\r
- } catch ( CancelledKeyException ckx ) {\r
- if (attachment!=null && attachment.getComet()) processSocket( (SocketChannel) sk.channel(), true);\r
- try {\r
- sk.channel().close();\r
- }catch ( Exception ignore){}\r
- } catch (Throwable t) {\r
- log.error("",t);\r
- }\r
- }//while\r
-\r
- \r
- }\r
- synchronized (this) {\r
- this.notifyAll();\r
- }\r
-\r
- }\r
-\r
- }\r
- \r
- public static class KeyAttachment {\r
-\r
- public long getLastAccess() { return lastAccess; }\r
- public void access() { access(System.currentTimeMillis()); }\r
- public void access(long access) { lastAccess = access; }\r
- public void setComet(boolean comet) { this.comet = comet; }\r
- public boolean getComet() { return comet; }\r
- public boolean getCurrentAccess() { return currentAccess; }\r
- public void setCurrentAccess(boolean access) { currentAccess = access; }\r
-\r
- protected long lastAccess = System.currentTimeMillis();\r
- protected boolean currentAccess = false;\r
- protected boolean comet = false;\r
-\r
- }\r
-\r
-\r
-\r
- // ----------------------------------------------------- Worker Inner Class\r
-\r
-\r
- /**\r
- * Server processor class.\r
- */\r
- protected class Worker implements Runnable {\r
-\r
-\r
- protected Thread thread = null;\r
- protected boolean available = false;\r
- protected SocketChannel socket = null;\r
- protected boolean event = false;\r
- protected boolean error = false;\r
-\r
-\r
- /**\r
- * Process an incoming TCP/IP connection on the specified socket. Any\r
- * exception that occurs during processing must be logged and swallowed.\r
- * <b>NOTE</b>: This method is called from our Connector's thread. We\r
- * must assign it to our own thread so that multiple simultaneous\r
- * requests can be handled.\r
- *\r
- * @param socket TCP socket to process\r
- */\r
- protected synchronized void assign(SocketChannel socket) {\r
-\r
- // Wait for the Processor to get the previous Socket\r
- while (available) {\r
- try {\r
- wait();\r
- } catch (InterruptedException e) {\r
- }\r
- }\r
-\r
- // Store the newly available Socket and notify our thread\r
- this.socket = socket;\r
- event = false;\r
- error = false;\r
- available = true;\r
- notifyAll();\r
-\r
- }\r
-\r
-\r
- protected synchronized void assign(SocketChannel socket, boolean error) {\r
-\r
- // Wait for the Processor to get the previous Socket\r
- while (available) {\r
- try {\r
- wait();\r
- } catch (InterruptedException e) {\r
- }\r
- }\r
-\r
- // Store the newly available Socket and notify our thread\r
- this.socket = socket;\r
- KeyAttachment att = (KeyAttachment)socket.keyFor(getPoller().getSelector()).attachment();\r
- if ( att!= null ) att.setCurrentAccess(true);\r
- event = true;\r
- this.error = error;\r
- available = true;\r
- notifyAll();\r
- }\r
-\r
-\r
- /**\r
- * Await a newly assigned Socket from our Connector, or <code>null</code>\r
- * if we are supposed to shut down.\r
- */\r
- protected synchronized SocketChannel await() {\r
-\r
- // Wait for the Connector to provide a new Socket\r
- while (!available) {\r
- try {\r
- wait();\r
- } catch (InterruptedException e) {\r
- }\r
- }\r
-\r
- // Notify the Connector that we have received this Socket\r
- SocketChannel socket = this.socket;\r
- available = false;\r
- notifyAll();\r
-\r
- return (socket);\r
-\r
- }\r
-\r
-\r
- /**\r
- * The background thread that listens for incoming TCP/IP connections and\r
- * hands them off to an appropriate processor.\r
- */\r
- public void run() {\r
-\r
- // Process requests until we receive a shutdown signal\r
- while (running) {\r
-\r
- // Wait for the next socket to be assigned\r
- SocketChannel socket = await();\r
- if (socket == null)\r
- continue;\r
-\r
- // Process the request from this socket\r
- if ((event) && (handler.event(socket, error) == Handler.SocketState.CLOSED)) {\r
- // Close socket and pool\r
- try {\r
- socket.socket().close();\r
- socket.close();\r
- }catch ( Exception x ) {\r
- log.error("",x);\r
- }\r
- } else if ((!event) && (handler.process(socket) == Handler.SocketState.CLOSED)) {\r
- // Close socket and pool\r
- try {\r
- socket.socket().close();\r
- socket.close();\r
- }catch ( Exception x ) {\r
- log.error("",x);\r
- }\r
- }\r
-\r
- // Finish up this request\r
- recycleWorkerThread(this);\r
-\r
- }\r
-\r
- }\r
-\r
-\r
- /**\r
- * Start the background processing thread.\r
- */\r
- public void start() {\r
- thread = new Thread(this);\r
- thread.setName(getName() + "-" + (++curThreads));\r
- thread.setDaemon(true);\r
- thread.start();\r
- }\r
-\r
-\r
- }\r
-\r
-\r
- // ----------------------------------------------- SendfileData Inner Class\r
-\r
-\r
- /**\r
- * SendfileData class.\r
- */\r
- public static class SendfileData {\r
- // File\r
- public String fileName;\r
- public long fd;\r
- public long fdpool;\r
- // Range information\r
- public long start;\r
- public long end;\r
- // Socket and socket pool\r
- public SocketChannel socket;\r
- // Position\r
- public long pos;\r
- // KeepAlive flag\r
- public boolean keepAlive;\r
- }\r
-\r
-\r
- // --------------------------------------------------- Sendfile Inner Class\r
-\r
-\r
- /**\r
- * Sendfile class.\r
- */\r
- public class Sendfile implements Runnable {\r
-\r
- protected long sendfilePollset = 0;\r
- protected long pool = 0;\r
- protected long[] desc;\r
- protected HashMap<Long, SendfileData> sendfileData;\r
-\r
- protected int sendfileCount;\r
- public int getSendfileCount() { return sendfileCount; }\r
-\r
- protected ArrayList<SendfileData> addS;\r
-\r
- /**\r
- * Create the sendfile poller. With some versions of APR, the maximum poller size will\r
- * be 62 (reocmpiling APR is necessary to remove this limitation).\r
- */\r
- protected void init() {\r
-// pool = Pool.create(serverSockPool);\r
-// int size = sendfileSize / sendfileThreadCount;\r
-// sendfilePollset = allocatePoller(size, pool, soTimeout);\r
-// if (sendfilePollset == 0 && size > 1024) {\r
-// size = 1024;\r
-// sendfilePollset = allocatePoller(size, pool, soTimeout);\r
-// }\r
-// if (sendfilePollset == 0) {\r
-// size = 62;\r
-// sendfilePollset = allocatePoller(size, pool, soTimeout);\r
-// }\r
-// desc = new long[size * 2];\r
-// sendfileData = new HashMap<Long, SendfileData>(size);\r
-// addS = new ArrayList<SendfileData>();\r
- }\r
-\r
- /**\r
- * Destroy the poller.\r
- */\r
- protected void destroy() {\r
-// // Wait for polltime before doing anything, so that the poller threads\r
-// // exit, otherwise parallel descturction of sockets which are still\r
-// // in the poller can cause problems\r
-// try {\r
-// synchronized (this) {\r
-// this.wait(pollTime / 1000);\r
-// }\r
-// } catch (InterruptedException e) {\r
-// // Ignore\r
-// }\r
-// // Close any socket remaining in the add queue\r
-// for (int i = (addS.size() - 1); i >= 0; i--) {\r
-// SendfileData data = addS.get(i);\r
-// Socket.destroy(data.socket);\r
-// }\r
-// // Close all sockets still in the poller\r
-// int rv = Poll.pollset(sendfilePollset, desc);\r
-// if (rv > 0) {\r
-// for (int n = 0; n < rv; n++) {\r
-// Socket.destroy(desc[n*2+1]);\r
-// }\r
-// }\r
-// Pool.destroy(pool);\r
-// sendfileData.clear();\r
- }\r
-\r
- /**\r
- * Add the sendfile data to the sendfile poller. Note that in most cases,\r
- * the initial non blocking calls to sendfile will return right away, and\r
- * will be handled asynchronously inside the kernel. As a result,\r
- * the poller will never be used.\r
- *\r
- * @param data containing the reference to the data which should be snet\r
- * @return true if all the data has been sent right away, and false\r
- * otherwise\r
- */\r
- public boolean add(SendfileData data) {\r
-// // Initialize fd from data given\r
-// try {\r
-// data.fdpool = Socket.pool(data.socket);\r
-// data.fd = File.open\r
-// (data.fileName, File.APR_FOPEN_READ\r
-// | File.APR_FOPEN_SENDFILE_ENABLED | File.APR_FOPEN_BINARY,\r
-// 0, data.fdpool);\r
-// data.pos = data.start;\r
-// // Set the socket to nonblocking mode\r
-// Socket.timeoutSet(data.socket, 0);\r
-// while (true) {\r
-// long nw = Socket.sendfilen(data.socket, data.fd,\r
-// data.pos, data.end - data.pos, 0);\r
-// if (nw < 0) {\r
-// if (!(-nw == Status.EAGAIN)) {\r
-// Socket.destroy(data.socket);\r
-// data.socket = 0;\r
-// return false;\r
-// } else {\r
-// // Break the loop and add the socket to poller.\r
-// break;\r
-// }\r
-// } else {\r
-// data.pos = data.pos + nw;\r
-// if (data.pos >= data.end) {\r
-// // Entire file has been sent\r
-// Pool.destroy(data.fdpool);\r
-// // Set back socket to blocking mode\r
-// Socket.timeoutSet(data.socket, soTimeout * 1000);\r
-// return true;\r
-// }\r
-// }\r
-// }\r
-// } catch (Exception e) {\r
-// log.error(sm.getString("endpoint.sendfile.error"), e);\r
-// return false;\r
-// }\r
-// // Add socket to the list. Newly added sockets will wait\r
-// // at most for pollTime before being polled\r
-// synchronized (this) {\r
-// addS.add(data);\r
-// this.notify();\r
-// }\r
- return false;\r
- }\r
-\r
- /**\r
- * Remove socket from the poller.\r
- *\r
- * @param data the sendfile data which should be removed\r
- */\r
- protected void remove(SendfileData data) {\r
-// int rv = Poll.remove(sendfilePollset, data.socket);\r
-// if (rv == Status.APR_SUCCESS) {\r
-// sendfileCount--;\r
-// }\r
-// sendfileData.remove(data);\r
- }\r
-\r
- /**\r
- * The background thread that listens for incoming TCP/IP connections and\r
- * hands them off to an appropriate processor.\r
- */\r
- public void run() {\r
-\r
-// // Loop until we receive a shutdown command\r
-// while (running) {\r
-//\r
-// // Loop if endpoint is paused\r
-// while (paused) {\r
-// try {\r
-// Thread.sleep(1000);\r
-// } catch (InterruptedException e) {\r
-// // Ignore\r
-// }\r
-// }\r
-//\r
-// while (sendfileCount < 1 && addS.size() < 1) {\r
-// try {\r
-// synchronized (this) {\r
-// this.wait();\r
-// }\r
-// } catch (InterruptedException e) {\r
-// // Ignore\r
-// }\r
-// }\r
-//\r
-// try {\r
-// // Add socket to the poller\r
-// if (addS.size() > 0) {\r
-// synchronized (this) {\r
-// for (int i = (addS.size() - 1); i >= 0; i--) {\r
-// SendfileData data = addS.get(i);\r
-// int rv = Poll.add(sendfilePollset, data.socket, Poll.APR_POLLOUT);\r
-// if (rv == Status.APR_SUCCESS) {\r
-// sendfileData.put(new Long(data.socket), data);\r
-// sendfileCount++;\r
-// } else {\r
-// log.warn(sm.getString("endpoint.sendfile.addfail", "" + rv, Error.strerror(rv)));\r
-// // Can't do anything: close the socket right away\r
-// Socket.destroy(data.socket);\r
-// }\r
-// }\r
-// addS.clear();\r
-// }\r
-// }\r
-// // Pool for the specified interval\r
-// int rv = Poll.poll(sendfilePollset, pollTime, desc, false);\r
-// if (rv > 0) {\r
-// for (int n = 0; n < rv; n++) {\r
-// // Get the sendfile state\r
-// SendfileData state =\r
-// sendfileData.get(new Long(desc[n*2+1]));\r
-// // Problem events\r
-// if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)\r
-// || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)) {\r
-// // Close socket and clear pool\r
-// remove(state);\r
-// // Destroy file descriptor pool, which should close the file\r
-// // Close the socket, as the reponse would be incomplete\r
-// Socket.destroy(state.socket);\r
-// continue;\r
-// }\r
-// // Write some data using sendfile\r
-// long nw = Socket.sendfilen(state.socket, state.fd,\r
-// state.pos,\r
-// state.end - state.pos, 0);\r
-// if (nw < 0) {\r
-// // Close socket and clear pool\r
-// remove(state);\r
-// // Close the socket, as the reponse would be incomplete\r
-// // This will close the file too.\r
-// Socket.destroy(state.socket);\r
-// continue;\r
-// }\r
-//\r
-// state.pos = state.pos + nw;\r
-// if (state.pos >= state.end) {\r
-// remove(state);\r
-// if (state.keepAlive) {\r
-// // Destroy file descriptor pool, which should close the file\r
-// Pool.destroy(state.fdpool);\r
-// Socket.timeoutSet(state.socket, soTimeout * 1000);\r
-// // If all done hand this socket off to a worker for\r
-// // processing of further requests\r
-// if (!processSocket(state.socket)) {\r
-// Socket.destroy(state.socket);\r
-// }\r
-// } else {\r
-// // Close the socket since this is\r
-// // the end of not keep-alive request.\r
-// Socket.destroy(state.socket);\r
-// }\r
-// }\r
-// }\r
-// } else if (rv < 0) {\r
-// int errn = -rv;\r
-// /* Any non timeup or interrupted error is critical */\r
-// if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) {\r
-// if (errn > Status.APR_OS_START_USERERR) {\r
-// errn -= Status.APR_OS_START_USERERR;\r
-// }\r
-// log.error(sm.getString("endpoint.poll.fail", "" + errn, Error.strerror(errn)));\r
-// // Handle poll critical failure\r
-// synchronized (this) {\r
-// destroy();\r
-// init();\r
-// }\r
-// continue;\r
-// }\r
-// }\r
-// /* TODO: See if we need to call the maintain for sendfile poller */\r
-// } catch (Throwable t) {\r
-// log.error(sm.getString("endpoint.poll.error"), t);\r
-// }\r
-// }\r
-//\r
-// synchronized (this) {\r
-// this.notifyAll();\r
-// }\r
-\r
- }\r
-\r
- }\r
-\r
-\r
- // ------------------------------------------------ Handler Inner Interface\r
-\r
-\r
- /**\r
- * Bare bones interface used for socket processing. Per thread data is to be\r
- * stored in the ThreadWithAttributes extra folders, or alternately in\r
- * thread local fields.\r
- */\r
- public interface Handler {\r
- public enum SocketState {\r
- OPEN, CLOSED, LONG\r
- }\r
- public SocketState process(SocketChannel socket);\r
- public SocketState event(SocketChannel socket, boolean error);\r
- }\r
-\r
-\r
- // ------------------------------------------------- WorkerStack Inner Class\r
-\r
-\r
- public class WorkerStack {\r
-\r
- protected Worker[] workers = null;\r
- protected int end = 0;\r
-\r
- public WorkerStack(int size) {\r
- workers = new Worker[size];\r
- }\r
-\r
- /** \r
- * Put the object into the queue.\r
- * \r
- * @param object the object to be appended to the queue (first element). \r
- */\r
- public void push(Worker worker) {\r
- workers[end++] = worker;\r
- }\r
-\r
- /**\r
- * Get the first object out of the queue. Return null if the queue\r
- * is empty. \r
- */\r
- public Worker pop() {\r
- if (end > 0) {\r
- return workers[--end];\r
- }\r
- return null;\r
- }\r
-\r
- /**\r
- * Get the first object out of the queue, Return null if the queue\r
- * is empty.\r
- */\r
- public Worker peek() {\r
- return workers[end];\r
- }\r
-\r
- /**\r
- * Is the queue empty?\r
- */\r
- public boolean isEmpty() {\r
- return (end == 0);\r
- }\r
-\r
- /**\r
- * How many elements are there in this queue?\r
- */\r
- public int size() {\r
- return (end);\r
- }\r
- }\r
-\r
-\r
- // ---------------------------------------------- SocketProcessor Inner Class\r
-\r
-\r
- /**\r
- * This class is the equivalent of the Worker, but will simply use in an\r
- * external Executor thread pool.\r
- */\r
- protected class SocketProcessor implements Runnable {\r
-\r
- protected SocketChannel socket = null;\r
-\r
- public SocketProcessor(SocketChannel socket) {\r
- this.socket = socket;\r
- }\r
-\r
- public void run() {\r
-\r
- // Process the request from this socket\r
- if (handler.process(socket) == Handler.SocketState.CLOSED) {\r
- // Close socket and pool\r
- try {\r
- socket.socket().close();\r
- socket.close();\r
- } catch ( Exception x ) {\r
- log.error("",x);\r
- }\r
- socket = null;\r
- }\r
-\r
- }\r
-\r
- }\r
-\r
-\r
- // --------------------------------------- SocketEventProcessor Inner Class\r
-\r
-\r
- /**\r
- * This class is the equivalent of the Worker, but will simply use in an\r
- * external Executor thread pool.\r
- */\r
- protected class SocketEventProcessor implements Runnable {\r
-\r
- protected SocketChannel socket = null;\r
- protected boolean error = false; \r
-\r
- public SocketEventProcessor(SocketChannel socket, boolean error) {\r
- this.socket = socket;\r
- this.error = error;\r
- }\r
-\r
- public void run() {\r
-\r
- // Process the request from this socket\r
- if (handler.event(socket, error) == Handler.SocketState.CLOSED) {\r
- // Close socket and pool\r
- try {\r
- socket.socket().close();\r
- socket.close();\r
- } catch ( Exception x ) {\r
- log.error("",x);\r
- }\r
- socket = null;\r
- }\r
-\r
- }\r
-\r
- }\r
-\r
-\r
-}\r
+import org.apache.tomcat.util.res.StringManager;
+
+/**
+ * NIO tailored thread pool, providing the following services:
+ * <ul>
+ * <li>Socket acceptor thread</li>
+ * <li>Socket poller thread</li>
+ * <li>Sendfile thread</li>
+ * <li>Worker threads pool</li>
+ * </ul>
+ *
+ * When switching to Java 5, there's an opportunity to use the virtual
+ * machine's thread pool.
+ *
+ * @author Mladen Turk
+ * @author Remy Maucherat
+ * @author Filip Hanik
+ */
+public class NioEndpoint {
+
+
+ // -------------------------------------------------------------- Constants
+
+
+ protected static Log log = LogFactory.getLog(NioEndpoint.class);
+
+ protected static StringManager sm =
+ StringManager.getManager("org.apache.tomcat.util.net.res");
+
+
+ /**
+ * The Request attribute key for the cipher suite.
+ */
+ public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite";
+
+ /**
+ * The Request attribute key for the key size.
+ */
+ public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size";
+
+ /**
+ * The Request attribute key for the client certificate chain.
+ */
+ public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate";
+
+ /**
+ * The Request attribute key for the session id.
+ * This one is a Tomcat extension to the Servlet spec.
+ */
+ public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session";
+
+
+ // ----------------------------------------------------------------- Fields
+
+
+ /**
+ * Available workers.
+ */
+ protected WorkerStack workers = null;
+
+
+ /**
+ * Running state of the endpoint.
+ */
+ protected volatile boolean running = false;
+
+
+ /**
+ * Will be set to true whenever the endpoint is paused.
+ */
+ protected volatile boolean paused = false;
+
+
+ /**
+ * Track the initialization state of the endpoint.
+ */
+ protected boolean initialized = false;
+
+
+ /**
+ * Current worker threads busy count.
+ */
+ protected int curThreadsBusy = 0;
+
+
+ /**
+ * Current worker threads count.
+ */
+ protected int curThreads = 0;
+
+
+ /**
+ * Sequence number used to generate thread names.
+ */
+ protected int sequence = 0;
+
+
+ /**
+ * Root APR memory pool.
+ */
+ protected long rootPool = 0;
+
+
+ /**
+ * Server socket "pointer".
+ */
+ protected ServerSocketChannel serverSock = null;
+
+
+ /**
+ * APR memory pool for the server socket.
+ */
+ protected long serverSockPool = 0;
+
+
+ /**
+ * SSL context.
+ */
+ protected long sslContext = 0;
+
+
+ // ------------------------------------------------------------- Properties
+
+
+ /**
+ * External Executor based thread pool.
+ */
+ protected Executor executor = null;
+ public void setExecutor(Executor executor) { this.executor = executor; }
+ public Executor getExecutor() { return executor; }
+
+
+ /**
+ * Maximum amount of worker threads.
+ */
+ protected int maxThreads = 40;
+ public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; }
+ public int getMaxThreads() { return maxThreads; }
+
+
+ /**
+ * Priority of the acceptor and poller threads.
+ */
+ protected int threadPriority = Thread.NORM_PRIORITY;
+ public void setThreadPriority(int threadPriority) { this.threadPriority = threadPriority; }
+ public int getThreadPriority() { return threadPriority; }
+
+
+ /**
+ * Size of the socket poller.
+ */
+ protected int pollerSize = 8 * 1024;
+ public void setPollerSize(int pollerSize) { this.pollerSize = pollerSize; }
+ public int getPollerSize() { return pollerSize; }
+
+
+ /**
+ * Size of the sendfile (= concurrent files which can be served).
+ */
+ protected int sendfileSize = 1 * 1024;
+ public void setSendfileSize(int sendfileSize) { this.sendfileSize = sendfileSize; }
+ public int getSendfileSize() { return sendfileSize; }
+
+
+ /**
+ * Server socket port.
+ */
+ protected int port;
+ public int getPort() { return port; }
+ public void setPort(int port ) { this.port=port; }
+
+
+ /**
+ * Address for the server socket.
+ */
+ protected InetAddress address;
+ public InetAddress getAddress() { return address; }
+ public void setAddress(InetAddress address) { this.address = address; }
+
+
+ /**
+ * Handling of accepted sockets.
+ */
+ protected Handler handler = null;
+ public void setHandler(Handler handler ) { this.handler = handler; }
+ public Handler getHandler() { return handler; }
+
+
+ /**
+ * Allows the server developer to specify the backlog that
+ * should be used for server sockets. By default, this value
+ * is 100.
+ */
+ protected int backlog = 100;
+ public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; }
+ public int getBacklog() { return backlog; }
+
+
+ /**
+ * Socket TCP no delay.
+ */
+ protected boolean tcpNoDelay = false;
+ public boolean getTcpNoDelay() { return tcpNoDelay; }
+ public void setTcpNoDelay(boolean tcpNoDelay) { this.tcpNoDelay = tcpNoDelay; }
+
+
+ /**
+ * Socket linger.
+ */
+ protected int soLinger = 100;
+ public int getSoLinger() { return soLinger; }
+ public void setSoLinger(int soLinger) { this.soLinger = soLinger; }
+
+
+ /**
+ * Socket timeout.
+ */
+ protected int soTimeout = -1;
+ public int getSoTimeout() { return soTimeout; }
+ public void setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; }
+
+
+ /**
+ * Timeout on first request read before going to the poller, in ms.
+ */
+ protected int firstReadTimeout = 60000;
+ public int getFirstReadTimeout() { return firstReadTimeout; }
+ public void setFirstReadTimeout(int firstReadTimeout) { this.firstReadTimeout = firstReadTimeout; }
+
+
+ /**
+ * Poll interval, in microseconds. The smaller the value, the more CPU the poller
+ * will use, but the more responsive to activity it will be.
+ */
+ protected int pollTime = 2000;
+ public int getPollTime() { return pollTime; }
+ public void setPollTime(int pollTime) { if (pollTime > 0) { this.pollTime = pollTime; } }
+
+
+ /**
+ * The default is true - the created threads will be
+ * in daemon mode. If set to false, the control thread
+ * will not be daemon - and will keep the process alive.
+ */
+ protected boolean daemon = true;
+ public void setDaemon(boolean b) { daemon = b; }
+ public boolean getDaemon() { return daemon; }
+
+
+ /**
+ * Name of the thread pool, which will be used for naming child threads.
+ */
+ protected String name = "TP";
+ public void setName(String name) { this.name = name; }
+ public String getName() { return name; }
+
+
+ /**
+ * Use endfile for sending static files.
+ */
+ protected boolean useSendfile = Library.APR_HAS_SENDFILE;
+ public void setUseSendfile(boolean useSendfile) { this.useSendfile = useSendfile; }
+ public boolean getUseSendfile() { return useSendfile; }
+
+
+ /**
+ * Allow comet request handling.
+ */
+ protected boolean useComet = true;
+ public void setUseComet(boolean useComet) { this.useComet = useComet; }
+ public boolean getUseComet() { return useComet; }
+
+
+ /**
+ * Acceptor thread count.
+ */
+ protected int acceptorThreadCount = 0;
+ public void setAcceptorThreadCount(int acceptorThreadCount) { this.acceptorThreadCount = acceptorThreadCount; }
+ public int getAcceptorThreadCount() { return acceptorThreadCount; }
+
+
+ /**
+ * Sendfile thread count.
+ */
+ protected int sendfileThreadCount = 0;
+ public void setSendfileThreadCount(int sendfileThreadCount) { this.sendfileThreadCount = sendfileThreadCount; }
+ public int getSendfileThreadCount() { return sendfileThreadCount; }
+
+
+ /**
+ * Poller thread count.
+ */
+ protected int pollerThreadCount = 0;
+ public void setPollerThreadCount(int pollerThreadCount) { this.pollerThreadCount = pollerThreadCount; }
+ public int getPollerThreadCount() { return pollerThreadCount; }
+
+ protected long selectorTimeout = 5000;
+ public void setSelectorTimeout(long timeout){ this.selectorTimeout = timeout;}
+ public long getSelectorTimeout(){ return this.selectorTimeout; }
+ /**
+ * The socket poller.
+ */
+ protected Poller[] pollers = null;
+ protected int pollerRoundRobin = 0;
+ public Poller getPoller() {
+ pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length;
+ Poller poller = pollers[pollerRoundRobin];
+ poller.comet = false;
+ return poller;
+ }
+
+
+ /**
+ * The socket poller used for Comet support.
+ */
+ public Poller getCometPoller() {
+ Poller poller = getPoller();
+ poller.comet = true;
+ return poller;
+ }
+
+
+ /**
+ * The static file sender.
+ */
+ protected Sendfile[] sendfiles = null;
+ protected int sendfileRoundRobin = 0;
+ public Sendfile getSendfile() {
+ sendfileRoundRobin = (sendfileRoundRobin + 1) % sendfiles.length;
+ return sendfiles[sendfileRoundRobin];
+ }
+
+
+ /**
+ * Dummy maxSpareThreads property.
+ */
+ public int getMaxSpareThreads() { return 0; }
+
+
+ /**
+ * Dummy minSpareThreads property.
+ */
+ public int getMinSpareThreads() { return 0; }
+
+
+ /**
+ * SSL engine.
+ */
+ protected String SSLEngine = "off";
+ public String getSSLEngine() { return SSLEngine; }
+ public void setSSLEngine(String SSLEngine) { this.SSLEngine = SSLEngine; }
+
+
+ /**
+ * SSL protocols.
+ */
+ protected String SSLProtocol = "all";
+ public String getSSLProtocol() { return SSLProtocol; }
+ public void setSSLProtocol(String SSLProtocol) { this.SSLProtocol = SSLProtocol; }
+
+
+ /**
+ * SSL password (if a cert is encrypted, and no password has been provided, a callback
+ * will ask for a password).
+ */
+ protected String SSLPassword = null;
+ public String getSSLPassword() { return SSLPassword; }
+ public void setSSLPassword(String SSLPassword) { this.SSLPassword = SSLPassword; }
+
+
+ /**
+ * SSL cipher suite.
+ */
+ protected String SSLCipherSuite = "ALL";
+ public String getSSLCipherSuite() { return SSLCipherSuite; }
+ public void setSSLCipherSuite(String SSLCipherSuite) { this.SSLCipherSuite = SSLCipherSuite; }
+
+
+ /**
+ * SSL certificate file.
+ */
+ protected String SSLCertificateFile = null;
+ public String getSSLCertificateFile() { return SSLCertificateFile; }
+ public void setSSLCertificateFile(String SSLCertificateFile) { this.SSLCertificateFile = SSLCertificateFile; }
+
+
+ /**
+ * SSL certificate key file.
+ */
+ protected String SSLCertificateKeyFile = null;
+ public String getSSLCertificateKeyFile() { return SSLCertificateKeyFile; }
+ public void setSSLCertificateKeyFile(String SSLCertificateKeyFile) { this.SSLCertificateKeyFile = SSLCertificateKeyFile; }
+
+
+ /**
+ * SSL certificate chain file.
+ */
+ protected String SSLCertificateChainFile = null;
+ public String getSSLCertificateChainFile() { return SSLCertificateChainFile; }
+ public void setSSLCertificateChainFile(String SSLCertificateChainFile) { this.SSLCertificateChainFile = SSLCertificateChainFile; }
+
+
+ /**
+ * SSL CA certificate path.
+ */
+ protected String SSLCACertificatePath = null;
+ public String getSSLCACertificatePath() { return SSLCACertificatePath; }
+ public void setSSLCACertificatePath(String SSLCACertificatePath) { this.SSLCACertificatePath = SSLCACertificatePath; }
+
+
+ /**
+ * SSL CA certificate file.
+ */
+ protected String SSLCACertificateFile = null;
+ public String getSSLCACertificateFile() { return SSLCACertificateFile; }
+ public void setSSLCACertificateFile(String SSLCACertificateFile) { this.SSLCACertificateFile = SSLCACertificateFile; }
+
+
+ /**
+ * SSL CA revocation path.
+ */
+ protected String SSLCARevocationPath = null;
+ public String getSSLCARevocationPath() { return SSLCARevocationPath; }
+ public void setSSLCARevocationPath(String SSLCARevocationPath) { this.SSLCARevocationPath = SSLCARevocationPath; }
+
+
+ /**
+ * SSL CA revocation file.
+ */
+ protected String SSLCARevocationFile = null;
+ public String getSSLCARevocationFile() { return SSLCARevocationFile; }
+ public void setSSLCARevocationFile(String SSLCARevocationFile) { this.SSLCARevocationFile = SSLCARevocationFile; }
+
+
+ /**
+ * SSL verify client.
+ */
+ protected String SSLVerifyClient = "none";
+ public String getSSLVerifyClient() { return SSLVerifyClient; }
+ public void setSSLVerifyClient(String SSLVerifyClient) { this.SSLVerifyClient = SSLVerifyClient; }
+
+
+ /**
+ * SSL verify depth.
+ */
+ protected int SSLVerifyDepth = 10;
+ public int getSSLVerifyDepth() { return SSLVerifyDepth; }
+ public void setSSLVerifyDepth(int SSLVerifyDepth) { this.SSLVerifyDepth = SSLVerifyDepth; }
+
+
+ // --------------------------------------------------------- Public Methods
+
+
+ /**
+ * Number of keepalive sockets.
+ */
+ public int getKeepAliveCount() {
+ if (pollers == null) {
+ return 0;
+ } else {
+ int keepAliveCount = 0;
+ for (int i = 0; i < pollers.length; i++) {
+ keepAliveCount += pollers[i].getKeepAliveCount();
+ }
+ return keepAliveCount;
+ }
+ }
+
+
+ /**
+ * Number of sendfile sockets.
+ */
+ public int getSendfileCount() {
+ if (sendfiles == null) {
+ return 0;
+ } else {
+ int sendfileCount = 0;
+ for (int i = 0; i < sendfiles.length; i++) {
+ sendfileCount += sendfiles[i].getSendfileCount();
+ }
+ return sendfileCount;
+ }
+ }
+
+
+ /**
+ * Return the amount of threads that are managed by the pool.
+ *
+ * @return the amount of threads that are managed by the pool
+ */
+ public int getCurrentThreadCount() {
+ return curThreads;
+ }
+
+
+ /**
+ * Return the amount of threads currently busy.
+ *
+ * @return the amount of threads currently busy
+ */
+ public int getCurrentThreadsBusy() {
+ return curThreadsBusy;
+ }
+
+
+ /**
+ * Return the state of the endpoint.
+ *
+ * @return true if the endpoint is running, false otherwise
+ */
+ public boolean isRunning() {
+ return running;
+ }
+
+
+ /**
+ * Return the state of the endpoint.
+ *
+ * @return true if the endpoint is paused, false otherwise
+ */
+ public boolean isPaused() {
+ return paused;
+ }
+
+
+ // ----------------------------------------------- Public Lifecycle Methods
+
+
+ /**
+ * Initialize the endpoint.
+ */
+ public void init()
+ throws Exception {
+
+ if (initialized)
+ return;
+
+ serverSock = ServerSocketChannel.open();
+ InetSocketAddress addr = (address!=null?new InetSocketAddress(address,port):new InetSocketAddress(port));
+ serverSock.socket().bind(addr,100); //todo, set backlog value
+ serverSock.configureBlocking(true); //mimic APR behavior
+ // Sendfile usage on systems which don't support it cause major problems
+ if (useSendfile) {
+ log.warn(sm.getString("endpoint.sendfile.nosupport"));
+ useSendfile = false;
+ }
+
+ // Initialize thread count defaults for acceptor, poller and sendfile
+ if (acceptorThreadCount == 0) {
+ // FIXME: Doesn't seem to work that well with multiple accept threads
+ acceptorThreadCount = 1;
+ }
+ if (pollerThreadCount != 1) {
+ // limit to one poller, no need for others
+ pollerThreadCount = 1;
+ }
+ if (sendfileThreadCount != 0) {
+ sendfileThreadCount = 0;
+ }
+
+ // Initialize SSL if needed
+ if (!"off".equalsIgnoreCase(SSLEngine)) {
+ // Initialize SSL
+ // FIXME: one per VM call ?
+ if ("on".equalsIgnoreCase(SSLEngine)) {
+ SSL.initialize(null);
+ } else {
+ SSL.initialize(SSLEngine);
+ }
+ // SSL protocol
+ int value = SSL.SSL_PROTOCOL_ALL;
+ if ("SSLv2".equalsIgnoreCase(SSLProtocol)) {
+ value = SSL.SSL_PROTOCOL_SSLV2;
+ } else if ("SSLv3".equalsIgnoreCase(SSLProtocol)) {
+ value = SSL.SSL_PROTOCOL_SSLV3;
+ } else if ("TLSv1".equalsIgnoreCase(SSLProtocol)) {
+ value = SSL.SSL_PROTOCOL_TLSV1;
+ } else if ("SSLv2+SSLv3".equalsIgnoreCase(SSLProtocol)) {
+ value = SSL.SSL_PROTOCOL_SSLV2 | SSL.SSL_PROTOCOL_SSLV3;
+ }
+// // Create SSL Context
+// sslContext = SSLContext.make(rootPool, value, SSL.SSL_MODE_SERVER);
+// // List the ciphers that the client is permitted to negotiate
+// SSLContext.setCipherSuite(sslContext, SSLCipherSuite);
+// // Load Server key and certificate
+// SSLContext.setCertificate(sslContext, SSLCertificateFile, SSLCertificateKeyFile, SSLPassword, SSL.SSL_AIDX_RSA);
+// // Set certificate chain file
+// SSLContext.setCertificateChainFile(sslContext, SSLCertificateChainFile, false);
+// // Support Client Certificates
+// SSLContext.setCACertificate(sslContext, SSLCACertificateFile, SSLCACertificatePath);
+// // Set revocation
+// SSLContext.setCARevocation(sslContext, SSLCARevocationFile, SSLCARevocationPath);
+// // Client certificate verification
+// value = SSL.SSL_CVERIFY_NONE;
+// if ("optional".equalsIgnoreCase(SSLVerifyClient)) {
+// value = SSL.SSL_CVERIFY_OPTIONAL;
+// } else if ("require".equalsIgnoreCase(SSLVerifyClient)) {
+// value = SSL.SSL_CVERIFY_REQUIRE;
+// } else if ("optionalNoCA".equalsIgnoreCase(SSLVerifyClient)) {
+// value = SSL.SSL_CVERIFY_OPTIONAL_NO_CA;
+// }
+// SSLContext.setVerify(sslContext, value, SSLVerifyDepth);
+ // For now, sendfile is not supported with SSL
+ useSendfile = false;
+ }
+
+ initialized = true;
+
+ }
+
+
+ /**
+ * Start the APR endpoint, creating acceptor, poller and sendfile threads.
+ */
+ public void start()
+ throws Exception {
+ // Initialize socket if not done before
+ if (!initialized) {
+ init();
+ }
+ if (!running) {
+ running = true;
+ paused = false;
+
+ // Create worker collection
+ if (executor == null) {
+ workers = new WorkerStack(maxThreads);
+ }
+
+ // Start acceptor threads
+ for (int i = 0; i < acceptorThreadCount; i++) {
+ Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i);
+ acceptorThread.setPriority(threadPriority);
+ acceptorThread.setDaemon(daemon);
+ acceptorThread.start();
+ }
+
+ // Start poller threads
+ pollers = new Poller[pollerThreadCount];
+ for (int i = 0; i < pollerThreadCount; i++) {
+ pollers[i] = new Poller(false);
+ pollers[i].init();
+ Thread pollerThread = new Thread(pollers[i], getName() + "-Poller-" + i);
+ pollerThread.setPriority(threadPriority);
+ pollerThread.setDaemon(true);
+ pollerThread.start();
+ }
+
+ // Start sendfile threads
+ if (useSendfile) {
+ sendfiles = new Sendfile[sendfileThreadCount];
+ for (int i = 0; i < sendfileThreadCount; i++) {
+ sendfiles[i] = new Sendfile();
+ sendfiles[i].init();
+ Thread sendfileThread = new Thread(sendfiles[i], getName() + "-Sendfile-" + i);
+ sendfileThread.setPriority(threadPriority);
+ sendfileThread.setDaemon(true);
+ sendfileThread.start();
+ }
+ }
+ }
+ }
+
+
+ /**
+ * Pause the endpoint, which will make it stop accepting new sockets.
+ */
+ public void pause() {
+ if (running && !paused) {
+ paused = true;
+ unlockAccept();
+ }
+ }
+
+
+ /**
+ * Resume the endpoint, which will make it start accepting new sockets
+ * again.
+ */
+ public void resume() {
+ if (running) {
+ paused = false;
+ }
+ }
+
+
+ /**
+ * Stop the endpoint. This will cause all processing threads to stop.
+ */
+ public void stop() {
+ if (running) {
+ running = false;
+ unlockAccept();
+ for (int i = 0; i < pollers.length; i++) {
+ pollers[i].destroy();
+ }
+ pollers = null;
+ if (useSendfile) {
+ for (int i = 0; i < sendfiles.length; i++) {
+ sendfiles[i].destroy();
+ }
+ sendfiles = null;
+ }
+ }
+ }
+
+
+ /**
+ * Deallocate APR memory pools, and close server socket.
+ */
+ public void destroy() throws Exception {
+ if (running) {
+ stop();
+ }
+ // Close server socket
+ serverSock.socket().close();
+ serverSock.close();
+ serverSock = null;
+ sslContext = 0;
+ initialized = false;
+ }
+
+
+ // ------------------------------------------------------ Protected Methods
+
+
+ /**
+ * Get a sequence number used for thread naming.
+ */
+ protected int getSequence() {
+ return sequence++;
+ }
+
+
+ /**
+ * Unlock the server socket accept using a bugus connection.
+ */
+ protected void unlockAccept() {
+ java.net.Socket s = null;
+ try {
+ // Need to create a connection to unlock the accept();
+ if (address == null) {
+ s = new java.net.Socket("127.0.0.1", port);
+ } else {
+ s = new java.net.Socket(address, port);
+ // setting soLinger to a small value will help shutdown the
+ // connection quicker
+ s.setSoLinger(true, 0);
+ }
+ } catch(Exception e) {
+ if (log.isDebugEnabled()) {
+ log.debug(sm.getString("endpoint.debug.unlock", "" + port), e);
+ }
+ } finally {
+ if (s != null) {
+ try {
+ s.close();
+ } catch (Exception e) {
+ // Ignore
+ }
+ }
+ }
+ }
+
+
+ /**
+ * Process the specified connection.
+ */
+ protected boolean setSocketOptions(SocketChannel socket) {
+ // Process the connection
+ int step = 1;
+ try {
+ //disable blocking, APR style, we are gonna be polling it
+ socket.configureBlocking(false);
+
+ // 1: Set socket options: timeout, linger, etc
+ if (soLinger >= 0)
+ socket.socket().setSoLinger(true,soLinger);
+ if (tcpNoDelay)
+ socket.socket().setTcpNoDelay(true);
+ if (soTimeout > 0)
+ socket.socket().setSoTimeout(soTimeout);
+
+
+ // 2: SSL handshake
+ step = 2;
+ if (sslContext != 0) {
+// SSLSocket.attach(sslContext, socket);
+// if (SSLSocket.handshake(socket) != 0) {
+// if (log.isDebugEnabled()) {
+// log.debug(sm.getString("endpoint.err.handshake") + ": " + SSL.getLastError());
+// }
+// return false;
+// }
+ }
+
+ getPoller().register(socket);
+
+ } catch (Throwable t) {
+ if (log.isDebugEnabled()) {
+ if (step == 2) {
+ log.debug(sm.getString("endpoint.err.handshake"), t);
+ } else {
+ log.debug(sm.getString("endpoint.err.unexpected"), t);
+ }
+ }
+ // Tell to close the socket
+ return false;
+ }
+ return true;
+ }
+
+
+ /**
+ * Create (or allocate) and return an available processor for use in
+ * processing a specific HTTP request, if possible. If the maximum
+ * allowed processors have already been created and are in use, return
+ * <code>null</code> instead.
+ */
+ protected Worker createWorkerThread() {
+
+ synchronized (workers) {
+ if (workers.size() > 0) {
+ curThreadsBusy++;
+ return (workers.pop());
+ }
+ if ((maxThreads > 0) && (curThreads < maxThreads)) {
+ curThreadsBusy++;
+ return (newWorkerThread());
+ } else {
+ if (maxThreads < 0) {
+ curThreadsBusy++;
+ return (newWorkerThread());
+ } else {
+ return (null);
+ }
+ }
+ }
+
+ }
+
+
+ /**
+ * Create and return a new processor suitable for processing HTTP
+ * requests and returning the corresponding responses.
+ */
+ protected Worker newWorkerThread() {
+
+ Worker workerThread = new Worker();
+ workerThread.start();
+ return (workerThread);
+
+ }
+
+
+ /**
+ * Return a new worker thread, and block while to worker is available.
+ */
+ protected Worker getWorkerThread() {
+ // Allocate a new worker thread
+ Worker workerThread = createWorkerThread();
+ while (workerThread == null) {
+ try {
+ synchronized (workers) {
+ workers.wait();
+ }
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ workerThread = createWorkerThread();
+ }
+ return workerThread;
+ }
+
+
+ /**
+ * Recycle the specified Processor so that it can be used again.
+ *
+ * @param workerThread The processor to be recycled
+ */
+ protected void recycleWorkerThread(Worker workerThread) {
+ synchronized (workers) {
+ workers.push(workerThread);
+ curThreadsBusy--;
+ workers.notify();
+ }
+ }
+
+
+ /**
+ * Allocate a new poller of the specified size.
+ */
+ protected long allocatePoller(int size, long pool, int timeout) {
+ try {
+ return Poll.create(size, pool, 0, timeout * 1000);
+ } catch (Error e) {
+ if (Status.APR_STATUS_IS_EINVAL(e.getError())) {
+ log.info(sm.getString("endpoint.poll.limitedpollsize", "" + size));
+ return 0;
+ } else {
+ log.error(sm.getString("endpoint.poll.initfail"), e);
+ return -1;
+ }
+ }
+ }
+
+
+ /**
+ * Process given socket.
+ */
+ protected boolean processSocket(SocketChannel socket) {
+ try {
+ if (executor == null) {
+ getWorkerThread().assign(socket);
+ } else {
+ executor.execute(new SocketProcessor(socket));
+ }
+ } catch (Throwable t) {
+ // This means we got an OOM or similar creating a thread, or that
+ // the pool and its queue are full
+ log.error(sm.getString("endpoint.process.fail"), t);
+ return false;
+ }
+ return true;
+ }
+
+
+ /**
+ * Process given socket for an event.
+ */
+ protected boolean processSocket(SocketChannel socket, boolean error) {
+ try {
+ if (executor == null) {
+ getWorkerThread().assign(socket, error);
+ } else {
+ executor.execute(new SocketEventProcessor(socket, error));
+ }
+ } catch (Throwable t) {
+ // This means we got an OOM or similar creating a thread, or that
+ // the pool and its queue are full
+ log.error(sm.getString("endpoint.process.fail"), t);
+ return false;
+ }
+ return true;
+ }
+
+
+ // --------------------------------------------------- Acceptor Inner Class
+
+
+ /**
+ * Server socket acceptor thread.
+ */
+ protected class Acceptor implements Runnable {
+
+
+ /**
+ * The background thread that listens for incoming TCP/IP connections and
+ * hands them off to an appropriate processor.
+ */
+ public void run() {
+
+ // Loop until we receive a shutdown command
+ while (running) {
+
+ // Loop if endpoint is paused
+ while (paused) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+
+ try {
+ // Accept the next incoming connection from the server socket
+ SocketChannel socket = serverSock.accept();
+ // Hand this socket off to an appropriate processor
+ if(!setSocketOptions(socket))
+ {
+ // Close socket right away
+ socket.socket().close();
+ socket.close();
+ }
+ } catch (Throwable t) {
+ log.error(sm.getString("endpoint.accept.fail"), t);
+ }
+
+ // The processor will recycle itself when it finishes
+
+ }
+
+ }
+
+ }
+
+
+ // ----------------------------------------------------- Poller Inner Class
+
+
+ /**
+ * Poller class.
+ */
+ public class Poller implements Runnable {
+
+ protected Selector selector;
+ protected LinkedList<Runnable> events = new LinkedList<Runnable>();
+ protected boolean close = false;
+ protected boolean comet = true;
+
+ protected int keepAliveCount = 0;
+ public int getKeepAliveCount() { return keepAliveCount; }
+
+
+
+ public Poller(boolean comet) throws IOException {
+ this.comet = comet;
+ this.selector = Selector.open();
+ }
+
+ public Selector getSelector() { return selector;}
+
+ /**
+ * Create the poller. With some versions of APR, the maximum poller size will
+ * be 62 (reocmpiling APR is necessary to remove this limitation).
+ */
+ protected void init() {
+ keepAliveCount = 0;
+ }
+
+ /**
+ * Destroy the poller.
+ */
+ protected void destroy() {
+ // Wait for polltime before doing anything, so that the poller threads
+ // exit, otherwise parallel descturction of sockets which are still
+ // in the poller can cause problems
+ try {
+ synchronized (this) {
+ this.wait(pollTime / 1000);
+ }
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ close = true;
+ }
+
+ /**
+ * Add specified socket and associated pool to the poller. The socket will
+ * be added to a temporary array, and polled first after a maximum amount
+ * of time equal to pollTime (in most cases, latency will be much lower,
+ * however).
+ *
+ * @param socket to add to the poller
+ */
+ public void add(final SocketChannel socket) {
+ final SelectionKey key = socket.keyFor(selector);
+ Runnable r = new Runnable() {
+ public void run() {
+ if ( key != null ) key.interestOps(SelectionKey.OP_READ);
+ }
+ };
+ synchronized (events) {
+ events.add(r);
+ }
+ selector.wakeup();
+ }
+
+ public void events() {
+ synchronized (events) {
+ Runnable r = null;
+ while ( (events.size() > 0) && (r = events.removeFirst()) != null ) {
+ try {
+ r.run();
+ } catch ( Exception x ) {
+ log.error("",x);
+ }
+ }
+ events.clear();
+ }
+ }
+
+ public void register(final SocketChannel socket)
+ {
+ SelectionKey key = socket.keyFor(selector);
+ Runnable r = new Runnable() {
+ public void run() {
+ try {
+ socket.register(selector, SelectionKey.OP_READ, new KeyAttachment());
+ } catch (Exception x) {
+ log.error("", x);
+ }
+ }
+
+ };
+ synchronized (events) {
+ events.add(r);
+ }
+ selector.wakeup();
+ }
+
+ public void cancelledKey(SelectionKey key) {
+ try {
+ KeyAttachment ka = (KeyAttachment) key.attachment();
+ key.cancel();
+ if (ka.getComet()) processSocket( (SocketChannel) key.channel(), true);
+ key.channel().close();
+ } catch (IOException e) {
+ if ( log.isDebugEnabled() ) log.debug("",e);
+ // Ignore
+ }
+ }
+ /**
+ * The background thread that listens for incoming TCP/IP connections and
+ * hands them off to an appropriate processor.
+ */
+ public void run() {
+
+ // Loop until we receive a shutdown command
+ while (running) {
+ // Loop if endpoint is paused
+ while (paused) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+
+ events();
+ // Time to terminate?
+ if (close) return;
+
+ int keyCount = 0;
+ try {
+ keyCount = selector.select(selectorTimeout);
+ } catch (IOException x) {
+ log.error("",x);
+ continue;
+ }
+ //timeout
+ Set keys = selector.keys();
+ long now = System.currentTimeMillis();
+ for (Iterator iter = keys.iterator(); iter.hasNext(); ) {
+ SelectionKey key = (SelectionKey) iter.next();
+ try {
+ if (key.interestOps() == SelectionKey.OP_READ) {
+ //only timeout sockets that we are waiting for a read from
+ KeyAttachment ka = (KeyAttachment) key.attachment();
+ long delta = now - ka.getLastAccess();
+ if (delta > (long) soTimeout) {
+ cancelledKey(key);
+ }
+ }
+ }catch ( CancelledKeyException ckx ) {
+ cancelledKey(key);
+ }
+ }
+
+
+ if (keyCount == 0) continue;
+
+ Iterator iterator = selector.selectedKeys().iterator();
+ // Walk through the collection of ready keys and dispatch
+ // any active event.
+ while (iterator.hasNext()) {
+ SelectionKey sk = (SelectionKey) iterator.next();
+ iterator.remove();
+ KeyAttachment attachment = (KeyAttachment)sk.attachment();
+ try {
+ if(attachment == null) attachment = new KeyAttachment();
+ attachment.access();
+ sk.attach(attachment);
+
+ int readyOps = sk.readyOps();
+ sk.interestOps(sk.interestOps() & ~readyOps);
+ SocketChannel channel = (SocketChannel)sk.channel();
+ boolean read = sk.isReadable();
+ if (read) {
+ if ( comet ) {
+ if (!processSocket(channel,false)) processSocket(channel,true);
+ } else {
+ boolean close = (!processSocket(channel));
+ if ( close ) {
+ channel.socket().close();
+ channel.close();
+ }
+ }
+ }
+ if (sk.isValid() && sk.isWritable()) {
+ }
+ } catch ( CancelledKeyException ckx ) {
+ if (attachment!=null && attachment.getComet()) processSocket( (SocketChannel) sk.channel(), true);
+ try {
+ sk.channel().close();
+ }catch ( Exception ignore){}
+ } catch (Throwable t) {
+ log.error("",t);
+ }
+ }//while
+
+
+ }
+ synchronized (this) {
+ this.notifyAll();
+ }
+
+ }
+
+ }
+
+ public static class KeyAttachment {
+
+ public long getLastAccess() { return lastAccess; }
+ public void access() { access(System.currentTimeMillis()); }
+ public void access(long access) { lastAccess = access; }
+ public void setComet(boolean comet) { this.comet = comet; }
+ public boolean getComet() { return comet; }
+ public boolean getCurrentAccess() { return currentAccess; }
+ public void setCurrentAccess(boolean access) { currentAccess = access; }
+
+ protected long lastAccess = System.currentTimeMillis();
+ protected boolean currentAccess = false;
+ protected boolean comet = false;
+
+ }
+
+
+
+ // ----------------------------------------------------- Worker Inner Class
+
+
+ /**
+ * Server processor class.
+ */
+ protected class Worker implements Runnable {
+
+
+ protected Thread thread = null;
+ protected boolean available = false;
+ protected SocketChannel socket = null;
+ protected boolean event = false;
+ protected boolean error = false;
+
+
+ /**
+ * Process an incoming TCP/IP connection on the specified socket. Any
+ * exception that occurs during processing must be logged and swallowed.
+ * <b>NOTE</b>: This method is called from our Connector's thread. We
+ * must assign it to our own thread so that multiple simultaneous
+ * requests can be handled.
+ *
+ * @param socket TCP socket to process
+ */
+ protected synchronized void assign(SocketChannel socket) {
+
+ // Wait for the Processor to get the previous Socket
+ while (available) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ }
+ }
+
+ // Store the newly available Socket and notify our thread
+ this.socket = socket;
+ event = false;
+ error = false;
+ available = true;
+ notifyAll();
+
+ }
+
+
+ protected synchronized void assign(SocketChannel socket, boolean error) {
+
+ // Wait for the Processor to get the previous Socket
+ while (available) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ }
+ }
+
+ // Store the newly available Socket and notify our thread
+ this.socket = socket;
+ event = true;
+ this.error = error;
+ available = true;
+ notifyAll();
+ }
+
+
+ /**
+ * Await a newly assigned Socket from our Connector, or <code>null</code>
+ * if we are supposed to shut down.
+ */
+ protected synchronized SocketChannel await() {
+
+ // Wait for the Connector to provide a new Socket
+ while (!available) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ }
+ }
+
+ // Notify the Connector that we have received this Socket
+ SocketChannel socket = this.socket;
+ available = false;
+ notifyAll();
+
+ return (socket);
+
+ }
+
+
+ /**
+ * The background thread that listens for incoming TCP/IP connections and
+ * hands them off to an appropriate processor.
+ */
+ public void run() {
+
+ // Process requests until we receive a shutdown signal
+ while (running) {
+
+ // Wait for the next socket to be assigned
+ SocketChannel socket = await();
+ if (socket == null)
+ continue;
+
+ // Process the request from this socket
+ if ((event) && (handler.event(socket, error) == Handler.SocketState.CLOSED)) {
+ // Close socket and pool
+ try {
+ socket.socket().close();
+ socket.close();
+ }catch ( Exception x ) {
+ log.error("",x);
+ }
+ } else if ((!event) && (handler.process(socket) == Handler.SocketState.CLOSED)) {
+ // Close socket and pool
+ try {
+ socket.socket().close();
+ socket.close();
+ }catch ( Exception x ) {
+ log.error("",x);
+ }
+ }
+
+ // Finish up this request
+ recycleWorkerThread(this);
+
+ }
+
+ }
+
+
+ /**
+ * Start the background processing thread.
+ */
+ public void start() {
+ thread = new Thread(this);
+ thread.setName(getName() + "-" + (++curThreads));
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+
+ }
+
+
+ // ----------------------------------------------- SendfileData Inner Class
+
+
+ /**
+ * SendfileData class.
+ */
+ public static class SendfileData {
+ // File
+ public String fileName;
+ public long fd;
+ public long fdpool;
+ // Range information
+ public long start;
+ public long end;
+ // Socket and socket pool
+ public SocketChannel socket;
+ // Position
+ public long pos;
+ // KeepAlive flag
+ public boolean keepAlive;
+ }
+
+
+ // --------------------------------------------------- Sendfile Inner Class
+
+
+ /**
+ * Sendfile class.
+ */
+ public class Sendfile implements Runnable {
+
+ protected long sendfilePollset = 0;
+ protected long pool = 0;
+ protected long[] desc;
+ protected HashMap<Long, SendfileData> sendfileData;
+
+ protected int sendfileCount;
+ public int getSendfileCount() { return sendfileCount; }
+
+ protected ArrayList<SendfileData> addS;
+
+ /**
+ * Create the sendfile poller. With some versions of APR, the maximum poller size will
+ * be 62 (reocmpiling APR is necessary to remove this limitation).
+ */
+ protected void init() {
+// pool = Pool.create(serverSockPool);
+// int size = sendfileSize / sendfileThreadCount;
+// sendfilePollset = allocatePoller(size, pool, soTimeout);
+// if (sendfilePollset == 0 && size > 1024) {
+// size = 1024;
+// sendfilePollset = allocatePoller(size, pool, soTimeout);
+// }
+// if (sendfilePollset == 0) {
+// size = 62;
+// sendfilePollset = allocatePoller(size, pool, soTimeout);
+// }
+// desc = new long[size * 2];
+// sendfileData = new HashMap<Long, SendfileData>(size);
+// addS = new ArrayList<SendfileData>();
+ }
+
+ /**
+ * Destroy the poller.
+ */
+ protected void destroy() {
+// // Wait for polltime before doing anything, so that the poller threads
+// // exit, otherwise parallel descturction of sockets which are still
+// // in the poller can cause problems
+// try {
+// synchronized (this) {
+// this.wait(pollTime / 1000);
+// }
+// } catch (InterruptedException e) {
+// // Ignore
+// }
+// // Close any socket remaining in the add queue
+// for (int i = (addS.size() - 1); i >= 0; i--) {
+// SendfileData data = addS.get(i);
+// Socket.destroy(data.socket);
+// }
+// // Close all sockets still in the poller
+// int rv = Poll.pollset(sendfilePollset, desc);
+// if (rv > 0) {
+// for (int n = 0; n < rv; n++) {
+// Socket.destroy(desc[n*2+1]);
+// }
+// }
+// Pool.destroy(pool);
+// sendfileData.clear();
+ }
+
+ /**
+ * Add the sendfile data to the sendfile poller. Note that in most cases,
+ * the initial non blocking calls to sendfile will return right away, and
+ * will be handled asynchronously inside the kernel. As a result,
+ * the poller will never be used.
+ *
+ * @param data containing the reference to the data which should be snet
+ * @return true if all the data has been sent right away, and false
+ * otherwise
+ */
+ public boolean add(SendfileData data) {
+// // Initialize fd from data given
+// try {
+// data.fdpool = Socket.pool(data.socket);
+// data.fd = File.open
+// (data.fileName, File.APR_FOPEN_READ
+// | File.APR_FOPEN_SENDFILE_ENABLED | File.APR_FOPEN_BINARY,
+// 0, data.fdpool);
+// data.pos = data.start;
+// // Set the socket to nonblocking mode
+// Socket.timeoutSet(data.socket, 0);
+// while (true) {
+// long nw = Socket.sendfilen(data.socket, data.fd,
+// data.pos, data.end - data.pos, 0);
+// if (nw < 0) {
+// if (!(-nw == Status.EAGAIN)) {
+// Socket.destroy(data.socket);
+// data.socket = 0;
+// return false;
+// } else {
+// // Break the loop and add the socket to poller.
+// break;
+// }
+// } else {
+// data.pos = data.pos + nw;
+// if (data.pos >= data.end) {
+// // Entire file has been sent
+// Pool.destroy(data.fdpool);
+// // Set back socket to blocking mode
+// Socket.timeoutSet(data.socket, soTimeout * 1000);
+// return true;
+// }
+// }
+// }
+// } catch (Exception e) {
+// log.error(sm.getString("endpoint.sendfile.error"), e);
+// return false;
+// }
+// // Add socket to the list. Newly added sockets will wait
+// // at most for pollTime before being polled
+// synchronized (this) {
+// addS.add(data);
+// this.notify();
+// }
+ return false;
+ }
+
+ /**
+ * Remove socket from the poller.
+ *
+ * @param data the sendfile data which should be removed
+ */
+ protected void remove(SendfileData data) {
+// int rv = Poll.remove(sendfilePollset, data.socket);
+// if (rv == Status.APR_SUCCESS) {
+// sendfileCount--;
+// }
+// sendfileData.remove(data);
+ }
+
+ /**
+ * The background thread that listens for incoming TCP/IP connections and
+ * hands them off to an appropriate processor.
+ */
+ public void run() {
+
+// // Loop until we receive a shutdown command
+// while (running) {
+//
+// // Loop if endpoint is paused
+// while (paused) {
+// try {
+// Thread.sleep(1000);
+// } catch (InterruptedException e) {
+// // Ignore
+// }
+// }
+//
+// while (sendfileCount < 1 && addS.size() < 1) {
+// try {
+// synchronized (this) {
+// this.wait();
+// }
+// } catch (InterruptedException e) {
+// // Ignore
+// }
+// }
+//
+// try {
+// // Add socket to the poller
+// if (addS.size() > 0) {
+// synchronized (this) {
+// for (int i = (addS.size() - 1); i >= 0; i--) {
+// SendfileData data = addS.get(i);
+// int rv = Poll.add(sendfilePollset, data.socket, Poll.APR_POLLOUT);
+// if (rv == Status.APR_SUCCESS) {
+// sendfileData.put(new Long(data.socket), data);
+// sendfileCount++;
+// } else {
+// log.warn(sm.getString("endpoint.sendfile.addfail", "" + rv, Error.strerror(rv)));
+// // Can't do anything: close the socket right away
+// Socket.destroy(data.socket);
+// }
+// }
+// addS.clear();
+// }
+// }
+// // Pool for the specified interval
+// int rv = Poll.poll(sendfilePollset, pollTime, desc, false);
+// if (rv > 0) {
+// for (int n = 0; n < rv; n++) {
+// // Get the sendfile state
+// SendfileData state =
+// sendfileData.get(new Long(desc[n*2+1]));
+// // Problem events
+// if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
+// || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)) {
+// // Close socket and clear pool
+// remove(state);
+// // Destroy file descriptor pool, which should close the file
+// // Close the socket, as the reponse would be incomplete
+// Socket.destroy(state.socket);
+// continue;
+// }
+// // Write some data using sendfile
+// long nw = Socket.sendfilen(state.socket, state.fd,
+// state.pos,
+// state.end - state.pos, 0);
+// if (nw < 0) {
+// // Close socket and clear pool
+// remove(state);
+// // Close the socket, as the reponse would be incomplete
+// // This will close the file too.
+// Socket.destroy(state.socket);
+// continue;
+// }
+//
+// state.pos = state.pos + nw;
+// if (state.pos >= state.end) {
+// remove(state);
+// if (state.keepAlive) {
+// // Destroy file descriptor pool, which should close the file
+// Pool.destroy(state.fdpool);
+// Socket.timeoutSet(state.socket, soTimeout * 1000);
+// // If all done hand this socket off to a worker for
+// // processing of further requests
+// if (!processSocket(state.socket)) {
+// Socket.destroy(state.socket);
+// }
+// } else {
+// // Close the socket since this is
+// // the end of not keep-alive request.
+// Socket.destroy(state.socket);
+// }
+// }
+// }
+// } else if (rv < 0) {
+// int errn = -rv;
+// /* Any non timeup or interrupted error is critical */
+// if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) {
+// if (errn > Status.APR_OS_START_USERERR) {
+// errn -= Status.APR_OS_START_USERERR;
+// }
+// log.error(sm.getString("endpoint.poll.fail", "" + errn, Error.strerror(errn)));
+// // Handle poll critical failure
+// synchronized (this) {
+// destroy();
+// init();
+// }
+// continue;
+// }
+// }
+// /* TODO: See if we need to call the maintain for sendfile poller */
+// } catch (Throwable t) {
+// log.error(sm.getString("endpoint.poll.error"), t);
+// }
+// }
+//
+// synchronized (this) {
+// this.notifyAll();
+// }
+
+ }
+
+ }
+
+
+ // ------------------------------------------------ Handler Inner Interface
+
+
+ /**
+ * Bare bones interface used for socket processing. Per thread data is to be
+ * stored in the ThreadWithAttributes extra folders, or alternately in
+ * thread local fields.
+ */
+ public interface Handler {
+ public enum SocketState {
+ OPEN, CLOSED, LONG
+ }
+ public SocketState process(SocketChannel socket);
+ public SocketState event(SocketChannel socket, boolean error);
+ }
+
+
+ // ------------------------------------------------- WorkerStack Inner Class
+
+
+ public class WorkerStack {
+
+ protected Worker[] workers = null;
+ protected int end = 0;
+
+ public WorkerStack(int size) {
+ workers = new Worker[size];
+ }
+
+ /**
+ * Put the object into the queue.
+ *
+ * @param object the object to be appended to the queue (first element).
+ */
+ public void push(Worker worker) {
+ workers[end++] = worker;
+ }
+
+ /**
+ * Get the first object out of the queue. Return null if the queue
+ * is empty.
+ */
+ public Worker pop() {
+ if (end > 0) {
+ return workers[--end];
+ }
+ return null;
+ }
+
+ /**
+ * Get the first object out of the queue, Return null if the queue
+ * is empty.
+ */
+ public Worker peek() {
+ return workers[end];
+ }
+
+ /**
+ * Is the queue empty?
+ */
+ public boolean isEmpty() {
+ return (end == 0);
+ }
+
+ /**
+ * How many elements are there in this queue?
+ */
+ public int size() {
+ return (end);
+ }
+ }
+
+
+ // ---------------------------------------------- SocketProcessor Inner Class
+
+
+ /**
+ * This class is the equivalent of the Worker, but will simply use in an
+ * external Executor thread pool.
+ */
+ protected class SocketProcessor implements Runnable {
+
+ protected SocketChannel socket = null;
+
+ public SocketProcessor(SocketChannel socket) {
+ this.socket = socket;
+ }
+
+ public void run() {
+
+ // Process the request from this socket
+ if (handler.process(socket) == Handler.SocketState.CLOSED) {
+ // Close socket and pool
+ try {
+ socket.socket().close();
+ socket.close();
+ } catch ( Exception x ) {
+ log.error("",x);
+ }
+ socket = null;
+ }
+
+ }
+
+ }
+
+
+ // --------------------------------------- SocketEventProcessor Inner Class
+
+
+ /**
+ * This class is the equivalent of the Worker, but will simply use in an
+ * external Executor thread pool.
+ */
+ protected class SocketEventProcessor implements Runnable {
+
+ protected SocketChannel socket = null;
+ protected boolean error = false;
+
+ public SocketEventProcessor(SocketChannel socket, boolean error) {
+ this.socket = socket;
+ this.error = error;
+ }
+
+ public void run() {
+
+ // Process the request from this socket
+ if (handler.event(socket, error) == Handler.SocketState.CLOSED) {
+ // Close socket and pool
+ try {
+ socket.socket().close();
+ socket.close();
+ } catch ( Exception x ) {
+ log.error("",x);
+ }
+ socket = null;
+ }
+
+ }
+
+ }
+
+
+}