From eb5b739f39d27e630d8aa75357886ba0b53de70f Mon Sep 17 00:00:00 2001 From: fhanik Date: Mon, 18 Feb 2008 22:07:09 +0000 Subject: [PATCH] Starting to add in UDP support, still need to rethink how the sender is going to work git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@628881 13f79535-47bb-0310-9956-ffa450edef68 --- java/org/apache/catalina/tribes/Channel.java | 65 ++++++++++-------- .../apache/catalina/tribes/io/ObjectReader.java | 27 ++++---- .../catalina/tribes/transport/AbstractSender.java | 44 +++++++++--- .../catalina/tribes/transport/nio/NioReceiver.java | 52 +++++++++++---- .../tribes/transport/nio/NioReplicationTask.java | 78 ++++++++++++---------- .../catalina/tribes/transport/nio/NioSender.java | 50 +++++++------- .../tribes/test/channel/ChannelStartStop.java | 24 +++++-- 7 files changed, 210 insertions(+), 130 deletions(-) diff --git a/java/org/apache/catalina/tribes/Channel.java b/java/org/apache/catalina/tribes/Channel.java index 73760d2bf..617089e90 100644 --- a/java/org/apache/catalina/tribes/Channel.java +++ b/java/org/apache/catalina/tribes/Channel.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -56,20 +56,20 @@ import java.io.Serializable; * | * Coordinator (implements MessageListener,MembershipListener,ChannelInterceptor) * -------------------- - * / | \ + * / | \ * / | \ * / | \ * / | \ * / | \ * MembershipService ChannelSender ChannelReceiver [IO layer] * - * + * * For example usage @see org.apache.catalina.tribes.group.GroupChannel * @author Filip Hanik * @version $Revision$, $Date$ */ public interface Channel { - + /** * Start and stop sequences can be controlled by these constants * This allows you to start separate components of the channel
@@ -119,7 +119,7 @@ public interface Channel { * @see #stop(int) */ public static final int MBR_TX_SEQ = 8; - + /** * Send options, when a message is sent, it can have an option flag * to trigger certain behavior. Most flags are used to trigger channel interceptors @@ -127,7 +127,7 @@ public interface Channel { * However, there are five default flags that every channel implementation must implement
* SEND_OPTIONS_BYTE_MESSAGE - The message is a pure byte message and no marshalling or unmarshalling will * be performed.
- * + * * @see #send(Member[], Serializable , int) * @see #send(Member[], Serializable, int, ErrorHandler) */ @@ -150,27 +150,27 @@ public interface Channel { * to trigger certain behavior. Most flags are used to trigger channel interceptors * as the message passes through the channel stack.
* However, there are five default flags that every channel implementation must implement
- * SEND_OPTIONS_SYNCHRONIZED_ACK - Message is sent and an ACK is received when the message has been received and + * SEND_OPTIONS_SYNCHRONIZED_ACK - Message is sent and an ACK is received when the message has been received and * processed by the recipient
* If no ack is received, the message is not considered successful
* @see #send(Member[], Serializable , int) * @see #send(Member[], Serializable, int, ErrorHandler) */ public static final int SEND_OPTIONS_SYNCHRONIZED_ACK = 0x0004; - + /** * Send options, when a message is sent, it can have an option flag * to trigger certain behavior. Most flags are used to trigger channel interceptors * as the message passes through the channel stack.
* However, there are five default flags that every channel implementation must implement
- * SEND_OPTIONS_ASYNCHRONOUS - Message is sent and an ACK is received when the message has been received and + * SEND_OPTIONS_ASYNCHRONOUS - Message is sent and an ACK is received when the message has been received and * processed by the recipient
* If no ack is received, the message is not considered successful
* @see #send(Member[], Serializable , int) * @see #send(Member[], Serializable, int, ErrorHandler) */ public static final int SEND_OPTIONS_ASYNCHRONOUS = 0x0008; - + /** * Send options, when a message is sent, it can have an option flag * to trigger certain behavior. Most flags are used to trigger channel interceptors @@ -181,7 +181,14 @@ public interface Channel { * @see #send(Member[], Serializable, int, ErrorHandler) */ public static final int SEND_OPTIONS_SECURE = 0x0010; - + + /** + * Send options. When a message is sent with this flag on + * the system sends the message using UDP instead of TCP + * @see #send(Member[], Serializable , int) + * @see #send(Member[], Serializable, int, ErrorHandler) + */ + public static final int SEND_OPTIONS_UDP = 0x0020; /** * Send options, when a message is sent, it can have an option flag @@ -196,13 +203,13 @@ public interface Channel { */ public static final int SEND_OPTIONS_DEFAULT = SEND_OPTIONS_USE_ACK; - + /** * Adds an interceptor to the channel message chain. * @param interceptor ChannelInterceptor */ public void addInterceptor(ChannelInterceptor interceptor); - + /** * Starts up the channel. This can be called multiple times for individual services to start * The svc parameter can be the logical or value of any constants @@ -212,7 +219,7 @@ public interface Channel { * MBR_TX_SEQ - starts the membership broadcaster
* SND_TX_SEQ - starts the replication transmitter
* SND_RX_SEQ - starts the replication receiver
- * Note: In order for the membership broadcaster to + * Note: In order for the membership broadcaster to * transmit the correct information, it has to be started after the replication receiver. * @throws ChannelException if a startup error occurs or the service is already started or an error occurs. */ @@ -229,14 +236,14 @@ public interface Channel { * SND_RX_SEQ - stops the replication receiver
* @throws ChannelException if a startup error occurs or the service is already stopped or an error occurs. */ - public void stop(int svc) throws ChannelException; - + public void stop(int svc) throws ChannelException; + /** * Send a message to one or more members in the cluster * @param destination Member[] - the destinations, can not be null or zero length, the reason for that * is that a membership change can occur and at that time the application is uncertain what group the message * actually got sent to. - * @param msg Serializable - the message to send, has to be serializable, or a ByteMessage to + * @param msg Serializable - the message to send, has to be serializable, or a ByteMessage to * send a pure byte array * @param options int - sender options, see class documentation for each interceptor that is configured in order to trigger interceptors * @return a unique Id that identifies the message that is sent @@ -257,10 +264,10 @@ public interface Channel { * @exception ChannelException - if a serialization error happens. */ public UniqueId send(Member[] destination, Serializable msg, int options, ErrorHandler handler) throws ChannelException; - + /** * Sends a heart beat through the interceptor stacks - * Use this method to alert interceptors and other components to + * Use this method to alert interceptors and other components to * clean up garbage, timed out messages etc.
* If you application has a background thread, then you can save one thread, * by configuring your channel to not use an internal heartbeat thread @@ -268,14 +275,14 @@ public interface Channel { * @see #setHeartbeat(boolean) */ public void heartbeat(); - + /** * Enables or disables internal heartbeat. * @param enable boolean - default value is implementation specific * @see #heartbeat() */ public void setHeartbeat(boolean enable); - + /** * Add a membership listener, will get notified when a new member joins, leaves or crashes *
If the membership listener implements the Heartbeat interface @@ -284,7 +291,7 @@ public interface Channel { * @see MembershipListener */ public void addMembershipListener(MembershipListener listener); - + /** * Add a channel listener, this is a callback object when messages are received *
If the channel listener implements the Heartbeat interface @@ -307,7 +314,7 @@ public interface Channel { * @see ChannelListener */ public void removeChannelListener(ChannelListener listener); - + /** * Returns true if there are any members in the group, * this call is the same as getMembers().length>0 @@ -317,7 +324,7 @@ public interface Channel { /** * Get all current group members - * @return all members or empty array, never null + * @return all members or empty array, never null */ public Member[] getMembers() ; @@ -329,10 +336,10 @@ public interface Channel { * @return Member */ public Member getLocalMember(boolean incAlive); - + /** - * Returns the member from the membership service with complete and - * recent data. Some implementations might serialize and send + * Returns the member from the membership service with complete and + * recent data. Some implementations might serialize and send * membership information along with a message, and instead of sending * complete membership details, only send the primary identifier for the member * but not the payload or other information. When such message is received @@ -343,5 +350,5 @@ public interface Channel { */ public Member getMember(Member mbr); - + } diff --git a/java/org/apache/catalina/tribes/io/ObjectReader.java b/java/org/apache/catalina/tribes/io/ObjectReader.java index bb6560700..136eb6094 100644 --- a/java/org/apache/catalina/tribes/io/ObjectReader.java +++ b/java/org/apache/catalina/tribes/io/ObjectReader.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -40,12 +40,15 @@ public class ObjectReader { protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(ObjectReader.class); private XByteBuffer buffer; - + protected long lastAccess = System.currentTimeMillis(); - + protected boolean accessed = false; private boolean cancelled; + public ObjectReader(int packetSize) { + this.buffer = new XByteBuffer(packetSize, true); + } /** * Creates an ObjectReader for a TCP NIO socket channel * @param channel - the channel to be read. @@ -53,7 +56,7 @@ public class ObjectReader { public ObjectReader(SocketChannel channel) { this(channel.socket()); } - + /** * Creates an ObjectReader for a TCP socket * @param socket Socket @@ -67,23 +70,23 @@ public class ObjectReader { this.buffer = new XByteBuffer(43800,true); } } - + public synchronized void access() { this.accessed = true; this.lastAccess = System.currentTimeMillis(); } - + public synchronized void finish() { this.accessed = false; this.lastAccess = System.currentTimeMillis(); } - + public boolean isAccessed() { return this.accessed; } /** - * Append new bytes to buffer. + * Append new bytes to buffer. * @see XByteBuffer#countPackages() * @param data new transfer buffer * @param off offset @@ -125,11 +128,11 @@ public class ObjectReader { } return result; } - + public int bufferSize() { return buffer.getLength(); } - + public boolean hasPackage() { return buffer.countPackages(true)>0; @@ -141,7 +144,7 @@ public class ObjectReader { public int count() { return buffer.countPackages(); } - + public void close() { this.buffer = null; } diff --git a/java/org/apache/catalina/tribes/transport/AbstractSender.java b/java/org/apache/catalina/tribes/transport/AbstractSender.java index d5fe7f10a..c7193e181 100644 --- a/java/org/apache/catalina/tribes/transport/AbstractSender.java +++ b/java/org/apache/catalina/tribes/transport/AbstractSender.java @@ -34,7 +34,7 @@ import org.apache.catalina.tribes.Member; * @version 1.0 */ public abstract class AbstractSender implements DataSender { - + private boolean connected = false; private int rxBufSize = 25188; private int txBufSize = 43800; @@ -57,7 +57,9 @@ public abstract class AbstractSender implements DataSender { private int soLingerTime = 3; private int soTrafficClass = 0x04 | 0x08 | 0x010; private boolean throwOnFailedAck = true; - + private boolean udpBased = false; + private int udpPort = -1; + /** * transfers sender properties from one sender to another * @param from AbstractSender @@ -82,13 +84,15 @@ public abstract class AbstractSender implements DataSender { to.soLingerTime = from.soLingerTime; to.soTrafficClass = from.soTrafficClass; to.throwOnFailedAck = from.throwOnFailedAck; - } + to.udpBased = from.udpBased; + to.udpPort = from.udpPort; + } + - public AbstractSender() { - + } - + /** * connect * @@ -117,11 +121,11 @@ public abstract class AbstractSender implements DataSender { if ( disconnect ) disconnect(); return disconnect; } - + protected void setConnected(boolean connected){ this.connected = connected; } - + public boolean isConnected() { return connected; } @@ -170,7 +174,7 @@ public abstract class AbstractSender implements DataSender { public int getMaxRetryAttempts() { return maxRetryAttempts; } - + public void setDirect(boolean direct) { setDirectBuffer(direct); } @@ -182,7 +186,7 @@ public abstract class AbstractSender implements DataSender { public boolean getDirect() { return getDirectBuffer(); } - + public boolean getDirectBuffer() { return this.directBuffer; } @@ -306,4 +310,24 @@ public abstract class AbstractSender implements DataSender { this.address = address; } + + public boolean isUdpBased() { + return udpBased; + } + + + public void setUdpBased(boolean udpBased) { + this.udpBased = udpBased; + } + + + public int getUdpPort() { + return udpPort; + } + + + public void setUdpPort(int udpPort) { + this.udpPort = udpPort; + } + } \ No newline at end of file diff --git a/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java b/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java index 322d9796e..6db55d7bd 100644 --- a/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java +++ b/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java @@ -18,7 +18,9 @@ package org.apache.catalina.tribes.transport.nio; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.ServerSocket; +import java.nio.channels.DatagramChannel; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; @@ -59,6 +61,7 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv private Selector selector = null; private ServerSocketChannel serverChannel = null; + private DatagramChannel datagramChannel = null; protected LinkedList events = new LinkedList(); // private Object interestOpsMutex = new Object(); @@ -110,7 +113,7 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv else throw new IOException(x.getMessage()); } } - + public AbstractRxTask createRxTask() { NioReplicationTask thread = new NioReplicationTask(this,this); thread.setUseBufferPool(this.getUseBufferPool()); @@ -118,9 +121,9 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv thread.setOptions(getWorkerThreadOptions()); return thread; } - - - + + + protected void bind() throws IOException { // allocate an unbound server socket channel serverChannel = ServerSocketChannel.open(); @@ -135,9 +138,22 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv serverChannel.configureBlocking(false); // register the ServerSocketChannel with the Selector serverChannel.register(selector, SelectionKey.OP_ACCEPT); - + + //set up the datagram channel + if (this.getUdpPort()>0) { + datagramChannel = DatagramChannel.open(); + datagramChannel.configureBlocking(false); + //bind to the address to avoid security checks + InetSocketAddress daddr = new InetSocketAddress(getBind(),getUdpPort()); + //TODO should we auto increment the UDP port to avoid collisions? + //we could auto increment with the offset from the tcp listen port + datagramChannel.connect(daddr); + } + + + } - + public void addEvent(Runnable event) { if ( selector != null ) { synchronized (events) { @@ -163,18 +179,18 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv events.clear(); } } - + public static void cancelledKey(SelectionKey key) { ObjectReader reader = (ObjectReader)key.attachment(); if ( reader != null ) { reader.setCancelled(true); reader.finish(); } - key.cancel(); + key.cancel(); key.attach(null); try { ((SocketChannel)key.channel()).socket().close(); } catch (IOException e) { if (log.isDebugEnabled()) log.debug("", e); } try { key.channel().close(); } catch (IOException e) { if (log.isDebugEnabled()) log.debug("", e); } - + } protected long lastCheck = System.currentTimeMillis(); protected void socketTimeouts() { @@ -202,7 +218,7 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv if ( ka != null ) { long delta = now - ka.getLastAccess(); if (delta > (long) getTimeout() && (!ka.isAccessed())) { - if (log.isWarnEnabled()) + if (log.isWarnEnabled()) log.warn("Channel key is registered, but has had no interest ops for the last "+getTimeout()+" ms. (cancelled:"+ka.isCancelled()+"):"+key+" last access:"+new java.sql.Timestamp(ka.getLastAccess())+" Possible cause: all threads used, perform thread dump"); ka.setLastAccess(now); //key.interestOps(SelectionKey.OP_READ); @@ -230,8 +246,12 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv log.warn("ServerSocketChannel already started"); return; } - + setListen(true); + if (selector!=null && datagramChannel!=null) { + ObjectReader oreader = new ObjectReader(1024*65); + registerChannel(selector,datagramChannel,SelectionKey.OP_READ,oreader); + } while (doListen() && selector != null) { // this may block for a long time, upon return the @@ -302,10 +322,18 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv } serverChannel.close(); + if (datagramChannel!=null) { + try { + datagramChannel.close(); + }catch (Exception iox) { + if (log.isDebugEnabled()) log.debug("Unable to close datagram channel.",iox); + } + datagramChannel=null; + } closeSelector(); } - + /** * Close Selector. diff --git a/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java b/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java index 6839d0606..f61fa9d8f 100644 --- a/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java +++ b/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,6 +18,9 @@ package org.apache.catalina.tribes.transport.nio; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; @@ -42,15 +45,15 @@ import org.apache.catalina.tribes.util.Logs; * serviceChannel() method stores the key reference in the thread object then * calls notify() to wake it up. When the channel has been drained, the worker * thread returns itself to its parent pool. - * + * * @author Filip Hanik - * + * * @version $Revision$, $Date$ */ public class NioReplicationTask extends AbstractRxTask { - + private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog( NioReplicationTask.class ); - + private ByteBuffer buffer = null; private SelectionKey key; private int rxBufSize; @@ -62,7 +65,7 @@ public class NioReplicationTask extends AbstractRxTask { } // loop forever waiting for work to do - public synchronized void run() { + public synchronized void run() { if ( buffer == null ) { if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER) { buffer = ByteBuffer.allocateDirect(getRxBufSize()); @@ -75,17 +78,17 @@ public class NioReplicationTask extends AbstractRxTask { if (key == null) { return; // just in case } - if ( log.isTraceEnabled() ) + if ( log.isTraceEnabled() ) log.trace("Servicing key:"+key); try { ObjectReader reader = (ObjectReader)key.attachment(); if ( reader == null ) { - if ( log.isTraceEnabled() ) + if ( log.isTraceEnabled() ) log.trace("No object reader, cancelling:"+key); cancelKey(key); } else { - if ( log.isTraceEnabled() ) + if ( log.isTraceEnabled() ) log.trace("Draining channel:"+key); drainChannel(key, reader); @@ -102,7 +105,7 @@ public class NioReplicationTask extends AbstractRxTask { } else if ( log.isErrorEnabled() ) { //this is a real error, log it. log.error("Exception caught in TcpReplicationThread.drainChannel.",e); - } + } cancelKey(key); } finally { @@ -143,16 +146,16 @@ public class NioReplicationTask extends AbstractRxTask { protected void drainChannel (final SelectionKey key, ObjectReader reader) throws Exception { reader.setLastAccess(System.currentTimeMillis()); reader.access(); - SocketChannel channel = (SocketChannel) key.channel(); + ReadableByteChannel channel = (ReadableByteChannel) key.channel(); int count; buffer.clear(); // make buffer empty // loop while data available, channel is non-blocking while ((count = channel.read (buffer)) > 0) { buffer.flip(); // make buffer readable - if ( buffer.hasArray() ) + if ( buffer.hasArray() ) reader.append(buffer.array(),0,count,false); - else + else reader.append(buffer,count,false); buffer.clear(); // make buffer empty //do we have at least one package? @@ -160,24 +163,24 @@ public class NioReplicationTask extends AbstractRxTask { } int pkgcnt = reader.count(); - + if (count < 0 && pkgcnt == 0 ) { //end of stream, and no more packages to process remoteEof(key); return; } - + ChannelMessage[] msgs = pkgcnt == 0? ChannelData.EMPTY_DATA_ARRAY : reader.execute(); - + registerForRead(key,reader);//register to read new data, before we send it off to avoid dead locks - + for ( int i=0; i connect() -> CONNECTED @@ -42,7 +43,7 @@ import java.net.*; * - READY_TO_WRITE -> write() -> READY TO WRITE | READY TO READ * - READY_TO_READ -> read() -> READY_TO_READ | TRANSFER_COMPLETE * - TRANSFER_COMPLETE -> CONNECTED - * + * * @author Filip Hanik * @version 1.0 */ @@ -50,10 +51,11 @@ public class NioSender extends AbstractSender implements DataSender{ protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(NioSender.class); - - - protected Selector selector; - protected SocketChannel socketChannel; + + + protected Selector selector; + protected SocketChannel socketChannel = null; + protected DatagramChannel dataChannel = null; /* * STATE VARIABLES * @@ -64,14 +66,14 @@ public class NioSender extends AbstractSender implements DataSender{ protected XByteBuffer ackbuf = new XByteBuffer(128,true); protected int remaining = 0; protected boolean complete; - + protected boolean connecting = false; - + public NioSender() { super(); - + } - + /** * State machine to send data * @param key SelectionKey @@ -89,7 +91,7 @@ public class NioSender extends AbstractSender implements DataSender{ completeConnect(); if ( current != null ) key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); return false; - } else { + } else { //wait for the connection to finish key.interestOps(key.interestOps() | SelectionKey.OP_CONNECT); return false; @@ -146,8 +148,8 @@ public class NioSender extends AbstractSender implements DataSender{ socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime()); socketChannel.socket().setTrafficClass(getSoTrafficClass()); } - - + + protected boolean read(SelectionKey key) throws IOException { //if there is no message here, we are done @@ -171,7 +173,7 @@ public class NioSender extends AbstractSender implements DataSender{ } } - + protected boolean write(SelectionKey key) throws IOException { if ( (!isConnected()) || (this.socketChannel==null)) { throw new IOException("NioSender is not connected, this should not occur."); @@ -215,7 +217,7 @@ public class NioSender extends AbstractSender implements DataSender{ } else { writebuf.clear(); } - + InetSocketAddress addr = new InetSocketAddress(getAddress(),getPort()); if ( socketChannel != null ) throw new IOException("Socket channel has already been established. Connection might be in progress."); socketChannel = SocketChannel.open(); @@ -227,7 +229,7 @@ public class NioSender extends AbstractSender implements DataSender{ socketChannel.register(getSelector(), SelectionKey.OP_CONNECT, this); } } - + /** * disconnect @@ -257,7 +259,7 @@ public class NioSender extends AbstractSender implements DataSender{ } } - + public void reset() { if ( isConnected() && readbuf == null) { readbuf = getReadBuffer(); @@ -273,10 +275,10 @@ public class NioSender extends AbstractSender implements DataSender{ setConnectTime(-1); } - private ByteBuffer getReadBuffer() { + private ByteBuffer getReadBuffer() { return getBuffer(getRxBufSize()); } - + private ByteBuffer getWriteBuffer() { return getBuffer(getTxBufSize()); } @@ -284,7 +286,7 @@ public class NioSender extends AbstractSender implements DataSender{ private ByteBuffer getBuffer(int size) { return (getDirectBuffer()?ByteBuffer.allocateDirect(size):ByteBuffer.allocate(size)); } - + /** * sendMessage * @@ -312,9 +314,9 @@ public class NioSender extends AbstractSender implements DataSender{ if (isConnected()) { socketChannel.register(getSelector(), SelectionKey.OP_WRITE, this); } - } + } } - + public byte[] getMessage() { return current; } diff --git a/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java b/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java index ee59032a0..cb5c9faec 100644 --- a/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java +++ b/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,6 +15,7 @@ */ package org.apache.catalina.tribes.test.channel; +import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.group.GroupChannel; import junit.framework.TestCase; import org.apache.catalina.tribes.transport.ReceiverBase; @@ -25,6 +26,7 @@ import org.apache.catalina.tribes.transport.ReceiverBase; */ public class ChannelStartStop extends TestCase { GroupChannel channel = null; + int udpPort = 45543; protected void setUp() throws Exception { super.setUp(); channel = new GroupChannel(); @@ -34,7 +36,7 @@ public class ChannelStartStop extends TestCase { super.tearDown(); try {channel.stop(channel.DEFAULT);}catch (Exception ignore){} } - + public void testDoubleFullStart() throws Exception { int count = 0; try { @@ -52,11 +54,11 @@ public class ChannelStartStop extends TestCase { public void testScrap() throws Exception { System.out.println(channel.getChannelReceiver().getClass()); ((ReceiverBase)channel.getChannelReceiver()).setMaxThreads(1); - } + } public void testDoublePartialStart() throws Exception { - //try to double start the RX + //try to double start the RX int count = 0; try { channel.start(channel.SND_RX_SEQ); @@ -82,7 +84,7 @@ public class ChannelStartStop extends TestCase { } catch ( Exception x){/*expected*/} assertEquals(count,1); channel.stop(channel.DEFAULT); - + count = 0; try { channel.start(channel.SND_RX_SEQ); @@ -107,7 +109,7 @@ public class ChannelStartStop extends TestCase { assertEquals(count,1); channel.stop(channel.DEFAULT); } - + public void testFalseOption() throws Exception { int flag = 0xFFF0;//should get ignored by the underlying components int count = 0; @@ -123,4 +125,12 @@ public class ChannelStartStop extends TestCase { channel.stop(channel.DEFAULT); } + public void testUdpReceiverStart() throws Exception { + ReceiverBase rb = (ReceiverBase)channel.getChannelReceiver(); + rb.setUdpPort(udpPort); + channel.start(Channel.DEFAULT); + Thread.sleep(1000); + channel.stop(Channel.DEFAULT); + } + } -- 2.11.0