From 67d206dbd0fe3c084cf916c7ee77d4074c9a268d Mon Sep 17 00:00:00 2001 From: fhanik Date: Tue, 19 Feb 2008 20:45:54 +0000 Subject: [PATCH] more UDP impl git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@629223 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/catalina/tribes/ChannelReceiver.java | 2 + .../tribes/membership/McastServiceImpl.java | 5 +- .../catalina/tribes/transport/ReceiverBase.java | 1 - .../catalina/tribes/transport/nio/NioReceiver.java | 2 +- .../tribes/transport/nio/NioReplicationTask.java | 10 +- .../catalina/tribes/transport/nio/NioSender.java | 31 ++- .../tribes/test/channel/TestDataIntegrity.java | 26 +-- .../tribes/test/channel/TestUdpPackages.java | 217 +++++++++++++++++++++ 8 files changed, 266 insertions(+), 28 deletions(-) create mode 100644 test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java diff --git a/java/org/apache/catalina/tribes/ChannelReceiver.java b/java/org/apache/catalina/tribes/ChannelReceiver.java index f52c039e5..32c53e497 100644 --- a/java/org/apache/catalina/tribes/ChannelReceiver.java +++ b/java/org/apache/catalina/tribes/ChannelReceiver.java @@ -27,6 +27,8 @@ package org.apache.catalina.tribes; * @version $Revision$, $Date$ */ public interface ChannelReceiver extends Heartbeat { + public static final int MAX_UDP_SIZE = 65535; + /** * Start listening for incoming messages on the host/port * @throws java.io.IOException diff --git a/java/org/apache/catalina/tribes/membership/McastServiceImpl.java b/java/org/apache/catalina/tribes/membership/McastServiceImpl.java index c17affa1c..fd410bde7 100644 --- a/java/org/apache/catalina/tribes/membership/McastServiceImpl.java +++ b/java/org/apache/catalina/tribes/membership/McastServiceImpl.java @@ -335,7 +335,10 @@ public class McastServiceImpl } }; } //end if - if ( t != null ) t.start(); + if ( t != null ) { + t.setDaemon(true); + t.start(); + } } } catch (SocketTimeoutException x ) { //do nothing, this is normal, we don't want to block forever diff --git a/java/org/apache/catalina/tribes/transport/ReceiverBase.java b/java/org/apache/catalina/tribes/transport/ReceiverBase.java index 3bc54a26d..06d4217c3 100644 --- a/java/org/apache/catalina/tribes/transport/ReceiverBase.java +++ b/java/org/apache/catalina/tribes/transport/ReceiverBase.java @@ -46,7 +46,6 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R public static final int OPTION_DIRECT_BUFFER = 0x0004; - protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(ReceiverBase.class); private MessageListener listener; diff --git a/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java b/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java index b9ef3ba14..de527633e 100644 --- a/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java +++ b/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java @@ -249,7 +249,7 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv setListen(true); if (selector!=null && datagramChannel!=null) { - ObjectReader oreader = new ObjectReader(65535); //max size for a datagram packet + ObjectReader oreader = new ObjectReader(MAX_UDP_SIZE); //max size for a datagram packet registerChannel(selector,datagramChannel,SelectionKey.OP_READ,oreader); } diff --git a/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java b/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java index 4fea62b0d..6e3707027 100644 --- a/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java +++ b/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java @@ -33,6 +33,8 @@ import org.apache.catalina.tribes.io.ListenCallback; import org.apache.catalina.tribes.io.ChannelData; import org.apache.catalina.tribes.io.BufferPool; import java.nio.channels.CancelledKeyException; + +import org.apache.catalina.tribes.ChannelReceiver; import org.apache.catalina.tribes.UniqueId; import org.apache.catalina.tribes.RemoteProcessException; import org.apache.catalina.tribes.util.Logs; @@ -68,10 +70,14 @@ public class NioReplicationTask extends AbstractRxTask { // loop forever waiting for work to do public synchronized void run() { if ( buffer == null ) { + int size = getRxBufSize(); + if (key.channel() instanceof DatagramChannel) { + size = ChannelReceiver.MAX_UDP_SIZE; + } if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER) { - buffer = ByteBuffer.allocateDirect(getRxBufSize()); + buffer = ByteBuffer.allocateDirect(size); } else { - buffer = ByteBuffer.allocate(getRxBufSize()); + buffer = ByteBuffer.allocate(size); } } else { buffer.clear(); diff --git a/java/org/apache/catalina/tribes/transport/nio/NioSender.java b/java/org/apache/catalina/tribes/transport/nio/NioSender.java index fe21cbf45..b8e0479ef 100644 --- a/java/org/apache/catalina/tribes/transport/nio/NioSender.java +++ b/java/org/apache/catalina/tribes/transport/nio/NioSender.java @@ -137,16 +137,24 @@ public class NioSender extends AbstractSender implements DataSender{ connecting = false; setRequestCount(0); setConnectTime(System.currentTimeMillis()); - socketChannel.socket().setSendBufferSize(getTxBufSize()); - socketChannel.socket().setReceiveBufferSize(getRxBufSize()); - socketChannel.socket().setSoTimeout((int)getTimeout()); - socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerOn()?getSoLingerTime():0); - socketChannel.socket().setTcpNoDelay(getTcpNoDelay()); - socketChannel.socket().setKeepAlive(getSoKeepAlive()); - socketChannel.socket().setReuseAddress(getSoReuseAddress()); - socketChannel.socket().setOOBInline(getOoBInline()); - socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime()); - socketChannel.socket().setTrafficClass(getSoTrafficClass()); + if (socketChannel!=null) { + socketChannel.socket().setSendBufferSize(getTxBufSize()); + socketChannel.socket().setReceiveBufferSize(getRxBufSize()); + socketChannel.socket().setSoTimeout((int)getTimeout()); + socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerOn()?getSoLingerTime():0); + socketChannel.socket().setTcpNoDelay(getTcpNoDelay()); + socketChannel.socket().setKeepAlive(getSoKeepAlive()); + socketChannel.socket().setReuseAddress(getSoReuseAddress()); + socketChannel.socket().setOOBInline(getOoBInline()); + socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime()); + socketChannel.socket().setTrafficClass(getSoTrafficClass()); + } else if (dataChannel!=null) { + dataChannel.socket().setSendBufferSize(getTxBufSize()); + dataChannel.socket().setReceiveBufferSize(getRxBufSize()); + dataChannel.socket().setSoTimeout((int)getTimeout()); + dataChannel.socket().setReuseAddress(getSoReuseAddress()); + dataChannel.socket().setTrafficClass(getSoTrafficClass()); + } } @@ -224,6 +232,9 @@ public class NioSender extends AbstractSender implements DataSender{ dataChannel = DatagramChannel.open(); dataChannel.configureBlocking(false); dataChannel.connect(daddr); + completeConnect(); + dataChannel.register(getSelector(),SelectionKey.OP_WRITE, this); + } else { InetSocketAddress addr = new InetSocketAddress(getAddress(),getPort()); if ( socketChannel != null ) throw new IOException("Socket channel has already been established. Connection might be in progress."); diff --git a/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java b/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java index 07b77bccc..2dfcd6e78 100644 --- a/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java +++ b/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java @@ -28,12 +28,12 @@ import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor; import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor; /** - *

Title:

- * - *

Description:

- * + *

Title:

+ * + *

Description:

+ * *

Company:

- * + * * @author not attributable * @version 1.0 */ @@ -61,7 +61,7 @@ public class TestDataIntegrity extends TestCase { channel1.stop(GroupChannel.DEFAULT); channel2.stop(GroupChannel.DEFAULT); } - + public void testDataSendNO_ACK() throws Exception { System.err.println("Starting NO_ACK"); Thread[] threads = new Thread[threadCount]; @@ -89,7 +89,7 @@ public class TestDataIntegrity extends TestCase { System.err.println("Finished NO_ACK ["+listener1.count+"]"); assertEquals("Checking success messages.",msgCount*threadCount,listener1.count); } - + public void testDataSendASYNCM() throws Exception { System.err.println("Starting ASYNC MULTI THREAD"); Thread[] threads = new Thread[threadCount]; @@ -113,7 +113,7 @@ public class TestDataIntegrity extends TestCase { for (int x=0; x