Name change in preparation of pluggable Executors and thread fairness
authorfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Fri, 15 Dec 2006 00:30:53 +0000 (00:30 +0000)
committerfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Fri, 15 Dec 2006 00:30:53 +0000 (00:30 +0000)
git-svn-id: https://svn.apache.org/repos/asf/tomcat/tc6.0.x/trunk@487409 13f79535-47bb-0310-9956-ffa450edef68

java/org/apache/catalina/tribes/transport/bio/BioReceiver.java
java/org/apache/catalina/tribes/transport/bio/BioReplicationTask.java [new file with mode: 0644]
java/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java [deleted file]
java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java [new file with mode: 0644]
java/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java [deleted file]

index 561d9f4..7bdec3a 100644 (file)
@@ -77,8 +77,8 @@ public class BioReceiver extends ReceiverBase implements Runnable, ChannelReceiv
         return getReplicationThread();
     }
     
-    protected BioReplicationThread getReplicationThread() {
-        BioReplicationThread result = new BioReplicationThread(this);
+    protected BioReplicationTask getReplicationThread() {
+        BioReplicationTask result = new BioReplicationTask(this);
         result.setOptions(getWorkerThreadOptions());
         result.setUseBufferPool(this.getUseBufferPool());
         return result;
@@ -129,7 +129,7 @@ public class BioReceiver extends ReceiverBase implements Runnable, ChannelReceiv
                 if ( log.isWarnEnabled() )
                     log.warn("All BIO server replication threads are busy, unable to handle more requests until a thread is freed up.");
             }
-            BioReplicationThread thread = (BioReplicationThread)getPool().getWorker();
+            BioReplicationTask thread = (BioReplicationTask)getPool().getWorker();
             if ( thread == null ) continue; //should never happen
             try {
                 socket = serverSocket.accept();
diff --git a/java/org/apache/catalina/tribes/transport/bio/BioReplicationTask.java b/java/org/apache/catalina/tribes/transport/bio/BioReplicationTask.java
new file mode 100644 (file)
index 0000000..9e45830
--- /dev/null
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.transport.bio;
+import java.io.IOException;
+
+import org.apache.catalina.tribes.io.ObjectReader;
+import org.apache.catalina.tribes.transport.Constants;
+import org.apache.catalina.tribes.transport.AbstractRxTask;
+import java.net.Socket;
+import java.io.InputStream;
+import org.apache.catalina.tribes.transport.ReceiverBase;
+import java.io.OutputStream;
+import org.apache.catalina.tribes.io.ListenCallback;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.BufferPool;
+
+/**
+ * A worker thread class which can drain channels and echo-back the input. Each
+ * instance is constructed with a reference to the owning thread pool object.
+ * When started, the thread loops forever waiting to be awakened to service the
+ * channel associated with a SelectionKey object. The worker is tasked by
+ * calling its serviceChannel() method with a SelectionKey object. The
+ * serviceChannel() method stores the key reference in the thread object then
+ * calls notify() to wake it up. When the channel has been drained, the worker
+ * thread returns itself to its parent pool.
+ * 
+ * @author Filip Hanik
+ * 
+ * @version $Revision$, $Date$
+ */
+public class BioReplicationTask extends AbstractRxTask {
+
+
+    protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog( BioReplicationTask.class );
+    
+    protected Socket socket;
+    protected ObjectReader reader;
+    
+    public BioReplicationTask (ListenCallback callback) {
+        super(callback);
+    }
+
+    // loop forever waiting for work to do
+    public synchronized void run()
+    {
+        this.notify();
+        while (isDoRun()) {
+            try {
+                // sleep and release object lock
+                this.wait();
+            } catch (InterruptedException e) {
+                if(log.isInfoEnabled())
+                    log.info("TCP worker thread interrupted in cluster",e);
+                // clear interrupt status
+                Thread.interrupted();
+            }
+            if ( socket == null ) continue;
+            try {
+                drainSocket();
+            } catch ( Exception x ) {
+                log.error("Unable to service bio socket");
+            }finally {
+                try {socket.close();}catch ( Exception ignore){}
+                try {reader.close();}catch ( Exception ignore){}
+                reader = null;
+                socket = null;
+            }
+            // done, ready for more, return to pool
+            if ( getPool() != null ) getPool().returnWorker (this);
+            else setDoRun(false);
+        }
+    }
+
+    
+    public synchronized void serviceSocket(Socket socket, ObjectReader reader) {
+        this.socket = socket;
+        this.reader = reader;
+        this.notify();         // awaken the thread
+    }
+    
+    protected void execute(ObjectReader reader) throws Exception{
+        int pkgcnt = reader.count();
+
+        if ( pkgcnt > 0 ) {
+            ChannelMessage[] msgs = reader.execute();
+            for ( int i=0; i<msgs.length; i++ ) {
+                /**
+                 * Use send ack here if you want to ack the request to the remote 
+                 * server before completing the request
+                 * This is considered an asynchronized request
+                 */
+                if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
+                try {
+                    //process the message
+                    getCallback().messageDataReceived(msgs[i]);
+                    /**
+                     * Use send ack here if you want the request to complete on this
+                     * server before sending the ack to the remote server
+                     * This is considered a synchronized request
+                     */
+                    if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
+                }catch  ( Exception x ) {
+                    if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.FAIL_ACK_COMMAND);
+                    log.error("Error thrown from messageDataReceived.",x);
+                }
+                if ( getUseBufferPool() ) {
+                    BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
+                    msgs[i].setMessage(null);
+                }
+            }                       
+        }
+
+       
+    }
+
+    /**
+     * The actual code which drains the channel associated with
+     * the given key.  This method assumes the key has been
+     * modified prior to invocation to turn off selection
+     * interest in OP_READ.  When this method completes it
+     * re-enables OP_READ and calls wakeup() on the selector
+     * so the selector will resume watching this channel.
+     */
+    protected void drainSocket () throws Exception {
+        InputStream in = socket.getInputStream();
+        // loop while data available, channel is non-blocking
+        byte[] buf = new byte[1024];
+        int length = in.read(buf);
+        while ( length >= 0 ) {
+            int count = reader.append(buf,0,length,true);
+            if ( count > 0 ) execute(reader);
+            length = in.read(buf);
+        }
+    }
+
+
+
+
+    /**
+     * send a reply-acknowledgement (6,2,3)
+     * @param key
+     * @param channel
+     */
+    protected void sendAck(byte[] command) {
+        try {
+            OutputStream out = socket.getOutputStream();
+            out.write(command);
+            out.flush();
+            if (log.isTraceEnabled()) {
+                log.trace("ACK sent to " + socket.getPort());
+            }
+        } catch ( java.io.IOException x ) {
+            log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage());
+        }
+    }
+    
+    public void close() {
+        setDoRun(false);
+        try {socket.close();}catch ( Exception ignore){}
+        try {reader.close();}catch ( Exception ignore){}
+        reader = null;
+        socket = null;
+        super.close();
+    }
+}
diff --git a/java/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java b/java/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java
deleted file mode 100644 (file)
index 38cf967..0000000
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.transport.bio;
-import java.io.IOException;
-
-import org.apache.catalina.tribes.io.ObjectReader;
-import org.apache.catalina.tribes.transport.Constants;
-import org.apache.catalina.tribes.transport.AbstractRxTask;
-import java.net.Socket;
-import java.io.InputStream;
-import org.apache.catalina.tribes.transport.ReceiverBase;
-import java.io.OutputStream;
-import org.apache.catalina.tribes.io.ListenCallback;
-import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.io.ChannelData;
-import org.apache.catalina.tribes.io.BufferPool;
-
-/**
- * A worker thread class which can drain channels and echo-back the input. Each
- * instance is constructed with a reference to the owning thread pool object.
- * When started, the thread loops forever waiting to be awakened to service the
- * channel associated with a SelectionKey object. The worker is tasked by
- * calling its serviceChannel() method with a SelectionKey object. The
- * serviceChannel() method stores the key reference in the thread object then
- * calls notify() to wake it up. When the channel has been drained, the worker
- * thread returns itself to its parent pool.
- * 
- * @author Filip Hanik
- * 
- * @version $Revision$, $Date$
- */
-public class BioReplicationThread extends AbstractRxTask {
-
-
-    protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog( BioReplicationThread.class );
-    
-    protected Socket socket;
-    protected ObjectReader reader;
-    
-    public BioReplicationThread (ListenCallback callback) {
-        super(callback);
-    }
-
-    // loop forever waiting for work to do
-    public synchronized void run()
-    {
-        this.notify();
-        while (isDoRun()) {
-            try {
-                // sleep and release object lock
-                this.wait();
-            } catch (InterruptedException e) {
-                if(log.isInfoEnabled())
-                    log.info("TCP worker thread interrupted in cluster",e);
-                // clear interrupt status
-                Thread.interrupted();
-            }
-            if ( socket == null ) continue;
-            try {
-                drainSocket();
-            } catch ( Exception x ) {
-                log.error("Unable to service bio socket");
-            }finally {
-                try {socket.close();}catch ( Exception ignore){}
-                try {reader.close();}catch ( Exception ignore){}
-                reader = null;
-                socket = null;
-            }
-            // done, ready for more, return to pool
-            if ( getPool() != null ) getPool().returnWorker (this);
-            else setDoRun(false);
-        }
-    }
-
-    
-    public synchronized void serviceSocket(Socket socket, ObjectReader reader) {
-        this.socket = socket;
-        this.reader = reader;
-        this.notify();         // awaken the thread
-    }
-    
-    protected void execute(ObjectReader reader) throws Exception{
-        int pkgcnt = reader.count();
-
-        if ( pkgcnt > 0 ) {
-            ChannelMessage[] msgs = reader.execute();
-            for ( int i=0; i<msgs.length; i++ ) {
-                /**
-                 * Use send ack here if you want to ack the request to the remote 
-                 * server before completing the request
-                 * This is considered an asynchronized request
-                 */
-                if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
-                try {
-                    //process the message
-                    getCallback().messageDataReceived(msgs[i]);
-                    /**
-                     * Use send ack here if you want the request to complete on this
-                     * server before sending the ack to the remote server
-                     * This is considered a synchronized request
-                     */
-                    if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
-                }catch  ( Exception x ) {
-                    if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.FAIL_ACK_COMMAND);
-                    log.error("Error thrown from messageDataReceived.",x);
-                }
-                if ( getUseBufferPool() ) {
-                    BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
-                    msgs[i].setMessage(null);
-                }
-            }                       
-        }
-
-       
-    }
-
-    /**
-     * The actual code which drains the channel associated with
-     * the given key.  This method assumes the key has been
-     * modified prior to invocation to turn off selection
-     * interest in OP_READ.  When this method completes it
-     * re-enables OP_READ and calls wakeup() on the selector
-     * so the selector will resume watching this channel.
-     */
-    protected void drainSocket () throws Exception {
-        InputStream in = socket.getInputStream();
-        // loop while data available, channel is non-blocking
-        byte[] buf = new byte[1024];
-        int length = in.read(buf);
-        while ( length >= 0 ) {
-            int count = reader.append(buf,0,length,true);
-            if ( count > 0 ) execute(reader);
-            length = in.read(buf);
-        }
-    }
-
-
-
-
-    /**
-     * send a reply-acknowledgement (6,2,3)
-     * @param key
-     * @param channel
-     */
-    protected void sendAck(byte[] command) {
-        try {
-            OutputStream out = socket.getOutputStream();
-            out.write(command);
-            out.flush();
-            if (log.isTraceEnabled()) {
-                log.trace("ACK sent to " + socket.getPort());
-            }
-        } catch ( java.io.IOException x ) {
-            log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage());
-        }
-    }
-    
-    public void close() {
-        setDoRun(false);
-        try {socket.close();}catch ( Exception ignore){}
-        try {reader.close();}catch ( Exception ignore){}
-        reader = null;
-        socket = null;
-        super.close();
-    }
-}
index 0c54776..1c0b998 100644 (file)
@@ -110,7 +110,7 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv
     }
     
     public AbstractRxTask getWorkerThread() {
-        NioReplicationThread thread = new NioReplicationThread(this,this);
+        NioReplicationTask thread = new NioReplicationTask(this,this);
         thread.setUseBufferPool(this.getUseBufferPool());
         thread.setRxBufSize(getRxBufSize());
         thread.setOptions(getWorkerThreadOptions());
@@ -365,7 +365,7 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv
      *  will then de-register the channel on the next select call.
      */
     protected void readDataFromSocket(SelectionKey key) throws Exception {
-        NioReplicationThread worker = (NioReplicationThread) getPool().getWorker();
+        NioReplicationTask worker = (NioReplicationTask) getPool().getWorker();
         if (worker == null) {
             // No threads available, do nothing, the selection
             // loop will keep calling this method until a
diff --git a/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java b/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
new file mode 100644 (file)
index 0000000..529b369
--- /dev/null
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.transport.nio;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import org.apache.catalina.tribes.io.ObjectReader;
+import org.apache.catalina.tribes.transport.Constants;
+import org.apache.catalina.tribes.transport.AbstractRxTask;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.io.ListenCallback;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.BufferPool;
+import java.nio.channels.CancelledKeyException;
+import org.apache.catalina.tribes.UniqueId;
+import org.apache.catalina.tribes.RemoteProcessException;
+import org.apache.catalina.tribes.util.Logs;
+
+/**
+ * A worker thread class which can drain channels and echo-back the input. Each
+ * instance is constructed with a reference to the owning thread pool object.
+ * When started, the thread loops forever waiting to be awakened to service the
+ * channel associated with a SelectionKey object. The worker is tasked by
+ * calling its serviceChannel() method with a SelectionKey object. The
+ * serviceChannel() method stores the key reference in the thread object then
+ * calls notify() to wake it up. When the channel has been drained, the worker
+ * thread returns itself to its parent pool.
+ * 
+ * @author Filip Hanik
+ * 
+ * @version $Revision$, $Date$
+ */
+public class NioReplicationTask extends AbstractRxTask {
+    
+    private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog( NioReplicationTask.class );
+    
+    private ByteBuffer buffer = null;
+    private SelectionKey key;
+    private int rxBufSize;
+    private NioReceiver receiver;
+    public NioReplicationTask (ListenCallback callback, NioReceiver receiver)
+    {
+        super(callback);
+        this.receiver = receiver;
+    }
+
+    // loop forever waiting for work to do
+    public synchronized void run() { 
+        this.notify();
+        if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER ) {
+            buffer = ByteBuffer.allocateDirect(getRxBufSize());
+        }else {
+            buffer = ByteBuffer.allocate (getRxBufSize());
+        }
+        while (isDoRun()) {
+            try {
+                // sleep and release object lock
+                this.wait();
+            } catch (InterruptedException e) {
+                if(log.isInfoEnabled()) log.info("TCP worker thread interrupted in cluster",e);
+                // clear interrupt status
+                Thread.interrupted();
+            }
+            if (key == null) {
+                continue;      // just in case
+            }
+            if ( log.isTraceEnabled() ) 
+                log.trace("Servicing key:"+key);
+
+            try {
+                ObjectReader reader = (ObjectReader)key.attachment();
+                if ( reader == null ) {
+                    if ( log.isTraceEnabled() ) 
+                        log.trace("No object reader, cancelling:"+key);
+                    cancelKey(key);
+                } else {
+                    if ( log.isTraceEnabled() ) 
+                        log.trace("Draining channel:"+key);
+
+                    drainChannel(key, reader);
+                }
+            } catch (Exception e) {
+                //this is common, since the sockets on the other
+                //end expire after a certain time.
+                if ( e instanceof CancelledKeyException ) {
+                    //do nothing
+                } else if ( e instanceof IOException ) {
+                    //dont spew out stack traces for IO exceptions unless debug is enabled.
+                    if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].", e);
+                    else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].");
+                } else if ( log.isErrorEnabled() ) {
+                    //this is a real error, log it.
+                    log.error("Exception caught in TcpReplicationThread.drainChannel.",e);
+                } 
+                cancelKey(key);
+            } finally {
+                
+            }
+            key = null;
+            // done, ready for more, return to pool
+            getPool().returnWorker (this);
+        }
+    }
+
+    /**
+     * Called to initiate a unit of work by this worker thread
+     * on the provided SelectionKey object.  This method is
+     * synchronized, as is the run() method, so only one key
+     * can be serviced at a given time.
+     * Before waking the worker thread, and before returning
+     * to the main selection loop, this key's interest set is
+     * updated to remove OP_READ.  This will cause the selector
+     * to ignore read-readiness for this channel while the
+     * worker thread is servicing it.
+     */
+    public synchronized void serviceChannel (SelectionKey key) {
+        if ( log.isTraceEnabled() ) 
+            log.trace("About to service key:"+key);
+        ObjectReader reader = (ObjectReader)key.attachment();
+        if ( reader != null ) reader.setLastAccess(System.currentTimeMillis());
+        this.key = key;
+        key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
+        key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
+        this.notify();         // awaken the thread
+    }
+
+    /**
+     * The actual code which drains the channel associated with
+     * the given key.  This method assumes the key has been
+     * modified prior to invocation to turn off selection
+     * interest in OP_READ.  When this method completes it
+     * re-enables OP_READ and calls wakeup() on the selector
+     * so the selector will resume watching this channel.
+     */
+    protected void drainChannel (final SelectionKey key, ObjectReader reader) throws Exception {
+        reader.setLastAccess(System.currentTimeMillis());
+        reader.access();
+        SocketChannel channel = (SocketChannel) key.channel();
+        int count;
+        buffer.clear();                        // make buffer empty
+
+        // loop while data available, channel is non-blocking
+        while ((count = channel.read (buffer)) > 0) {
+            buffer.flip();             // make buffer readable
+            if ( buffer.hasArray() ) 
+                reader.append(buffer.array(),0,count,false);
+            else 
+                reader.append(buffer,count,false);
+            buffer.clear();            // make buffer empty
+            //do we have at least one package?
+            if ( reader.hasPackage() ) break;
+        }
+
+        int pkgcnt = reader.count();
+        
+        if (count < 0 && pkgcnt == 0 ) {
+            //end of stream, and no more packages to process
+            remoteEof(key);
+            return;
+        }
+        
+        ChannelMessage[] msgs = pkgcnt == 0? ChannelData.EMPTY_DATA_ARRAY : reader.execute();
+        
+        registerForRead(key,reader);//register to read new data, before we send it off to avoid dead locks
+        
+        for ( int i=0; i<msgs.length; i++ ) {
+            /**
+             * Use send ack here if you want to ack the request to the remote 
+             * server before completing the request
+             * This is considered an asynchronized request
+             */
+            if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
+            try {
+                if ( Logs.MESSAGES.isTraceEnabled() ) {
+                    try {
+                        Logs.MESSAGES.trace("NioReplicationThread - Received msg:" + new UniqueId(msgs[i].getUniqueId()) + " at " + new java.sql.Timestamp(System.currentTimeMillis()));
+                    }catch ( Throwable t ) {}
+                }
+                //process the message
+                getCallback().messageDataReceived(msgs[i]);
+                /**
+                 * Use send ack here if you want the request to complete on this 
+                 * server before sending the ack to the remote server
+                 * This is considered a synchronized request
+                 */
+                if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
+            }catch ( RemoteProcessException e ) {
+                if ( log.isDebugEnabled() ) log.error("Processing of cluster message failed.",e);
+                if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
+            }catch ( Exception e ) {
+                log.error("Processing of cluster message failed.",e);
+                if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
+            }
+            if ( getUseBufferPool() ) {
+                BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
+                msgs[i].setMessage(null);
+            }
+        }                        
+        
+        if (count < 0) {
+            remoteEof(key);
+            return;
+        }
+    }
+
+    private void remoteEof(SelectionKey key) {
+        // close channel on EOF, invalidates the key
+        if ( log.isDebugEnabled() ) log.debug("Channel closed on the remote end, disconnecting");
+        cancelKey(key);
+    }
+
+    protected void registerForRead(final SelectionKey key, ObjectReader reader) {
+        if ( log.isTraceEnabled() ) 
+            log.trace("Adding key for read event:"+key);
+        reader.finish();
+        //register our OP_READ interest
+        Runnable r = new Runnable() {
+            public void run() {
+                try {
+                    if (key.isValid()) {
+                        // cycle the selector so this key is active again
+                        key.selector().wakeup();
+                        // resume interest in OP_READ, OP_WRITE
+                        int resumeOps = key.interestOps() | SelectionKey.OP_READ;
+                        key.interestOps(resumeOps);
+                        if ( log.isTraceEnabled() ) 
+                            log.trace("Registering key for read:"+key);
+                    }
+                } catch (CancelledKeyException ckx ) {
+                    NioReceiver.cancelledKey(key);
+                    if ( log.isTraceEnabled() ) 
+                        log.trace("CKX Cancelling key:"+key);
+
+                } catch (Exception x) {
+                    log.error("Error registering key for read:"+key,x);
+                }
+            }
+        };
+        receiver.addEvent(r);
+    }
+
+    private void cancelKey(final SelectionKey key) {
+        if ( log.isTraceEnabled() ) 
+            log.trace("Adding key for cancel event:"+key);
+
+        ObjectReader reader = (ObjectReader)key.attachment();
+        if ( reader != null ) {
+            reader.setCancelled(true);
+            reader.finish();
+        }
+        Runnable cx = new Runnable() {
+            public void run() {
+                if ( log.isTraceEnabled() ) 
+                    log.trace("Cancelling key:"+key);
+
+                NioReceiver.cancelledKey(key);
+            }
+        };
+        receiver.addEvent(cx);
+    }
+    
+    
+
+
+
+    /**
+     * send a reply-acknowledgement (6,2,3)
+     * @param key
+     * @param channel
+     */
+    protected void sendAck(SelectionKey key, SocketChannel channel, byte[] command) {
+        
+        try {
+            ByteBuffer buf = ByteBuffer.wrap(command);
+            int total = 0;
+            while ( total < command.length ) {
+                total += channel.write(buf);
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("ACK sent to " + channel.socket().getPort());
+            }
+        } catch ( java.io.IOException x ) {
+            log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage());
+        }
+    }
+
+    public void setRxBufSize(int rxBufSize) {
+        this.rxBufSize = rxBufSize;
+    }
+
+    public int getRxBufSize() {
+        return rxBufSize;
+    }
+}
diff --git a/java/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java b/java/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
deleted file mode 100644 (file)
index 6f3c75f..0000000
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.transport.nio;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-
-import org.apache.catalina.tribes.io.ObjectReader;
-import org.apache.catalina.tribes.transport.Constants;
-import org.apache.catalina.tribes.transport.AbstractRxTask;
-import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.io.ListenCallback;
-import org.apache.catalina.tribes.io.ChannelData;
-import org.apache.catalina.tribes.io.BufferPool;
-import java.nio.channels.CancelledKeyException;
-import org.apache.catalina.tribes.UniqueId;
-import org.apache.catalina.tribes.RemoteProcessException;
-import org.apache.catalina.tribes.util.Logs;
-
-/**
- * A worker thread class which can drain channels and echo-back the input. Each
- * instance is constructed with a reference to the owning thread pool object.
- * When started, the thread loops forever waiting to be awakened to service the
- * channel associated with a SelectionKey object. The worker is tasked by
- * calling its serviceChannel() method with a SelectionKey object. The
- * serviceChannel() method stores the key reference in the thread object then
- * calls notify() to wake it up. When the channel has been drained, the worker
- * thread returns itself to its parent pool.
- * 
- * @author Filip Hanik
- * 
- * @version $Revision$, $Date$
- */
-public class NioReplicationThread extends AbstractRxTask {
-    
-    private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog( NioReplicationThread.class );
-    
-    private ByteBuffer buffer = null;
-    private SelectionKey key;
-    private int rxBufSize;
-    private NioReceiver receiver;
-    public NioReplicationThread (ListenCallback callback, NioReceiver receiver)
-    {
-        super(callback);
-        this.receiver = receiver;
-    }
-
-    // loop forever waiting for work to do
-    public synchronized void run() { 
-        this.notify();
-        if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER ) {
-            buffer = ByteBuffer.allocateDirect(getRxBufSize());
-        }else {
-            buffer = ByteBuffer.allocate (getRxBufSize());
-        }
-        while (isDoRun()) {
-            try {
-                // sleep and release object lock
-                this.wait();
-            } catch (InterruptedException e) {
-                if(log.isInfoEnabled()) log.info("TCP worker thread interrupted in cluster",e);
-                // clear interrupt status
-                Thread.interrupted();
-            }
-            if (key == null) {
-                continue;      // just in case
-            }
-            if ( log.isTraceEnabled() ) 
-                log.trace("Servicing key:"+key);
-
-            try {
-                ObjectReader reader = (ObjectReader)key.attachment();
-                if ( reader == null ) {
-                    if ( log.isTraceEnabled() ) 
-                        log.trace("No object reader, cancelling:"+key);
-                    cancelKey(key);
-                } else {
-                    if ( log.isTraceEnabled() ) 
-                        log.trace("Draining channel:"+key);
-
-                    drainChannel(key, reader);
-                }
-            } catch (Exception e) {
-                //this is common, since the sockets on the other
-                //end expire after a certain time.
-                if ( e instanceof CancelledKeyException ) {
-                    //do nothing
-                } else if ( e instanceof IOException ) {
-                    //dont spew out stack traces for IO exceptions unless debug is enabled.
-                    if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].", e);
-                    else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].");
-                } else if ( log.isErrorEnabled() ) {
-                    //this is a real error, log it.
-                    log.error("Exception caught in TcpReplicationThread.drainChannel.",e);
-                } 
-                cancelKey(key);
-            } finally {
-                
-            }
-            key = null;
-            // done, ready for more, return to pool
-            getPool().returnWorker (this);
-        }
-    }
-
-    /**
-     * Called to initiate a unit of work by this worker thread
-     * on the provided SelectionKey object.  This method is
-     * synchronized, as is the run() method, so only one key
-     * can be serviced at a given time.
-     * Before waking the worker thread, and before returning
-     * to the main selection loop, this key's interest set is
-     * updated to remove OP_READ.  This will cause the selector
-     * to ignore read-readiness for this channel while the
-     * worker thread is servicing it.
-     */
-    public synchronized void serviceChannel (SelectionKey key) {
-        if ( log.isTraceEnabled() ) 
-            log.trace("About to service key:"+key);
-        ObjectReader reader = (ObjectReader)key.attachment();
-        if ( reader != null ) reader.setLastAccess(System.currentTimeMillis());
-        this.key = key;
-        key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
-        key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
-        this.notify();         // awaken the thread
-    }
-
-    /**
-     * The actual code which drains the channel associated with
-     * the given key.  This method assumes the key has been
-     * modified prior to invocation to turn off selection
-     * interest in OP_READ.  When this method completes it
-     * re-enables OP_READ and calls wakeup() on the selector
-     * so the selector will resume watching this channel.
-     */
-    protected void drainChannel (final SelectionKey key, ObjectReader reader) throws Exception {
-        reader.setLastAccess(System.currentTimeMillis());
-        reader.access();
-        SocketChannel channel = (SocketChannel) key.channel();
-        int count;
-        buffer.clear();                        // make buffer empty
-
-        // loop while data available, channel is non-blocking
-        while ((count = channel.read (buffer)) > 0) {
-            buffer.flip();             // make buffer readable
-            if ( buffer.hasArray() ) 
-                reader.append(buffer.array(),0,count,false);
-            else 
-                reader.append(buffer,count,false);
-            buffer.clear();            // make buffer empty
-            //do we have at least one package?
-            if ( reader.hasPackage() ) break;
-        }
-
-        int pkgcnt = reader.count();
-        
-        if (count < 0 && pkgcnt == 0 ) {
-            //end of stream, and no more packages to process
-            remoteEof(key);
-            return;
-        }
-        
-        ChannelMessage[] msgs = pkgcnt == 0? ChannelData.EMPTY_DATA_ARRAY : reader.execute();
-        
-        registerForRead(key,reader);//register to read new data, before we send it off to avoid dead locks
-        
-        for ( int i=0; i<msgs.length; i++ ) {
-            /**
-             * Use send ack here if you want to ack the request to the remote 
-             * server before completing the request
-             * This is considered an asynchronized request
-             */
-            if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
-            try {
-                if ( Logs.MESSAGES.isTraceEnabled() ) {
-                    try {
-                        Logs.MESSAGES.trace("NioReplicationThread - Received msg:" + new UniqueId(msgs[i].getUniqueId()) + " at " + new java.sql.Timestamp(System.currentTimeMillis()));
-                    }catch ( Throwable t ) {}
-                }
-                //process the message
-                getCallback().messageDataReceived(msgs[i]);
-                /**
-                 * Use send ack here if you want the request to complete on this 
-                 * server before sending the ack to the remote server
-                 * This is considered a synchronized request
-                 */
-                if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
-            }catch ( RemoteProcessException e ) {
-                if ( log.isDebugEnabled() ) log.error("Processing of cluster message failed.",e);
-                if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
-            }catch ( Exception e ) {
-                log.error("Processing of cluster message failed.",e);
-                if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
-            }
-            if ( getUseBufferPool() ) {
-                BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
-                msgs[i].setMessage(null);
-            }
-        }                        
-        
-        if (count < 0) {
-            remoteEof(key);
-            return;
-        }
-    }
-
-    private void remoteEof(SelectionKey key) {
-        // close channel on EOF, invalidates the key
-        if ( log.isDebugEnabled() ) log.debug("Channel closed on the remote end, disconnecting");
-        cancelKey(key);
-    }
-
-    protected void registerForRead(final SelectionKey key, ObjectReader reader) {
-        if ( log.isTraceEnabled() ) 
-            log.trace("Adding key for read event:"+key);
-        reader.finish();
-        //register our OP_READ interest
-        Runnable r = new Runnable() {
-            public void run() {
-                try {
-                    if (key.isValid()) {
-                        // cycle the selector so this key is active again
-                        key.selector().wakeup();
-                        // resume interest in OP_READ, OP_WRITE
-                        int resumeOps = key.interestOps() | SelectionKey.OP_READ;
-                        key.interestOps(resumeOps);
-                        if ( log.isTraceEnabled() ) 
-                            log.trace("Registering key for read:"+key);
-                    }
-                } catch (CancelledKeyException ckx ) {
-                    NioReceiver.cancelledKey(key);
-                    if ( log.isTraceEnabled() ) 
-                        log.trace("CKX Cancelling key:"+key);
-
-                } catch (Exception x) {
-                    log.error("Error registering key for read:"+key,x);
-                }
-            }
-        };
-        receiver.addEvent(r);
-    }
-
-    private void cancelKey(final SelectionKey key) {
-        if ( log.isTraceEnabled() ) 
-            log.trace("Adding key for cancel event:"+key);
-
-        ObjectReader reader = (ObjectReader)key.attachment();
-        if ( reader != null ) {
-            reader.setCancelled(true);
-            reader.finish();
-        }
-        Runnable cx = new Runnable() {
-            public void run() {
-                if ( log.isTraceEnabled() ) 
-                    log.trace("Cancelling key:"+key);
-
-                NioReceiver.cancelledKey(key);
-            }
-        };
-        receiver.addEvent(cx);
-    }
-    
-    
-
-
-
-    /**
-     * send a reply-acknowledgement (6,2,3)
-     * @param key
-     * @param channel
-     */
-    protected void sendAck(SelectionKey key, SocketChannel channel, byte[] command) {
-        
-        try {
-            ByteBuffer buf = ByteBuffer.wrap(command);
-            int total = 0;
-            while ( total < command.length ) {
-                total += channel.write(buf);
-            }
-            if (log.isTraceEnabled()) {
-                log.trace("ACK sent to " + channel.socket().getPort());
-            }
-        } catch ( java.io.IOException x ) {
-            log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage());
-        }
-    }
-
-    public void setRxBufSize(int rxBufSize) {
-        this.rxBufSize = rxBufSize;
-    }
-
-    public int getRxBufSize() {
-        return rxBufSize;
-    }
-}