tint.setInterval(500);\r
ThroughputInterceptor tint2 = new ThroughputInterceptor();\r
tint2.setInterval(500);\r
- channel1.addInterceptor(tint);\r
+ //channel1.addInterceptor(tint);\r
channel2.addInterceptor(tint2);\r
listener1 = new Listener();\r
ReceiverBase rb1 = (ReceiverBase)channel1.getChannelReceiver();\r
}\r
\r
public void testDataSendASYNCM() throws Exception {\r
- System.err.println("Starting ASYNC MULTI THREAD");\r
- Thread[] threads = new Thread[threadCount];\r
- for (int x=0; x<threads.length; x++ ) {\r
- threads[x] = new Thread() {\r
- public void run() {\r
- try {\r
- long start = System.currentTimeMillis();\r
- for (int i = 0; i < msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS|Channel.SEND_OPTIONS_UDP);\r
- System.out.println("Thread["+this.getName()+"] sent "+msgCount+" messages in "+(System.currentTimeMillis()-start)+" ms.");\r
- }catch ( Exception x ) {\r
- x.printStackTrace();\r
- return;\r
- } finally {\r
- threadCounter++;\r
+ final AtomicInteger counter = new AtomicInteger(0);\r
+ ReceiverBase rb1 = (ReceiverBase)channel1.getChannelReceiver();\r
+ ReceiverBase rb2 = (ReceiverBase)channel2.getChannelReceiver();\r
+ rb1.setUdpRxBufSize(1024*1024*10);\r
+ rb2.setUdpRxBufSize(1024*1024*10);\r
+ rb1.setUdpTxBufSize(1024*1024*10);\r
+ rb2.setUdpTxBufSize(1024*1024*10);\r
+ System.err.println("Starting NO_ACK");\r
+ Thread[] threads = new Thread[threadCount];\r
+ for (int x=0; x<threads.length; x++ ) {\r
+ threads[x] = new Thread() {\r
+ public void run() {\r
+ try {\r
+ long start = System.currentTimeMillis();\r
+ for (int i = 0; i < msgCount; i++) {\r
+ int cnt = counter.getAndAdd(1);\r
+ channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(1024,cnt),Channel.SEND_OPTIONS_UDP|Channel.SEND_OPTIONS_ASYNCHRONOUS);\r
+ //Thread.currentThread().sleep(10);\r
}\r
+ System.out.println("Thread["+this.getName()+"] sent "+msgCount+" messages in "+(System.currentTimeMillis()-start)+" ms.");\r
+ }catch ( Exception x ) {\r
+ x.printStackTrace();\r
+ return;\r
+ } finally {\r
+ threadCounter++;\r
}\r
- };\r
- }\r
- for (int x=0; x<threads.length; x++ ) { threads[x].start();}\r
- for (int x=0; x<threads.length; x++ ) { threads[x].join();}\r
- //sleep for 50 sec, let the other messages in\r
- long start = System.currentTimeMillis();\r
- while ( (System.currentTimeMillis()-start)<25000 && msgCount*threadCount!=listener1.count.get()) Thread.sleep(500);\r
- System.err.println("Finished ASYNC MULTI THREAD ["+listener1.count+"]");\r
- assertEquals("Checking success messages.",msgCount*threadCount,listener1.count.get());\r
+ }\r
+ };\r
+ } \r
+ for (int x=0; x<threads.length; x++ ) { threads[x].start();}\r
+ for (int x=0; x<threads.length; x++ ) { threads[x].join();}\r
+ //sleep for 50 sec, let the other messages in\r
+ long start = System.currentTimeMillis();\r
+ while ( (System.currentTimeMillis()-start)<25000 && msgCount*threadCount!=listener1.count.get()) Thread.sleep(500);\r
+ System.err.println("Finished NO_ACK ["+listener1.count+"]");\r
+ System.out.println("Sent "+counter.get()+ " messages. Received "+listener1.count+" Highest msg received:"+listener1.maxIdx);\r
+ System.out.print("Missing messages:");\r
+ printMissingMsgs(listener1.nrs,counter.get());\r
+ assertEquals("Checking success messages.",msgCount*threadCount,listener1.count.get());\r
}\r
public void testDataSendASYNC() throws Exception {\r
System.err.println("Starting ASYNC");\r