From: fhanik Date: Fri, 15 Dec 2006 00:26:22 +0000 (+0000) Subject: refactor, name change, in preparation of having a pluggable Executor and more thread... X-Git-Url: https://git.internetallee.de/?a=commitdiff_plain;h=18a1ecfb7c6d8cf241bb214345e012f46a8971d9;p=tomcat7.0 refactor, name change, in preparation of having a pluggable Executor and more thread fairness on receiving data git-svn-id: https://svn.apache.org/repos/asf/tomcat/tc6.0.x/trunk@487408 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/java/org/apache/catalina/tribes/transport/AbstractRxTask.java b/java/org/apache/catalina/tribes/transport/AbstractRxTask.java new file mode 100644 index 000000000..66b7de437 --- /dev/null +++ b/java/org/apache/catalina/tribes/transport/AbstractRxTask.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.catalina.tribes.transport; + +import org.apache.catalina.tribes.io.ListenCallback; + + + + +/** + * @author Filip Hanik + * @version $Revision$ $Date$ + */ +public abstract class AbstractRxTask extends Thread +{ + + public static final int OPTION_DIRECT_BUFFER = ReceiverBase.OPTION_DIRECT_BUFFER; + + private ListenCallback callback; + private RxTaskPool pool; + private boolean doRun = true; + private int options; + protected boolean useBufferPool = true; + + public AbstractRxTask(ListenCallback callback) { + this.callback = callback; + } + + public void setPool(RxTaskPool pool) { + this.pool = pool; + } + + public void setOptions(int options) { + this.options = options; + } + + public void setCallback(ListenCallback callback) { + this.callback = callback; + } + + public void setDoRun(boolean doRun) { + this.doRun = doRun; + } + + public RxTaskPool getPool() { + return pool; + } + + public int getOptions() { + return options; + } + + public ListenCallback getCallback() { + return callback; + } + + public boolean isDoRun() { + return doRun; + } + + public void close() + { + doRun = false; + notify(); + } + + public void setUseBufferPool(boolean usebufpool) { + useBufferPool = usebufpool; + } + + public boolean getUseBufferPool() { + return useBufferPool; + } +} diff --git a/java/org/apache/catalina/tribes/transport/RxTaskPool.java b/java/org/apache/catalina/tribes/transport/RxTaskPool.java index 43ac09b5d..ff2b465d3 100644 --- a/java/org/apache/catalina/tribes/transport/RxTaskPool.java +++ b/java/org/apache/catalina/tribes/transport/RxTaskPool.java @@ -57,13 +57,13 @@ public class RxTaskPool this.creator = creator; //for (int i = 0; i < minThreads; i++) { for (int i = 0; i < maxThreads; i++) { //temporary fix for thread hand off problem - WorkerThread thread = creator.getWorkerThread(); + AbstractRxTask thread = creator.getWorkerThread(); setupThread(thread); idle.add (thread); } } - protected void setupThread(WorkerThread thread) { + protected void setupThread(AbstractRxTask thread) { synchronized (thread) { thread.setPool(this); thread.setName(thread.getClass().getName() + "[" + inc() + "]"); @@ -77,14 +77,14 @@ public class RxTaskPool /** * Find an idle worker thread, if any. Could return null. */ - public WorkerThread getWorker() + public AbstractRxTask getWorker() { - WorkerThread worker = null; + AbstractRxTask worker = null; synchronized (mutex) { while ( worker == null && running ) { if (idle.size() > 0) { try { - worker = (WorkerThread) idle.remove(0); + worker = (AbstractRxTask) idle.remove(0); } catch (java.util.NoSuchElementException x) { //this means that there are no available workers worker = null; @@ -109,7 +109,7 @@ public class RxTaskPool * Called by the worker thread to return itself to the * idle pool. */ - public void returnWorker (WorkerThread worker) { + public void returnWorker (AbstractRxTask worker) { if ( running ) { synchronized (mutex) { used.remove(worker); @@ -140,7 +140,7 @@ public class RxTaskPool synchronized (mutex) { Iterator i = idle.iterator(); while ( i.hasNext() ) { - WorkerThread worker = (WorkerThread)i.next(); + AbstractRxTask worker = (AbstractRxTask)i.next(); returnWorker(worker); i.remove(); } @@ -160,6 +160,6 @@ public class RxTaskPool } public static interface TaskCreator { - public WorkerThread getWorkerThread(); + public AbstractRxTask getWorkerThread(); } } diff --git a/java/org/apache/catalina/tribes/transport/WorkerThread.java b/java/org/apache/catalina/tribes/transport/WorkerThread.java deleted file mode 100644 index 88afd87a8..000000000 --- a/java/org/apache/catalina/tribes/transport/WorkerThread.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.catalina.tribes.transport; - -import org.apache.catalina.tribes.io.ListenCallback; - - - - -/** - * @author Filip Hanik - * @version $Revision$ $Date$ - */ -public abstract class WorkerThread extends Thread -{ - - public static final int OPTION_DIRECT_BUFFER = ReceiverBase.OPTION_DIRECT_BUFFER; - - private ListenCallback callback; - private RxTaskPool pool; - private boolean doRun = true; - private int options; - protected boolean useBufferPool = true; - - public WorkerThread(ListenCallback callback) { - this.callback = callback; - } - - public void setPool(RxTaskPool pool) { - this.pool = pool; - } - - public void setOptions(int options) { - this.options = options; - } - - public void setCallback(ListenCallback callback) { - this.callback = callback; - } - - public void setDoRun(boolean doRun) { - this.doRun = doRun; - } - - public RxTaskPool getPool() { - return pool; - } - - public int getOptions() { - return options; - } - - public ListenCallback getCallback() { - return callback; - } - - public boolean isDoRun() { - return doRun; - } - - public void close() - { - doRun = false; - notify(); - } - - public void setUseBufferPool(boolean usebufpool) { - useBufferPool = usebufpool; - } - - public boolean getUseBufferPool() { - return useBufferPool; - } -} diff --git a/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java b/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java index 6fc75e61a..561d9f414 100644 --- a/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java +++ b/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java @@ -24,7 +24,7 @@ import org.apache.catalina.tribes.io.ListenCallback; import org.apache.catalina.tribes.io.ObjectReader; import org.apache.catalina.tribes.transport.ReceiverBase; import org.apache.catalina.tribes.transport.RxTaskPool; -import org.apache.catalina.tribes.transport.WorkerThread; +import org.apache.catalina.tribes.transport.AbstractRxTask; /** *

Title:

@@ -73,7 +73,7 @@ public class BioReceiver extends ReceiverBase implements Runnable, ChannelReceiv } } - public WorkerThread getWorkerThread() { + public AbstractRxTask getWorkerThread() { return getReplicationThread(); } diff --git a/java/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java b/java/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java index 61ae79353..38cf96784 100644 --- a/java/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java +++ b/java/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java @@ -20,7 +20,7 @@ import java.io.IOException; import org.apache.catalina.tribes.io.ObjectReader; import org.apache.catalina.tribes.transport.Constants; -import org.apache.catalina.tribes.transport.WorkerThread; +import org.apache.catalina.tribes.transport.AbstractRxTask; import java.net.Socket; import java.io.InputStream; import org.apache.catalina.tribes.transport.ReceiverBase; @@ -44,7 +44,7 @@ import org.apache.catalina.tribes.io.BufferPool; * * @version $Revision$, $Date$ */ -public class BioReplicationThread extends WorkerThread { +public class BioReplicationThread extends AbstractRxTask { protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog( BioReplicationThread.class ); diff --git a/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java b/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java index 0b0a02243..0c547764b 100644 --- a/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java +++ b/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java @@ -32,7 +32,7 @@ import org.apache.catalina.tribes.io.ObjectReader; import org.apache.catalina.tribes.transport.Constants; import org.apache.catalina.tribes.transport.ReceiverBase; import org.apache.catalina.tribes.transport.RxTaskPool; -import org.apache.catalina.tribes.transport.WorkerThread; +import org.apache.catalina.tribes.transport.AbstractRxTask; import org.apache.catalina.tribes.util.StringManager; import java.util.LinkedList; import java.util.Set; @@ -109,7 +109,7 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv } } - public WorkerThread getWorkerThread() { + public AbstractRxTask getWorkerThread() { NioReplicationThread thread = new NioReplicationThread(this,this); thread.setUseBufferPool(this.getUseBufferPool()); thread.setRxBufSize(getRxBufSize()); diff --git a/java/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java b/java/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java index 7dac62b6c..6f3c75f00 100644 --- a/java/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java +++ b/java/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java @@ -23,7 +23,7 @@ import java.nio.channels.SocketChannel; import org.apache.catalina.tribes.io.ObjectReader; import org.apache.catalina.tribes.transport.Constants; -import org.apache.catalina.tribes.transport.WorkerThread; +import org.apache.catalina.tribes.transport.AbstractRxTask; import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.io.ListenCallback; import org.apache.catalina.tribes.io.ChannelData; @@ -47,7 +47,7 @@ import org.apache.catalina.tribes.util.Logs; * * @version $Revision$, $Date$ */ -public class NioReplicationThread extends WorkerThread { +public class NioReplicationThread extends AbstractRxTask { private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog( NioReplicationThread.class );