-/*
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tomcat.util.net;
-
+/*\r
+ * Copyright 2005-2006 The Apache Software Foundation\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+\r
+package org.apache.tomcat.util.net;\r
+\r
import java.io.IOException;\r
import java.net.InetAddress;\r
import java.net.InetSocketAddress;\r
import org.apache.tomcat.jni.Poll;\r
import org.apache.tomcat.jni.SSL;\r
import org.apache.tomcat.jni.Status;\r
-import org.apache.tomcat.util.res.StringManager;
-
-/**
- * NIO tailored thread pool, providing the following services:
- * <ul>
- * <li>Socket acceptor thread</li>
- * <li>Socket poller thread</li>
- * <li>Sendfile thread</li>
- * <li>Worker threads pool</li>
- * </ul>
- *
- * When switching to Java 5, there's an opportunity to use the virtual
- * machine's thread pool.
- *
- * @author Mladen Turk
- * @author Remy Maucherat
- * @author Filip Hanik
- */
-public class NioEndpoint {
-
-
- // -------------------------------------------------------------- Constants
-
-
- protected static Log log = LogFactory.getLog(NioEndpoint.class);
-
- protected static StringManager sm =
- StringManager.getManager("org.apache.tomcat.util.net.res");
-
-
- /**
- * The Request attribute key for the cipher suite.
- */
- public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite";
-
- /**
- * The Request attribute key for the key size.
- */
- public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size";
-
- /**
- * The Request attribute key for the client certificate chain.
- */
- public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate";
-
- /**
- * The Request attribute key for the session id.
- * This one is a Tomcat extension to the Servlet spec.
- */
- public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session";
-
-
- // ----------------------------------------------------------------- Fields
-
-
- /**
- * Available workers.
- */
- protected WorkerStack workers = null;
-
-
- /**
- * Running state of the endpoint.
- */
- protected volatile boolean running = false;
-
-
- /**
- * Will be set to true whenever the endpoint is paused.
- */
- protected volatile boolean paused = false;
-
-
- /**
- * Track the initialization state of the endpoint.
- */
- protected boolean initialized = false;
-
-
- /**
- * Current worker threads busy count.
- */
- protected int curThreadsBusy = 0;
-
-
- /**
- * Current worker threads count.
- */
- protected int curThreads = 0;
-
-
- /**
- * Sequence number used to generate thread names.
- */
- protected int sequence = 0;
-
-
- /**
- * Root APR memory pool.
- */
- protected long rootPool = 0;
-
-
- /**
- * Server socket "pointer".
- */
- protected ServerSocketChannel serverSock = null;
-
-
- /**
- * APR memory pool for the server socket.
- */
- protected long serverSockPool = 0;
-
-
- /**
- * SSL context.
- */
- protected long sslContext = 0;
-
-
- // ------------------------------------------------------------- Properties
-
-
- /**
- * External Executor based thread pool.
- */
- protected Executor executor = null;
- public void setExecutor(Executor executor) { this.executor = executor; }
- public Executor getExecutor() { return executor; }
-
-
- /**
- * Maximum amount of worker threads.
- */
- protected int maxThreads = 40;
- public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; }
- public int getMaxThreads() { return maxThreads; }
-
-
- /**
- * Priority of the acceptor and poller threads.
- */
- protected int threadPriority = Thread.NORM_PRIORITY;
- public void setThreadPriority(int threadPriority) { this.threadPriority = threadPriority; }
- public int getThreadPriority() { return threadPriority; }
-
-
- /**
- * Size of the socket poller.
- */
- protected int pollerSize = 8 * 1024;
- public void setPollerSize(int pollerSize) { this.pollerSize = pollerSize; }
- public int getPollerSize() { return pollerSize; }
-
-
- /**
- * Size of the sendfile (= concurrent files which can be served).
- */
- protected int sendfileSize = 1 * 1024;
- public void setSendfileSize(int sendfileSize) { this.sendfileSize = sendfileSize; }
- public int getSendfileSize() { return sendfileSize; }
-
-
- /**
- * Server socket port.
- */
- protected int port;
- public int getPort() { return port; }
- public void setPort(int port ) { this.port=port; }
-
-
- /**
- * Address for the server socket.
- */
- protected InetAddress address;
- public InetAddress getAddress() { return address; }
- public void setAddress(InetAddress address) { this.address = address; }
-
-
- /**
- * Handling of accepted sockets.
- */
- protected Handler handler = null;
- public void setHandler(Handler handler ) { this.handler = handler; }
- public Handler getHandler() { return handler; }
-
-
- /**
- * Allows the server developer to specify the backlog that
- * should be used for server sockets. By default, this value
- * is 100.
- */
- protected int backlog = 100;
- public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; }
- public int getBacklog() { return backlog; }
-
-
- /**
- * Socket TCP no delay.
- */
- protected boolean tcpNoDelay = false;
- public boolean getTcpNoDelay() { return tcpNoDelay; }
- public void setTcpNoDelay(boolean tcpNoDelay) { this.tcpNoDelay = tcpNoDelay; }
-
-
- /**
- * Socket linger.
- */
- protected int soLinger = 100;
- public int getSoLinger() { return soLinger; }
- public void setSoLinger(int soLinger) { this.soLinger = soLinger; }
-
-
- /**
- * Socket timeout.
- */
- protected int soTimeout = -1;
- public int getSoTimeout() { return soTimeout; }
- public void setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; }
-
-
- /**
- * Timeout on first request read before going to the poller, in ms.
- */
- protected int firstReadTimeout = 60000;
- public int getFirstReadTimeout() { return firstReadTimeout; }
- public void setFirstReadTimeout(int firstReadTimeout) { this.firstReadTimeout = firstReadTimeout; }
-
-
- /**
- * Poll interval, in microseconds. The smaller the value, the more CPU the poller
- * will use, but the more responsive to activity it will be.
- */
- protected int pollTime = 2000;
- public int getPollTime() { return pollTime; }
- public void setPollTime(int pollTime) { if (pollTime > 0) { this.pollTime = pollTime; } }
-
-
- /**
- * The default is true - the created threads will be
- * in daemon mode. If set to false, the control thread
- * will not be daemon - and will keep the process alive.
- */
- protected boolean daemon = true;
- public void setDaemon(boolean b) { daemon = b; }
- public boolean getDaemon() { return daemon; }
-
-
- /**
- * Name of the thread pool, which will be used for naming child threads.
- */
- protected String name = "TP";
- public void setName(String name) { this.name = name; }
- public String getName() { return name; }
-
-
- /**
- * Use endfile for sending static files.
- */
- protected boolean useSendfile = Library.APR_HAS_SENDFILE;
- public void setUseSendfile(boolean useSendfile) { this.useSendfile = useSendfile; }
- public boolean getUseSendfile() { return useSendfile; }
-
-
- /**
- * Allow comet request handling.
- */
- protected boolean useComet = true;
- public void setUseComet(boolean useComet) { this.useComet = useComet; }
- public boolean getUseComet() { return useComet; }
-
-
- /**
- * Acceptor thread count.
- */
- protected int acceptorThreadCount = 0;
- public void setAcceptorThreadCount(int acceptorThreadCount) { this.acceptorThreadCount = acceptorThreadCount; }
- public int getAcceptorThreadCount() { return acceptorThreadCount; }
-
-
- /**
- * Sendfile thread count.
- */
- protected int sendfileThreadCount = 0;
- public void setSendfileThreadCount(int sendfileThreadCount) { this.sendfileThreadCount = sendfileThreadCount; }
- public int getSendfileThreadCount() { return sendfileThreadCount; }
-
-
- /**
- * Poller thread count.
- */
- protected int pollerThreadCount = 0;
- public void setPollerThreadCount(int pollerThreadCount) { this.pollerThreadCount = pollerThreadCount; }
- public int getPollerThreadCount() { return pollerThreadCount; }
-
- protected long selectorTimeout = 5000;
- public void setSelectorTimeout(long timeout){ this.selectorTimeout = timeout;}
- public long getSelectorTimeout(){ return this.selectorTimeout; }
- /**
- * The socket poller.
- */
- protected Poller[] pollers = null;
- protected int pollerRoundRobin = 0;
- public Poller getPoller() {
- pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length;
- Poller poller = pollers[pollerRoundRobin];
- poller.comet = false;
- return poller;
- }
-
-
- /**
- * The socket poller used for Comet support.
- */
- public Poller getCometPoller() {
- Poller poller = getPoller();
- poller.comet = true;
- return poller;
- }
-
-
- /**
- * The static file sender.
- */
- protected Sendfile[] sendfiles = null;
- protected int sendfileRoundRobin = 0;
- public Sendfile getSendfile() {
- sendfileRoundRobin = (sendfileRoundRobin + 1) % sendfiles.length;
- return sendfiles[sendfileRoundRobin];
- }
-
-
- /**
- * Dummy maxSpareThreads property.
- */
- public int getMaxSpareThreads() { return 0; }
-
-
- /**
- * Dummy minSpareThreads property.
- */
- public int getMinSpareThreads() { return 0; }
-
-
- /**
- * SSL engine.
- */
- protected String SSLEngine = "off";
- public String getSSLEngine() { return SSLEngine; }
- public void setSSLEngine(String SSLEngine) { this.SSLEngine = SSLEngine; }
-
-
- /**
- * SSL protocols.
- */
- protected String SSLProtocol = "all";
- public String getSSLProtocol() { return SSLProtocol; }
- public void setSSLProtocol(String SSLProtocol) { this.SSLProtocol = SSLProtocol; }
-
-
- /**
- * SSL password (if a cert is encrypted, and no password has been provided, a callback
- * will ask for a password).
- */
- protected String SSLPassword = null;
- public String getSSLPassword() { return SSLPassword; }
- public void setSSLPassword(String SSLPassword) { this.SSLPassword = SSLPassword; }
-
-
- /**
- * SSL cipher suite.
- */
- protected String SSLCipherSuite = "ALL";
- public String getSSLCipherSuite() { return SSLCipherSuite; }
- public void setSSLCipherSuite(String SSLCipherSuite) { this.SSLCipherSuite = SSLCipherSuite; }
-
-
- /**
- * SSL certificate file.
- */
- protected String SSLCertificateFile = null;
- public String getSSLCertificateFile() { return SSLCertificateFile; }
- public void setSSLCertificateFile(String SSLCertificateFile) { this.SSLCertificateFile = SSLCertificateFile; }
-
-
- /**
- * SSL certificate key file.
- */
- protected String SSLCertificateKeyFile = null;
- public String getSSLCertificateKeyFile() { return SSLCertificateKeyFile; }
- public void setSSLCertificateKeyFile(String SSLCertificateKeyFile) { this.SSLCertificateKeyFile = SSLCertificateKeyFile; }
-
-
- /**
- * SSL certificate chain file.
- */
- protected String SSLCertificateChainFile = null;
- public String getSSLCertificateChainFile() { return SSLCertificateChainFile; }
- public void setSSLCertificateChainFile(String SSLCertificateChainFile) { this.SSLCertificateChainFile = SSLCertificateChainFile; }
-
-
- /**
- * SSL CA certificate path.
- */
- protected String SSLCACertificatePath = null;
- public String getSSLCACertificatePath() { return SSLCACertificatePath; }
- public void setSSLCACertificatePath(String SSLCACertificatePath) { this.SSLCACertificatePath = SSLCACertificatePath; }
-
-
- /**
- * SSL CA certificate file.
- */
- protected String SSLCACertificateFile = null;
- public String getSSLCACertificateFile() { return SSLCACertificateFile; }
- public void setSSLCACertificateFile(String SSLCACertificateFile) { this.SSLCACertificateFile = SSLCACertificateFile; }
-
-
- /**
- * SSL CA revocation path.
- */
- protected String SSLCARevocationPath = null;
- public String getSSLCARevocationPath() { return SSLCARevocationPath; }
- public void setSSLCARevocationPath(String SSLCARevocationPath) { this.SSLCARevocationPath = SSLCARevocationPath; }
-
-
- /**
- * SSL CA revocation file.
- */
- protected String SSLCARevocationFile = null;
- public String getSSLCARevocationFile() { return SSLCARevocationFile; }
- public void setSSLCARevocationFile(String SSLCARevocationFile) { this.SSLCARevocationFile = SSLCARevocationFile; }
-
-
- /**
- * SSL verify client.
- */
- protected String SSLVerifyClient = "none";
- public String getSSLVerifyClient() { return SSLVerifyClient; }
- public void setSSLVerifyClient(String SSLVerifyClient) { this.SSLVerifyClient = SSLVerifyClient; }
-
-
- /**
- * SSL verify depth.
- */
- protected int SSLVerifyDepth = 10;
- public int getSSLVerifyDepth() { return SSLVerifyDepth; }
- public void setSSLVerifyDepth(int SSLVerifyDepth) { this.SSLVerifyDepth = SSLVerifyDepth; }
-
-
- // --------------------------------------------------------- Public Methods
-
-
- /**
- * Number of keepalive sockets.
- */
- public int getKeepAliveCount() {
- if (pollers == null) {
- return 0;
- } else {
- int keepAliveCount = 0;
- for (int i = 0; i < pollers.length; i++) {
- keepAliveCount += pollers[i].getKeepAliveCount();
- }
- return keepAliveCount;
- }
- }
-
-
- /**
- * Number of sendfile sockets.
- */
- public int getSendfileCount() {
- if (sendfiles == null) {
- return 0;
- } else {
- int sendfileCount = 0;
- for (int i = 0; i < sendfiles.length; i++) {
- sendfileCount += sendfiles[i].getSendfileCount();
- }
- return sendfileCount;
- }
- }
-
-
- /**
- * Return the amount of threads that are managed by the pool.
- *
- * @return the amount of threads that are managed by the pool
- */
- public int getCurrentThreadCount() {
- return curThreads;
- }
-
-
- /**
- * Return the amount of threads currently busy.
- *
- * @return the amount of threads currently busy
- */
- public int getCurrentThreadsBusy() {
- return curThreadsBusy;
- }
-
-
- /**
- * Return the state of the endpoint.
- *
- * @return true if the endpoint is running, false otherwise
- */
- public boolean isRunning() {
- return running;
- }
-
-
- /**
- * Return the state of the endpoint.
- *
- * @return true if the endpoint is paused, false otherwise
- */
- public boolean isPaused() {
- return paused;
- }
-
-
- // ----------------------------------------------- Public Lifecycle Methods
-
-
- /**
- * Initialize the endpoint.
- */
- public void init()
- throws Exception {
-
- if (initialized)
- return;
-
- serverSock = ServerSocketChannel.open();
- InetSocketAddress addr = (address!=null?new InetSocketAddress(address,port):new InetSocketAddress(port));
- serverSock.socket().bind(addr,100); //todo, set backlog value
- serverSock.configureBlocking(true); //mimic APR behavior
- // Sendfile usage on systems which don't support it cause major problems
- if (useSendfile) {
- log.warn(sm.getString("endpoint.sendfile.nosupport"));
- useSendfile = false;
- }
-
- // Initialize thread count defaults for acceptor, poller and sendfile
- if (acceptorThreadCount == 0) {
- // FIXME: Doesn't seem to work that well with multiple accept threads
- acceptorThreadCount = 1;
- }
- if (pollerThreadCount != 1) {
- // limit to one poller, no need for others
- pollerThreadCount = 1;
- }
- if (sendfileThreadCount != 0) {
- sendfileThreadCount = 0;
- }
-
- // Initialize SSL if needed
- if (!"off".equalsIgnoreCase(SSLEngine)) {
- // Initialize SSL
- // FIXME: one per VM call ?
- if ("on".equalsIgnoreCase(SSLEngine)) {
- SSL.initialize(null);
- } else {
- SSL.initialize(SSLEngine);
- }
- // SSL protocol
- int value = SSL.SSL_PROTOCOL_ALL;
- if ("SSLv2".equalsIgnoreCase(SSLProtocol)) {
- value = SSL.SSL_PROTOCOL_SSLV2;
- } else if ("SSLv3".equalsIgnoreCase(SSLProtocol)) {
- value = SSL.SSL_PROTOCOL_SSLV3;
- } else if ("TLSv1".equalsIgnoreCase(SSLProtocol)) {
- value = SSL.SSL_PROTOCOL_TLSV1;
- } else if ("SSLv2+SSLv3".equalsIgnoreCase(SSLProtocol)) {
- value = SSL.SSL_PROTOCOL_SSLV2 | SSL.SSL_PROTOCOL_SSLV3;
- }
-// // Create SSL Context
-// sslContext = SSLContext.make(rootPool, value, SSL.SSL_MODE_SERVER);
-// // List the ciphers that the client is permitted to negotiate
-// SSLContext.setCipherSuite(sslContext, SSLCipherSuite);
-// // Load Server key and certificate
-// SSLContext.setCertificate(sslContext, SSLCertificateFile, SSLCertificateKeyFile, SSLPassword, SSL.SSL_AIDX_RSA);
-// // Set certificate chain file
-// SSLContext.setCertificateChainFile(sslContext, SSLCertificateChainFile, false);
-// // Support Client Certificates
-// SSLContext.setCACertificate(sslContext, SSLCACertificateFile, SSLCACertificatePath);
-// // Set revocation
-// SSLContext.setCARevocation(sslContext, SSLCARevocationFile, SSLCARevocationPath);
-// // Client certificate verification
-// value = SSL.SSL_CVERIFY_NONE;
-// if ("optional".equalsIgnoreCase(SSLVerifyClient)) {
-// value = SSL.SSL_CVERIFY_OPTIONAL;
-// } else if ("require".equalsIgnoreCase(SSLVerifyClient)) {
-// value = SSL.SSL_CVERIFY_REQUIRE;
-// } else if ("optionalNoCA".equalsIgnoreCase(SSLVerifyClient)) {
-// value = SSL.SSL_CVERIFY_OPTIONAL_NO_CA;
-// }
-// SSLContext.setVerify(sslContext, value, SSLVerifyDepth);
- // For now, sendfile is not supported with SSL
- useSendfile = false;
- }
-
- initialized = true;
-
- }
-
-
- /**
- * Start the APR endpoint, creating acceptor, poller and sendfile threads.
- */
- public void start()
- throws Exception {
- // Initialize socket if not done before
- if (!initialized) {
- init();
- }
- if (!running) {
- running = true;
- paused = false;
-
- // Create worker collection
- if (executor == null) {
- workers = new WorkerStack(maxThreads);
- }
-
- // Start acceptor threads
- for (int i = 0; i < acceptorThreadCount; i++) {
- Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i);
- acceptorThread.setPriority(threadPriority);
- acceptorThread.setDaemon(daemon);
- acceptorThread.start();
- }
-
- // Start poller threads
- pollers = new Poller[pollerThreadCount];
- for (int i = 0; i < pollerThreadCount; i++) {
- pollers[i] = new Poller(false);
- pollers[i].init();
- Thread pollerThread = new Thread(pollers[i], getName() + "-Poller-" + i);
- pollerThread.setPriority(threadPriority);
- pollerThread.setDaemon(true);
- pollerThread.start();
- }
-
- // Start sendfile threads
- if (useSendfile) {
- sendfiles = new Sendfile[sendfileThreadCount];
- for (int i = 0; i < sendfileThreadCount; i++) {
- sendfiles[i] = new Sendfile();
- sendfiles[i].init();
- Thread sendfileThread = new Thread(sendfiles[i], getName() + "-Sendfile-" + i);
- sendfileThread.setPriority(threadPriority);
- sendfileThread.setDaemon(true);
- sendfileThread.start();
- }
- }
- }
- }
-
-
- /**
- * Pause the endpoint, which will make it stop accepting new sockets.
- */
- public void pause() {
- if (running && !paused) {
- paused = true;
- unlockAccept();
- }
- }
-
-
- /**
- * Resume the endpoint, which will make it start accepting new sockets
- * again.
- */
- public void resume() {
- if (running) {
- paused = false;
- }
- }
-
-
- /**
- * Stop the endpoint. This will cause all processing threads to stop.
- */
- public void stop() {
- if (running) {
- running = false;
- unlockAccept();
- for (int i = 0; i < pollers.length; i++) {
- pollers[i].destroy();
- }
- pollers = null;
- if (useSendfile) {
- for (int i = 0; i < sendfiles.length; i++) {
- sendfiles[i].destroy();
- }
- sendfiles = null;
- }
- }
- }
-
-
- /**
- * Deallocate APR memory pools, and close server socket.
- */
- public void destroy() throws Exception {
- if (running) {
- stop();
- }
- // Close server socket
- serverSock.socket().close();
- serverSock.close();
- serverSock = null;
- sslContext = 0;
- initialized = false;
- }
-
-
- // ------------------------------------------------------ Protected Methods
-
-
- /**
- * Get a sequence number used for thread naming.
- */
- protected int getSequence() {
- return sequence++;
- }
-
-
- /**
- * Unlock the server socket accept using a bugus connection.
- */
- protected void unlockAccept() {
- java.net.Socket s = null;
- try {
- // Need to create a connection to unlock the accept();
- if (address == null) {
- s = new java.net.Socket("127.0.0.1", port);
- } else {
- s = new java.net.Socket(address, port);
- // setting soLinger to a small value will help shutdown the
- // connection quicker
- s.setSoLinger(true, 0);
- }
- } catch(Exception e) {
- if (log.isDebugEnabled()) {
- log.debug(sm.getString("endpoint.debug.unlock", "" + port), e);
- }
- } finally {
- if (s != null) {
- try {
- s.close();
- } catch (Exception e) {
- // Ignore
- }
- }
- }
- }
-
-
- /**
- * Process the specified connection.
- */
- protected boolean setSocketOptions(SocketChannel socket) {
- // Process the connection
- int step = 1;
- try {
- //disable blocking, APR style, we are gonna be polling it
- socket.configureBlocking(false);
-
- // 1: Set socket options: timeout, linger, etc
- if (soLinger >= 0)
- socket.socket().setSoLinger(true,soLinger);
- if (tcpNoDelay)
- socket.socket().setTcpNoDelay(true);
- if (soTimeout > 0)
- socket.socket().setSoTimeout(soTimeout);
-
-
- // 2: SSL handshake
- step = 2;
- if (sslContext != 0) {
-// SSLSocket.attach(sslContext, socket);
-// if (SSLSocket.handshake(socket) != 0) {
-// if (log.isDebugEnabled()) {
-// log.debug(sm.getString("endpoint.err.handshake") + ": " + SSL.getLastError());
-// }
-// return false;
-// }
- }
-
- getPoller().register(socket);
-
- } catch (Throwable t) {
- if (log.isDebugEnabled()) {
- if (step == 2) {
- log.debug(sm.getString("endpoint.err.handshake"), t);
- } else {
- log.debug(sm.getString("endpoint.err.unexpected"), t);
- }
- }
- // Tell to close the socket
- return false;
- }
- return true;
- }
-
-
- /**
- * Create (or allocate) and return an available processor for use in
- * processing a specific HTTP request, if possible. If the maximum
- * allowed processors have already been created and are in use, return
- * <code>null</code> instead.
- */
- protected Worker createWorkerThread() {
-
- synchronized (workers) {
- if (workers.size() > 0) {
- curThreadsBusy++;
- return (workers.pop());
- }
- if ((maxThreads > 0) && (curThreads < maxThreads)) {
- curThreadsBusy++;
- return (newWorkerThread());
- } else {
- if (maxThreads < 0) {
- curThreadsBusy++;
- return (newWorkerThread());
- } else {
- return (null);
- }
- }
- }
-
- }
-
-
- /**
- * Create and return a new processor suitable for processing HTTP
- * requests and returning the corresponding responses.
- */
- protected Worker newWorkerThread() {
-
- Worker workerThread = new Worker();
- workerThread.start();
- return (workerThread);
-
- }
-
-
- /**
- * Return a new worker thread, and block while to worker is available.
- */
- protected Worker getWorkerThread() {
- // Allocate a new worker thread
- Worker workerThread = createWorkerThread();
- while (workerThread == null) {
- try {
- synchronized (workers) {
- workers.wait();
- }
- } catch (InterruptedException e) {
- // Ignore
- }
- workerThread = createWorkerThread();
- }
- return workerThread;
- }
-
-
- /**
- * Recycle the specified Processor so that it can be used again.
- *
- * @param workerThread The processor to be recycled
- */
- protected void recycleWorkerThread(Worker workerThread) {
- synchronized (workers) {
- workers.push(workerThread);
- curThreadsBusy--;
- workers.notify();
- }
- }
-
-
- /**
- * Allocate a new poller of the specified size.
- */
- protected long allocatePoller(int size, long pool, int timeout) {
- try {
- return Poll.create(size, pool, 0, timeout * 1000);
- } catch (Error e) {
- if (Status.APR_STATUS_IS_EINVAL(e.getError())) {
- log.info(sm.getString("endpoint.poll.limitedpollsize", "" + size));
- return 0;
- } else {
- log.error(sm.getString("endpoint.poll.initfail"), e);
- return -1;
- }
- }
- }
-
-
- /**
- * Process given socket.
- */
- protected boolean processSocket(SocketChannel socket) {
- try {
- if (executor == null) {
- getWorkerThread().assign(socket);
- } else {
- executor.execute(new SocketProcessor(socket));
- }
- } catch (Throwable t) {
- // This means we got an OOM or similar creating a thread, or that
- // the pool and its queue are full
- log.error(sm.getString("endpoint.process.fail"), t);
- return false;
- }
- return true;
- }
-
-
- /**
- * Process given socket for an event.
- */
- protected boolean processSocket(SocketChannel socket, boolean error) {
- try {
- if (executor == null) {
- getWorkerThread().assign(socket, error);
- } else {
- executor.execute(new SocketEventProcessor(socket, error));
- }
- } catch (Throwable t) {
- // This means we got an OOM or similar creating a thread, or that
- // the pool and its queue are full
- log.error(sm.getString("endpoint.process.fail"), t);
- return false;
- }
- return true;
- }
-
-
- // --------------------------------------------------- Acceptor Inner Class
-
-
- /**
- * Server socket acceptor thread.
- */
- protected class Acceptor implements Runnable {
-
-
- /**
- * The background thread that listens for incoming TCP/IP connections and
- * hands them off to an appropriate processor.
- */
- public void run() {
-
- // Loop until we receive a shutdown command
- while (running) {
-
- // Loop if endpoint is paused
- while (paused) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- // Ignore
- }
- }
-
- try {
- // Accept the next incoming connection from the server socket
- SocketChannel socket = serverSock.accept();
- // Hand this socket off to an appropriate processor
- if(!setSocketOptions(socket))
- {
- // Close socket right away
- socket.socket().close();
- socket.close();
- }
- } catch (Throwable t) {
- log.error(sm.getString("endpoint.accept.fail"), t);
- }
-
- // The processor will recycle itself when it finishes
-
- }
-
- }
-
- }
-
-
- // ----------------------------------------------------- Poller Inner Class
-
-
- /**
- * Poller class.
- */
- public class Poller implements Runnable {
-
- protected Selector selector;
- protected LinkedList<Runnable> events = new LinkedList<Runnable>();
- protected boolean close = false;
- protected boolean comet = true;
-
- protected int keepAliveCount = 0;
- public int getKeepAliveCount() { return keepAliveCount; }
-
-
-
- public Poller(boolean comet) throws IOException {
- this.comet = comet;
- this.selector = Selector.open();
- }
-
- public Selector getSelector() { return selector;}
-
- /**
- * Create the poller. With some versions of APR, the maximum poller size will
- * be 62 (reocmpiling APR is necessary to remove this limitation).
- */
- protected void init() {
- keepAliveCount = 0;
- }
-
- /**
- * Destroy the poller.
- */
- protected void destroy() {
- // Wait for polltime before doing anything, so that the poller threads
- // exit, otherwise parallel descturction of sockets which are still
- // in the poller can cause problems
- try {
- synchronized (this) {
- this.wait(pollTime / 1000);
- }
- } catch (InterruptedException e) {
- // Ignore
- }
- close = true;
- }
-
- /**
- * Add specified socket and associated pool to the poller. The socket will
- * be added to a temporary array, and polled first after a maximum amount
- * of time equal to pollTime (in most cases, latency will be much lower,
- * however).
- *
- * @param socket to add to the poller
- */
- public void add(final SocketChannel socket) {
- final SelectionKey key = socket.keyFor(selector);
- Runnable r = new Runnable() {
- public void run() {
- if ( key != null ) key.interestOps(SelectionKey.OP_READ);
- }
- };
- synchronized (events) {
- events.add(r);
- }
- selector.wakeup();
- }
-
- public void events() {
- synchronized (events) {
- Runnable r = null;
- while ( (events.size() > 0) && (r = events.removeFirst()) != null ) {
- try {
- r.run();
- } catch ( Exception x ) {
- log.error("",x);
- }
- }
- events.clear();
- }
- }
-
- public void register(final SocketChannel socket)
- {
- SelectionKey key = socket.keyFor(selector);
- Runnable r = new Runnable() {
- public void run() {
- try {
- socket.register(selector, SelectionKey.OP_READ, new KeyAttachment());
- } catch (Exception x) {
- log.error("", x);
- }
- }
-
- };
- synchronized (events) {
- events.add(r);
- }
- selector.wakeup();
- }
-
- public void cancelledKey(SelectionKey key) {
- try {
- KeyAttachment ka = (KeyAttachment) key.attachment();
- key.cancel();
- if (ka.getComet()) processSocket( (SocketChannel) key.channel(), true);
- key.channel().close();
- } catch (IOException e) {
- if ( log.isDebugEnabled() ) log.debug("",e);
- // Ignore
- }
- }
- /**
- * The background thread that listens for incoming TCP/IP connections and
- * hands them off to an appropriate processor.
- */
- public void run() {
-
- // Loop until we receive a shutdown command
- while (running) {
- // Loop if endpoint is paused
- while (paused) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- // Ignore
- }
- }
-
- events();
- // Time to terminate?
- if (close) return;
-
- int keyCount = 0;
- try {
- keyCount = selector.select(selectorTimeout);
- } catch (IOException x) {
- log.error("",x);
- continue;
- }
- //timeout
- Set keys = selector.keys();
- long now = System.currentTimeMillis();
- for (Iterator iter = keys.iterator(); iter.hasNext(); ) {
- SelectionKey key = (SelectionKey) iter.next();
- try {
- if (key.interestOps() == SelectionKey.OP_READ) {
- //only timeout sockets that we are waiting for a read from
- KeyAttachment ka = (KeyAttachment) key.attachment();
- long delta = now - ka.getLastAccess();
- if (delta > (long) soTimeout) {
- cancelledKey(key);
- }
- }
- }catch ( CancelledKeyException ckx ) {
- cancelledKey(key);
- }
- }
-
-
- if (keyCount == 0) continue;
-
- Iterator iterator = selector.selectedKeys().iterator();
- // Walk through the collection of ready keys and dispatch
- // any active event.
- while (iterator.hasNext()) {
- SelectionKey sk = (SelectionKey) iterator.next();
- iterator.remove();
- KeyAttachment attachment = (KeyAttachment)sk.attachment();
- try {
- if(attachment == null) attachment = new KeyAttachment();
- attachment.access();
- sk.attach(attachment);
-
- int readyOps = sk.readyOps();
- sk.interestOps(sk.interestOps() & ~readyOps);
- SocketChannel channel = (SocketChannel)sk.channel();
- boolean read = sk.isReadable();
- if (read) {
- if ( comet ) {
- if (!processSocket(channel,false)) processSocket(channel,true);
- } else {
- boolean close = (!processSocket(channel));
- if ( close ) {
- channel.socket().close();
- channel.close();
- }
- }
- }
- if (sk.isValid() && sk.isWritable()) {
- }
- } catch ( CancelledKeyException ckx ) {
- if (attachment!=null && attachment.getComet()) processSocket( (SocketChannel) sk.channel(), true);
- try {
- sk.channel().close();
- }catch ( Exception ignore){}
- } catch (Throwable t) {
- log.error("",t);
- }
- }//while
-
-
- }
- synchronized (this) {
- this.notifyAll();
- }
-
- }
-
- }
-
- public static class KeyAttachment {
-
- public long getLastAccess() { return lastAccess; }
- public void access() { access(System.currentTimeMillis()); }
- public void access(long access) { lastAccess = access; }
- public void setComet(boolean comet) { this.comet = comet; }
- public boolean getComet() { return comet; }
- public boolean getCurrentAccess() { return currentAccess; }
- public void setCurrentAccess(boolean access) { currentAccess = access; }
-
- protected long lastAccess = System.currentTimeMillis();
- protected boolean currentAccess = false;
- protected boolean comet = false;
-
- }
-
-
-
- // ----------------------------------------------------- Worker Inner Class
-
-
- /**
- * Server processor class.
- */
- protected class Worker implements Runnable {
-
-
- protected Thread thread = null;
- protected boolean available = false;
- protected SocketChannel socket = null;
- protected boolean event = false;
- protected boolean error = false;
-
-
- /**
- * Process an incoming TCP/IP connection on the specified socket. Any
- * exception that occurs during processing must be logged and swallowed.
- * <b>NOTE</b>: This method is called from our Connector's thread. We
- * must assign it to our own thread so that multiple simultaneous
- * requests can be handled.
- *
- * @param socket TCP socket to process
- */
- protected synchronized void assign(SocketChannel socket) {
-
- // Wait for the Processor to get the previous Socket
- while (available) {
- try {
- wait();
- } catch (InterruptedException e) {
- }
- }
-
- // Store the newly available Socket and notify our thread
- this.socket = socket;
- event = false;
- error = false;
- available = true;
- notifyAll();
-
- }
-
-
- protected synchronized void assign(SocketChannel socket, boolean error) {
-
- // Wait for the Processor to get the previous Socket
- while (available) {
- try {
- wait();
- } catch (InterruptedException e) {
- }
- }
-
- // Store the newly available Socket and notify our thread
- this.socket = socket;
- event = true;
- this.error = error;
- available = true;
- notifyAll();
- }
-
-
- /**
- * Await a newly assigned Socket from our Connector, or <code>null</code>
- * if we are supposed to shut down.
- */
- protected synchronized SocketChannel await() {
-
- // Wait for the Connector to provide a new Socket
- while (!available) {
- try {
- wait();
- } catch (InterruptedException e) {
- }
- }
-
- // Notify the Connector that we have received this Socket
- SocketChannel socket = this.socket;
- available = false;
- notifyAll();
-
- return (socket);
-
- }
-
-
- /**
- * The background thread that listens for incoming TCP/IP connections and
- * hands them off to an appropriate processor.
- */
- public void run() {
-
- // Process requests until we receive a shutdown signal
- while (running) {
-
- // Wait for the next socket to be assigned
- SocketChannel socket = await();
- if (socket == null)
- continue;
-
- // Process the request from this socket
- if ((event) && (handler.event(socket, error) == Handler.SocketState.CLOSED)) {
- // Close socket and pool
- try {
- socket.socket().close();
- socket.close();
- }catch ( Exception x ) {
- log.error("",x);
- }
- } else if ((!event) && (handler.process(socket) == Handler.SocketState.CLOSED)) {
- // Close socket and pool
- try {
- socket.socket().close();
- socket.close();
- }catch ( Exception x ) {
- log.error("",x);
- }
- }
-
- // Finish up this request
- recycleWorkerThread(this);
-
- }
-
- }
-
-
- /**
- * Start the background processing thread.
- */
- public void start() {
- thread = new Thread(this);
- thread.setName(getName() + "-" + (++curThreads));
- thread.setDaemon(true);
- thread.start();
- }
-
-
- }
-
-
- // ----------------------------------------------- SendfileData Inner Class
-
-
- /**
- * SendfileData class.
- */
- public static class SendfileData {
- // File
- public String fileName;
- public long fd;
- public long fdpool;
- // Range information
- public long start;
- public long end;
- // Socket and socket pool
- public SocketChannel socket;
- // Position
- public long pos;
- // KeepAlive flag
- public boolean keepAlive;
- }
-
-
- // --------------------------------------------------- Sendfile Inner Class
-
-
- /**
- * Sendfile class.
- */
- public class Sendfile implements Runnable {
-
- protected long sendfilePollset = 0;
- protected long pool = 0;
- protected long[] desc;
- protected HashMap<Long, SendfileData> sendfileData;
-
- protected int sendfileCount;
- public int getSendfileCount() { return sendfileCount; }
-
- protected ArrayList<SendfileData> addS;
-
- /**
- * Create the sendfile poller. With some versions of APR, the maximum poller size will
- * be 62 (reocmpiling APR is necessary to remove this limitation).
- */
- protected void init() {
-// pool = Pool.create(serverSockPool);
-// int size = sendfileSize / sendfileThreadCount;
-// sendfilePollset = allocatePoller(size, pool, soTimeout);
-// if (sendfilePollset == 0 && size > 1024) {
-// size = 1024;
-// sendfilePollset = allocatePoller(size, pool, soTimeout);
-// }
-// if (sendfilePollset == 0) {
-// size = 62;
-// sendfilePollset = allocatePoller(size, pool, soTimeout);
-// }
-// desc = new long[size * 2];
-// sendfileData = new HashMap<Long, SendfileData>(size);
-// addS = new ArrayList<SendfileData>();
- }
-
- /**
- * Destroy the poller.
- */
- protected void destroy() {
-// // Wait for polltime before doing anything, so that the poller threads
-// // exit, otherwise parallel descturction of sockets which are still
-// // in the poller can cause problems
-// try {
-// synchronized (this) {
-// this.wait(pollTime / 1000);
-// }
-// } catch (InterruptedException e) {
-// // Ignore
-// }
-// // Close any socket remaining in the add queue
-// for (int i = (addS.size() - 1); i >= 0; i--) {
-// SendfileData data = addS.get(i);
-// Socket.destroy(data.socket);
-// }
-// // Close all sockets still in the poller
-// int rv = Poll.pollset(sendfilePollset, desc);
-// if (rv > 0) {
-// for (int n = 0; n < rv; n++) {
-// Socket.destroy(desc[n*2+1]);
-// }
-// }
-// Pool.destroy(pool);
-// sendfileData.clear();
- }
-
- /**
- * Add the sendfile data to the sendfile poller. Note that in most cases,
- * the initial non blocking calls to sendfile will return right away, and
- * will be handled asynchronously inside the kernel. As a result,
- * the poller will never be used.
- *
- * @param data containing the reference to the data which should be snet
- * @return true if all the data has been sent right away, and false
- * otherwise
- */
- public boolean add(SendfileData data) {
-// // Initialize fd from data given
-// try {
-// data.fdpool = Socket.pool(data.socket);
-// data.fd = File.open
-// (data.fileName, File.APR_FOPEN_READ
-// | File.APR_FOPEN_SENDFILE_ENABLED | File.APR_FOPEN_BINARY,
-// 0, data.fdpool);
-// data.pos = data.start;
-// // Set the socket to nonblocking mode
-// Socket.timeoutSet(data.socket, 0);
-// while (true) {
-// long nw = Socket.sendfilen(data.socket, data.fd,
-// data.pos, data.end - data.pos, 0);
-// if (nw < 0) {
-// if (!(-nw == Status.EAGAIN)) {
-// Socket.destroy(data.socket);
-// data.socket = 0;
-// return false;
-// } else {
-// // Break the loop and add the socket to poller.
-// break;
-// }
-// } else {
-// data.pos = data.pos + nw;
-// if (data.pos >= data.end) {
-// // Entire file has been sent
-// Pool.destroy(data.fdpool);
-// // Set back socket to blocking mode
-// Socket.timeoutSet(data.socket, soTimeout * 1000);
-// return true;
-// }
-// }
-// }
-// } catch (Exception e) {
-// log.error(sm.getString("endpoint.sendfile.error"), e);
-// return false;
-// }
-// // Add socket to the list. Newly added sockets will wait
-// // at most for pollTime before being polled
-// synchronized (this) {
-// addS.add(data);
-// this.notify();
-// }
- return false;
- }
-
- /**
- * Remove socket from the poller.
- *
- * @param data the sendfile data which should be removed
- */
- protected void remove(SendfileData data) {
-// int rv = Poll.remove(sendfilePollset, data.socket);
-// if (rv == Status.APR_SUCCESS) {
-// sendfileCount--;
-// }
-// sendfileData.remove(data);
- }
-
- /**
- * The background thread that listens for incoming TCP/IP connections and
- * hands them off to an appropriate processor.
- */
- public void run() {
-
-// // Loop until we receive a shutdown command
-// while (running) {
-//
-// // Loop if endpoint is paused
-// while (paused) {
-// try {
-// Thread.sleep(1000);
-// } catch (InterruptedException e) {
-// // Ignore
-// }
-// }
-//
-// while (sendfileCount < 1 && addS.size() < 1) {
-// try {
-// synchronized (this) {
-// this.wait();
-// }
-// } catch (InterruptedException e) {
-// // Ignore
-// }
-// }
-//
-// try {
-// // Add socket to the poller
-// if (addS.size() > 0) {
-// synchronized (this) {
-// for (int i = (addS.size() - 1); i >= 0; i--) {
-// SendfileData data = addS.get(i);
-// int rv = Poll.add(sendfilePollset, data.socket, Poll.APR_POLLOUT);
-// if (rv == Status.APR_SUCCESS) {
-// sendfileData.put(new Long(data.socket), data);
-// sendfileCount++;
-// } else {
-// log.warn(sm.getString("endpoint.sendfile.addfail", "" + rv, Error.strerror(rv)));
-// // Can't do anything: close the socket right away
-// Socket.destroy(data.socket);
-// }
-// }
-// addS.clear();
-// }
-// }
-// // Pool for the specified interval
-// int rv = Poll.poll(sendfilePollset, pollTime, desc, false);
-// if (rv > 0) {
-// for (int n = 0; n < rv; n++) {
-// // Get the sendfile state
-// SendfileData state =
-// sendfileData.get(new Long(desc[n*2+1]));
-// // Problem events
-// if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
-// || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)) {
-// // Close socket and clear pool
-// remove(state);
-// // Destroy file descriptor pool, which should close the file
-// // Close the socket, as the reponse would be incomplete
-// Socket.destroy(state.socket);
-// continue;
-// }
-// // Write some data using sendfile
-// long nw = Socket.sendfilen(state.socket, state.fd,
-// state.pos,
-// state.end - state.pos, 0);
-// if (nw < 0) {
-// // Close socket and clear pool
-// remove(state);
-// // Close the socket, as the reponse would be incomplete
-// // This will close the file too.
-// Socket.destroy(state.socket);
-// continue;
-// }
-//
-// state.pos = state.pos + nw;
-// if (state.pos >= state.end) {
-// remove(state);
-// if (state.keepAlive) {
-// // Destroy file descriptor pool, which should close the file
-// Pool.destroy(state.fdpool);
-// Socket.timeoutSet(state.socket, soTimeout * 1000);
-// // If all done hand this socket off to a worker for
-// // processing of further requests
-// if (!processSocket(state.socket)) {
-// Socket.destroy(state.socket);
-// }
-// } else {
-// // Close the socket since this is
-// // the end of not keep-alive request.
-// Socket.destroy(state.socket);
-// }
-// }
-// }
-// } else if (rv < 0) {
-// int errn = -rv;
-// /* Any non timeup or interrupted error is critical */
-// if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) {
-// if (errn > Status.APR_OS_START_USERERR) {
-// errn -= Status.APR_OS_START_USERERR;
-// }
-// log.error(sm.getString("endpoint.poll.fail", "" + errn, Error.strerror(errn)));
-// // Handle poll critical failure
-// synchronized (this) {
-// destroy();
-// init();
-// }
-// continue;
-// }
-// }
-// /* TODO: See if we need to call the maintain for sendfile poller */
-// } catch (Throwable t) {
-// log.error(sm.getString("endpoint.poll.error"), t);
-// }
-// }
-//
-// synchronized (this) {
-// this.notifyAll();
-// }
-
- }
-
- }
-
-
- // ------------------------------------------------ Handler Inner Interface
-
-
- /**
- * Bare bones interface used for socket processing. Per thread data is to be
- * stored in the ThreadWithAttributes extra folders, or alternately in
- * thread local fields.
- */
- public interface Handler {
- public enum SocketState {
- OPEN, CLOSED, LONG
- }
- public SocketState process(SocketChannel socket);
- public SocketState event(SocketChannel socket, boolean error);
- }
-
-
- // ------------------------------------------------- WorkerStack Inner Class
-
-
- public class WorkerStack {
-
- protected Worker[] workers = null;
- protected int end = 0;
-
- public WorkerStack(int size) {
- workers = new Worker[size];
- }
-
- /**
- * Put the object into the queue.
- *
- * @param object the object to be appended to the queue (first element).
- */
- public void push(Worker worker) {
- workers[end++] = worker;
- }
-
- /**
- * Get the first object out of the queue. Return null if the queue
- * is empty.
- */
- public Worker pop() {
- if (end > 0) {
- return workers[--end];
- }
- return null;
- }
-
- /**
- * Get the first object out of the queue, Return null if the queue
- * is empty.
- */
- public Worker peek() {
- return workers[end];
- }
-
- /**
- * Is the queue empty?
- */
- public boolean isEmpty() {
- return (end == 0);
- }
-
- /**
- * How many elements are there in this queue?
- */
- public int size() {
- return (end);
- }
- }
-
-
- // ---------------------------------------------- SocketProcessor Inner Class
-
-
- /**
- * This class is the equivalent of the Worker, but will simply use in an
- * external Executor thread pool.
- */
- protected class SocketProcessor implements Runnable {
-
- protected SocketChannel socket = null;
-
- public SocketProcessor(SocketChannel socket) {
- this.socket = socket;
- }
-
- public void run() {
-
- // Process the request from this socket
- if (handler.process(socket) == Handler.SocketState.CLOSED) {
- // Close socket and pool
- try {
- socket.socket().close();
- socket.close();
- } catch ( Exception x ) {
- log.error("",x);
- }
- socket = null;
- }
-
- }
-
- }
-
-
- // --------------------------------------- SocketEventProcessor Inner Class
-
-
- /**
- * This class is the equivalent of the Worker, but will simply use in an
- * external Executor thread pool.
- */
- protected class SocketEventProcessor implements Runnable {
-
- protected SocketChannel socket = null;
- protected boolean error = false;
-
- public SocketEventProcessor(SocketChannel socket, boolean error) {
- this.socket = socket;
- this.error = error;
- }
-
- public void run() {
-
- // Process the request from this socket
- if (handler.event(socket, error) == Handler.SocketState.CLOSED) {
- // Close socket and pool
- try {
- socket.socket().close();
- socket.close();
- } catch ( Exception x ) {
- log.error("",x);
- }
- socket = null;
- }
-
- }
-
- }
-
-
-}
+import org.apache.tomcat.util.res.StringManager;\r
+\r
+/**\r
+ * NIO tailored thread pool, providing the following services:\r
+ * <ul>\r
+ * <li>Socket acceptor thread</li>\r
+ * <li>Socket poller thread</li>\r
+ * <li>Sendfile thread</li>\r
+ * <li>Worker threads pool</li>\r
+ * </ul>\r
+ *\r
+ * When switching to Java 5, there's an opportunity to use the virtual\r
+ * machine's thread pool.\r
+ *\r
+ * @author Mladen Turk\r
+ * @author Remy Maucherat\r
+ * @author Filip Hanik\r
+ */\r
+public class NioEndpoint {\r
+\r
+\r
+ // -------------------------------------------------------------- Constants\r
+\r
+\r
+ protected static Log log = LogFactory.getLog(NioEndpoint.class);\r
+\r
+ protected static StringManager sm =\r
+ StringManager.getManager("org.apache.tomcat.util.net.res");\r
+\r
+\r
+ /**\r
+ * The Request attribute key for the cipher suite.\r
+ */\r
+ public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite";\r
+\r
+ /**\r
+ * The Request attribute key for the key size.\r
+ */\r
+ public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size";\r
+\r
+ /**\r
+ * The Request attribute key for the client certificate chain.\r
+ */\r
+ public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate";\r
+\r
+ /**\r
+ * The Request attribute key for the session id.\r
+ * This one is a Tomcat extension to the Servlet spec.\r
+ */\r
+ public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session";\r
+\r
+\r
+ // ----------------------------------------------------------------- Fields\r
+\r
+\r
+ /**\r
+ * Available workers.\r
+ */\r
+ protected WorkerStack workers = null;\r
+\r
+\r
+ /**\r
+ * Running state of the endpoint.\r
+ */\r
+ protected volatile boolean running = false;\r
+\r
+\r
+ /**\r
+ * Will be set to true whenever the endpoint is paused.\r
+ */\r
+ protected volatile boolean paused = false;\r
+\r
+\r
+ /**\r
+ * Track the initialization state of the endpoint.\r
+ */\r
+ protected boolean initialized = false;\r
+\r
+\r
+ /**\r
+ * Current worker threads busy count.\r
+ */\r
+ protected int curThreadsBusy = 0;\r
+\r
+\r
+ /**\r
+ * Current worker threads count.\r
+ */\r
+ protected int curThreads = 0;\r
+\r
+\r
+ /**\r
+ * Sequence number used to generate thread names.\r
+ */\r
+ protected int sequence = 0;\r
+\r
+\r
+ /**\r
+ * Root APR memory pool.\r
+ */\r
+ protected long rootPool = 0;\r
+\r
+\r
+ /**\r
+ * Server socket "pointer".\r
+ */\r
+ protected ServerSocketChannel serverSock = null;\r
+\r
+\r
+ /**\r
+ * APR memory pool for the server socket.\r
+ */\r
+ protected long serverSockPool = 0;\r
+\r
+\r
+ /**\r
+ * SSL context.\r
+ */\r
+ protected long sslContext = 0;\r
+\r
+\r
+ // ------------------------------------------------------------- Properties\r
+\r
+\r
+ /**\r
+ * External Executor based thread pool.\r
+ */\r
+ protected Executor executor = null;\r
+ public void setExecutor(Executor executor) { this.executor = executor; }\r
+ public Executor getExecutor() { return executor; }\r
+\r
+\r
+ /**\r
+ * Maximum amount of worker threads.\r
+ */\r
+ protected int maxThreads = 40;\r
+ public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; }\r
+ public int getMaxThreads() { return maxThreads; }\r
+\r
+\r
+ /**\r
+ * Priority of the acceptor and poller threads.\r
+ */\r
+ protected int threadPriority = Thread.NORM_PRIORITY;\r
+ public void setThreadPriority(int threadPriority) { this.threadPriority = threadPriority; }\r
+ public int getThreadPriority() { return threadPriority; }\r
+\r
+\r
+ /**\r
+ * Size of the socket poller.\r
+ */\r
+ protected int pollerSize = 8 * 1024;\r
+ public void setPollerSize(int pollerSize) { this.pollerSize = pollerSize; }\r
+ public int getPollerSize() { return pollerSize; }\r
+\r
+\r
+ /**\r
+ * Size of the sendfile (= concurrent files which can be served).\r
+ */\r
+ protected int sendfileSize = 1 * 1024;\r
+ public void setSendfileSize(int sendfileSize) { this.sendfileSize = sendfileSize; }\r
+ public int getSendfileSize() { return sendfileSize; }\r
+\r
+\r
+ /**\r
+ * Server socket port.\r
+ */\r
+ protected int port;\r
+ public int getPort() { return port; }\r
+ public void setPort(int port ) { this.port=port; }\r
+\r
+\r
+ /**\r
+ * Address for the server socket.\r
+ */\r
+ protected InetAddress address;\r
+ public InetAddress getAddress() { return address; }\r
+ public void setAddress(InetAddress address) { this.address = address; }\r
+\r
+\r
+ /**\r
+ * Handling of accepted sockets.\r
+ */\r
+ protected Handler handler = null;\r
+ public void setHandler(Handler handler ) { this.handler = handler; }\r
+ public Handler getHandler() { return handler; }\r
+\r
+\r
+ /**\r
+ * Allows the server developer to specify the backlog that\r
+ * should be used for server sockets. By default, this value\r
+ * is 100.\r
+ */\r
+ protected int backlog = 100;\r
+ public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; }\r
+ public int getBacklog() { return backlog; }\r
+\r
+\r
+ /**\r
+ * Socket TCP no delay.\r
+ */\r
+ protected boolean tcpNoDelay = false;\r
+ public boolean getTcpNoDelay() { return tcpNoDelay; }\r
+ public void setTcpNoDelay(boolean tcpNoDelay) { this.tcpNoDelay = tcpNoDelay; }\r
+\r
+\r
+ /**\r
+ * Socket linger.\r
+ */\r
+ protected int soLinger = 100;\r
+ public int getSoLinger() { return soLinger; }\r
+ public void setSoLinger(int soLinger) { this.soLinger = soLinger; }\r
+\r
+\r
+ /**\r
+ * Socket timeout.\r
+ */\r
+ protected int soTimeout = -1;\r
+ public int getSoTimeout() { return soTimeout; }\r
+ public void setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; }\r
+\r
+\r
+ /**\r
+ * Timeout on first request read before going to the poller, in ms.\r
+ */\r
+ protected int firstReadTimeout = 60000;\r
+ public int getFirstReadTimeout() { return firstReadTimeout; }\r
+ public void setFirstReadTimeout(int firstReadTimeout) { this.firstReadTimeout = firstReadTimeout; }\r
+\r
+\r
+ /**\r
+ * Poll interval, in microseconds. The smaller the value, the more CPU the poller\r
+ * will use, but the more responsive to activity it will be.\r
+ */\r
+ protected int pollTime = 2000;\r
+ public int getPollTime() { return pollTime; }\r
+ public void setPollTime(int pollTime) { if (pollTime > 0) { this.pollTime = pollTime; } }\r
+\r
+\r
+ /**\r
+ * The default is true - the created threads will be\r
+ * in daemon mode. If set to false, the control thread\r
+ * will not be daemon - and will keep the process alive.\r
+ */\r
+ protected boolean daemon = true;\r
+ public void setDaemon(boolean b) { daemon = b; }\r
+ public boolean getDaemon() { return daemon; }\r
+\r
+\r
+ /**\r
+ * Name of the thread pool, which will be used for naming child threads.\r
+ */\r
+ protected String name = "TP";\r
+ public void setName(String name) { this.name = name; }\r
+ public String getName() { return name; }\r
+\r
+\r
+ /**\r
+ * Use endfile for sending static files.\r
+ */\r
+ protected boolean useSendfile = Library.APR_HAS_SENDFILE;\r
+ public void setUseSendfile(boolean useSendfile) { this.useSendfile = useSendfile; }\r
+ public boolean getUseSendfile() { return useSendfile; }\r
+\r
+\r
+ /**\r
+ * Allow comet request handling.\r
+ */\r
+ protected boolean useComet = true;\r
+ public void setUseComet(boolean useComet) { this.useComet = useComet; }\r
+ public boolean getUseComet() { return useComet; }\r
+\r
+\r
+ /**\r
+ * Acceptor thread count.\r
+ */\r
+ protected int acceptorThreadCount = 0;\r
+ public void setAcceptorThreadCount(int acceptorThreadCount) { this.acceptorThreadCount = acceptorThreadCount; }\r
+ public int getAcceptorThreadCount() { return acceptorThreadCount; }\r
+\r
+\r
+ /**\r
+ * Sendfile thread count.\r
+ */\r
+ protected int sendfileThreadCount = 0;\r
+ public void setSendfileThreadCount(int sendfileThreadCount) { this.sendfileThreadCount = sendfileThreadCount; }\r
+ public int getSendfileThreadCount() { return sendfileThreadCount; }\r
+\r
+\r
+ /**\r
+ * Poller thread count.\r
+ */\r
+ protected int pollerThreadCount = 0;\r
+ public void setPollerThreadCount(int pollerThreadCount) { this.pollerThreadCount = pollerThreadCount; }\r
+ public int getPollerThreadCount() { return pollerThreadCount; }\r
+\r
+ protected long selectorTimeout = 5000;\r
+ public void setSelectorTimeout(long timeout){ this.selectorTimeout = timeout;}\r
+ public long getSelectorTimeout(){ return this.selectorTimeout; }\r
+ /**\r
+ * The socket poller.\r
+ */\r
+ protected Poller[] pollers = null;\r
+ protected int pollerRoundRobin = 0;\r
+ public Poller getPoller() {\r
+ pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length;\r
+ Poller poller = pollers[pollerRoundRobin];\r
+ poller.comet = false;\r
+ return poller;\r
+ }\r
+\r
+\r
+ /**\r
+ * The socket poller used for Comet support.\r
+ */\r
+ public Poller getCometPoller() {\r
+ Poller poller = getPoller();\r
+ poller.comet = true;\r
+ return poller;\r
+ }\r
+\r
+\r
+ /**\r
+ * The static file sender.\r
+ */\r
+ protected Sendfile[] sendfiles = null;\r
+ protected int sendfileRoundRobin = 0;\r
+ public Sendfile getSendfile() {\r
+ sendfileRoundRobin = (sendfileRoundRobin + 1) % sendfiles.length;\r
+ return sendfiles[sendfileRoundRobin];\r
+ }\r
+\r
+\r
+ /**\r
+ * Dummy maxSpareThreads property.\r
+ */\r
+ public int getMaxSpareThreads() { return 0; }\r
+\r
+\r
+ /**\r
+ * Dummy minSpareThreads property.\r
+ */\r
+ public int getMinSpareThreads() { return 0; }\r
+\r
+\r
+ /**\r
+ * SSL engine.\r
+ */\r
+ protected String SSLEngine = "off";\r
+ public String getSSLEngine() { return SSLEngine; }\r
+ public void setSSLEngine(String SSLEngine) { this.SSLEngine = SSLEngine; }\r
+\r
+\r
+ /**\r
+ * SSL protocols.\r
+ */\r
+ protected String SSLProtocol = "all";\r
+ public String getSSLProtocol() { return SSLProtocol; }\r
+ public void setSSLProtocol(String SSLProtocol) { this.SSLProtocol = SSLProtocol; }\r
+\r
+\r
+ /**\r
+ * SSL password (if a cert is encrypted, and no password has been provided, a callback\r
+ * will ask for a password).\r
+ */\r
+ protected String SSLPassword = null;\r
+ public String getSSLPassword() { return SSLPassword; }\r
+ public void setSSLPassword(String SSLPassword) { this.SSLPassword = SSLPassword; }\r
+\r
+\r
+ /**\r
+ * SSL cipher suite.\r
+ */\r
+ protected String SSLCipherSuite = "ALL";\r
+ public String getSSLCipherSuite() { return SSLCipherSuite; }\r
+ public void setSSLCipherSuite(String SSLCipherSuite) { this.SSLCipherSuite = SSLCipherSuite; }\r
+\r
+\r
+ /**\r
+ * SSL certificate file.\r
+ */\r
+ protected String SSLCertificateFile = null;\r
+ public String getSSLCertificateFile() { return SSLCertificateFile; }\r
+ public void setSSLCertificateFile(String SSLCertificateFile) { this.SSLCertificateFile = SSLCertificateFile; }\r
+\r
+\r
+ /**\r
+ * SSL certificate key file.\r
+ */\r
+ protected String SSLCertificateKeyFile = null;\r
+ public String getSSLCertificateKeyFile() { return SSLCertificateKeyFile; }\r
+ public void setSSLCertificateKeyFile(String SSLCertificateKeyFile) { this.SSLCertificateKeyFile = SSLCertificateKeyFile; }\r
+\r
+\r
+ /**\r
+ * SSL certificate chain file.\r
+ */\r
+ protected String SSLCertificateChainFile = null;\r
+ public String getSSLCertificateChainFile() { return SSLCertificateChainFile; }\r
+ public void setSSLCertificateChainFile(String SSLCertificateChainFile) { this.SSLCertificateChainFile = SSLCertificateChainFile; }\r
+\r
+\r
+ /**\r
+ * SSL CA certificate path.\r
+ */\r
+ protected String SSLCACertificatePath = null;\r
+ public String getSSLCACertificatePath() { return SSLCACertificatePath; }\r
+ public void setSSLCACertificatePath(String SSLCACertificatePath) { this.SSLCACertificatePath = SSLCACertificatePath; }\r
+\r
+\r
+ /**\r
+ * SSL CA certificate file.\r
+ */\r
+ protected String SSLCACertificateFile = null;\r
+ public String getSSLCACertificateFile() { return SSLCACertificateFile; }\r
+ public void setSSLCACertificateFile(String SSLCACertificateFile) { this.SSLCACertificateFile = SSLCACertificateFile; }\r
+\r
+\r
+ /**\r
+ * SSL CA revocation path.\r
+ */\r
+ protected String SSLCARevocationPath = null;\r
+ public String getSSLCARevocationPath() { return SSLCARevocationPath; }\r
+ public void setSSLCARevocationPath(String SSLCARevocationPath) { this.SSLCARevocationPath = SSLCARevocationPath; }\r
+\r
+\r
+ /**\r
+ * SSL CA revocation file.\r
+ */\r
+ protected String SSLCARevocationFile = null;\r
+ public String getSSLCARevocationFile() { return SSLCARevocationFile; }\r
+ public void setSSLCARevocationFile(String SSLCARevocationFile) { this.SSLCARevocationFile = SSLCARevocationFile; }\r
+\r
+\r
+ /**\r
+ * SSL verify client.\r
+ */\r
+ protected String SSLVerifyClient = "none";\r
+ public String getSSLVerifyClient() { return SSLVerifyClient; }\r
+ public void setSSLVerifyClient(String SSLVerifyClient) { this.SSLVerifyClient = SSLVerifyClient; }\r
+\r
+\r
+ /**\r
+ * SSL verify depth.\r
+ */\r
+ protected int SSLVerifyDepth = 10;\r
+ public int getSSLVerifyDepth() { return SSLVerifyDepth; }\r
+ public void setSSLVerifyDepth(int SSLVerifyDepth) { this.SSLVerifyDepth = SSLVerifyDepth; }\r
+\r
+\r
+ // --------------------------------------------------------- Public Methods\r
+\r
+\r
+ /**\r
+ * Number of keepalive sockets.\r
+ */\r
+ public int getKeepAliveCount() {\r
+ if (pollers == null) {\r
+ return 0;\r
+ } else {\r
+ int keepAliveCount = 0;\r
+ for (int i = 0; i < pollers.length; i++) {\r
+ keepAliveCount += pollers[i].getKeepAliveCount();\r
+ }\r
+ return keepAliveCount;\r
+ }\r
+ }\r
+\r
+\r
+ /**\r
+ * Number of sendfile sockets.\r
+ */\r
+ public int getSendfileCount() {\r
+ if (sendfiles == null) {\r
+ return 0;\r
+ } else {\r
+ int sendfileCount = 0;\r
+ for (int i = 0; i < sendfiles.length; i++) {\r
+ sendfileCount += sendfiles[i].getSendfileCount();\r
+ }\r
+ return sendfileCount;\r
+ }\r
+ }\r
+\r
+\r
+ /**\r
+ * Return the amount of threads that are managed by the pool.\r
+ *\r
+ * @return the amount of threads that are managed by the pool\r
+ */\r
+ public int getCurrentThreadCount() {\r
+ return curThreads;\r
+ }\r
+\r
+\r
+ /**\r
+ * Return the amount of threads currently busy.\r
+ *\r
+ * @return the amount of threads currently busy\r
+ */\r
+ public int getCurrentThreadsBusy() {\r
+ return curThreadsBusy;\r
+ }\r
+\r
+\r
+ /**\r
+ * Return the state of the endpoint.\r
+ *\r
+ * @return true if the endpoint is running, false otherwise\r
+ */\r
+ public boolean isRunning() {\r
+ return running;\r
+ }\r
+\r
+\r
+ /**\r
+ * Return the state of the endpoint.\r
+ *\r
+ * @return true if the endpoint is paused, false otherwise\r
+ */\r
+ public boolean isPaused() {\r
+ return paused;\r
+ }\r
+\r
+\r
+ // ----------------------------------------------- Public Lifecycle Methods\r
+\r
+\r
+ /**\r
+ * Initialize the endpoint.\r
+ */\r
+ public void init()\r
+ throws Exception {\r
+\r
+ if (initialized)\r
+ return;\r
+\r
+ serverSock = ServerSocketChannel.open();\r
+ InetSocketAddress addr = (address!=null?new InetSocketAddress(address,port):new InetSocketAddress(port));\r
+ serverSock.socket().bind(addr,100); //todo, set backlog value\r
+ serverSock.configureBlocking(true); //mimic APR behavior\r
+ // Sendfile usage on systems which don't support it cause major problems\r
+ if (useSendfile) {\r
+ log.warn(sm.getString("endpoint.sendfile.nosupport"));\r
+ useSendfile = false;\r
+ }\r
+\r
+ // Initialize thread count defaults for acceptor, poller and sendfile\r
+ if (acceptorThreadCount == 0) {\r
+ // FIXME: Doesn't seem to work that well with multiple accept threads\r
+ acceptorThreadCount = 1;\r
+ }\r
+ if (pollerThreadCount != 1) {\r
+ // limit to one poller, no need for others\r
+ pollerThreadCount = 1;\r
+ }\r
+ if (sendfileThreadCount != 0) {\r
+ sendfileThreadCount = 0;\r
+ }\r
+\r
+ // Initialize SSL if needed\r
+ if (!"off".equalsIgnoreCase(SSLEngine)) {\r
+ // Initialize SSL\r
+ // FIXME: one per VM call ?\r
+ if ("on".equalsIgnoreCase(SSLEngine)) {\r
+ SSL.initialize(null);\r
+ } else {\r
+ SSL.initialize(SSLEngine);\r
+ }\r
+ // SSL protocol\r
+ int value = SSL.SSL_PROTOCOL_ALL;\r
+ if ("SSLv2".equalsIgnoreCase(SSLProtocol)) {\r
+ value = SSL.SSL_PROTOCOL_SSLV2;\r
+ } else if ("SSLv3".equalsIgnoreCase(SSLProtocol)) {\r
+ value = SSL.SSL_PROTOCOL_SSLV3;\r
+ } else if ("TLSv1".equalsIgnoreCase(SSLProtocol)) {\r
+ value = SSL.SSL_PROTOCOL_TLSV1;\r
+ } else if ("SSLv2+SSLv3".equalsIgnoreCase(SSLProtocol)) {\r
+ value = SSL.SSL_PROTOCOL_SSLV2 | SSL.SSL_PROTOCOL_SSLV3;\r
+ }\r
+// // Create SSL Context\r
+// sslContext = SSLContext.make(rootPool, value, SSL.SSL_MODE_SERVER);\r
+// // List the ciphers that the client is permitted to negotiate\r
+// SSLContext.setCipherSuite(sslContext, SSLCipherSuite);\r
+// // Load Server key and certificate\r
+// SSLContext.setCertificate(sslContext, SSLCertificateFile, SSLCertificateKeyFile, SSLPassword, SSL.SSL_AIDX_RSA);\r
+// // Set certificate chain file\r
+// SSLContext.setCertificateChainFile(sslContext, SSLCertificateChainFile, false);\r
+// // Support Client Certificates\r
+// SSLContext.setCACertificate(sslContext, SSLCACertificateFile, SSLCACertificatePath);\r
+// // Set revocation\r
+// SSLContext.setCARevocation(sslContext, SSLCARevocationFile, SSLCARevocationPath);\r
+// // Client certificate verification\r
+// value = SSL.SSL_CVERIFY_NONE;\r
+// if ("optional".equalsIgnoreCase(SSLVerifyClient)) {\r
+// value = SSL.SSL_CVERIFY_OPTIONAL;\r
+// } else if ("require".equalsIgnoreCase(SSLVerifyClient)) {\r
+// value = SSL.SSL_CVERIFY_REQUIRE;\r
+// } else if ("optionalNoCA".equalsIgnoreCase(SSLVerifyClient)) {\r
+// value = SSL.SSL_CVERIFY_OPTIONAL_NO_CA;\r
+// }\r
+// SSLContext.setVerify(sslContext, value, SSLVerifyDepth);\r
+ // For now, sendfile is not supported with SSL\r
+ useSendfile = false;\r
+ }\r
+\r
+ initialized = true;\r
+\r
+ }\r
+\r
+\r
+ /**\r
+ * Start the APR endpoint, creating acceptor, poller and sendfile threads.\r
+ */\r
+ public void start()\r
+ throws Exception {\r
+ // Initialize socket if not done before\r
+ if (!initialized) {\r
+ init();\r
+ }\r
+ if (!running) {\r
+ running = true;\r
+ paused = false;\r
+\r
+ // Create worker collection\r
+ if (executor == null) {\r
+ workers = new WorkerStack(maxThreads);\r
+ }\r
+\r
+ // Start acceptor threads\r
+ for (int i = 0; i < acceptorThreadCount; i++) {\r
+ Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i);\r
+ acceptorThread.setPriority(threadPriority);\r
+ acceptorThread.setDaemon(daemon);\r
+ acceptorThread.start();\r
+ }\r
+\r
+ // Start poller threads\r
+ pollers = new Poller[pollerThreadCount];\r
+ for (int i = 0; i < pollerThreadCount; i++) {\r
+ pollers[i] = new Poller(false);\r
+ pollers[i].init();\r
+ Thread pollerThread = new Thread(pollers[i], getName() + "-Poller-" + i);\r
+ pollerThread.setPriority(threadPriority);\r
+ pollerThread.setDaemon(true);\r
+ pollerThread.start();\r
+ }\r
+\r
+ // Start sendfile threads\r
+ if (useSendfile) {\r
+ sendfiles = new Sendfile[sendfileThreadCount];\r
+ for (int i = 0; i < sendfileThreadCount; i++) {\r
+ sendfiles[i] = new Sendfile();\r
+ sendfiles[i].init();\r
+ Thread sendfileThread = new Thread(sendfiles[i], getName() + "-Sendfile-" + i);\r
+ sendfileThread.setPriority(threadPriority);\r
+ sendfileThread.setDaemon(true);\r
+ sendfileThread.start();\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+\r
+ /**\r
+ * Pause the endpoint, which will make it stop accepting new sockets.\r
+ */\r
+ public void pause() {\r
+ if (running && !paused) {\r
+ paused = true;\r
+ unlockAccept();\r
+ }\r
+ }\r
+\r
+\r
+ /**\r
+ * Resume the endpoint, which will make it start accepting new sockets\r
+ * again.\r
+ */\r
+ public void resume() {\r
+ if (running) {\r
+ paused = false;\r
+ }\r
+ }\r
+\r
+\r
+ /**\r
+ * Stop the endpoint. This will cause all processing threads to stop.\r
+ */\r
+ public void stop() {\r
+ if (running) {\r
+ running = false;\r
+ unlockAccept();\r
+ for (int i = 0; i < pollers.length; i++) {\r
+ pollers[i].destroy();\r
+ }\r
+ pollers = null;\r
+ if (useSendfile) {\r
+ for (int i = 0; i < sendfiles.length; i++) {\r
+ sendfiles[i].destroy();\r
+ }\r
+ sendfiles = null;\r
+ }\r
+ }\r
+ }\r
+\r
+\r
+ /**\r
+ * Deallocate APR memory pools, and close server socket.\r
+ */\r
+ public void destroy() throws Exception {\r
+ if (running) {\r
+ stop();\r
+ }\r
+ // Close server socket\r
+ serverSock.socket().close();\r
+ serverSock.close();\r
+ serverSock = null;\r
+ sslContext = 0;\r
+ initialized = false;\r
+ }\r
+\r
+\r
+ // ------------------------------------------------------ Protected Methods\r
+\r
+\r
+ /**\r
+ * Get a sequence number used for thread naming.\r
+ */\r
+ protected int getSequence() {\r
+ return sequence++;\r
+ }\r
+\r
+\r
+ /**\r
+ * Unlock the server socket accept using a bugus connection.\r
+ */\r
+ protected void unlockAccept() {\r
+ java.net.Socket s = null;\r
+ try {\r
+ // Need to create a connection to unlock the accept();\r
+ if (address == null) {\r
+ s = new java.net.Socket("127.0.0.1", port);\r
+ } else {\r
+ s = new java.net.Socket(address, port);\r
+ // setting soLinger to a small value will help shutdown the\r
+ // connection quicker\r
+ s.setSoLinger(true, 0);\r
+ }\r
+ } catch(Exception e) {\r
+ if (log.isDebugEnabled()) {\r
+ log.debug(sm.getString("endpoint.debug.unlock", "" + port), e);\r
+ }\r
+ } finally {\r
+ if (s != null) {\r
+ try {\r
+ s.close();\r
+ } catch (Exception e) {\r
+ // Ignore\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+\r
+ /**\r
+ * Process the specified connection.\r
+ */\r
+ protected boolean setSocketOptions(SocketChannel socket) {\r
+ // Process the connection\r
+ int step = 1;\r
+ try {\r
+ //disable blocking, APR style, we are gonna be polling it\r
+ socket.configureBlocking(false);\r
+\r
+ // 1: Set socket options: timeout, linger, etc\r
+ if (soLinger >= 0)\r
+ socket.socket().setSoLinger(true,soLinger);\r
+ if (tcpNoDelay)\r
+ socket.socket().setTcpNoDelay(true);\r
+ if (soTimeout > 0)\r
+ socket.socket().setSoTimeout(soTimeout);\r
+\r
+\r
+ // 2: SSL handshake\r
+ step = 2;\r
+ if (sslContext != 0) {\r
+// SSLSocket.attach(sslContext, socket);\r
+// if (SSLSocket.handshake(socket) != 0) {\r
+// if (log.isDebugEnabled()) {\r
+// log.debug(sm.getString("endpoint.err.handshake") + ": " + SSL.getLastError());\r
+// }\r
+// return false;\r
+// }\r
+ }\r
+ \r
+ getPoller().register(socket);\r
+\r
+ } catch (Throwable t) {\r
+ if (log.isDebugEnabled()) {\r
+ if (step == 2) {\r
+ log.debug(sm.getString("endpoint.err.handshake"), t);\r
+ } else {\r
+ log.debug(sm.getString("endpoint.err.unexpected"), t);\r
+ }\r
+ }\r
+ // Tell to close the socket\r
+ return false;\r
+ }\r
+ return true;\r
+ }\r
+\r
+\r
+ /**\r
+ * Create (or allocate) and return an available processor for use in\r
+ * processing a specific HTTP request, if possible. If the maximum\r
+ * allowed processors have already been created and are in use, return\r
+ * <code>null</code> instead.\r
+ */\r
+ protected Worker createWorkerThread() {\r
+\r
+ synchronized (workers) {\r
+ if (workers.size() > 0) {\r
+ curThreadsBusy++;\r
+ return (workers.pop());\r
+ }\r
+ if ((maxThreads > 0) && (curThreads < maxThreads)) {\r
+ curThreadsBusy++;\r
+ return (newWorkerThread());\r
+ } else {\r
+ if (maxThreads < 0) {\r
+ curThreadsBusy++;\r
+ return (newWorkerThread());\r
+ } else {\r
+ return (null);\r
+ }\r
+ }\r
+ }\r
+\r
+ }\r
+\r
+\r
+ /**\r
+ * Create and return a new processor suitable for processing HTTP\r
+ * requests and returning the corresponding responses.\r
+ */\r
+ protected Worker newWorkerThread() {\r
+\r
+ Worker workerThread = new Worker();\r
+ workerThread.start();\r
+ return (workerThread);\r
+\r
+ }\r
+\r
+\r
+ /**\r
+ * Return a new worker thread, and block while to worker is available.\r
+ */\r
+ protected Worker getWorkerThread() {\r
+ // Allocate a new worker thread\r
+ Worker workerThread = createWorkerThread();\r
+ while (workerThread == null) {\r
+ try {\r
+ synchronized (workers) {\r
+ workers.wait();\r
+ }\r
+ } catch (InterruptedException e) {\r
+ // Ignore\r
+ }\r
+ workerThread = createWorkerThread();\r
+ }\r
+ return workerThread;\r
+ }\r
+\r
+\r
+ /**\r
+ * Recycle the specified Processor so that it can be used again.\r
+ *\r
+ * @param workerThread The processor to be recycled\r
+ */\r
+ protected void recycleWorkerThread(Worker workerThread) {\r
+ synchronized (workers) {\r
+ workers.push(workerThread);\r
+ curThreadsBusy--;\r
+ workers.notify();\r
+ }\r
+ }\r
+\r
+\r
+ /**\r
+ * Allocate a new poller of the specified size.\r
+ */\r
+ protected long allocatePoller(int size, long pool, int timeout) {\r
+ try {\r
+ return Poll.create(size, pool, 0, timeout * 1000);\r
+ } catch (Error e) {\r
+ if (Status.APR_STATUS_IS_EINVAL(e.getError())) {\r
+ log.info(sm.getString("endpoint.poll.limitedpollsize", "" + size));\r
+ return 0;\r
+ } else {\r
+ log.error(sm.getString("endpoint.poll.initfail"), e);\r
+ return -1;\r
+ }\r
+ }\r
+ }\r
+\r
+\r
+ /**\r
+ * Process given socket.\r
+ */\r
+ protected boolean processSocket(SocketChannel socket) {\r
+ try {\r
+ if (executor == null) {\r
+ getWorkerThread().assign(socket);\r
+ } else {\r
+ executor.execute(new SocketProcessor(socket));\r
+ }\r
+ } catch (Throwable t) {\r
+ // This means we got an OOM or similar creating a thread, or that\r
+ // the pool and its queue are full\r
+ log.error(sm.getString("endpoint.process.fail"), t);\r
+ return false;\r
+ }\r
+ return true;\r
+ }\r
+\r
+\r
+ /**\r
+ * Process given socket for an event.\r
+ */\r
+ protected boolean processSocket(SocketChannel socket, boolean error) {\r
+ try {\r
+ if (executor == null) {\r
+ getWorkerThread().assign(socket, error);\r
+ } else {\r
+ executor.execute(new SocketEventProcessor(socket, error));\r
+ }\r
+ } catch (Throwable t) {\r
+ // This means we got an OOM or similar creating a thread, or that\r
+ // the pool and its queue are full\r
+ log.error(sm.getString("endpoint.process.fail"), t);\r
+ return false;\r
+ }\r
+ return true;\r
+ }\r
+\r
+\r
+ // --------------------------------------------------- Acceptor Inner Class\r
+\r
+\r
+ /**\r
+ * Server socket acceptor thread.\r
+ */\r
+ protected class Acceptor implements Runnable {\r
+\r
+\r
+ /**\r
+ * The background thread that listens for incoming TCP/IP connections and\r
+ * hands them off to an appropriate processor.\r
+ */\r
+ public void run() {\r
+\r
+ // Loop until we receive a shutdown command\r
+ while (running) {\r
+\r
+ // Loop if endpoint is paused\r
+ while (paused) {\r
+ try {\r
+ Thread.sleep(1000);\r
+ } catch (InterruptedException e) {\r
+ // Ignore\r
+ }\r
+ }\r
+\r
+ try {\r
+ // Accept the next incoming connection from the server socket\r
+ SocketChannel socket = serverSock.accept();\r
+ // Hand this socket off to an appropriate processor\r
+ if(!setSocketOptions(socket))\r
+ {\r
+ // Close socket right away\r
+ socket.socket().close();\r
+ socket.close();\r
+ }\r
+ } catch (Throwable t) {\r
+ log.error(sm.getString("endpoint.accept.fail"), t);\r
+ }\r
+\r
+ // The processor will recycle itself when it finishes\r
+\r
+ }\r
+\r
+ }\r
+\r
+ }\r
+\r
+\r
+ // ----------------------------------------------------- Poller Inner Class\r
+\r
+\r
+ /**\r
+ * Poller class.\r
+ */\r
+ public class Poller implements Runnable {\r
+\r
+ protected Selector selector;\r
+ protected LinkedList<Runnable> events = new LinkedList<Runnable>();\r
+ protected boolean close = false;\r
+ protected boolean comet = true;\r
+\r
+ protected int keepAliveCount = 0;\r
+ public int getKeepAliveCount() { return keepAliveCount; }\r
+\r
+\r
+\r
+ public Poller(boolean comet) throws IOException {\r
+ this.comet = comet;\r
+ this.selector = Selector.open();\r
+ }\r
+ \r
+ public Selector getSelector() { return selector;}\r
+\r
+ /**\r
+ * Create the poller. With some versions of APR, the maximum poller size will\r
+ * be 62 (reocmpiling APR is necessary to remove this limitation).\r
+ */\r
+ protected void init() {\r
+ keepAliveCount = 0;\r
+ }\r
+\r
+ /**\r
+ * Destroy the poller.\r
+ */\r
+ protected void destroy() {\r
+ // Wait for polltime before doing anything, so that the poller threads\r
+ // exit, otherwise parallel descturction of sockets which are still\r
+ // in the poller can cause problems\r
+ try {\r
+ synchronized (this) {\r
+ this.wait(pollTime / 1000);\r
+ }\r
+ } catch (InterruptedException e) {\r
+ // Ignore\r
+ }\r
+ close = true;\r
+ }\r
+ \r
+ public void addEvent(Runnable event) {\r
+ synchronized (events) {\r
+ events.add(event);\r
+ }\r
+ selector.wakeup();\r
+ }\r
+\r
+ /**\r
+ * Add specified socket and associated pool to the poller. The socket will\r
+ * be added to a temporary array, and polled first after a maximum amount\r
+ * of time equal to pollTime (in most cases, latency will be much lower,\r
+ * however).\r
+ *\r
+ * @param socket to add to the poller\r
+ */\r
+ public void add(final SocketChannel socket) {\r
+ final SelectionKey key = socket.keyFor(selector);\r
+ KeyAttachment att = (KeyAttachment)key.attachment();\r
+ if ( att != null ) att.setWakeUp(false);\r
+ Runnable r = new Runnable() {\r
+ public void run() {\r
+ if ( key != null ) key.interestOps(SelectionKey.OP_READ);\r
+ }\r
+ };\r
+ addEvent(r);\r
+ }\r
+\r
+ public void events() {\r
+ synchronized (events) {\r
+ Runnable r = null;\r
+ while ( (events.size() > 0) && (r = events.removeFirst()) != null ) {\r
+ try {\r
+ r.run();\r
+ } catch ( Exception x ) {\r
+ log.error("",x);\r
+ }\r
+ }\r
+ events.clear();\r
+ }\r
+ }\r
+ \r
+ public void register(final SocketChannel socket)\r
+ {\r
+ SelectionKey key = socket.keyFor(selector);\r
+ Runnable r = new Runnable() {\r
+ public void run() {\r
+ try {\r
+ socket.register(selector, SelectionKey.OP_READ, new KeyAttachment());\r
+ } catch (Exception x) {\r
+ log.error("", x);\r
+ }\r
+ }\r
+ \r
+ };\r
+ synchronized (events) {\r
+ events.add(r);\r
+ }\r
+ selector.wakeup();\r
+ }\r
+ \r
+ public void cancelledKey(SelectionKey key) {\r
+ try {\r
+ KeyAttachment ka = (KeyAttachment) key.attachment();\r
+ key.cancel();\r
+ if (ka.getComet()) processSocket( (SocketChannel) key.channel(), true);\r
+ key.channel().close();\r
+ } catch (IOException e) {\r
+ if ( log.isDebugEnabled() ) log.debug("",e);\r
+ // Ignore\r
+ }\r
+ }\r
+ /**\r
+ * The background thread that listens for incoming TCP/IP connections and\r
+ * hands them off to an appropriate processor.\r
+ */\r
+ public void run() {\r
+\r
+ // Loop until we receive a shutdown command\r
+ while (running) {\r
+ // Loop if endpoint is paused\r
+ while (paused) {\r
+ try {\r
+ Thread.sleep(1000);\r
+ } catch (InterruptedException e) {\r
+ // Ignore\r
+ }\r
+ }\r
+\r
+ events();\r
+ // Time to terminate?\r
+ if (close) return;\r
+\r
+ int keyCount = 0;\r
+ try {\r
+ keyCount = selector.select(selectorTimeout);\r
+ } catch (IOException x) {\r
+ log.error("",x);\r
+ continue;\r
+ }\r
+ //timeout\r
+ Set keys = selector.keys();\r
+ long now = System.currentTimeMillis();\r
+ for (Iterator iter = keys.iterator(); iter.hasNext(); ) {\r
+ SelectionKey key = (SelectionKey) iter.next();\r
+ try {\r
+ if (key.interestOps() == SelectionKey.OP_READ) {\r
+ //only timeout sockets that we are waiting for a read from\r
+ KeyAttachment ka = (KeyAttachment) key.attachment();\r
+ long delta = now - ka.getLastAccess();\r
+ if (delta > (long) soTimeout) {\r
+ cancelledKey(key);\r
+ }\r
+ }\r
+ }catch ( CancelledKeyException ckx ) {\r
+ cancelledKey(key);\r
+ }\r
+ }\r
+ \r
+\r
+ if (keyCount == 0) continue;\r
+\r
+ Iterator iterator = selector.selectedKeys().iterator();\r
+ // Walk through the collection of ready keys and dispatch\r
+ // any active event.\r
+ while (iterator.hasNext()) {\r
+ SelectionKey sk = (SelectionKey) iterator.next();\r
+ iterator.remove();\r
+ KeyAttachment attachment = (KeyAttachment)sk.attachment();\r
+ try {\r
+ if(attachment == null) attachment = new KeyAttachment();\r
+ attachment.access();\r
+ sk.attach(attachment);\r
+\r
+ int readyOps = sk.readyOps();\r
+ sk.interestOps(sk.interestOps() & ~readyOps);\r
+ SocketChannel channel = (SocketChannel)sk.channel();\r
+ boolean read = sk.isReadable();\r
+ if (read) {\r
+ if ( attachment.getWakeUp() ) {\r
+ attachment.setWakeUp(false);\r
+ synchronized (attachment.getMutex()) {attachment.getMutex().notifyAll();}\r
+ } else if ( comet ) {\r
+ if (!processSocket(channel,false)) processSocket(channel,true);\r
+ } else {\r
+ boolean close = (!processSocket(channel));\r
+ if ( close ) {\r
+ channel.socket().close();\r
+ channel.close();\r
+ }\r
+ }\r
+ }\r
+ if (sk.isValid() && sk.isWritable()) {\r
+ }\r
+ } catch ( CancelledKeyException ckx ) {\r
+ if (attachment!=null && attachment.getComet()) processSocket( (SocketChannel) sk.channel(), true);\r
+ try {\r
+ sk.channel().close();\r
+ }catch ( Exception ignore){}\r
+ } catch (Throwable t) {\r
+ log.error("",t);\r
+ }\r
+ }//while\r
+\r
+ \r
+ }\r
+ synchronized (this) {\r
+ this.notifyAll();\r
+ }\r
+\r
+ }\r
+\r
+ }\r
+ \r
+ public static class KeyAttachment {\r
+\r
+ public long getLastAccess() { return lastAccess; }\r
+ public void access() { access(System.currentTimeMillis()); }\r
+ public void access(long access) { lastAccess = access; }\r
+ public void setComet(boolean comet) { this.comet = comet; }\r
+ public boolean getComet() { return comet; }\r
+ public boolean getCurrentAccess() { return currentAccess; }\r
+ public void setCurrentAccess(boolean access) { currentAccess = access; }\r
+ public boolean getWakeUp() { return wakeUp; }\r
+ public void setWakeUp(boolean wakeUp) { this.wakeUp = wakeUp; }\r
+ public Object getMutex() {return mutex;}\r
+ protected Object mutex = new Object();\r
+ protected boolean wakeUp = false;\r
+ protected long lastAccess = System.currentTimeMillis();\r
+ protected boolean currentAccess = false;\r
+ protected boolean comet = false;\r
+\r
+ }\r
+\r
+\r
+\r
+ // ----------------------------------------------------- Worker Inner Class\r
+\r
+\r
+ /**\r
+ * Server processor class.\r
+ */\r
+ protected class Worker implements Runnable {\r
+\r
+\r
+ protected Thread thread = null;\r
+ protected boolean available = false;\r
+ protected SocketChannel socket = null;\r
+ protected boolean event = false;\r
+ protected boolean error = false;\r
+\r
+\r
+ /**\r
+ * Process an incoming TCP/IP connection on the specified socket. Any\r
+ * exception that occurs during processing must be logged and swallowed.\r
+ * <b>NOTE</b>: This method is called from our Connector's thread. We\r
+ * must assign it to our own thread so that multiple simultaneous\r
+ * requests can be handled.\r
+ *\r
+ * @param socket TCP socket to process\r
+ */\r
+ protected synchronized void assign(SocketChannel socket) {\r
+\r
+ // Wait for the Processor to get the previous Socket\r
+ while (available) {\r
+ try {\r
+ wait();\r
+ } catch (InterruptedException e) {\r
+ }\r
+ }\r
+\r
+ // Store the newly available Socket and notify our thread\r
+ this.socket = socket;\r
+ event = false;\r
+ error = false;\r
+ available = true;\r
+ notifyAll();\r
+\r
+ }\r
+\r
+\r
+ protected synchronized void assign(SocketChannel socket, boolean error) {\r
+\r
+ // Wait for the Processor to get the previous Socket\r
+ while (available) {\r
+ try {\r
+ wait();\r
+ } catch (InterruptedException e) {\r
+ }\r
+ }\r
+\r
+ // Store the newly available Socket and notify our thread\r
+ this.socket = socket;\r
+ event = true;\r
+ this.error = error;\r
+ available = true;\r
+ notifyAll();\r
+ }\r
+\r
+\r
+ /**\r
+ * Await a newly assigned Socket from our Connector, or <code>null</code>\r
+ * if we are supposed to shut down.\r
+ */\r
+ protected synchronized SocketChannel await() {\r
+\r
+ // Wait for the Connector to provide a new Socket\r
+ while (!available) {\r
+ try {\r
+ wait();\r
+ } catch (InterruptedException e) {\r
+ }\r
+ }\r
+\r
+ // Notify the Connector that we have received this Socket\r
+ SocketChannel socket = this.socket;\r
+ available = false;\r
+ notifyAll();\r
+\r
+ return (socket);\r
+\r
+ }\r
+\r
+\r
+ /**\r
+ * The background thread that listens for incoming TCP/IP connections and\r
+ * hands them off to an appropriate processor.\r
+ */\r
+ public void run() {\r
+\r
+ // Process requests until we receive a shutdown signal\r
+ while (running) {\r
+\r
+ // Wait for the next socket to be assigned\r
+ SocketChannel socket = await();\r
+ if (socket == null)\r
+ continue;\r
+\r
+ // Process the request from this socket\r
+ if ((event) && (handler.event(socket, error) == Handler.SocketState.CLOSED)) {\r
+ // Close socket and pool\r
+ try {\r
+ socket.socket().close();\r
+ socket.close();\r
+ }catch ( Exception x ) {\r
+ log.error("",x);\r
+ }\r
+ } else if ((!event) && (handler.process(socket) == Handler.SocketState.CLOSED)) {\r
+ // Close socket and pool\r
+ try {\r
+ socket.socket().close();\r
+ socket.close();\r
+ }catch ( Exception x ) {\r
+ log.error("",x);\r
+ }\r
+ }\r
+\r
+ // Finish up this request\r
+ recycleWorkerThread(this);\r
+\r
+ }\r
+\r
+ }\r
+\r
+\r
+ /**\r
+ * Start the background processing thread.\r
+ */\r
+ public void start() {\r
+ thread = new Thread(this);\r
+ thread.setName(getName() + "-" + (++curThreads));\r
+ thread.setDaemon(true);\r
+ thread.start();\r
+ }\r
+\r
+\r
+ }\r
+\r
+\r
+ // ----------------------------------------------- SendfileData Inner Class\r
+\r
+\r
+ /**\r
+ * SendfileData class.\r
+ */\r
+ public static class SendfileData {\r
+ // File\r
+ public String fileName;\r
+ public long fd;\r
+ public long fdpool;\r
+ // Range information\r
+ public long start;\r
+ public long end;\r
+ // Socket and socket pool\r
+ public SocketChannel socket;\r
+ // Position\r
+ public long pos;\r
+ // KeepAlive flag\r
+ public boolean keepAlive;\r
+ }\r
+\r
+\r
+ // --------------------------------------------------- Sendfile Inner Class\r
+\r
+\r
+ /**\r
+ * Sendfile class.\r
+ */\r
+ public class Sendfile implements Runnable {\r
+\r
+ protected long sendfilePollset = 0;\r
+ protected long pool = 0;\r
+ protected long[] desc;\r
+ protected HashMap<Long, SendfileData> sendfileData;\r
+\r
+ protected int sendfileCount;\r
+ public int getSendfileCount() { return sendfileCount; }\r
+\r
+ protected ArrayList<SendfileData> addS;\r
+\r
+ /**\r
+ * Create the sendfile poller. With some versions of APR, the maximum poller size will\r
+ * be 62 (reocmpiling APR is necessary to remove this limitation).\r
+ */\r
+ protected void init() {\r
+// pool = Pool.create(serverSockPool);\r
+// int size = sendfileSize / sendfileThreadCount;\r
+// sendfilePollset = allocatePoller(size, pool, soTimeout);\r
+// if (sendfilePollset == 0 && size > 1024) {\r
+// size = 1024;\r
+// sendfilePollset = allocatePoller(size, pool, soTimeout);\r
+// }\r
+// if (sendfilePollset == 0) {\r
+// size = 62;\r
+// sendfilePollset = allocatePoller(size, pool, soTimeout);\r
+// }\r
+// desc = new long[size * 2];\r
+// sendfileData = new HashMap<Long, SendfileData>(size);\r
+// addS = new ArrayList<SendfileData>();\r
+ }\r
+\r
+ /**\r
+ * Destroy the poller.\r
+ */\r
+ protected void destroy() {\r
+// // Wait for polltime before doing anything, so that the poller threads\r
+// // exit, otherwise parallel descturction of sockets which are still\r
+// // in the poller can cause problems\r
+// try {\r
+// synchronized (this) {\r
+// this.wait(pollTime / 1000);\r
+// }\r
+// } catch (InterruptedException e) {\r
+// // Ignore\r
+// }\r
+// // Close any socket remaining in the add queue\r
+// for (int i = (addS.size() - 1); i >= 0; i--) {\r
+// SendfileData data = addS.get(i);\r
+// Socket.destroy(data.socket);\r
+// }\r
+// // Close all sockets still in the poller\r
+// int rv = Poll.pollset(sendfilePollset, desc);\r
+// if (rv > 0) {\r
+// for (int n = 0; n < rv; n++) {\r
+// Socket.destroy(desc[n*2+1]);\r
+// }\r
+// }\r
+// Pool.destroy(pool);\r
+// sendfileData.clear();\r
+ }\r
+\r
+ /**\r
+ * Add the sendfile data to the sendfile poller. Note that in most cases,\r
+ * the initial non blocking calls to sendfile will return right away, and\r
+ * will be handled asynchronously inside the kernel. As a result,\r
+ * the poller will never be used.\r
+ *\r
+ * @param data containing the reference to the data which should be snet\r
+ * @return true if all the data has been sent right away, and false\r
+ * otherwise\r
+ */\r
+ public boolean add(SendfileData data) {\r
+// // Initialize fd from data given\r
+// try {\r
+// data.fdpool = Socket.pool(data.socket);\r
+// data.fd = File.open\r
+// (data.fileName, File.APR_FOPEN_READ\r
+// | File.APR_FOPEN_SENDFILE_ENABLED | File.APR_FOPEN_BINARY,\r
+// 0, data.fdpool);\r
+// data.pos = data.start;\r
+// // Set the socket to nonblocking mode\r
+// Socket.timeoutSet(data.socket, 0);\r
+// while (true) {\r
+// long nw = Socket.sendfilen(data.socket, data.fd,\r
+// data.pos, data.end - data.pos, 0);\r
+// if (nw < 0) {\r
+// if (!(-nw == Status.EAGAIN)) {\r
+// Socket.destroy(data.socket);\r
+// data.socket = 0;\r
+// return false;\r
+// } else {\r
+// // Break the loop and add the socket to poller.\r
+// break;\r
+// }\r
+// } else {\r
+// data.pos = data.pos + nw;\r
+// if (data.pos >= data.end) {\r
+// // Entire file has been sent\r
+// Pool.destroy(data.fdpool);\r
+// // Set back socket to blocking mode\r
+// Socket.timeoutSet(data.socket, soTimeout * 1000);\r
+// return true;\r
+// }\r
+// }\r
+// }\r
+// } catch (Exception e) {\r
+// log.error(sm.getString("endpoint.sendfile.error"), e);\r
+// return false;\r
+// }\r
+// // Add socket to the list. Newly added sockets will wait\r
+// // at most for pollTime before being polled\r
+// synchronized (this) {\r
+// addS.add(data);\r
+// this.notify();\r
+// }\r
+ return false;\r
+ }\r
+\r
+ /**\r
+ * Remove socket from the poller.\r
+ *\r
+ * @param data the sendfile data which should be removed\r
+ */\r
+ protected void remove(SendfileData data) {\r
+// int rv = Poll.remove(sendfilePollset, data.socket);\r
+// if (rv == Status.APR_SUCCESS) {\r
+// sendfileCount--;\r
+// }\r
+// sendfileData.remove(data);\r
+ }\r
+\r
+ /**\r
+ * The background thread that listens for incoming TCP/IP connections and\r
+ * hands them off to an appropriate processor.\r
+ */\r
+ public void run() {\r
+\r
+// // Loop until we receive a shutdown command\r
+// while (running) {\r
+//\r
+// // Loop if endpoint is paused\r
+// while (paused) {\r
+// try {\r
+// Thread.sleep(1000);\r
+// } catch (InterruptedException e) {\r
+// // Ignore\r
+// }\r
+// }\r
+//\r
+// while (sendfileCount < 1 && addS.size() < 1) {\r
+// try {\r
+// synchronized (this) {\r
+// this.wait();\r
+// }\r
+// } catch (InterruptedException e) {\r
+// // Ignore\r
+// }\r
+// }\r
+//\r
+// try {\r
+// // Add socket to the poller\r
+// if (addS.size() > 0) {\r
+// synchronized (this) {\r
+// for (int i = (addS.size() - 1); i >= 0; i--) {\r
+// SendfileData data = addS.get(i);\r
+// int rv = Poll.add(sendfilePollset, data.socket, Poll.APR_POLLOUT);\r
+// if (rv == Status.APR_SUCCESS) {\r
+// sendfileData.put(new Long(data.socket), data);\r
+// sendfileCount++;\r
+// } else {\r
+// log.warn(sm.getString("endpoint.sendfile.addfail", "" + rv, Error.strerror(rv)));\r
+// // Can't do anything: close the socket right away\r
+// Socket.destroy(data.socket);\r
+// }\r
+// }\r
+// addS.clear();\r
+// }\r
+// }\r
+// // Pool for the specified interval\r
+// int rv = Poll.poll(sendfilePollset, pollTime, desc, false);\r
+// if (rv > 0) {\r
+// for (int n = 0; n < rv; n++) {\r
+// // Get the sendfile state\r
+// SendfileData state =\r
+// sendfileData.get(new Long(desc[n*2+1]));\r
+// // Problem events\r
+// if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)\r
+// || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)) {\r
+// // Close socket and clear pool\r
+// remove(state);\r
+// // Destroy file descriptor pool, which should close the file\r
+// // Close the socket, as the reponse would be incomplete\r
+// Socket.destroy(state.socket);\r
+// continue;\r
+// }\r
+// // Write some data using sendfile\r
+// long nw = Socket.sendfilen(state.socket, state.fd,\r
+// state.pos,\r
+// state.end - state.pos, 0);\r
+// if (nw < 0) {\r
+// // Close socket and clear pool\r
+// remove(state);\r
+// // Close the socket, as the reponse would be incomplete\r
+// // This will close the file too.\r
+// Socket.destroy(state.socket);\r
+// continue;\r
+// }\r
+//\r
+// state.pos = state.pos + nw;\r
+// if (state.pos >= state.end) {\r
+// remove(state);\r
+// if (state.keepAlive) {\r
+// // Destroy file descriptor pool, which should close the file\r
+// Pool.destroy(state.fdpool);\r
+// Socket.timeoutSet(state.socket, soTimeout * 1000);\r
+// // If all done hand this socket off to a worker for\r
+// // processing of further requests\r
+// if (!processSocket(state.socket)) {\r
+// Socket.destroy(state.socket);\r
+// }\r
+// } else {\r
+// // Close the socket since this is\r
+// // the end of not keep-alive request.\r
+// Socket.destroy(state.socket);\r
+// }\r
+// }\r
+// }\r
+// } else if (rv < 0) {\r
+// int errn = -rv;\r
+// /* Any non timeup or interrupted error is critical */\r
+// if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) {\r
+// if (errn > Status.APR_OS_START_USERERR) {\r
+// errn -= Status.APR_OS_START_USERERR;\r
+// }\r
+// log.error(sm.getString("endpoint.poll.fail", "" + errn, Error.strerror(errn)));\r
+// // Handle poll critical failure\r
+// synchronized (this) {\r
+// destroy();\r
+// init();\r
+// }\r
+// continue;\r
+// }\r
+// }\r
+// /* TODO: See if we need to call the maintain for sendfile poller */\r
+// } catch (Throwable t) {\r
+// log.error(sm.getString("endpoint.poll.error"), t);\r
+// }\r
+// }\r
+//\r
+// synchronized (this) {\r
+// this.notifyAll();\r
+// }\r
+\r
+ }\r
+\r
+ }\r
+\r
+\r
+ // ------------------------------------------------ Handler Inner Interface\r
+\r
+\r
+ /**\r
+ * Bare bones interface used for socket processing. Per thread data is to be\r
+ * stored in the ThreadWithAttributes extra folders, or alternately in\r
+ * thread local fields.\r
+ */\r
+ public interface Handler {\r
+ public enum SocketState {\r
+ OPEN, CLOSED, LONG\r
+ }\r
+ public SocketState process(SocketChannel socket);\r
+ public SocketState event(SocketChannel socket, boolean error);\r
+ }\r
+\r
+\r
+ // ------------------------------------------------- WorkerStack Inner Class\r
+\r
+\r
+ public class WorkerStack {\r
+\r
+ protected Worker[] workers = null;\r
+ protected int end = 0;\r
+\r
+ public WorkerStack(int size) {\r
+ workers = new Worker[size];\r
+ }\r
+\r
+ /** \r
+ * Put the object into the queue.\r
+ * \r
+ * @param object the object to be appended to the queue (first element). \r
+ */\r
+ public void push(Worker worker) {\r
+ workers[end++] = worker;\r
+ }\r
+\r
+ /**\r
+ * Get the first object out of the queue. Return null if the queue\r
+ * is empty. \r
+ */\r
+ public Worker pop() {\r
+ if (end > 0) {\r
+ return workers[--end];\r
+ }\r
+ return null;\r
+ }\r
+\r
+ /**\r
+ * Get the first object out of the queue, Return null if the queue\r
+ * is empty.\r
+ */\r
+ public Worker peek() {\r
+ return workers[end];\r
+ }\r
+\r
+ /**\r
+ * Is the queue empty?\r
+ */\r
+ public boolean isEmpty() {\r
+ return (end == 0);\r
+ }\r
+\r
+ /**\r
+ * How many elements are there in this queue?\r
+ */\r
+ public int size() {\r
+ return (end);\r
+ }\r
+ }\r
+\r
+\r
+ // ---------------------------------------------- SocketProcessor Inner Class\r
+\r
+\r
+ /**\r
+ * This class is the equivalent of the Worker, but will simply use in an\r
+ * external Executor thread pool.\r
+ */\r
+ protected class SocketProcessor implements Runnable {\r
+\r
+ protected SocketChannel socket = null;\r
+\r
+ public SocketProcessor(SocketChannel socket) {\r
+ this.socket = socket;\r
+ }\r
+\r
+ public void run() {\r
+\r
+ // Process the request from this socket\r
+ if (handler.process(socket) == Handler.SocketState.CLOSED) {\r
+ // Close socket and pool\r
+ try {\r
+ socket.socket().close();\r
+ socket.close();\r
+ } catch ( Exception x ) {\r
+ log.error("",x);\r
+ }\r
+ socket = null;\r
+ }\r
+\r
+ }\r
+\r
+ }\r
+\r
+\r
+ // --------------------------------------- SocketEventProcessor Inner Class\r
+\r
+\r
+ /**\r
+ * This class is the equivalent of the Worker, but will simply use in an\r
+ * external Executor thread pool.\r
+ */\r
+ protected class SocketEventProcessor implements Runnable {\r
+\r
+ protected SocketChannel socket = null;\r
+ protected boolean error = false; \r
+\r
+ public SocketEventProcessor(SocketChannel socket, boolean error) {\r
+ this.socket = socket;\r
+ this.error = error;\r
+ }\r
+\r
+ public void run() {\r
+\r
+ // Process the request from this socket\r
+ if (handler.event(socket, error) == Handler.SocketState.CLOSED) {\r
+ // Close socket and pool\r
+ try {\r
+ socket.socket().close();\r
+ socket.close();\r
+ } catch ( Exception x ) {\r
+ log.error("",x);\r
+ }\r
+ socket = null;\r
+ }\r
+\r
+ }\r
+\r
+ }\r
+\r
+\r
+}\r