* @author Filip Hanik
* @version $Revision$ $Date$
*/
-public abstract class AbstractRxTask extends Thread
+public abstract class AbstractRxTask implements Runnable
{
public static final int OPTION_DIRECT_BUFFER = ReceiverBase.OPTION_DIRECT_BUFFER;
this.callback = callback;
}
- public void setPool(RxTaskPool pool) {
+ public void setTaskPool(RxTaskPool pool) {
this.pool = pool;
}
this.doRun = doRun;
}
- public RxTaskPool getPool() {
+ public RxTaskPool getTaskPool() {
return pool;
}
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.ChannelReceiver;
private long tcpSelectorTimeout = 5000;
//how many times to search for an available socket
private int autoBind = 100;
- private int maxThreads = 6;
+ private int maxThreads = 15;
private int minThreads = 6;
+ private int maxTasks = 100;
+ private int minTasks = 10;
private boolean tcpNoDelay = true;
private boolean soKeepAlive = false;
private boolean ooBInline = true;
private int soTrafficClass = 0x04 | 0x08 | 0x010;
private int timeout = 3000; //3 seconds
private boolean useBufferPool = true;
+
+ private Executor executor;
public ReceiverBase() {
}
+ public void start() throws IOException {
+ if ( executor == null ) {
+ executor = new ThreadPoolExecutor(minThreads,maxThreads,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
+ }
+ }
+
+ public void stop() {
+ if ( executor instanceof ExecutorService ) ((ExecutorService)executor).shutdown();
+ }
+
/**
* getMessageListener
*
return listener;
}
- public RxTaskPool getPool() {
+ public RxTaskPool getTaskPool() {
return pool;
}
return securePort;
}
+ public int getMinTasks() {
+ return minTasks;
+ }
+
+ public int getMaxTasks() {
+ return maxTasks;
+ }
+
+ public Executor getExecutor() {
+ return executor;
+ }
+
+ public boolean isListening() {
+ return listen;
+ }
+
/**
* @deprecated use setSelectorTimeout
* @param selTimeout long
this.securePort = securePort;
}
+ public void setMinTasks(int minTasks) {
+ this.minTasks = minTasks;
+ }
+
+ public void setMaxTasks(int maxTasks) {
+ this.maxTasks = maxTasks;
+ }
+
+ public void setExecutor(Executor executor) {
+ this.executor = executor;
+ }
+
public void heartbeat() {
//empty operation
}
+
}
\ No newline at end of file
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.ThreadFactory;
/**
* @author not attributable
boolean running = true;
private static int counter = 1;
- private int maxThreads;
- private int minThreads;
+ private int maxTasks;
+ private int minTasks;
private TaskCreator creator = null;
}
- public RxTaskPool (int maxThreads, int minThreads, TaskCreator creator) throws Exception {
+ public RxTaskPool (int maxTasks, int minTasks, TaskCreator creator) throws Exception {
// fill up the pool with worker threads
- this.maxThreads = maxThreads;
- this.minThreads = minThreads;
+ this.maxTasks = maxTasks;
+ this.minTasks = minTasks;
this.creator = creator;
- //for (int i = 0; i < minThreads; i++) {
- for (int i = 0; i < maxThreads; i++) { //temporary fix for thread hand off problem
- AbstractRxTask thread = creator.getWorkerThread();
- setupThread(thread);
- idle.add (thread);
- }
}
- protected void setupThread(AbstractRxTask thread) {
- synchronized (thread) {
- thread.setPool(this);
- thread.setName(thread.getClass().getName() + "[" + inc() + "]");
- thread.setDaemon(true);
- thread.setPriority(Thread.MAX_PRIORITY);
- thread.start();
- try {thread.wait(500); }catch ( InterruptedException x ) {}
+ protected void configureTask(AbstractRxTask task) {
+ synchronized (task) {
+ task.setTaskPool(this);
+// task.setName(task.getClass().getName() + "[" + inc() + "]");
+// task.setDaemon(true);
+// task.setPriority(Thread.MAX_PRIORITY);
+// task.start();
}
}
/**
* Find an idle worker thread, if any. Could return null.
*/
- public AbstractRxTask getWorker()
+ public AbstractRxTask getRxTask()
{
AbstractRxTask worker = null;
synchronized (mutex) {
//this means that there are no available workers
worker = null;
}
- } else if ( used.size() < this.maxThreads && creator != null) {
- worker = creator.getWorkerThread();
- setupThread(worker);
+ } else if ( used.size() < this.maxTasks && creator != null) {
+ worker = creator.createRxTask();
+ configureTask(worker);
} else {
try { mutex.wait(); } catch ( java.lang.InterruptedException x ) {Thread.currentThread().interrupted();}
}
synchronized (mutex) {
used.remove(worker);
//if ( idle.size() < minThreads && !idle.contains(worker)) idle.add(worker);
- if ( idle.size() < maxThreads && !idle.contains(worker)) idle.add(worker); //let max be the upper limit
+ if ( idle.size() < maxTasks && !idle.contains(worker)) idle.add(worker); //let max be the upper limit
else {
worker.setDoRun(false);
synchronized (worker){worker.notify();}
}
public int getMaxThreads() {
- return maxThreads;
+ return maxTasks;
}
public int getMinThreads() {
- return minThreads;
+ return minTasks;
}
public void stop() {
}
}
- public void setMaxThreads(int maxThreads) {
- this.maxThreads = maxThreads;
+ public void setMaxTasks(int maxThreads) {
+ this.maxTasks = maxThreads;
}
- public void setMinThreads(int minThreads) {
- this.minThreads = minThreads;
+ public void setMinTasks(int minThreads) {
+ this.minTasks = minThreads;
}
- public TaskCreator getThreadCreator() {
+ public TaskCreator getTaskCreator() {
return this.creator;
}
- public static interface TaskCreator {
- public AbstractRxTask getWorkerThread();
+ public static interface TaskCreator {
+ public AbstractRxTask createRxTask();
}
}
* @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
*/
public void start() throws IOException {
+ super.start();
try {
setPool(new RxTaskPool(getMaxThreads(),getMinThreads(),this));
} catch (Exception x) {
}
}
- public AbstractRxTask getWorkerThread() {
+ public AbstractRxTask createRxTask() {
return getReplicationThread();
}
try {
this.serverSocket.close();
}catch ( Exception x ) {}
+ super.stop();
}
while ( doListen() ) {
Socket socket = null;
- if ( getPool().available() < 1 ) {
+ if ( getTaskPool().available() < 1 ) {
if ( log.isWarnEnabled() )
log.warn("All BIO server replication threads are busy, unable to handle more requests until a thread is freed up.");
}
- BioReplicationTask thread = (BioReplicationTask)getPool().getWorker();
- if ( thread == null ) continue; //should never happen
+ BioReplicationTask task = (BioReplicationTask)getTaskPool().getRxTask();
+ if ( task == null ) continue; //should never happen
try {
socket = serverSocket.accept();
}catch ( Exception x ) {
if ( doListen() ) throw x;
}
if ( !doListen() ) {
- thread.setDoRun(false);
- thread.serviceSocket(null,null);
+ task.setDoRun(false);
+ task.serviceSocket(null,null);
+ getExecutor().execute(task);
break; //regular shutdown
}
if ( socket == null ) continue;
socket.setTrafficClass(getSoTrafficClass());
socket.setSoTimeout(getTimeout());
ObjectReader reader = new ObjectReader(socket);
- thread.serviceSocket(socket,reader);
+ task.serviceSocket(socket,reader);
}//while
}
// loop forever waiting for work to do
public synchronized void run()
{
- this.notify();
- while (isDoRun()) {
- try {
- // sleep and release object lock
- this.wait();
- } catch (InterruptedException e) {
- if(log.isInfoEnabled())
- log.info("TCP worker thread interrupted in cluster",e);
- // clear interrupt status
- Thread.interrupted();
- }
- if ( socket == null ) continue;
- try {
- drainSocket();
- } catch ( Exception x ) {
- log.error("Unable to service bio socket");
- }finally {
- try {socket.close();}catch ( Exception ignore){}
- try {reader.close();}catch ( Exception ignore){}
- reader = null;
- socket = null;
- }
- // done, ready for more, return to pool
- if ( getPool() != null ) getPool().returnWorker (this);
- else setDoRun(false);
+ if ( socket == null ) return;
+ try {
+ drainSocket();
+ } catch ( Exception x ) {
+ log.error("Unable to service bio socket");
+ }finally {
+ try {socket.close();}catch ( Exception ignore){}
+ try {reader.close();}catch ( Exception ignore){}
+ reader = null;
+ socket = null;
}
+ // done, ready for more, return to pool
+ if ( getTaskPool() != null ) getTaskPool().returnWorker (this);
}
public void stop() {
this.stopListening();
+ super.stop();
}
/**
* @see org.apache.catalina.tribes.ClusterReceiver#start()
*/
public void start() throws IOException {
+ super.start();
try {
-// setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this));
setPool(new RxTaskPool(getMaxThreads(),getMinThreads(),this));
} catch (Exception x) {
log.fatal("ThreadPool can initilzed. Listener not started", x);
}
}
- public AbstractRxTask getWorkerThread() {
+ public AbstractRxTask createRxTask() {
NioReplicationTask thread = new NioReplicationTask(this,this);
thread.setUseBufferPool(this.getUseBufferPool());
thread.setRxBufSize(getRxBufSize());
events.add(event);
}
if ( log.isTraceEnabled() ) log.trace("Adding event to selector:"+event);
- selector.wakeup();
+ if ( isListening() && selector!=null ) selector.wakeup();
}
}
protected void socketTimeouts() {
//timeout
- Set keys = selector.keys();
+ Selector tmpsel = selector;
+ Set keys = (isListening()&&tmpsel!=null)?tmpsel.keys():null;
+ if ( keys == null ) return;
long now = System.currentTimeMillis();
for (Iterator iter = keys.iterator(); iter.hasNext(); ) {
SelectionKey key = (SelectionKey) iter.next();
* will then de-register the channel on the next select call.
*/
protected void readDataFromSocket(SelectionKey key) throws Exception {
- NioReplicationTask worker = (NioReplicationTask) getPool().getWorker();
- if (worker == null) {
- // No threads available, do nothing, the selection
+ NioReplicationTask task = (NioReplicationTask) getTaskPool().getRxTask();
+ if (task == null) {
+ // No threads/tasks available, do nothing, the selection
// loop will keep calling this method until a
// thread becomes available, the thread pool itself has a waiting mechanism
// so we will not wait here.
- if (log.isDebugEnabled())
- log.debug("No TcpReplicationThread available");
+ if (log.isDebugEnabled()) log.debug("No TcpReplicationThread available");
} else {
// invoking this wakes up the worker thread then returns
- worker.serviceChannel(key);
+ //add task to thread pool
+ task.serviceChannel(key);
+ getExecutor().execute(task);
}
}
// loop forever waiting for work to do
public synchronized void run() {
- this.notify();
if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER ) {
buffer = ByteBuffer.allocateDirect(getRxBufSize());
}else {
buffer = ByteBuffer.allocate (getRxBufSize());
}
- while (isDoRun()) {
- try {
- // sleep and release object lock
- this.wait();
- } catch (InterruptedException e) {
- if(log.isInfoEnabled()) log.info("TCP worker thread interrupted in cluster",e);
- // clear interrupt status
- Thread.interrupted();
- }
- if (key == null) {
- continue; // just in case
- }
- if ( log.isTraceEnabled() )
- log.trace("Servicing key:"+key);
- try {
- ObjectReader reader = (ObjectReader)key.attachment();
- if ( reader == null ) {
- if ( log.isTraceEnabled() )
- log.trace("No object reader, cancelling:"+key);
- cancelKey(key);
- } else {
- if ( log.isTraceEnabled() )
- log.trace("Draining channel:"+key);
+ if (key == null) {
+ return; // just in case
+ }
+ if ( log.isTraceEnabled() )
+ log.trace("Servicing key:"+key);
- drainChannel(key, reader);
- }
- } catch (Exception e) {
- //this is common, since the sockets on the other
- //end expire after a certain time.
- if ( e instanceof CancelledKeyException ) {
- //do nothing
- } else if ( e instanceof IOException ) {
- //dont spew out stack traces for IO exceptions unless debug is enabled.
- if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].", e);
- else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].");
- } else if ( log.isErrorEnabled() ) {
- //this is a real error, log it.
- log.error("Exception caught in TcpReplicationThread.drainChannel.",e);
- }
+ try {
+ ObjectReader reader = (ObjectReader)key.attachment();
+ if ( reader == null ) {
+ if ( log.isTraceEnabled() )
+ log.trace("No object reader, cancelling:"+key);
cancelKey(key);
- } finally {
-
+ } else {
+ if ( log.isTraceEnabled() )
+ log.trace("Draining channel:"+key);
+
+ drainChannel(key, reader);
}
- key = null;
- // done, ready for more, return to pool
- getPool().returnWorker (this);
+ } catch (Exception e) {
+ //this is common, since the sockets on the other
+ //end expire after a certain time.
+ if ( e instanceof CancelledKeyException ) {
+ //do nothing
+ } else if ( e instanceof IOException ) {
+ //dont spew out stack traces for IO exceptions unless debug is enabled.
+ if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].", e);
+ else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].");
+ } else if ( log.isErrorEnabled() ) {
+ //this is a real error, log it.
+ log.error("Exception caught in TcpReplicationThread.drainChannel.",e);
+ }
+ cancelKey(key);
+ } finally {
+
}
+ key = null;
+ // done, ready for more, return to pool
+ getTaskPool().returnWorker (this);
}
/**
* worker thread is servicing it.
*/
public synchronized void serviceChannel (SelectionKey key) {
- if ( log.isTraceEnabled() )
- log.trace("About to service key:"+key);
+ if ( log.isTraceEnabled() ) log.trace("About to service key:"+key);
ObjectReader reader = (ObjectReader)key.attachment();
if ( reader != null ) reader.setLastAccess(System.currentTimeMillis());
this.key = key;
key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
- this.notify(); // awaken the thread
}
/**