--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.transport;
+
+import org.apache.catalina.tribes.io.ListenCallback;
+
+
+
+
+/**
+ * @author Filip Hanik
+ * @version $Revision$ $Date$
+ */
+public abstract class AbstractRxTask extends Thread
+{
+
+ public static final int OPTION_DIRECT_BUFFER = ReceiverBase.OPTION_DIRECT_BUFFER;
+
+ private ListenCallback callback;
+ private RxTaskPool pool;
+ private boolean doRun = true;
+ private int options;
+ protected boolean useBufferPool = true;
+
+ public AbstractRxTask(ListenCallback callback) {
+ this.callback = callback;
+ }
+
+ public void setPool(RxTaskPool pool) {
+ this.pool = pool;
+ }
+
+ public void setOptions(int options) {
+ this.options = options;
+ }
+
+ public void setCallback(ListenCallback callback) {
+ this.callback = callback;
+ }
+
+ public void setDoRun(boolean doRun) {
+ this.doRun = doRun;
+ }
+
+ public RxTaskPool getPool() {
+ return pool;
+ }
+
+ public int getOptions() {
+ return options;
+ }
+
+ public ListenCallback getCallback() {
+ return callback;
+ }
+
+ public boolean isDoRun() {
+ return doRun;
+ }
+
+ public void close()
+ {
+ doRun = false;
+ notify();
+ }
+
+ public void setUseBufferPool(boolean usebufpool) {
+ useBufferPool = usebufpool;
+ }
+
+ public boolean getUseBufferPool() {
+ return useBufferPool;
+ }
+}
this.creator = creator;
//for (int i = 0; i < minThreads; i++) {
for (int i = 0; i < maxThreads; i++) { //temporary fix for thread hand off problem
- WorkerThread thread = creator.getWorkerThread();
+ AbstractRxTask thread = creator.getWorkerThread();
setupThread(thread);
idle.add (thread);
}
}
- protected void setupThread(WorkerThread thread) {
+ protected void setupThread(AbstractRxTask thread) {
synchronized (thread) {
thread.setPool(this);
thread.setName(thread.getClass().getName() + "[" + inc() + "]");
/**
* Find an idle worker thread, if any. Could return null.
*/
- public WorkerThread getWorker()
+ public AbstractRxTask getWorker()
{
- WorkerThread worker = null;
+ AbstractRxTask worker = null;
synchronized (mutex) {
while ( worker == null && running ) {
if (idle.size() > 0) {
try {
- worker = (WorkerThread) idle.remove(0);
+ worker = (AbstractRxTask) idle.remove(0);
} catch (java.util.NoSuchElementException x) {
//this means that there are no available workers
worker = null;
* Called by the worker thread to return itself to the
* idle pool.
*/
- public void returnWorker (WorkerThread worker) {
+ public void returnWorker (AbstractRxTask worker) {
if ( running ) {
synchronized (mutex) {
used.remove(worker);
synchronized (mutex) {
Iterator i = idle.iterator();
while ( i.hasNext() ) {
- WorkerThread worker = (WorkerThread)i.next();
+ AbstractRxTask worker = (AbstractRxTask)i.next();
returnWorker(worker);
i.remove();
}
}
public static interface TaskCreator {
- public WorkerThread getWorkerThread();
+ public AbstractRxTask getWorkerThread();
}
}
+++ /dev/null
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.transport;
-
-import org.apache.catalina.tribes.io.ListenCallback;
-
-
-
-
-/**
- * @author Filip Hanik
- * @version $Revision$ $Date$
- */
-public abstract class WorkerThread extends Thread
-{
-
- public static final int OPTION_DIRECT_BUFFER = ReceiverBase.OPTION_DIRECT_BUFFER;
-
- private ListenCallback callback;
- private RxTaskPool pool;
- private boolean doRun = true;
- private int options;
- protected boolean useBufferPool = true;
-
- public WorkerThread(ListenCallback callback) {
- this.callback = callback;
- }
-
- public void setPool(RxTaskPool pool) {
- this.pool = pool;
- }
-
- public void setOptions(int options) {
- this.options = options;
- }
-
- public void setCallback(ListenCallback callback) {
- this.callback = callback;
- }
-
- public void setDoRun(boolean doRun) {
- this.doRun = doRun;
- }
-
- public RxTaskPool getPool() {
- return pool;
- }
-
- public int getOptions() {
- return options;
- }
-
- public ListenCallback getCallback() {
- return callback;
- }
-
- public boolean isDoRun() {
- return doRun;
- }
-
- public void close()
- {
- doRun = false;
- notify();
- }
-
- public void setUseBufferPool(boolean usebufpool) {
- useBufferPool = usebufpool;
- }
-
- public boolean getUseBufferPool() {
- return useBufferPool;
- }
-}
import org.apache.catalina.tribes.io.ObjectReader;
import org.apache.catalina.tribes.transport.ReceiverBase;
import org.apache.catalina.tribes.transport.RxTaskPool;
-import org.apache.catalina.tribes.transport.WorkerThread;
+import org.apache.catalina.tribes.transport.AbstractRxTask;
/**
* <p>Title: </p>
}
}
- public WorkerThread getWorkerThread() {
+ public AbstractRxTask getWorkerThread() {
return getReplicationThread();
}
import org.apache.catalina.tribes.io.ObjectReader;
import org.apache.catalina.tribes.transport.Constants;
-import org.apache.catalina.tribes.transport.WorkerThread;
+import org.apache.catalina.tribes.transport.AbstractRxTask;
import java.net.Socket;
import java.io.InputStream;
import org.apache.catalina.tribes.transport.ReceiverBase;
*
* @version $Revision$, $Date$
*/
-public class BioReplicationThread extends WorkerThread {
+public class BioReplicationThread extends AbstractRxTask {
protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog( BioReplicationThread.class );
import org.apache.catalina.tribes.transport.Constants;
import org.apache.catalina.tribes.transport.ReceiverBase;
import org.apache.catalina.tribes.transport.RxTaskPool;
-import org.apache.catalina.tribes.transport.WorkerThread;
+import org.apache.catalina.tribes.transport.AbstractRxTask;
import org.apache.catalina.tribes.util.StringManager;
import java.util.LinkedList;
import java.util.Set;
}
}
- public WorkerThread getWorkerThread() {
+ public AbstractRxTask getWorkerThread() {
NioReplicationThread thread = new NioReplicationThread(this,this);
thread.setUseBufferPool(this.getUseBufferPool());
thread.setRxBufSize(getRxBufSize());
import org.apache.catalina.tribes.io.ObjectReader;
import org.apache.catalina.tribes.transport.Constants;
-import org.apache.catalina.tribes.transport.WorkerThread;
+import org.apache.catalina.tribes.transport.AbstractRxTask;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.io.ListenCallback;
import org.apache.catalina.tribes.io.ChannelData;
*
* @version $Revision$, $Date$
*/
-public class NioReplicationThread extends WorkerThread {
+public class NioReplicationThread extends AbstractRxTask {
private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog( NioReplicationThread.class );