From: fhanik Date: Fri, 15 Dec 2006 00:30:53 +0000 (+0000) Subject: Name change in preparation of pluggable Executors and thread fairness X-Git-Url: https://git.internetallee.de/?a=commitdiff_plain;h=a18be69fb902726de21020eb9a1f26542a92522b;p=tomcat7.0 Name change in preparation of pluggable Executors and thread fairness git-svn-id: https://svn.apache.org/repos/asf/tomcat/tc6.0.x/trunk@487409 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java b/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java index 561d9f414..7bdec3a1e 100644 --- a/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java +++ b/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java @@ -77,8 +77,8 @@ public class BioReceiver extends ReceiverBase implements Runnable, ChannelReceiv return getReplicationThread(); } - protected BioReplicationThread getReplicationThread() { - BioReplicationThread result = new BioReplicationThread(this); + protected BioReplicationTask getReplicationThread() { + BioReplicationTask result = new BioReplicationTask(this); result.setOptions(getWorkerThreadOptions()); result.setUseBufferPool(this.getUseBufferPool()); return result; @@ -129,7 +129,7 @@ public class BioReceiver extends ReceiverBase implements Runnable, ChannelReceiv if ( log.isWarnEnabled() ) log.warn("All BIO server replication threads are busy, unable to handle more requests until a thread is freed up."); } - BioReplicationThread thread = (BioReplicationThread)getPool().getWorker(); + BioReplicationTask thread = (BioReplicationTask)getPool().getWorker(); if ( thread == null ) continue; //should never happen try { socket = serverSocket.accept(); diff --git a/java/org/apache/catalina/tribes/transport/bio/BioReplicationTask.java b/java/org/apache/catalina/tribes/transport/bio/BioReplicationTask.java new file mode 100644 index 000000000..9e4583031 --- /dev/null +++ b/java/org/apache/catalina/tribes/transport/bio/BioReplicationTask.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.catalina.tribes.transport.bio; +import java.io.IOException; + +import org.apache.catalina.tribes.io.ObjectReader; +import org.apache.catalina.tribes.transport.Constants; +import org.apache.catalina.tribes.transport.AbstractRxTask; +import java.net.Socket; +import java.io.InputStream; +import org.apache.catalina.tribes.transport.ReceiverBase; +import java.io.OutputStream; +import org.apache.catalina.tribes.io.ListenCallback; +import org.apache.catalina.tribes.ChannelMessage; +import org.apache.catalina.tribes.io.ChannelData; +import org.apache.catalina.tribes.io.BufferPool; + +/** + * A worker thread class which can drain channels and echo-back the input. Each + * instance is constructed with a reference to the owning thread pool object. + * When started, the thread loops forever waiting to be awakened to service the + * channel associated with a SelectionKey object. The worker is tasked by + * calling its serviceChannel() method with a SelectionKey object. The + * serviceChannel() method stores the key reference in the thread object then + * calls notify() to wake it up. When the channel has been drained, the worker + * thread returns itself to its parent pool. + * + * @author Filip Hanik + * + * @version $Revision$, $Date$ + */ +public class BioReplicationTask extends AbstractRxTask { + + + protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog( BioReplicationTask.class ); + + protected Socket socket; + protected ObjectReader reader; + + public BioReplicationTask (ListenCallback callback) { + super(callback); + } + + // loop forever waiting for work to do + public synchronized void run() + { + this.notify(); + while (isDoRun()) { + try { + // sleep and release object lock + this.wait(); + } catch (InterruptedException e) { + if(log.isInfoEnabled()) + log.info("TCP worker thread interrupted in cluster",e); + // clear interrupt status + Thread.interrupted(); + } + if ( socket == null ) continue; + try { + drainSocket(); + } catch ( Exception x ) { + log.error("Unable to service bio socket"); + }finally { + try {socket.close();}catch ( Exception ignore){} + try {reader.close();}catch ( Exception ignore){} + reader = null; + socket = null; + } + // done, ready for more, return to pool + if ( getPool() != null ) getPool().returnWorker (this); + else setDoRun(false); + } + } + + + public synchronized void serviceSocket(Socket socket, ObjectReader reader) { + this.socket = socket; + this.reader = reader; + this.notify(); // awaken the thread + } + + protected void execute(ObjectReader reader) throws Exception{ + int pkgcnt = reader.count(); + + if ( pkgcnt > 0 ) { + ChannelMessage[] msgs = reader.execute(); + for ( int i=0; i= 0 ) { + int count = reader.append(buf,0,length,true); + if ( count > 0 ) execute(reader); + length = in.read(buf); + } + } + + + + + /** + * send a reply-acknowledgement (6,2,3) + * @param key + * @param channel + */ + protected void sendAck(byte[] command) { + try { + OutputStream out = socket.getOutputStream(); + out.write(command); + out.flush(); + if (log.isTraceEnabled()) { + log.trace("ACK sent to " + socket.getPort()); + } + } catch ( java.io.IOException x ) { + log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage()); + } + } + + public void close() { + setDoRun(false); + try {socket.close();}catch ( Exception ignore){} + try {reader.close();}catch ( Exception ignore){} + reader = null; + socket = null; + super.close(); + } +} diff --git a/java/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java b/java/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java deleted file mode 100644 index 38cf96784..000000000 --- a/java/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.catalina.tribes.transport.bio; -import java.io.IOException; - -import org.apache.catalina.tribes.io.ObjectReader; -import org.apache.catalina.tribes.transport.Constants; -import org.apache.catalina.tribes.transport.AbstractRxTask; -import java.net.Socket; -import java.io.InputStream; -import org.apache.catalina.tribes.transport.ReceiverBase; -import java.io.OutputStream; -import org.apache.catalina.tribes.io.ListenCallback; -import org.apache.catalina.tribes.ChannelMessage; -import org.apache.catalina.tribes.io.ChannelData; -import org.apache.catalina.tribes.io.BufferPool; - -/** - * A worker thread class which can drain channels and echo-back the input. Each - * instance is constructed with a reference to the owning thread pool object. - * When started, the thread loops forever waiting to be awakened to service the - * channel associated with a SelectionKey object. The worker is tasked by - * calling its serviceChannel() method with a SelectionKey object. The - * serviceChannel() method stores the key reference in the thread object then - * calls notify() to wake it up. When the channel has been drained, the worker - * thread returns itself to its parent pool. - * - * @author Filip Hanik - * - * @version $Revision$, $Date$ - */ -public class BioReplicationThread extends AbstractRxTask { - - - protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog( BioReplicationThread.class ); - - protected Socket socket; - protected ObjectReader reader; - - public BioReplicationThread (ListenCallback callback) { - super(callback); - } - - // loop forever waiting for work to do - public synchronized void run() - { - this.notify(); - while (isDoRun()) { - try { - // sleep and release object lock - this.wait(); - } catch (InterruptedException e) { - if(log.isInfoEnabled()) - log.info("TCP worker thread interrupted in cluster",e); - // clear interrupt status - Thread.interrupted(); - } - if ( socket == null ) continue; - try { - drainSocket(); - } catch ( Exception x ) { - log.error("Unable to service bio socket"); - }finally { - try {socket.close();}catch ( Exception ignore){} - try {reader.close();}catch ( Exception ignore){} - reader = null; - socket = null; - } - // done, ready for more, return to pool - if ( getPool() != null ) getPool().returnWorker (this); - else setDoRun(false); - } - } - - - public synchronized void serviceSocket(Socket socket, ObjectReader reader) { - this.socket = socket; - this.reader = reader; - this.notify(); // awaken the thread - } - - protected void execute(ObjectReader reader) throws Exception{ - int pkgcnt = reader.count(); - - if ( pkgcnt > 0 ) { - ChannelMessage[] msgs = reader.execute(); - for ( int i=0; i= 0 ) { - int count = reader.append(buf,0,length,true); - if ( count > 0 ) execute(reader); - length = in.read(buf); - } - } - - - - - /** - * send a reply-acknowledgement (6,2,3) - * @param key - * @param channel - */ - protected void sendAck(byte[] command) { - try { - OutputStream out = socket.getOutputStream(); - out.write(command); - out.flush(); - if (log.isTraceEnabled()) { - log.trace("ACK sent to " + socket.getPort()); - } - } catch ( java.io.IOException x ) { - log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage()); - } - } - - public void close() { - setDoRun(false); - try {socket.close();}catch ( Exception ignore){} - try {reader.close();}catch ( Exception ignore){} - reader = null; - socket = null; - super.close(); - } -} diff --git a/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java b/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java index 0c547764b..1c0b99803 100644 --- a/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java +++ b/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java @@ -110,7 +110,7 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv } public AbstractRxTask getWorkerThread() { - NioReplicationThread thread = new NioReplicationThread(this,this); + NioReplicationTask thread = new NioReplicationTask(this,this); thread.setUseBufferPool(this.getUseBufferPool()); thread.setRxBufSize(getRxBufSize()); thread.setOptions(getWorkerThreadOptions()); @@ -365,7 +365,7 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv * will then de-register the channel on the next select call. */ protected void readDataFromSocket(SelectionKey key) throws Exception { - NioReplicationThread worker = (NioReplicationThread) getPool().getWorker(); + NioReplicationTask worker = (NioReplicationTask) getPool().getWorker(); if (worker == null) { // No threads available, do nothing, the selection // loop will keep calling this method until a diff --git a/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java b/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java new file mode 100644 index 000000000..529b369cb --- /dev/null +++ b/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.catalina.tribes.transport.nio; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; + +import org.apache.catalina.tribes.io.ObjectReader; +import org.apache.catalina.tribes.transport.Constants; +import org.apache.catalina.tribes.transport.AbstractRxTask; +import org.apache.catalina.tribes.ChannelMessage; +import org.apache.catalina.tribes.io.ListenCallback; +import org.apache.catalina.tribes.io.ChannelData; +import org.apache.catalina.tribes.io.BufferPool; +import java.nio.channels.CancelledKeyException; +import org.apache.catalina.tribes.UniqueId; +import org.apache.catalina.tribes.RemoteProcessException; +import org.apache.catalina.tribes.util.Logs; + +/** + * A worker thread class which can drain channels and echo-back the input. Each + * instance is constructed with a reference to the owning thread pool object. + * When started, the thread loops forever waiting to be awakened to service the + * channel associated with a SelectionKey object. The worker is tasked by + * calling its serviceChannel() method with a SelectionKey object. The + * serviceChannel() method stores the key reference in the thread object then + * calls notify() to wake it up. When the channel has been drained, the worker + * thread returns itself to its parent pool. + * + * @author Filip Hanik + * + * @version $Revision$, $Date$ + */ +public class NioReplicationTask extends AbstractRxTask { + + private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog( NioReplicationTask.class ); + + private ByteBuffer buffer = null; + private SelectionKey key; + private int rxBufSize; + private NioReceiver receiver; + public NioReplicationTask (ListenCallback callback, NioReceiver receiver) + { + super(callback); + this.receiver = receiver; + } + + // loop forever waiting for work to do + public synchronized void run() { + this.notify(); + if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER ) { + buffer = ByteBuffer.allocateDirect(getRxBufSize()); + }else { + buffer = ByteBuffer.allocate (getRxBufSize()); + } + while (isDoRun()) { + try { + // sleep and release object lock + this.wait(); + } catch (InterruptedException e) { + if(log.isInfoEnabled()) log.info("TCP worker thread interrupted in cluster",e); + // clear interrupt status + Thread.interrupted(); + } + if (key == null) { + continue; // just in case + } + if ( log.isTraceEnabled() ) + log.trace("Servicing key:"+key); + + try { + ObjectReader reader = (ObjectReader)key.attachment(); + if ( reader == null ) { + if ( log.isTraceEnabled() ) + log.trace("No object reader, cancelling:"+key); + cancelKey(key); + } else { + if ( log.isTraceEnabled() ) + log.trace("Draining channel:"+key); + + drainChannel(key, reader); + } + } catch (Exception e) { + //this is common, since the sockets on the other + //end expire after a certain time. + if ( e instanceof CancelledKeyException ) { + //do nothing + } else if ( e instanceof IOException ) { + //dont spew out stack traces for IO exceptions unless debug is enabled. + if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].", e); + else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"]."); + } else if ( log.isErrorEnabled() ) { + //this is a real error, log it. + log.error("Exception caught in TcpReplicationThread.drainChannel.",e); + } + cancelKey(key); + } finally { + + } + key = null; + // done, ready for more, return to pool + getPool().returnWorker (this); + } + } + + /** + * Called to initiate a unit of work by this worker thread + * on the provided SelectionKey object. This method is + * synchronized, as is the run() method, so only one key + * can be serviced at a given time. + * Before waking the worker thread, and before returning + * to the main selection loop, this key's interest set is + * updated to remove OP_READ. This will cause the selector + * to ignore read-readiness for this channel while the + * worker thread is servicing it. + */ + public synchronized void serviceChannel (SelectionKey key) { + if ( log.isTraceEnabled() ) + log.trace("About to service key:"+key); + ObjectReader reader = (ObjectReader)key.attachment(); + if ( reader != null ) reader.setLastAccess(System.currentTimeMillis()); + this.key = key; + key.interestOps (key.interestOps() & (~SelectionKey.OP_READ)); + key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE)); + this.notify(); // awaken the thread + } + + /** + * The actual code which drains the channel associated with + * the given key. This method assumes the key has been + * modified prior to invocation to turn off selection + * interest in OP_READ. When this method completes it + * re-enables OP_READ and calls wakeup() on the selector + * so the selector will resume watching this channel. + */ + protected void drainChannel (final SelectionKey key, ObjectReader reader) throws Exception { + reader.setLastAccess(System.currentTimeMillis()); + reader.access(); + SocketChannel channel = (SocketChannel) key.channel(); + int count; + buffer.clear(); // make buffer empty + + // loop while data available, channel is non-blocking + while ((count = channel.read (buffer)) > 0) { + buffer.flip(); // make buffer readable + if ( buffer.hasArray() ) + reader.append(buffer.array(),0,count,false); + else + reader.append(buffer,count,false); + buffer.clear(); // make buffer empty + //do we have at least one package? + if ( reader.hasPackage() ) break; + } + + int pkgcnt = reader.count(); + + if (count < 0 && pkgcnt == 0 ) { + //end of stream, and no more packages to process + remoteEof(key); + return; + } + + ChannelMessage[] msgs = pkgcnt == 0? ChannelData.EMPTY_DATA_ARRAY : reader.execute(); + + registerForRead(key,reader);//register to read new data, before we send it off to avoid dead locks + + for ( int i=0; i 0) { - buffer.flip(); // make buffer readable - if ( buffer.hasArray() ) - reader.append(buffer.array(),0,count,false); - else - reader.append(buffer,count,false); - buffer.clear(); // make buffer empty - //do we have at least one package? - if ( reader.hasPackage() ) break; - } - - int pkgcnt = reader.count(); - - if (count < 0 && pkgcnt == 0 ) { - //end of stream, and no more packages to process - remoteEof(key); - return; - } - - ChannelMessage[] msgs = pkgcnt == 0? ChannelData.EMPTY_DATA_ARRAY : reader.execute(); - - registerForRead(key,reader);//register to read new data, before we send it off to avoid dead locks - - for ( int i=0; i