import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.Collection;
+import java.util.concurrent.ThreadFactory;
/**
* NIO tailored thread pool, providing the following services:
protected Executor executor = null;
public void setExecutor(Executor executor) { this.executor = executor; }
public Executor getExecutor() { return executor; }
-
+
+ protected boolean useExecutor = true;
+ public void setUseExecutor(boolean useexec) { useExecutor = useexec;}
+ public boolean getUseExecutor() { return useExecutor;}
/**
* Maximum amount of worker threads.
/**
- * Start the APR endpoint, creating acceptor, poller threads.
+ * Start the NIO endpoint, creating acceptor, poller threads.
*/
public void start()
throws Exception {
paused = false;
// Create worker collection
- if (executor == null) {
- //workers = new WorkerStack(maxThreads);
- executor = new ThreadPoolExecutor(getMaxThreads(),getMaxThreads(),5000,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
+ if (getUseExecutor()) {
+ if ( executor == null ) {
+ TaskQueue taskqueue = new TaskQueue();
+ TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-");
+ executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
+ taskqueue.setParent( (ThreadPoolExecutor) executor);
+ }
+ } else {
+ workers = new WorkerStack(maxThreads);
}
// Start acceptor threads
eventCache.clear();
keyCache.clear();
nioChannels.clear();
+ if ( executor!=null ) {
+ ThreadPoolExecutor tpe = (ThreadPoolExecutor)executor;
+ tpe.shutdown();
+ TaskQueue queue = (TaskQueue)tpe.getQueue();
+ queue.setParent(null);
+ executor = null;
+ }
}
*/
protected boolean isWorkerAvailable() {
if ( executor != null ) {
+// ThreadPoolExecutor tpe = (ThreadPoolExecutor)executor;
+// TaskQueue queue = (TaskQueue)tpe.getQueue();
return true;
} else {
if (workers.size() > 0) {
workers.notify();
}
}
-
-
- protected boolean processSocket(SocketChannel socket) {
- try {
- if (executor == null) {
- getWorkerThread().assign(socket);
- } else {
- executor.execute(new SocketOptionsProcessor(socket));
- }
- } catch (Throwable t) {
- // This means we got an OOM or similar creating a thread, or that
- // the pool and its queue are full
- log.error(sm.getString("endpoint.process.fail"), t);
- return false;
- }
- return true;
- }
/**
* Process given socket.
*/
if (executor == null) {
getWorkerThread().assign(socket);
} else {
- executor.execute(new SocketProcessor(socket));
+ executor.execute(new SocketProcessor(socket,null));
}
} catch (Throwable t) {
// This means we got an OOM or similar creating a thread, or that
if (executor == null) {
getWorkerThread().assign(socket, status);
} else {
- executor.execute(new SocketEventProcessor(socket, status));
+ executor.execute(new SocketProcessor(socket, status));
}
} catch (Throwable t) {
// This means we got an OOM or similar creating a thread, or that
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = (SelectionKey) iterator.next();
KeyAttachment attachment = (KeyAttachment)sk.attachment();
- if ( processKey(sk, attachment) ) {
- iterator.remove(); //only remove it if the key was processed.
- }
+ iterator.remove();
+ processKey(sk, attachment);
}//while
//process timeouts
thread = new Thread(this);
thread.setName(getName() + "-" + (++curThreads));
thread.setDaemon(true);
+ thread.setPriority(getThreadPriority());
thread.start();
}
protected class SocketProcessor implements Runnable {
protected NioChannel socket = null;
+ protected SocketStatus status = null;
- public SocketProcessor(NioChannel socket) {
+ public SocketProcessor(NioChannel socket, SocketStatus status) {
this.socket = socket;
+ this.status = status;
}
-
+
public void run() {
// Process the request from this socket
- if (handler.process(socket) == Handler.SocketState.CLOSED) {
+ boolean closed = (status==null)?(handler.process(socket)==Handler.SocketState.CLOSED) :
+ (handler.event(socket,status)==Handler.SocketState.CLOSED);
+ if (closed) {
// Close socket and pool
try {
try {socket.close();}catch (Exception ignore){}
log.error("",x);
}
socket = null;
+ status = null;
}
}
}
+ // ---------------------------------------------- TaskQueue Inner Class
+ public static class TaskQueue extends LinkedBlockingQueue<Runnable> {
+ ThreadPoolExecutor parent = null;
+
+ public TaskQueue() {
+ super();
+ }
-
- // --------------------------------------- SocketEventProcessor Inner Class
-
-
- /**
- * This class is the equivalent of the Worker, but will simply use in an
- * external Executor thread pool.
- */
- protected class SocketEventProcessor implements Runnable {
-
- protected NioChannel socket = null;
- protected SocketStatus status = null;
-
- public SocketEventProcessor(NioChannel socket, SocketStatus status) {
- this.socket = socket;
- this.status = status;
+ public TaskQueue(int initialCapacity) {
+ super(initialCapacity);
+ }
+
+ public TaskQueue(Collection<? extends Runnable> c) {
+ super(c);
}
- public void run() {
+
+ public void setParent(ThreadPoolExecutor tp) {
+ parent = tp;
+ }
+
+ public boolean offer(Runnable o) {
+ if ( parent != null && parent.getPoolSize()<parent.getMaximumPoolSize() ) return false;
+ else return super.offer(o);
+ }
+ }
- // Process the request from this socket
- if (handler.event(socket, status) == Handler.SocketState.CLOSED) {
- // Close socket and pool
- try {
- try {socket.close();}catch (Exception ignore){}
- if ( socket.isOpen() ) socket.close(true);
- } catch ( Exception x ) {
- log.error("",x);
- }
- socket = null;
- }
+ // ---------------------------------------------- ThreadFactory Inner Class
+ class TaskThreadFactory implements ThreadFactory {
+ final ThreadGroup group;
+ final AtomicInteger threadNumber = new AtomicInteger(1);
+ final String namePrefix;
+ TaskThreadFactory(String namePrefix) {
+ SecurityManager s = System.getSecurityManager();
+ group = (s != null)? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
+ this.namePrefix = namePrefix;
}
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement());
+ t.setDaemon(true);
+ t.setPriority(getThreadPriority());
+ return t;
+ }
}
-
-
}