From: fhanik Date: Wed, 20 Feb 2008 17:12:27 +0000 (+0000) Subject: Add buffer sizes to the UDP sockets X-Git-Url: https://git.internetallee.de/?a=commitdiff_plain;h=af36faf48654cc1208673c5ad520ae06ef7e1c05;p=tomcat7.0 Add buffer sizes to the UDP sockets git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@629539 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/java/org/apache/catalina/tribes/membership/McastServiceImpl.java b/java/org/apache/catalina/tribes/membership/McastServiceImpl.java index fd410bde7..1c4341a52 100644 --- a/java/org/apache/catalina/tribes/membership/McastServiceImpl.java +++ b/java/org/apache/catalina/tribes/membership/McastServiceImpl.java @@ -421,12 +421,15 @@ public class McastServiceImpl if ( log.isDebugEnabled() ) log.debug("Invalid member mcast package.",ax); } catch ( Exception x ) { - if (errorCounter==0) log.warn("Error receiving mcast package. Sleeping 500ms",x); - else log.debug("Error receiving mcast package. Sleeping 500ms",x); - try { Thread.sleep(500); } catch ( Exception ignore ){} - if ( (++errorCounter)>=recoveryCounter ) { - errorCounter=0; - new RecoveryThread(McastServiceImpl.this); + if (x instanceof InterruptedException) interrupted(); + else { + if (errorCounter==0) log.warn("Error receiving mcast package. Sleeping 500ms",x); + else log.debug("Error receiving mcast package. Sleeping 500ms",x); + try { Thread.sleep(500); } catch ( Exception ignore ){} + if ( (++errorCounter)>=recoveryCounter ) { + errorCounter=0; + new RecoveryThread(McastServiceImpl.this); + } } } } diff --git a/java/org/apache/catalina/tribes/transport/AbstractSender.java b/java/org/apache/catalina/tribes/transport/AbstractSender.java index aa3a2d059..258dbdaee 100644 --- a/java/org/apache/catalina/tribes/transport/AbstractSender.java +++ b/java/org/apache/catalina/tribes/transport/AbstractSender.java @@ -38,6 +38,8 @@ public abstract class AbstractSender implements DataSender { private boolean connected = false; private int rxBufSize = 25188; private int txBufSize = 43800; + private int udpRxBufSize = 25188; + private int udpTxBufSize = 43800; private boolean directBuffer = false; private int keepAliveCount = -1; private int requestCount = 0; @@ -332,4 +334,24 @@ public abstract class AbstractSender implements DataSender { this.udpPort = udpPort; } + + public int getUdpRxBufSize() { + return udpRxBufSize; + } + + + public void setUdpRxBufSize(int udpRxBufSize) { + this.udpRxBufSize = udpRxBufSize; + } + + + public int getUdpTxBufSize() { + return udpTxBufSize; + } + + + public void setUdpTxBufSize(int udpTxBufSize) { + this.udpTxBufSize = udpTxBufSize; + } + } \ No newline at end of file diff --git a/java/org/apache/catalina/tribes/transport/ReceiverBase.java b/java/org/apache/catalina/tribes/transport/ReceiverBase.java index 06d4217c3..50c689920 100644 --- a/java/org/apache/catalina/tribes/transport/ReceiverBase.java +++ b/java/org/apache/catalina/tribes/transport/ReceiverBase.java @@ -56,6 +56,9 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R private int securePort = -1; private int rxBufSize = 43800; private int txBufSize = 25188; + private int udpRxBufSize = 43800; + private int udpTxBufSize = 25188; + private boolean listen = false; private RxTaskPool pool; private boolean direct = true; @@ -520,4 +523,20 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, R this.udpPort = udpPort; } + public int getUdpRxBufSize() { + return udpRxBufSize; + } + + public void setUdpRxBufSize(int udpRxBufSize) { + this.udpRxBufSize = udpRxBufSize; + } + + public int getUdpTxBufSize() { + return udpTxBufSize; + } + + public void setUdpTxBufSize(int udpTxBufSize) { + this.udpTxBufSize = udpTxBufSize; + } + } \ No newline at end of file diff --git a/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java b/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java index de527633e..0c91f1f69 100644 --- a/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java +++ b/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java @@ -250,6 +250,11 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv setListen(true); if (selector!=null && datagramChannel!=null) { ObjectReader oreader = new ObjectReader(MAX_UDP_SIZE); //max size for a datagram packet + datagramChannel.socket().setSendBufferSize(getUdpTxBufSize()); + datagramChannel.socket().setReceiveBufferSize(getUdpRxBufSize()); + datagramChannel.socket().setReuseAddress(getSoReuseAddress()); + datagramChannel.socket().setSoTimeout(getTimeout()); + datagramChannel.socket().setTrafficClass(getSoTrafficClass()); registerChannel(selector,datagramChannel,SelectionKey.OP_READ,oreader); } diff --git a/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java b/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java index 6e3707027..8cb0dbee9 100644 --- a/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java +++ b/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java @@ -308,8 +308,12 @@ public class NioReplicationTask extends AbstractRxTask { int total = 0; if (channel instanceof DatagramChannel) { DatagramChannel dchannel = (DatagramChannel)channel; - while ( total < command.length ) { - total += dchannel.send(buf, udpaddr); + //were using a shared channel, it's not thread safe + //TODO check optimization, one channel per thread + synchronized (dchannel) { + while ( total < command.length ) { + total += dchannel.send(buf, udpaddr); + } } } else { while ( total < command.length ) { diff --git a/java/org/apache/catalina/tribes/transport/nio/NioSender.java b/java/org/apache/catalina/tribes/transport/nio/NioSender.java index b8e0479ef..3db44e41e 100644 --- a/java/org/apache/catalina/tribes/transport/nio/NioSender.java +++ b/java/org/apache/catalina/tribes/transport/nio/NioSender.java @@ -149,8 +149,8 @@ public class NioSender extends AbstractSender implements DataSender{ socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime()); socketChannel.socket().setTrafficClass(getSoTrafficClass()); } else if (dataChannel!=null) { - dataChannel.socket().setSendBufferSize(getTxBufSize()); - dataChannel.socket().setReceiveBufferSize(getRxBufSize()); + dataChannel.socket().setSendBufferSize(getUdpTxBufSize()); + dataChannel.socket().setReceiveBufferSize(getUdpRxBufSize()); dataChannel.socket().setSoTimeout((int)getTimeout()); dataChannel.socket().setReuseAddress(getSoReuseAddress()); dataChannel.socket().setTrafficClass(getSoTrafficClass()); diff --git a/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java b/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java index c159d28a0..dc4f39a73 100644 --- a/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java +++ b/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java @@ -20,6 +20,8 @@ import junit.framework.TestCase; import java.io.Serializable; import java.util.Random; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelListener; @@ -33,6 +35,7 @@ import org.apache.catalina.tribes.transport.ReplicationTransmitter; import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor; import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor; import org.apache.catalina.tribes.group.interceptors.ThroughputInterceptor; +import org.apache.catalina.tribes.io.XByteBuffer; /** */ @@ -81,11 +84,18 @@ public class TestUdpPackages extends TestCase { channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(1024),Channel.SEND_OPTIONS_UDP); Thread.sleep(500); System.err.println("Finished Single package NO_ACK ["+listener1.count+"]"); - assertEquals("Checking success messages.",1,listener1.count); + assertEquals("Checking success messages.",1,listener1.count.get()); } public void testDataSendNO_ACK() throws Exception { + final AtomicInteger counter = new AtomicInteger(0); + ReceiverBase rb1 = (ReceiverBase)channel1.getChannelReceiver(); + ReceiverBase rb2 = (ReceiverBase)channel2.getChannelReceiver(); + rb1.setUdpRxBufSize(1024*1024*10); + rb2.setUdpRxBufSize(1024*1024*10); + rb1.setUdpTxBufSize(1024*1024*10); + rb2.setUdpTxBufSize(1024*1024*10); System.err.println("Starting NO_ACK"); Thread[] threads = new Thread[threadCount]; for (int x=0; x=0 && nr0 && d.data.length>=4) { + //populate number + d.hasNr = true; + XByteBuffer.toBytes(number,d.data, 0); + } return d; } + + public int getNumber() { + if (!hasNr) return -1; + return XByteBuffer.toInt(this.data, 0); + } public static boolean verify(Data d) { boolean result = (d.length == d.data.length); - for ( int i=0; result && (i The sending buffer size on the receiving sockets. Value is in bytes, the default value is 25188 bytes. + + The receive buffer size on the datagram socket. + Default value is 25188 bytes. + + + The send buffer size on the datagram socket. + Default value is 43800 bytes. + Boolean value for the socket SO_KEEPALIVE option. Possible values are true or false. diff --git a/webapps/docs/config/cluster-sender.xml b/webapps/docs/config/cluster-sender.xml index 5485b946e..fe82c98c9 100644 --- a/webapps/docs/config/cluster-sender.xml +++ b/webapps/docs/config/cluster-sender.xml @@ -90,6 +90,14 @@ The send buffer size on the socket. Default value is 43800 bytes. + + The receive buffer size on the datagram socket. + Default value is 25188 bytes. + + + The send buffer size on the datagram socket. + Default value is 43800 bytes. + Possible values are true or false. Set to true if you want the receiver to use direct bytebuffers when reading data