/**
* Poller thread count.
*/
- protected int pollerThreadCount = 1;
+ protected int pollerThreadCount = Runtime.getRuntime().availableProcessors();
public void setPollerThreadCount(int pollerThreadCount) { this.pollerThreadCount = pollerThreadCount; }
public int getPollerThreadCount() { return pollerThreadCount; }
/**
* The socket poller.
*/
- protected Poller poller = null;
+ protected Poller[] pollers = null;
+ protected AtomicInteger pollerRotater = new AtomicInteger(0);
+ /**
+ * Return an available poller in true round robin fashion
+ * @return
+ */
public Poller getPoller0() {
- return poller;
+ int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
+ return pollers[idx];
}
/**
* Number of keepalive sockets.
*/
public int getKeepAliveCount() {
- if (poller == null) {
+ if (pollers == null) {
return 0;
} else {
- return poller.selector.keys().size();
+ int sum = 0;
+ for (int i=0; i<pollers.length; i++) {
+ sum += pollers[i].selector.keys().size();
+ }
+ return sum;
}
}
workers = new WorkerStack(maxThreads);
}
- // Start poller thread
- poller = new Poller();
- Thread pollerThread = new Thread(poller, getName() + "-ClientPoller");
- pollerThread.setPriority(threadPriority);
- pollerThread.setDaemon(true);
- pollerThread.start();
+ // Start poller threads
+ pollers = new Poller[getPollerThreadCount()];
+ for (int i=0; i<pollers.length; i++) {
+ pollers[i] = new Poller();
+ Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
+ pollerThread.setPriority(threadPriority);
+ pollerThread.setDaemon(true);
+ pollerThread.start();
+ }
// Start acceptor threads
for (int i = 0; i < acceptorThreadCount; i++) {
if (running) {
running = false;
unlockAccept();
- poller.destroy();
- poller = null;
+ for (int i=0; pollers!=null && i<pollers.length; i++) {
+ if (pollers[i]==null) continue;
+ pollers[i].destroy();
+ pollers[i] = null;
+ }
}
eventCache.clear();
keyCache.clear();