Added in a unit test for ordering messages
authorfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Thu, 17 May 2007 13:19:59 +0000 (13:19 +0000)
committerfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Thu, 17 May 2007 13:19:59 +0000 (13:19 +0000)
git-svn-id: https://svn.apache.org/repos/asf/tomcat/tc6.0.x/trunk@538908 13f79535-47bb-0310-9956-ffa450edef68

java/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java [new file with mode: 0644]

index e651b96..34d8852 100644 (file)
@@ -24,6 +24,7 @@ import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.group.ChannelInterceptorBase;
 import org.apache.catalina.tribes.group.InterceptorPayload;
 import org.apache.catalina.tribes.io.XByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
 
 
 
@@ -59,7 +60,7 @@ public class OrderInterceptor extends ChannelInterceptorBase {
     private boolean forwardExpired = true;
     private int maxQueue = Integer.MAX_VALUE;
 
-    public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
+    public synchronized void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
         if ( !okToProcess(msg.getOptions()) ) {
             super.sendMessage(destination, msg, payload);
             return;
@@ -76,7 +77,7 @@ public class OrderInterceptor extends ChannelInterceptorBase {
         }
     }
 
-    public void messageReceived(ChannelMessage msg) {
+    public synchronized void messageReceived(ChannelMessage msg) {
         if ( !okToProcess(msg.getOptions()) ) {
             super.messageReceived(msg);
             return;
@@ -87,7 +88,7 @@ public class OrderInterceptor extends ChannelInterceptorBase {
         if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false);
     }
     
-    public synchronized void processLeftOvers(Member member, boolean force) {
+    public void processLeftOvers(Member member, boolean force) {
         MessageOrder tmp = (MessageOrder)incoming.get(member);
         if ( force ) {
             Counter cnt = getInCounter(member);
@@ -100,7 +101,7 @@ public class OrderInterceptor extends ChannelInterceptorBase {
      * @param order MessageOrder
      * @return boolean - true if a message expired and was processed
      */
-    public synchronized boolean processIncoming(MessageOrder order) {
+    public boolean processIncoming(MessageOrder order) {
         boolean result = false;
         Member member = order.getMessage().getAddress();
         Counter cnt = getInCounter(member);
@@ -130,7 +131,8 @@ public class OrderInterceptor extends ChannelInterceptorBase {
                 //reset the head
                 if ( tmp == head ) head = tmp.next;
                 cnt.setCounter(tmp.getMsgNr()+1);
-                if ( getForwardExpired() ) super.messageReceived(tmp.getMessage());
+                if ( getForwardExpired() ) 
+                    super.messageReceived(tmp.getMessage());
                 tmp.setMessage(null);
                 tmp = tmp.next;
                 if ( prev != null ) prev.next = tmp;  
@@ -145,14 +147,14 @@ public class OrderInterceptor extends ChannelInterceptorBase {
         return result;
     }
     
-    public void memberAdded(Member member) {
+    public synchronized void memberAdded(Member member) {
         //notify upwards
         getInCounter(member);
         getOutCounter(member);
         super.memberAdded(member);
     }
 
-    public void memberDisappeared(Member member) {
+    public synchronized void memberDisappeared(Member member) {
         //notify upwards
         outcounter.remove(member);
         incounter.remove(member);
@@ -166,7 +168,7 @@ public class OrderInterceptor extends ChannelInterceptorBase {
         return cnt.inc();
     }
     
-    public synchronized Counter getInCounter(Member mbr) {
+    public Counter getInCounter(Member mbr) {
         Counter cnt = (Counter)incounter.get(mbr);
         if ( cnt == null ) {
             cnt = new Counter();
@@ -176,7 +178,7 @@ public class OrderInterceptor extends ChannelInterceptorBase {
         return cnt;
     }
 
-    public synchronized Counter getOutCounter(Member mbr) {
+    public Counter getOutCounter(Member mbr) {
         Counter cnt = (Counter)outcounter.get(mbr);
         if ( cnt == null ) {
             cnt = new Counter();
@@ -186,18 +188,18 @@ public class OrderInterceptor extends ChannelInterceptorBase {
     }
 
     public static class Counter {
-        private int value = 0;
+        private AtomicInteger value = new AtomicInteger(0);
         
         public int getCounter() {
-            return value;
+            return value.get();
         }
         
-        public synchronized void setCounter(int counter) {
-            this.value = counter;
+        public void setCounter(int counter) {
+            this.value.set(counter);
         }
         
-        public synchronized int inc() {
-            return ++value;
+        public int inc() {
+            return value.addAndGet(1);
         }
     }
     
diff --git a/test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java b/test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java
new file mode 100644 (file)
index 0000000..cd4f99d
--- /dev/null
@@ -0,0 +1,149 @@
+/*\r
+ * Licensed to the Apache Software Foundation (ASF) under one or more\r
+ * contributor license agreements.  See the NOTICE file distributed with\r
+ * this work for additional information regarding copyright ownership.\r
+ * The ASF licenses this file to You under the Apache License, Version 2.0\r
+ * (the "License"); you may not use this file except in compliance with\r
+ * the License.  You may obtain a copy of the License at\r
+ *\r
+ *      http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+package org.apache.catalina.tribes.test.interceptors;\r
+\r
+import org.apache.catalina.tribes.Channel;\r
+import org.apache.catalina.tribes.Member;\r
+import org.apache.catalina.tribes.group.GroupChannel;\r
+import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator;\r
+import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;\r
+import junit.framework.TestCase;\r
+import junit.framework.TestResult;\r
+import junit.framework.TestSuite;\r
+import org.apache.catalina.tribes.ChannelListener;\r
+import java.io.Serializable;\r
+import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;\r
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;\r
+import org.apache.catalina.tribes.ChannelMessage;\r
+import org.apache.catalina.tribes.group.InterceptorPayload;\r
+import org.apache.catalina.tribes.ChannelException;\r
+\r
+public class TestOrderInterceptor extends TestCase {\r
+\r
+    GroupChannel[] channels = null;\r
+    OrderInterceptor[] orderitcs = null;\r
+    MangleOrderInterceptor[] mangleitcs = null;\r
+    TestListener[] test = null;\r
+    int channelCount = 2;\r
+    Thread[] threads = null;\r
+    protected void setUp() throws Exception {\r
+        System.out.println("Setup");\r
+        super.setUp();\r
+        channels = new GroupChannel[channelCount];\r
+        orderitcs = new OrderInterceptor[channelCount];\r
+        mangleitcs = new MangleOrderInterceptor[channelCount];\r
+        test = new TestListener[channelCount];\r
+        threads = new Thread[channelCount];\r
+        for ( int i=0; i<channelCount; i++ ) {\r
+            channels[i] = new GroupChannel();\r
+            orderitcs[i] = new OrderInterceptor();\r
+            mangleitcs[i] = new MangleOrderInterceptor();\r
+            orderitcs[i].setExpire(Long.MAX_VALUE);\r
+            channels[i].addInterceptor(orderitcs[i]);\r
+            channels[i].addInterceptor(mangleitcs[i]);\r
+            test[i] = new TestListener(i);\r
+            channels[i].addChannelListener(test[i]);\r
+            final int j = i;\r
+            threads[i] = new Thread() {\r
+                public void run() {\r
+                    try {\r
+                        channels[j].start(Channel.DEFAULT);\r
+                        Thread.sleep(50);\r
+                    } catch (Exception x) {\r
+                        x.printStackTrace();\r
+                    }\r
+                }\r
+            };\r
+        }\r
+        for ( int i=0; i<channelCount; i++ ) threads[i].start();\r
+        for ( int i=0; i<channelCount; i++ ) threads[i].join();\r
+        Thread.sleep(1000);\r
+    }\r
+    \r
+    public void testOrder1() throws Exception {\r
+        Member[] dest = channels[0].getMembers();\r
+        for ( int i=0; i<100; i++ ) {\r
+            channels[0].send(dest,new Integer(i),0);\r
+        }\r
+        Thread.sleep(5000);\r
+        for ( int i=0; i<test.length; i++ ) {\r
+            super.assertEquals(false,test[i].fail);\r
+        }\r
+    }\r
+    \r
+    protected void tearDown() throws Exception {\r
+        System.out.println("tearDown");\r
+        super.tearDown();\r
+        for ( int i=0; i<channelCount; i++ ) {\r
+            channels[i].stop(Channel.DEFAULT);\r
+        }\r
+    }\r
+    \r
+    public static void main(String[] args) throws Exception {\r
+        TestSuite suite = new TestSuite();\r
+        suite.addTestSuite(TestOrderInterceptor.class);\r
+        suite.run(new TestResult());\r
+    }\r
+    \r
+    public static class TestListener implements ChannelListener {\r
+        int id = -1;\r
+        public TestListener(int id) {\r
+            this.id = id;\r
+        }\r
+        int cnt = 0;\r
+        int total = 0;\r
+        boolean fail = false;\r
+        public synchronized void messageReceived(Serializable msg, Member sender) {\r
+            total++;\r
+            Integer i = (Integer)msg;\r
+            if ( i.intValue() != cnt ) fail = true;\r
+            else cnt++;\r
+            System.out.println("Listener["+id+"] Message received:"+i+" Count:"+total);\r
+\r
+        }\r
+\r
+        public boolean accept(Serializable msg, Member sender) {\r
+            return (msg instanceof Integer);\r
+        }\r
+    }\r
+    \r
+    public static class MangleOrderInterceptor extends ChannelInterceptorBase {\r
+        int cnt = 1;\r
+        ChannelMessage hold = null;\r
+        Member[] dest = null;\r
+        public synchronized void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {\r
+            if ( hold == null ) {\r
+                //System.out.println("Skipping message:"+msg);\r
+                hold = (ChannelMessage)msg.deepclone();\r
+                dest = new Member[destination.length];\r
+                System.arraycopy(destination,0,dest,0,dest.length);\r
+            } else {\r
+                //System.out.println("Sending message:"+msg);\r
+                super.sendMessage(destination,msg,payload);\r
+                //System.out.println("Sending message:"+hold);\r
+                super.sendMessage(dest,hold,null);\r
+                hold = null;\r
+                dest = null;\r
+            }\r
+        }\r
+    }\r
+    \r
+    \r
+    \r
+    \r
+\r
+}\r