import java.io.Serializable;\r
import java.util.Random;\r
import java.util.Arrays;\r
+import java.util.concurrent.atomic.AtomicInteger;\r
+import java.util.concurrent.atomic.AtomicLong;\r
\r
import org.apache.catalina.tribes.Channel;\r
import org.apache.catalina.tribes.ChannelListener;\r
import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;\r
import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;\r
import org.apache.catalina.tribes.group.interceptors.ThroughputInterceptor;\r
+import org.apache.catalina.tribes.io.XByteBuffer;\r
\r
/**\r
*/\r
channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(1024),Channel.SEND_OPTIONS_UDP);\r
Thread.sleep(500);\r
System.err.println("Finished Single package NO_ACK ["+listener1.count+"]");\r
- assertEquals("Checking success messages.",1,listener1.count);\r
+ assertEquals("Checking success messages.",1,listener1.count.get());\r
}\r
\r
\r
public void testDataSendNO_ACK() throws Exception {\r
+ final AtomicInteger counter = new AtomicInteger(0);\r
+ ReceiverBase rb1 = (ReceiverBase)channel1.getChannelReceiver();\r
+ ReceiverBase rb2 = (ReceiverBase)channel2.getChannelReceiver();\r
+ rb1.setUdpRxBufSize(1024*1024*10);\r
+ rb2.setUdpRxBufSize(1024*1024*10);\r
+ rb1.setUdpTxBufSize(1024*1024*10);\r
+ rb2.setUdpTxBufSize(1024*1024*10);\r
System.err.println("Starting NO_ACK");\r
Thread[] threads = new Thread[threadCount];\r
for (int x=0; x<threads.length; x++ ) {\r
public void run() {\r
try {\r
long start = System.currentTimeMillis();\r
- for (int i = 0; i < msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(1024),Channel.SEND_OPTIONS_UDP);\r
+ for (int i = 0; i < msgCount; i++) {\r
+ int cnt = counter.addAndGet(1);\r
+ channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(1024,cnt),Channel.SEND_OPTIONS_UDP);\r
+ Thread.currentThread().sleep(10);\r
+ }\r
System.out.println("Thread["+this.getName()+"] sent "+msgCount+" messages in "+(System.currentTimeMillis()-start)+" ms.");\r
}catch ( Exception x ) {\r
x.printStackTrace();\r
for (int x=0; x<threads.length; x++ ) { threads[x].join();}\r
//sleep for 50 sec, let the other messages in\r
long start = System.currentTimeMillis();\r
- while ( (System.currentTimeMillis()-start)<25000 && msgCount*threadCount!=listener1.count) Thread.sleep(500);\r
+ while ( (System.currentTimeMillis()-start)<25000 && msgCount*threadCount!=listener1.count.get()) Thread.sleep(500);\r
System.err.println("Finished NO_ACK ["+listener1.count+"]");\r
- assertEquals("Checking success messages.",msgCount*threadCount,listener1.count);\r
+ System.out.println("Sent "+counter.get()+ " messages. Received "+listener1.count+" Highest msg received:"+listener1.maxIdx);\r
+ System.out.print("Missing messages:");\r
+ printMissingMsgs(listener1.nrs,counter.get());\r
+ assertEquals("Checking success messages.",msgCount*threadCount,listener1.count.get());\r
+ }\r
+ \r
+ public static void printMissingMsgs(int[] msgs, int maxIdx) {\r
+ for (int i=0; i<maxIdx && i<msgs.length; i++) {\r
+ if (msgs[i]==0) System.out.print(i+", ");\r
+ }\r
+ System.out.println();\r
}\r
\r
public void testDataSendASYNCM() throws Exception {\r
for (int x=0; x<threads.length; x++ ) { threads[x].join();}\r
//sleep for 50 sec, let the other messages in\r
long start = System.currentTimeMillis();\r
- while ( (System.currentTimeMillis()-start)<25000 && msgCount*threadCount!=listener1.count) Thread.sleep(500);\r
+ while ( (System.currentTimeMillis()-start)<25000 && msgCount*threadCount!=listener1.count.get()) Thread.sleep(500);\r
System.err.println("Finished ASYNC MULTI THREAD ["+listener1.count+"]");\r
- assertEquals("Checking success messages.",msgCount*threadCount,listener1.count);\r
+ assertEquals("Checking success messages.",msgCount*threadCount,listener1.count.get());\r
}\r
public void testDataSendASYNC() throws Exception {\r
System.err.println("Starting ASYNC");\r
for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS|Channel.SEND_OPTIONS_UDP);\r
//sleep for 50 sec, let the other messages in\r
long start = System.currentTimeMillis();\r
- while ( (System.currentTimeMillis()-start)<5000 && msgCount!=listener1.count) Thread.sleep(500);\r
+ while ( (System.currentTimeMillis()-start)<5000 && msgCount!=listener1.count.get()) Thread.sleep(500);\r
System.err.println("Finished ASYNC");\r
- assertEquals("Checking success messages.",msgCount,listener1.count);\r
+ assertEquals("Checking success messages.",msgCount,listener1.count.get());\r
}\r
\r
public void testDataSendACK() throws Exception {\r
for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_UDP);\r
Thread.sleep(250);\r
System.err.println("Finished ACK");\r
- assertEquals("Checking success messages.",msgCount,listener1.count);\r
+ assertEquals("Checking success messages.",msgCount,listener1.count.get());\r
}\r
\r
public void testDataSendSYNCACK() throws Exception {\r
for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_UDP);\r
Thread.sleep(250);\r
System.err.println("Finished SYNC_ACK");\r
- assertEquals("Checking success messages.",msgCount,listener1.count);\r
+ assertEquals("Checking success messages.",msgCount,listener1.count.get());\r
}\r
\r
public static class Listener implements ChannelListener {\r
- long count = 0;\r
+ AtomicLong count = new AtomicLong(0);\r
+ int maxIdx = -1;\r
+ int[] nrs = new int[1000000];\r
+ public Listener() {\r
+ Arrays.fill(nrs, 0);\r
+ }\r
public boolean accept(Serializable s, Member m) {\r
return (s instanceof Data);\r
}\r
\r
public void messageReceived(Serializable s, Member m) {\r
- Data d = (Data)s;\r
- if ( !Data.verify(d) ) {\r
- System.err.println("ERROR");\r
- } else {\r
- count++;\r
- if ((count %1000) ==0 ) {\r
- System.err.println("SUCCESS:"+count);\r
+ try {\r
+ Data d = (Data)s;\r
+ if ( !Data.verify(d) ) {\r
+ System.err.println("ERROR - Unable to verify data package");\r
+ } else {\r
+ long c = count.addAndGet(1);\r
+ if ((c%1000) ==0 ) {\r
+ System.err.println("SUCCESS:"+c);\r
+ }\r
+ int nr = d.getNumber();\r
+ if (nr>=0 && nr<nrs.length) {\r
+ maxIdx = Math.max(maxIdx, nr);\r
+ nrs[nr] = 1;\r
+ }\r
}\r
+ }catch (Exception x ) {\r
+ x.printStackTrace();\r
}\r
}\r
}\r
public int length;\r
public byte[] data;\r
public byte key;\r
+ public boolean hasNr = false;\r
public static Random r = new Random(System.currentTimeMillis());\r
public static Data createRandomData() {\r
return createRandomData(ChannelReceiver.MAX_UDP_SIZE);\r
}\r
public static Data createRandomData(int size) {\r
+ return createRandomData(size,-1);\r
+ }\r
+ \r
+ public static Data createRandomData(int size, int number) {\r
int i = r.nextInt();\r
i = ( i % 127 );\r
int length = Math.abs(r.nextInt() % size);\r
+ if (length<100) length += 100;\r
Data d = new Data();\r
d.length = length;\r
d.key = (byte)i;\r
d.data = new byte[length];\r
Arrays.fill(d.data,d.key);\r
+ if (number>0 && d.data.length>=4) {\r
+ //populate number\r
+ d.hasNr = true;\r
+ XByteBuffer.toBytes(number,d.data, 0);\r
+ }\r
return d;\r
}\r
+ \r
+ public int getNumber() {\r
+ if (!hasNr) return -1;\r
+ return XByteBuffer.toInt(this.data, 0);\r
+ }\r
\r
public static boolean verify(Data d) {\r
boolean result = (d.length == d.data.length);\r
- for ( int i=0; result && (i<d.data.length); i++ ) result = result && d.data[i] == d.key;\r
+ for ( int i=(d.hasNr?4:0); result && (i<d.data.length); i++ ) result = result && d.data[i] == d.key;\r
return result;\r
}\r
}\r