return getReplicationThread();
}
- protected BioReplicationThread getReplicationThread() {
- BioReplicationThread result = new BioReplicationThread(this);
+ protected BioReplicationTask getReplicationThread() {
+ BioReplicationTask result = new BioReplicationTask(this);
result.setOptions(getWorkerThreadOptions());
result.setUseBufferPool(this.getUseBufferPool());
return result;
if ( log.isWarnEnabled() )
log.warn("All BIO server replication threads are busy, unable to handle more requests until a thread is freed up.");
}
- BioReplicationThread thread = (BioReplicationThread)getPool().getWorker();
+ BioReplicationTask thread = (BioReplicationTask)getPool().getWorker();
if ( thread == null ) continue; //should never happen
try {
socket = serverSocket.accept();
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.transport.bio;
+import java.io.IOException;
+
+import org.apache.catalina.tribes.io.ObjectReader;
+import org.apache.catalina.tribes.transport.Constants;
+import org.apache.catalina.tribes.transport.AbstractRxTask;
+import java.net.Socket;
+import java.io.InputStream;
+import org.apache.catalina.tribes.transport.ReceiverBase;
+import java.io.OutputStream;
+import org.apache.catalina.tribes.io.ListenCallback;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.BufferPool;
+
+/**
+ * A worker thread class which can drain channels and echo-back the input. Each
+ * instance is constructed with a reference to the owning thread pool object.
+ * When started, the thread loops forever waiting to be awakened to service the
+ * channel associated with a SelectionKey object. The worker is tasked by
+ * calling its serviceChannel() method with a SelectionKey object. The
+ * serviceChannel() method stores the key reference in the thread object then
+ * calls notify() to wake it up. When the channel has been drained, the worker
+ * thread returns itself to its parent pool.
+ *
+ * @author Filip Hanik
+ *
+ * @version $Revision$, $Date$
+ */
+public class BioReplicationTask extends AbstractRxTask {
+
+
+ protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog( BioReplicationTask.class );
+
+ protected Socket socket;
+ protected ObjectReader reader;
+
+ public BioReplicationTask (ListenCallback callback) {
+ super(callback);
+ }
+
+ // loop forever waiting for work to do
+ public synchronized void run()
+ {
+ this.notify();
+ while (isDoRun()) {
+ try {
+ // sleep and release object lock
+ this.wait();
+ } catch (InterruptedException e) {
+ if(log.isInfoEnabled())
+ log.info("TCP worker thread interrupted in cluster",e);
+ // clear interrupt status
+ Thread.interrupted();
+ }
+ if ( socket == null ) continue;
+ try {
+ drainSocket();
+ } catch ( Exception x ) {
+ log.error("Unable to service bio socket");
+ }finally {
+ try {socket.close();}catch ( Exception ignore){}
+ try {reader.close();}catch ( Exception ignore){}
+ reader = null;
+ socket = null;
+ }
+ // done, ready for more, return to pool
+ if ( getPool() != null ) getPool().returnWorker (this);
+ else setDoRun(false);
+ }
+ }
+
+
+ public synchronized void serviceSocket(Socket socket, ObjectReader reader) {
+ this.socket = socket;
+ this.reader = reader;
+ this.notify(); // awaken the thread
+ }
+
+ protected void execute(ObjectReader reader) throws Exception{
+ int pkgcnt = reader.count();
+
+ if ( pkgcnt > 0 ) {
+ ChannelMessage[] msgs = reader.execute();
+ for ( int i=0; i<msgs.length; i++ ) {
+ /**
+ * Use send ack here if you want to ack the request to the remote
+ * server before completing the request
+ * This is considered an asynchronized request
+ */
+ if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
+ try {
+ //process the message
+ getCallback().messageDataReceived(msgs[i]);
+ /**
+ * Use send ack here if you want the request to complete on this
+ * server before sending the ack to the remote server
+ * This is considered a synchronized request
+ */
+ if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
+ }catch ( Exception x ) {
+ if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.FAIL_ACK_COMMAND);
+ log.error("Error thrown from messageDataReceived.",x);
+ }
+ if ( getUseBufferPool() ) {
+ BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
+ msgs[i].setMessage(null);
+ }
+ }
+ }
+
+
+ }
+
+ /**
+ * The actual code which drains the channel associated with
+ * the given key. This method assumes the key has been
+ * modified prior to invocation to turn off selection
+ * interest in OP_READ. When this method completes it
+ * re-enables OP_READ and calls wakeup() on the selector
+ * so the selector will resume watching this channel.
+ */
+ protected void drainSocket () throws Exception {
+ InputStream in = socket.getInputStream();
+ // loop while data available, channel is non-blocking
+ byte[] buf = new byte[1024];
+ int length = in.read(buf);
+ while ( length >= 0 ) {
+ int count = reader.append(buf,0,length,true);
+ if ( count > 0 ) execute(reader);
+ length = in.read(buf);
+ }
+ }
+
+
+
+
+ /**
+ * send a reply-acknowledgement (6,2,3)
+ * @param key
+ * @param channel
+ */
+ protected void sendAck(byte[] command) {
+ try {
+ OutputStream out = socket.getOutputStream();
+ out.write(command);
+ out.flush();
+ if (log.isTraceEnabled()) {
+ log.trace("ACK sent to " + socket.getPort());
+ }
+ } catch ( java.io.IOException x ) {
+ log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage());
+ }
+ }
+
+ public void close() {
+ setDoRun(false);
+ try {socket.close();}catch ( Exception ignore){}
+ try {reader.close();}catch ( Exception ignore){}
+ reader = null;
+ socket = null;
+ super.close();
+ }
+}
+++ /dev/null
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.transport.bio;
-import java.io.IOException;
-
-import org.apache.catalina.tribes.io.ObjectReader;
-import org.apache.catalina.tribes.transport.Constants;
-import org.apache.catalina.tribes.transport.AbstractRxTask;
-import java.net.Socket;
-import java.io.InputStream;
-import org.apache.catalina.tribes.transport.ReceiverBase;
-import java.io.OutputStream;
-import org.apache.catalina.tribes.io.ListenCallback;
-import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.io.ChannelData;
-import org.apache.catalina.tribes.io.BufferPool;
-
-/**
- * A worker thread class which can drain channels and echo-back the input. Each
- * instance is constructed with a reference to the owning thread pool object.
- * When started, the thread loops forever waiting to be awakened to service the
- * channel associated with a SelectionKey object. The worker is tasked by
- * calling its serviceChannel() method with a SelectionKey object. The
- * serviceChannel() method stores the key reference in the thread object then
- * calls notify() to wake it up. When the channel has been drained, the worker
- * thread returns itself to its parent pool.
- *
- * @author Filip Hanik
- *
- * @version $Revision$, $Date$
- */
-public class BioReplicationThread extends AbstractRxTask {
-
-
- protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog( BioReplicationThread.class );
-
- protected Socket socket;
- protected ObjectReader reader;
-
- public BioReplicationThread (ListenCallback callback) {
- super(callback);
- }
-
- // loop forever waiting for work to do
- public synchronized void run()
- {
- this.notify();
- while (isDoRun()) {
- try {
- // sleep and release object lock
- this.wait();
- } catch (InterruptedException e) {
- if(log.isInfoEnabled())
- log.info("TCP worker thread interrupted in cluster",e);
- // clear interrupt status
- Thread.interrupted();
- }
- if ( socket == null ) continue;
- try {
- drainSocket();
- } catch ( Exception x ) {
- log.error("Unable to service bio socket");
- }finally {
- try {socket.close();}catch ( Exception ignore){}
- try {reader.close();}catch ( Exception ignore){}
- reader = null;
- socket = null;
- }
- // done, ready for more, return to pool
- if ( getPool() != null ) getPool().returnWorker (this);
- else setDoRun(false);
- }
- }
-
-
- public synchronized void serviceSocket(Socket socket, ObjectReader reader) {
- this.socket = socket;
- this.reader = reader;
- this.notify(); // awaken the thread
- }
-
- protected void execute(ObjectReader reader) throws Exception{
- int pkgcnt = reader.count();
-
- if ( pkgcnt > 0 ) {
- ChannelMessage[] msgs = reader.execute();
- for ( int i=0; i<msgs.length; i++ ) {
- /**
- * Use send ack here if you want to ack the request to the remote
- * server before completing the request
- * This is considered an asynchronized request
- */
- if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
- try {
- //process the message
- getCallback().messageDataReceived(msgs[i]);
- /**
- * Use send ack here if you want the request to complete on this
- * server before sending the ack to the remote server
- * This is considered a synchronized request
- */
- if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND);
- }catch ( Exception x ) {
- if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.FAIL_ACK_COMMAND);
- log.error("Error thrown from messageDataReceived.",x);
- }
- if ( getUseBufferPool() ) {
- BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
- msgs[i].setMessage(null);
- }
- }
- }
-
-
- }
-
- /**
- * The actual code which drains the channel associated with
- * the given key. This method assumes the key has been
- * modified prior to invocation to turn off selection
- * interest in OP_READ. When this method completes it
- * re-enables OP_READ and calls wakeup() on the selector
- * so the selector will resume watching this channel.
- */
- protected void drainSocket () throws Exception {
- InputStream in = socket.getInputStream();
- // loop while data available, channel is non-blocking
- byte[] buf = new byte[1024];
- int length = in.read(buf);
- while ( length >= 0 ) {
- int count = reader.append(buf,0,length,true);
- if ( count > 0 ) execute(reader);
- length = in.read(buf);
- }
- }
-
-
-
-
- /**
- * send a reply-acknowledgement (6,2,3)
- * @param key
- * @param channel
- */
- protected void sendAck(byte[] command) {
- try {
- OutputStream out = socket.getOutputStream();
- out.write(command);
- out.flush();
- if (log.isTraceEnabled()) {
- log.trace("ACK sent to " + socket.getPort());
- }
- } catch ( java.io.IOException x ) {
- log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage());
- }
- }
-
- public void close() {
- setDoRun(false);
- try {socket.close();}catch ( Exception ignore){}
- try {reader.close();}catch ( Exception ignore){}
- reader = null;
- socket = null;
- super.close();
- }
-}
}
public AbstractRxTask getWorkerThread() {
- NioReplicationThread thread = new NioReplicationThread(this,this);
+ NioReplicationTask thread = new NioReplicationTask(this,this);
thread.setUseBufferPool(this.getUseBufferPool());
thread.setRxBufSize(getRxBufSize());
thread.setOptions(getWorkerThreadOptions());
* will then de-register the channel on the next select call.
*/
protected void readDataFromSocket(SelectionKey key) throws Exception {
- NioReplicationThread worker = (NioReplicationThread) getPool().getWorker();
+ NioReplicationTask worker = (NioReplicationTask) getPool().getWorker();
if (worker == null) {
// No threads available, do nothing, the selection
// loop will keep calling this method until a
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.transport.nio;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import org.apache.catalina.tribes.io.ObjectReader;
+import org.apache.catalina.tribes.transport.Constants;
+import org.apache.catalina.tribes.transport.AbstractRxTask;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.io.ListenCallback;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.BufferPool;
+import java.nio.channels.CancelledKeyException;
+import org.apache.catalina.tribes.UniqueId;
+import org.apache.catalina.tribes.RemoteProcessException;
+import org.apache.catalina.tribes.util.Logs;
+
+/**
+ * A worker thread class which can drain channels and echo-back the input. Each
+ * instance is constructed with a reference to the owning thread pool object.
+ * When started, the thread loops forever waiting to be awakened to service the
+ * channel associated with a SelectionKey object. The worker is tasked by
+ * calling its serviceChannel() method with a SelectionKey object. The
+ * serviceChannel() method stores the key reference in the thread object then
+ * calls notify() to wake it up. When the channel has been drained, the worker
+ * thread returns itself to its parent pool.
+ *
+ * @author Filip Hanik
+ *
+ * @version $Revision$, $Date$
+ */
+public class NioReplicationTask extends AbstractRxTask {
+
+ private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog( NioReplicationTask.class );
+
+ private ByteBuffer buffer = null;
+ private SelectionKey key;
+ private int rxBufSize;
+ private NioReceiver receiver;
+ public NioReplicationTask (ListenCallback callback, NioReceiver receiver)
+ {
+ super(callback);
+ this.receiver = receiver;
+ }
+
+ // loop forever waiting for work to do
+ public synchronized void run() {
+ this.notify();
+ if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER ) {
+ buffer = ByteBuffer.allocateDirect(getRxBufSize());
+ }else {
+ buffer = ByteBuffer.allocate (getRxBufSize());
+ }
+ while (isDoRun()) {
+ try {
+ // sleep and release object lock
+ this.wait();
+ } catch (InterruptedException e) {
+ if(log.isInfoEnabled()) log.info("TCP worker thread interrupted in cluster",e);
+ // clear interrupt status
+ Thread.interrupted();
+ }
+ if (key == null) {
+ continue; // just in case
+ }
+ if ( log.isTraceEnabled() )
+ log.trace("Servicing key:"+key);
+
+ try {
+ ObjectReader reader = (ObjectReader)key.attachment();
+ if ( reader == null ) {
+ if ( log.isTraceEnabled() )
+ log.trace("No object reader, cancelling:"+key);
+ cancelKey(key);
+ } else {
+ if ( log.isTraceEnabled() )
+ log.trace("Draining channel:"+key);
+
+ drainChannel(key, reader);
+ }
+ } catch (Exception e) {
+ //this is common, since the sockets on the other
+ //end expire after a certain time.
+ if ( e instanceof CancelledKeyException ) {
+ //do nothing
+ } else if ( e instanceof IOException ) {
+ //dont spew out stack traces for IO exceptions unless debug is enabled.
+ if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].", e);
+ else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].");
+ } else if ( log.isErrorEnabled() ) {
+ //this is a real error, log it.
+ log.error("Exception caught in TcpReplicationThread.drainChannel.",e);
+ }
+ cancelKey(key);
+ } finally {
+
+ }
+ key = null;
+ // done, ready for more, return to pool
+ getPool().returnWorker (this);
+ }
+ }
+
+ /**
+ * Called to initiate a unit of work by this worker thread
+ * on the provided SelectionKey object. This method is
+ * synchronized, as is the run() method, so only one key
+ * can be serviced at a given time.
+ * Before waking the worker thread, and before returning
+ * to the main selection loop, this key's interest set is
+ * updated to remove OP_READ. This will cause the selector
+ * to ignore read-readiness for this channel while the
+ * worker thread is servicing it.
+ */
+ public synchronized void serviceChannel (SelectionKey key) {
+ if ( log.isTraceEnabled() )
+ log.trace("About to service key:"+key);
+ ObjectReader reader = (ObjectReader)key.attachment();
+ if ( reader != null ) reader.setLastAccess(System.currentTimeMillis());
+ this.key = key;
+ key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
+ key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
+ this.notify(); // awaken the thread
+ }
+
+ /**
+ * The actual code which drains the channel associated with
+ * the given key. This method assumes the key has been
+ * modified prior to invocation to turn off selection
+ * interest in OP_READ. When this method completes it
+ * re-enables OP_READ and calls wakeup() on the selector
+ * so the selector will resume watching this channel.
+ */
+ protected void drainChannel (final SelectionKey key, ObjectReader reader) throws Exception {
+ reader.setLastAccess(System.currentTimeMillis());
+ reader.access();
+ SocketChannel channel = (SocketChannel) key.channel();
+ int count;
+ buffer.clear(); // make buffer empty
+
+ // loop while data available, channel is non-blocking
+ while ((count = channel.read (buffer)) > 0) {
+ buffer.flip(); // make buffer readable
+ if ( buffer.hasArray() )
+ reader.append(buffer.array(),0,count,false);
+ else
+ reader.append(buffer,count,false);
+ buffer.clear(); // make buffer empty
+ //do we have at least one package?
+ if ( reader.hasPackage() ) break;
+ }
+
+ int pkgcnt = reader.count();
+
+ if (count < 0 && pkgcnt == 0 ) {
+ //end of stream, and no more packages to process
+ remoteEof(key);
+ return;
+ }
+
+ ChannelMessage[] msgs = pkgcnt == 0? ChannelData.EMPTY_DATA_ARRAY : reader.execute();
+
+ registerForRead(key,reader);//register to read new data, before we send it off to avoid dead locks
+
+ for ( int i=0; i<msgs.length; i++ ) {
+ /**
+ * Use send ack here if you want to ack the request to the remote
+ * server before completing the request
+ * This is considered an asynchronized request
+ */
+ if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
+ try {
+ if ( Logs.MESSAGES.isTraceEnabled() ) {
+ try {
+ Logs.MESSAGES.trace("NioReplicationThread - Received msg:" + new UniqueId(msgs[i].getUniqueId()) + " at " + new java.sql.Timestamp(System.currentTimeMillis()));
+ }catch ( Throwable t ) {}
+ }
+ //process the message
+ getCallback().messageDataReceived(msgs[i]);
+ /**
+ * Use send ack here if you want the request to complete on this
+ * server before sending the ack to the remote server
+ * This is considered a synchronized request
+ */
+ if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
+ }catch ( RemoteProcessException e ) {
+ if ( log.isDebugEnabled() ) log.error("Processing of cluster message failed.",e);
+ if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
+ }catch ( Exception e ) {
+ log.error("Processing of cluster message failed.",e);
+ if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
+ }
+ if ( getUseBufferPool() ) {
+ BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
+ msgs[i].setMessage(null);
+ }
+ }
+
+ if (count < 0) {
+ remoteEof(key);
+ return;
+ }
+ }
+
+ private void remoteEof(SelectionKey key) {
+ // close channel on EOF, invalidates the key
+ if ( log.isDebugEnabled() ) log.debug("Channel closed on the remote end, disconnecting");
+ cancelKey(key);
+ }
+
+ protected void registerForRead(final SelectionKey key, ObjectReader reader) {
+ if ( log.isTraceEnabled() )
+ log.trace("Adding key for read event:"+key);
+ reader.finish();
+ //register our OP_READ interest
+ Runnable r = new Runnable() {
+ public void run() {
+ try {
+ if (key.isValid()) {
+ // cycle the selector so this key is active again
+ key.selector().wakeup();
+ // resume interest in OP_READ, OP_WRITE
+ int resumeOps = key.interestOps() | SelectionKey.OP_READ;
+ key.interestOps(resumeOps);
+ if ( log.isTraceEnabled() )
+ log.trace("Registering key for read:"+key);
+ }
+ } catch (CancelledKeyException ckx ) {
+ NioReceiver.cancelledKey(key);
+ if ( log.isTraceEnabled() )
+ log.trace("CKX Cancelling key:"+key);
+
+ } catch (Exception x) {
+ log.error("Error registering key for read:"+key,x);
+ }
+ }
+ };
+ receiver.addEvent(r);
+ }
+
+ private void cancelKey(final SelectionKey key) {
+ if ( log.isTraceEnabled() )
+ log.trace("Adding key for cancel event:"+key);
+
+ ObjectReader reader = (ObjectReader)key.attachment();
+ if ( reader != null ) {
+ reader.setCancelled(true);
+ reader.finish();
+ }
+ Runnable cx = new Runnable() {
+ public void run() {
+ if ( log.isTraceEnabled() )
+ log.trace("Cancelling key:"+key);
+
+ NioReceiver.cancelledKey(key);
+ }
+ };
+ receiver.addEvent(cx);
+ }
+
+
+
+
+
+ /**
+ * send a reply-acknowledgement (6,2,3)
+ * @param key
+ * @param channel
+ */
+ protected void sendAck(SelectionKey key, SocketChannel channel, byte[] command) {
+
+ try {
+ ByteBuffer buf = ByteBuffer.wrap(command);
+ int total = 0;
+ while ( total < command.length ) {
+ total += channel.write(buf);
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("ACK sent to " + channel.socket().getPort());
+ }
+ } catch ( java.io.IOException x ) {
+ log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage());
+ }
+ }
+
+ public void setRxBufSize(int rxBufSize) {
+ this.rxBufSize = rxBufSize;
+ }
+
+ public int getRxBufSize() {
+ return rxBufSize;
+ }
+}
+++ /dev/null
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.transport.nio;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-
-import org.apache.catalina.tribes.io.ObjectReader;
-import org.apache.catalina.tribes.transport.Constants;
-import org.apache.catalina.tribes.transport.AbstractRxTask;
-import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.io.ListenCallback;
-import org.apache.catalina.tribes.io.ChannelData;
-import org.apache.catalina.tribes.io.BufferPool;
-import java.nio.channels.CancelledKeyException;
-import org.apache.catalina.tribes.UniqueId;
-import org.apache.catalina.tribes.RemoteProcessException;
-import org.apache.catalina.tribes.util.Logs;
-
-/**
- * A worker thread class which can drain channels and echo-back the input. Each
- * instance is constructed with a reference to the owning thread pool object.
- * When started, the thread loops forever waiting to be awakened to service the
- * channel associated with a SelectionKey object. The worker is tasked by
- * calling its serviceChannel() method with a SelectionKey object. The
- * serviceChannel() method stores the key reference in the thread object then
- * calls notify() to wake it up. When the channel has been drained, the worker
- * thread returns itself to its parent pool.
- *
- * @author Filip Hanik
- *
- * @version $Revision$, $Date$
- */
-public class NioReplicationThread extends AbstractRxTask {
-
- private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog( NioReplicationThread.class );
-
- private ByteBuffer buffer = null;
- private SelectionKey key;
- private int rxBufSize;
- private NioReceiver receiver;
- public NioReplicationThread (ListenCallback callback, NioReceiver receiver)
- {
- super(callback);
- this.receiver = receiver;
- }
-
- // loop forever waiting for work to do
- public synchronized void run() {
- this.notify();
- if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER ) {
- buffer = ByteBuffer.allocateDirect(getRxBufSize());
- }else {
- buffer = ByteBuffer.allocate (getRxBufSize());
- }
- while (isDoRun()) {
- try {
- // sleep and release object lock
- this.wait();
- } catch (InterruptedException e) {
- if(log.isInfoEnabled()) log.info("TCP worker thread interrupted in cluster",e);
- // clear interrupt status
- Thread.interrupted();
- }
- if (key == null) {
- continue; // just in case
- }
- if ( log.isTraceEnabled() )
- log.trace("Servicing key:"+key);
-
- try {
- ObjectReader reader = (ObjectReader)key.attachment();
- if ( reader == null ) {
- if ( log.isTraceEnabled() )
- log.trace("No object reader, cancelling:"+key);
- cancelKey(key);
- } else {
- if ( log.isTraceEnabled() )
- log.trace("Draining channel:"+key);
-
- drainChannel(key, reader);
- }
- } catch (Exception e) {
- //this is common, since the sockets on the other
- //end expire after a certain time.
- if ( e instanceof CancelledKeyException ) {
- //do nothing
- } else if ( e instanceof IOException ) {
- //dont spew out stack traces for IO exceptions unless debug is enabled.
- if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].", e);
- else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].");
- } else if ( log.isErrorEnabled() ) {
- //this is a real error, log it.
- log.error("Exception caught in TcpReplicationThread.drainChannel.",e);
- }
- cancelKey(key);
- } finally {
-
- }
- key = null;
- // done, ready for more, return to pool
- getPool().returnWorker (this);
- }
- }
-
- /**
- * Called to initiate a unit of work by this worker thread
- * on the provided SelectionKey object. This method is
- * synchronized, as is the run() method, so only one key
- * can be serviced at a given time.
- * Before waking the worker thread, and before returning
- * to the main selection loop, this key's interest set is
- * updated to remove OP_READ. This will cause the selector
- * to ignore read-readiness for this channel while the
- * worker thread is servicing it.
- */
- public synchronized void serviceChannel (SelectionKey key) {
- if ( log.isTraceEnabled() )
- log.trace("About to service key:"+key);
- ObjectReader reader = (ObjectReader)key.attachment();
- if ( reader != null ) reader.setLastAccess(System.currentTimeMillis());
- this.key = key;
- key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
- key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
- this.notify(); // awaken the thread
- }
-
- /**
- * The actual code which drains the channel associated with
- * the given key. This method assumes the key has been
- * modified prior to invocation to turn off selection
- * interest in OP_READ. When this method completes it
- * re-enables OP_READ and calls wakeup() on the selector
- * so the selector will resume watching this channel.
- */
- protected void drainChannel (final SelectionKey key, ObjectReader reader) throws Exception {
- reader.setLastAccess(System.currentTimeMillis());
- reader.access();
- SocketChannel channel = (SocketChannel) key.channel();
- int count;
- buffer.clear(); // make buffer empty
-
- // loop while data available, channel is non-blocking
- while ((count = channel.read (buffer)) > 0) {
- buffer.flip(); // make buffer readable
- if ( buffer.hasArray() )
- reader.append(buffer.array(),0,count,false);
- else
- reader.append(buffer,count,false);
- buffer.clear(); // make buffer empty
- //do we have at least one package?
- if ( reader.hasPackage() ) break;
- }
-
- int pkgcnt = reader.count();
-
- if (count < 0 && pkgcnt == 0 ) {
- //end of stream, and no more packages to process
- remoteEof(key);
- return;
- }
-
- ChannelMessage[] msgs = pkgcnt == 0? ChannelData.EMPTY_DATA_ARRAY : reader.execute();
-
- registerForRead(key,reader);//register to read new data, before we send it off to avoid dead locks
-
- for ( int i=0; i<msgs.length; i++ ) {
- /**
- * Use send ack here if you want to ack the request to the remote
- * server before completing the request
- * This is considered an asynchronized request
- */
- if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
- try {
- if ( Logs.MESSAGES.isTraceEnabled() ) {
- try {
- Logs.MESSAGES.trace("NioReplicationThread - Received msg:" + new UniqueId(msgs[i].getUniqueId()) + " at " + new java.sql.Timestamp(System.currentTimeMillis()));
- }catch ( Throwable t ) {}
- }
- //process the message
- getCallback().messageDataReceived(msgs[i]);
- /**
- * Use send ack here if you want the request to complete on this
- * server before sending the ack to the remote server
- * This is considered a synchronized request
- */
- if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
- }catch ( RemoteProcessException e ) {
- if ( log.isDebugEnabled() ) log.error("Processing of cluster message failed.",e);
- if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
- }catch ( Exception e ) {
- log.error("Processing of cluster message failed.",e);
- if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
- }
- if ( getUseBufferPool() ) {
- BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
- msgs[i].setMessage(null);
- }
- }
-
- if (count < 0) {
- remoteEof(key);
- return;
- }
- }
-
- private void remoteEof(SelectionKey key) {
- // close channel on EOF, invalidates the key
- if ( log.isDebugEnabled() ) log.debug("Channel closed on the remote end, disconnecting");
- cancelKey(key);
- }
-
- protected void registerForRead(final SelectionKey key, ObjectReader reader) {
- if ( log.isTraceEnabled() )
- log.trace("Adding key for read event:"+key);
- reader.finish();
- //register our OP_READ interest
- Runnable r = new Runnable() {
- public void run() {
- try {
- if (key.isValid()) {
- // cycle the selector so this key is active again
- key.selector().wakeup();
- // resume interest in OP_READ, OP_WRITE
- int resumeOps = key.interestOps() | SelectionKey.OP_READ;
- key.interestOps(resumeOps);
- if ( log.isTraceEnabled() )
- log.trace("Registering key for read:"+key);
- }
- } catch (CancelledKeyException ckx ) {
- NioReceiver.cancelledKey(key);
- if ( log.isTraceEnabled() )
- log.trace("CKX Cancelling key:"+key);
-
- } catch (Exception x) {
- log.error("Error registering key for read:"+key,x);
- }
- }
- };
- receiver.addEvent(r);
- }
-
- private void cancelKey(final SelectionKey key) {
- if ( log.isTraceEnabled() )
- log.trace("Adding key for cancel event:"+key);
-
- ObjectReader reader = (ObjectReader)key.attachment();
- if ( reader != null ) {
- reader.setCancelled(true);
- reader.finish();
- }
- Runnable cx = new Runnable() {
- public void run() {
- if ( log.isTraceEnabled() )
- log.trace("Cancelling key:"+key);
-
- NioReceiver.cancelledKey(key);
- }
- };
- receiver.addEvent(cx);
- }
-
-
-
-
-
- /**
- * send a reply-acknowledgement (6,2,3)
- * @param key
- * @param channel
- */
- protected void sendAck(SelectionKey key, SocketChannel channel, byte[] command) {
-
- try {
- ByteBuffer buf = ByteBuffer.wrap(command);
- int total = 0;
- while ( total < command.length ) {
- total += channel.write(buf);
- }
- if (log.isTraceEnabled()) {
- log.trace("ACK sent to " + channel.socket().getPort());
- }
- } catch ( java.io.IOException x ) {
- log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage());
- }
- }
-
- public void setRxBufSize(int rxBufSize) {
- this.rxBufSize = rxBufSize;
- }
-
- public int getRxBufSize() {
- return rxBufSize;
- }
-}