import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler;
import org.apache.tomcat.util.res.StringManager;
import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* NIO tailored thread pool, providing the following services:
protected ConcurrentLinkedQueue<NioChannel> nioChannels = new ConcurrentLinkedQueue<NioChannel>() {
+ protected AtomicInteger size = new AtomicInteger(0);
+ protected AtomicInteger bytes = new AtomicInteger(0);
public boolean offer(NioChannel socket) {
Poller pol = socket.getPoller();
Selector sel = pol!=null?pol.getSelector():null;
KeyAttachment att = key!=null?(KeyAttachment)key.attachment():null;
if ( att!=null ) att.reset();
if ( key!=null ) key.attach(null);
+ boolean offer = socketProperties.getBufferPool()==-1?true:size.get()<socketProperties.getBufferPool();
+ offer = offer && (socketProperties.getBufferPoolSize()==-1?true:(bytes.get()+socket.getBufferSize())<socketProperties.getBufferPoolSize());
//avoid over growing our cache or add after we have stopped
- if ( running && (!paused) && (size() < socketProperties.getDirectBufferPool()) ) return super.offer(socket);
+ if ( running && (!paused) && (offer) ) {
+ boolean result = super.offer(socket);
+ if ( result ) {
+ size.incrementAndGet();
+ bytes.addAndGet(socket.getBufferSize());
+ }
+ return result;
+ }
else return false;
}
public NioChannel poll() {
- return super.poll();
+ NioChannel result = super.poll();
+ if ( result != null ) {
+ size.decrementAndGet();
+ bytes.addAndGet(-result.getBufferSize());
+ }
+ return result;
+ }
+
+ public void clear() {
+ super.clear();
+ size.set(0);
}
};
}
- // ----------------------------------------------------- Poller Inner Class
-
+ // ----------------------------------------------------- Poller Inner Classes
/**
+ *
+ * PollerEvent, cacheable object for poller events to avoid GC
+ */
+ public class PollerEvent implements Runnable {
+ protected NioChannel socket;
+ protected int interestOps;
+ public PollerEvent(NioChannel ch, int intOps) {
+ reset(ch, intOps);
+ }
+
+ public void reset(NioChannel ch, int intOps) {
+ socket = ch;
+ interestOps = intOps;
+ }
+
+ public void reset() {
+ reset(null, 0);
+ }
+
+ public void run() {
+ final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
+ final KeyAttachment att = (KeyAttachment) key.attachment();
+ try {
+ if (key != null) {
+ key.interestOps(interestOps);
+ att.interestOps(interestOps);
+ }
+ }
+ catch (CancelledKeyException ckx) {
+ try {
+ if (key != null && key.attachment() != null) {
+ KeyAttachment ka = (KeyAttachment) key.attachment();
+ ka.setError(true); //set to collect this socket immediately
+ }
+ try {
+ socket.close();
+ }
+ catch (Exception ignore) {}
+ if (socket.isOpen())
+ socket.close(true);
+ }
+ catch (Exception ignore) {}
+ }
+ }
+ }
+ /**
* Poller class.
*/
public class Poller implements Runnable {
protected Selector selector;
- protected ConcurrentLinkedQueue events = new ConcurrentLinkedQueue();
+ protected ConcurrentLinkedQueue<Runnable> events = new ConcurrentLinkedQueue<Runnable>();
+ protected ConcurrentLinkedQueue<PollerEvent> eventCache = new ConcurrentLinkedQueue<PollerEvent>();
protected boolean close = false;
protected long nextExpiration = 0;//optimize expiration handling
* @param socket to add to the poller
*/
public void add(final NioChannel socket) {
- final SelectionKey key = socket.getIOChannel().keyFor(selector);
- final KeyAttachment att = (KeyAttachment)key.attachment();
- Runnable r = new Runnable() {
- public void run() {
- try {
- if (key != null) {
- key.interestOps(SelectionKey.OP_READ);
- att.interestOps(SelectionKey.OP_READ);
- }
- }catch ( CancelledKeyException ckx ) {
- try {
- if ( key != null && key.attachment() != null ) {
- KeyAttachment ka = (KeyAttachment)key.attachment();
- ka.setError(true); //set to collect this socket immediately
- }
- try {socket.close();}catch (Exception ignore){}
- if ( socket.isOpen() ) socket.close(true);
- } catch ( Exception ignore ) {}
- }
- }
- };
+ add(socket,SelectionKey.OP_READ);
+ }
+
+ public void add(final NioChannel socket, final int interestOps) {
+ PollerEvent r = this.eventCache.poll();
+ if ( r==null) r = new PollerEvent(socket,interestOps);
addEvent(r);
}
-
+
public boolean events() {
boolean result = false;
//synchronized (events) {
while ( (r = (Runnable)events.poll()) != null ) {
try {
r.run();
+ if ( r instanceof PollerEvent ) {
+ ((PollerEvent)r).reset();
+ eventCache.offer((PollerEvent)r);
+ }
} catch ( Exception x ) {
log.error("",x);
}
Runnable r = new Runnable() {
public void run() {
try {
-
socket.getIOChannel().register(selector, SelectionKey.OP_READ, ka);
} catch (Exception x) {
log.error("", x);
final SelectionKey fk = key;
final int intops = handshake;
final KeyAttachment ka = (KeyAttachment)fk.attachment();
- //register for handshake ops
- Runnable r = new Runnable() {
- public void run() {
- try {
- fk.interestOps(intops);
- ka.interestOps(intops);
- } catch (CancelledKeyException ckx) {
- try {
- if ( fk != null && fk.attachment() != null ) {
- ka.setError(true); //set to collect this socket immediately
- try {ka.getChannel().getIOChannel().socket().close();}catch(Exception ignore){}
- try {ka.getChannel().close();}catch(Exception ignore){}
- }
- } catch (Exception ignore) {}
- }
-
- }
- };
- ka.getPoller().addEvent(r);
+ ka.getPoller().add(socket,intops);
}
}
} finally {
import java.net.SocketException;
public class SocketProperties {
+ /**
+ * Enable/disable direct buffers for the network buffers
+ * Default value is enabled
+ */
protected boolean directBuffer = true;
+ /**
+ * Socket receive buffer size in bytes (SO_RCVBUF)
+ * Default value is 25188
+ */
protected int rxBufSize = 25188;
+ /**
+ * Socket send buffer size in bytes (SO_SNDBUF)
+ * Default value is 43800
+ */
protected int txBufSize = 43800;
- protected int directBufferPool = 500;
+ /**
+ * NioChannel pool size for the endpoint,
+ * this value is how many channels
+ * -1 means unlimited cached, 0 means no cache
+ * Default value is 500
+ */
+ protected int bufferPool = 500;
+
+
+ /**
+ * Buffer pool size in bytes to be cached
+ * -1 means unlimited, 0 means no cache
+ * Default value is 100MB (1024*1024*100 bytes)
+ */
+ protected int bufferPoolSize = 1024*1024*100;
+
+ /**
+ * TCP_NO_DELAY option, default is false
+ */
protected boolean tcpNoDelay = false;
+ /**
+ * SO_KEEPALIVE option, default is false
+ */
protected boolean soKeepAlive = false;
+ /**
+ * OOBINLINE option, default is true
+ */
protected boolean ooBInline = true;
+ /**
+ * SO_REUSEADDR option, default is true
+ */
protected boolean soReuseAddress = true;
+ /**
+ * SO_LINGER option, default is true, paired with the <code>soLingerTime</code> value
+ */
protected boolean soLingerOn = true;
+ /**
+ * SO_LINGER option, default is 25 seconds.
+ */
protected int soLingerTime = 25;
+ /**
+ * SO_TIMEOUT option, default is 5000 milliseconds
+ */
protected int soTimeout = 5000;
+ /**
+ * Traffic class option, value between 0 and 255
+ * IPTOS_LOWCOST (0x02)
+ * IPTOS_RELIABILITY (0x04)
+ * IPTOS_THROUGHPUT (0x08)
+ * IPTOS_LOWDELAY (0x10)
+ * Default value is 0x04 | 0x08 | 0x010
+ */
protected int soTrafficClass = 0x04 | 0x08 | 0x010;
+ /**
+ * Performance preferences according to
+ * http://java.sun.com/j2se/1.5.0/docs/api/java/net/Socket.html#setPerformancePreferences(int,%20int,%20int)
+ * Default value is 1
+ */
protected int performanceConnectionTime = 1;
+ /**
+ * Performance preferences according to
+ * http://java.sun.com/j2se/1.5.0/docs/api/java/net/Socket.html#setPerformancePreferences(int,%20int,%20int)
+ * Default value is 0
+ */
protected int performanceLatency = 0;
+ /**
+ * Performance preferences according to
+ * http://java.sun.com/j2se/1.5.0/docs/api/java/net/Socket.html#setPerformancePreferences(int,%20int,%20int)
+ * Default value is 1
+ */
protected int performanceBandwidth = 1;
return txBufSize;
}
+ public int getBufferPool() {
+ return bufferPool;
+ }
+
+ public int getBufferPoolSize() {
+ return bufferPoolSize;
+ }
+
public int getDirectBufferPool() {
- return directBufferPool;
+ return bufferPool;
}
public void setPerformanceConnectionTime(int performanceConnectionTime) {
this.soLingerOn = soLingerOn;
}
+ public void setBufferPool(int bufferPool) {
+ this.bufferPool = bufferPool;
+ }
+
+ public void setBufferPoolSize(int bufferPoolSize) {
+ this.bufferPoolSize = bufferPoolSize;
+ }
+
public void setDirectBufferPool(int directBufferPool) {
- this.directBufferPool = directBufferPool;
+ this.bufferPool = directBufferPool;
}
}
\ No newline at end of file