connecting = false;
setRequestCount(0);
setConnectTime(System.currentTimeMillis());
- socketChannel.socket().setSendBufferSize(getTxBufSize());
- socketChannel.socket().setReceiveBufferSize(getRxBufSize());
- socketChannel.socket().setSoTimeout((int)getTimeout());
- socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerOn()?getSoLingerTime():0);
- socketChannel.socket().setTcpNoDelay(getTcpNoDelay());
- socketChannel.socket().setKeepAlive(getSoKeepAlive());
- socketChannel.socket().setReuseAddress(getSoReuseAddress());
- socketChannel.socket().setOOBInline(getOoBInline());
- socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime());
- socketChannel.socket().setTrafficClass(getSoTrafficClass());
+ if (socketChannel!=null) {
+ socketChannel.socket().setSendBufferSize(getTxBufSize());
+ socketChannel.socket().setReceiveBufferSize(getRxBufSize());
+ socketChannel.socket().setSoTimeout((int)getTimeout());
+ socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerOn()?getSoLingerTime():0);
+ socketChannel.socket().setTcpNoDelay(getTcpNoDelay());
+ socketChannel.socket().setKeepAlive(getSoKeepAlive());
+ socketChannel.socket().setReuseAddress(getSoReuseAddress());
+ socketChannel.socket().setOOBInline(getOoBInline());
+ socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime());
+ socketChannel.socket().setTrafficClass(getSoTrafficClass());
+ } else if (dataChannel!=null) {
+ dataChannel.socket().setSendBufferSize(getTxBufSize());
+ dataChannel.socket().setReceiveBufferSize(getRxBufSize());
+ dataChannel.socket().setSoTimeout((int)getTimeout());
+ dataChannel.socket().setReuseAddress(getSoReuseAddress());
+ dataChannel.socket().setTrafficClass(getSoTrafficClass());
+ }
}
dataChannel = DatagramChannel.open();
dataChannel.configureBlocking(false);
dataChannel.connect(daddr);
+ completeConnect();
+ dataChannel.register(getSelector(),SelectionKey.OP_WRITE, this);
+
} else {
InetSocketAddress addr = new InetSocketAddress(getAddress(),getPort());
if ( socketChannel != null ) throw new IOException("Socket channel has already been established. Connection might be in progress.");
--- /dev/null
+/*\r
+ * Licensed to the Apache Software Foundation (ASF) under one or more\r
+ * contributor license agreements. See the NOTICE file distributed with\r
+ * this work for additional information regarding copyright ownership.\r
+ * The ASF licenses this file to You under the Apache License, Version 2.0\r
+ * (the "License"); you may not use this file except in compliance with\r
+ * the License. You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+package org.apache.catalina.tribes.test.channel;\r
+\r
+import junit.framework.TestCase;\r
+import java.io.Serializable;\r
+import java.util.Random;\r
+import java.util.Arrays;\r
+\r
+import org.apache.catalina.tribes.Channel;\r
+import org.apache.catalina.tribes.ChannelListener;\r
+import org.apache.catalina.tribes.ChannelReceiver;\r
+import org.apache.catalina.tribes.Member;\r
+import org.apache.catalina.tribes.group.GroupChannel;\r
+import org.apache.catalina.tribes.test.channel.TestDataIntegrity.Listener;\r
+import org.apache.catalina.tribes.transport.AbstractSender;\r
+import org.apache.catalina.tribes.transport.ReceiverBase;\r
+import org.apache.catalina.tribes.transport.ReplicationTransmitter;\r
+import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;\r
+import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;\r
+import org.apache.catalina.tribes.group.interceptors.ThroughputInterceptor;\r
+\r
+/**\r
+ */\r
+public class TestUdpPackages extends TestCase {\r
+ int msgCount = 500;\r
+ int threadCount = 20;\r
+ GroupChannel channel1;\r
+ GroupChannel channel2;\r
+ Listener listener1;\r
+ int threadCounter = 0;\r
+ protected void setUp() throws Exception {\r
+ super.setUp();\r
+ channel1 = new GroupChannel();\r
+ channel1.addInterceptor(new MessageDispatch15Interceptor());\r
+ channel2 = new GroupChannel();\r
+ channel2.addInterceptor(new MessageDispatch15Interceptor());\r
+ ThroughputInterceptor tint = new ThroughputInterceptor();\r
+ tint.setInterval(500);\r
+ ThroughputInterceptor tint2 = new ThroughputInterceptor();\r
+ tint2.setInterval(500);\r
+ channel1.addInterceptor(tint);\r
+ channel2.addInterceptor(tint2);\r
+ listener1 = new Listener();\r
+ ReceiverBase rb1 = (ReceiverBase)channel1.getChannelReceiver();\r
+ ReceiverBase rb2 = (ReceiverBase)channel2.getChannelReceiver();\r
+ rb1.setUdpPort(50000);\r
+ rb2.setUdpPort(50000);\r
+ channel2.addChannelListener(listener1);\r
+ channel1.start(GroupChannel.DEFAULT);\r
+ channel2.start(GroupChannel.DEFAULT);\r
+ }\r
+\r
+ protected void tearDown() throws Exception {\r
+ super.tearDown();\r
+ channel1.stop(GroupChannel.DEFAULT);\r
+ channel2.stop(GroupChannel.DEFAULT);\r
+ }\r
+\r
+ public void testSingleDataSendNO_ACK() throws Exception {\r
+ AbstractSender s1 =(AbstractSender) ((ReplicationTransmitter)channel1.getChannelSender()).getTransport();\r
+ AbstractSender s2 =(AbstractSender) ((ReplicationTransmitter)channel2.getChannelSender()).getTransport();\r
+ s1.setTimeout(Long.MAX_VALUE); //for debugging\r
+ s2.setTimeout(Long.MAX_VALUE); //for debugging\r
+ \r
+ System.err.println("Starting Single package NO_ACK");\r
+ channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(1024),Channel.SEND_OPTIONS_UDP);\r
+ Thread.sleep(500);\r
+ System.err.println("Finished Single package NO_ACK ["+listener1.count+"]");\r
+ assertEquals("Checking success messages.",1,listener1.count);\r
+ }\r
+\r
+ \r
+ public void testDataSendNO_ACK() throws Exception {\r
+ System.err.println("Starting NO_ACK");\r
+ Thread[] threads = new Thread[threadCount];\r
+ for (int x=0; x<threads.length; x++ ) {\r
+ threads[x] = new Thread() {\r
+ public void run() {\r
+ try {\r
+ long start = System.currentTimeMillis();\r
+ for (int i = 0; i < msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(1024),Channel.SEND_OPTIONS_UDP);\r
+ System.out.println("Thread["+this.getName()+"] sent "+msgCount+" messages in "+(System.currentTimeMillis()-start)+" ms.");\r
+ }catch ( Exception x ) {\r
+ x.printStackTrace();\r
+ return;\r
+ } finally {\r
+ threadCounter++;\r
+ }\r
+ }\r
+ };\r
+ } \r
+ for (int x=0; x<threads.length; x++ ) { threads[x].start();}\r
+ for (int x=0; x<threads.length; x++ ) { threads[x].join();}\r
+ //sleep for 50 sec, let the other messages in\r
+ long start = System.currentTimeMillis();\r
+ while ( (System.currentTimeMillis()-start)<25000 && msgCount*threadCount!=listener1.count) Thread.sleep(500);\r
+ System.err.println("Finished NO_ACK ["+listener1.count+"]");\r
+ assertEquals("Checking success messages.",msgCount*threadCount,listener1.count);\r
+ }\r
+\r
+ public void testDataSendASYNCM() throws Exception {\r
+ System.err.println("Starting ASYNC MULTI THREAD");\r
+ Thread[] threads = new Thread[threadCount];\r
+ for (int x=0; x<threads.length; x++ ) {\r
+ threads[x] = new Thread() {\r
+ public void run() {\r
+ try {\r
+ long start = System.currentTimeMillis();\r
+ for (int i = 0; i < msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS|Channel.SEND_OPTIONS_UDP);\r
+ System.out.println("Thread["+this.getName()+"] sent "+msgCount+" messages in "+(System.currentTimeMillis()-start)+" ms.");\r
+ }catch ( Exception x ) {\r
+ x.printStackTrace();\r
+ return;\r
+ } finally {\r
+ threadCounter++;\r
+ }\r
+ }\r
+ };\r
+ }\r
+ for (int x=0; x<threads.length; x++ ) { threads[x].start();}\r
+ for (int x=0; x<threads.length; x++ ) { threads[x].join();}\r
+ //sleep for 50 sec, let the other messages in\r
+ long start = System.currentTimeMillis();\r
+ while ( (System.currentTimeMillis()-start)<25000 && msgCount*threadCount!=listener1.count) Thread.sleep(500);\r
+ System.err.println("Finished ASYNC MULTI THREAD ["+listener1.count+"]");\r
+ assertEquals("Checking success messages.",msgCount*threadCount,listener1.count);\r
+ }\r
+ public void testDataSendASYNC() throws Exception {\r
+ System.err.println("Starting ASYNC");\r
+ for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS|Channel.SEND_OPTIONS_UDP);\r
+ //sleep for 50 sec, let the other messages in\r
+ long start = System.currentTimeMillis();\r
+ while ( (System.currentTimeMillis()-start)<5000 && msgCount!=listener1.count) Thread.sleep(500);\r
+ System.err.println("Finished ASYNC");\r
+ assertEquals("Checking success messages.",msgCount,listener1.count);\r
+ }\r
+\r
+ public void testDataSendACK() throws Exception {\r
+ System.err.println("Starting ACK");\r
+ for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_UDP);\r
+ Thread.sleep(250);\r
+ System.err.println("Finished ACK");\r
+ assertEquals("Checking success messages.",msgCount,listener1.count);\r
+ }\r
+\r
+ public void testDataSendSYNCACK() throws Exception {\r
+ System.err.println("Starting SYNC_ACK");\r
+ for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_UDP);\r
+ Thread.sleep(250);\r
+ System.err.println("Finished SYNC_ACK");\r
+ assertEquals("Checking success messages.",msgCount,listener1.count);\r
+ }\r
+\r
+ public static class Listener implements ChannelListener {\r
+ long count = 0;\r
+ public boolean accept(Serializable s, Member m) {\r
+ return (s instanceof Data);\r
+ }\r
+\r
+ public void messageReceived(Serializable s, Member m) {\r
+ Data d = (Data)s;\r
+ if ( !Data.verify(d) ) {\r
+ System.err.println("ERROR");\r
+ } else {\r
+ count++;\r
+ if ((count %1000) ==0 ) {\r
+ System.err.println("SUCCESS:"+count);\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+ public static class Data implements Serializable {\r
+ public int length;\r
+ public byte[] data;\r
+ public byte key;\r
+ public static Random r = new Random(System.currentTimeMillis());\r
+ public static Data createRandomData() {\r
+ return createRandomData(ChannelReceiver.MAX_UDP_SIZE);\r
+ }\r
+ public static Data createRandomData(int size) {\r
+ int i = r.nextInt();\r
+ i = ( i % 127 );\r
+ int length = Math.abs(r.nextInt() % size);\r
+ Data d = new Data();\r
+ d.length = length;\r
+ d.key = (byte)i;\r
+ d.data = new byte[length];\r
+ Arrays.fill(d.data,d.key);\r
+ return d;\r
+ }\r
+\r
+ public static boolean verify(Data d) {\r
+ boolean result = (d.length == d.data.length);\r
+ for ( int i=0; result && (i<d.data.length); i++ ) result = result && d.data[i] == d.key;\r
+ return result;\r
+ }\r
+ }\r
+\r
+\r
+\r
+}\r