Add locks instead of synchronized statements to avoid issues between receiving and...
authorfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Thu, 17 May 2007 13:45:15 +0000 (13:45 +0000)
committerfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Thu, 17 May 2007 13:45:15 +0000 (13:45 +0000)
git-svn-id: https://svn.apache.org/repos/asf/tomcat/tc6.0.x/trunk@538920 13f79535-47bb-0310-9956-ffa450edef68

java/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java

index 34d8852..9bd1a75 100644 (file)
@@ -25,6 +25,7 @@ import org.apache.catalina.tribes.group.ChannelInterceptorBase;
 import org.apache.catalina.tribes.group.InterceptorPayload;
 import org.apache.catalina.tribes.io.XByteBuffer;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 
 
@@ -59,36 +60,49 @@ public class OrderInterceptor extends ChannelInterceptorBase {
     private long expire = 3000;
     private boolean forwardExpired = true;
     private int maxQueue = Integer.MAX_VALUE;
+    
+    ReentrantReadWriteLock inLock = new ReentrantReadWriteLock(true);
+    ReentrantReadWriteLock outLock= new ReentrantReadWriteLock(true);
 
-    public synchronized void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
+    public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
         if ( !okToProcess(msg.getOptions()) ) {
             super.sendMessage(destination, msg, payload);
             return;
         }
-        for ( int i=0; i<destination.length; i++ ) {
-            int nr = incCounter(destination[i]);
-            //reduce byte copy
-            msg.getMessage().append(nr);
-            try {
-                getNext().sendMessage(new Member[] {destination[i]}, msg, payload);
-            }finally {
-                msg.getMessage().trim(4);
+        try {
+            outLock.writeLock().lock();
+            for ( int i=0; i<destination.length; i++ ) {
+                int nr = incCounter(destination[i]);
+                //reduce byte copy
+                msg.getMessage().append(nr);
+                try {
+                    getNext().sendMessage(new Member[] {destination[i]}, msg, payload);
+                }finally {
+                    msg.getMessage().trim(4);
+                }
             }
+        }finally {
+            outLock.writeLock().unlock();
         }
     }
 
-    public synchronized void messageReceived(ChannelMessage msg) {
+    public void messageReceived(ChannelMessage msg) {
         if ( !okToProcess(msg.getOptions()) ) {
             super.messageReceived(msg);
             return;
         }
-        int msgnr = XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4);
-        msg.getMessage().trim(4);
-        MessageOrder order = new MessageOrder(msgnr,(ChannelMessage)msg.deepclone());
-        if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false);
-    }
+        try {
+            inLock.writeLock().lock();
+            int msgnr = XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4);
+            msg.getMessage().trim(4);
+            MessageOrder order = new MessageOrder(msgnr,(ChannelMessage)msg.deepclone());
+            if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false);
     
-    public void processLeftOvers(Member member, boolean force) {
+        }finally {
+            inLock.writeLock().unlock();
+        }
+    }
+    protected void processLeftOvers(Member member, boolean force) {
         MessageOrder tmp = (MessageOrder)incoming.get(member);
         if ( force ) {
             Counter cnt = getInCounter(member);
@@ -101,7 +115,7 @@ public class OrderInterceptor extends ChannelInterceptorBase {
      * @param order MessageOrder
      * @return boolean - true if a message expired and was processed
      */
-    public boolean processIncoming(MessageOrder order) {
+    protected boolean processIncoming(MessageOrder order) {
         boolean result = false;
         Member member = order.getMessage().getAddress();
         Counter cnt = getInCounter(member);
@@ -147,28 +161,50 @@ public class OrderInterceptor extends ChannelInterceptorBase {
         return result;
     }
     
-    public synchronized void memberAdded(Member member) {
+    public void memberAdded(Member member) {
+        //reset counters
+        try {
+            inLock.writeLock().lock();
+            getInCounter(member);
+        }finally {
+            inLock.writeLock().unlock();
+        }
+        try {
+            outLock.writeLock().lock();
+            getOutCounter(member);
+        }finally {
+            outLock.writeLock().unlock();
+        }
         //notify upwards
-        getInCounter(member);
-        getOutCounter(member);
         super.memberAdded(member);
     }
 
-    public synchronized void memberDisappeared(Member member) {
-        //notify upwards
-        outcounter.remove(member);
-        incounter.remove(member);
+    public void memberDisappeared(Member member) {
+        //reset counters
+        try {
+            inLock.writeLock().lock();
+            incounter.remove(member);
+        }finally {
+            inLock.writeLock().unlock();
+        }
+        try {
+            outLock.writeLock().lock();
+            outcounter.remove(member);
+        }finally {
+            outLock.writeLock().unlock();
+        }
         //clear the remaining queue
         processLeftOvers(member,true);
+        //notify upwards
         super.memberDisappeared(member);
     }
     
-    public int incCounter(Member mbr) { 
+    protected int incCounter(Member mbr) { 
         Counter cnt = getOutCounter(mbr);
         return cnt.inc();
     }
     
-    public Counter getInCounter(Member mbr) {
+    protected Counter getInCounter(Member mbr) {
         Counter cnt = (Counter)incounter.get(mbr);
         if ( cnt == null ) {
             cnt = new Counter();
@@ -178,7 +214,7 @@ public class OrderInterceptor extends ChannelInterceptorBase {
         return cnt;
     }
 
-    public Counter getOutCounter(Member mbr) {
+    protected Counter getOutCounter(Member mbr) {
         Counter cnt = (Counter)outcounter.get(mbr);
         if ( cnt == null ) {
             cnt = new Counter();
@@ -187,7 +223,7 @@ public class OrderInterceptor extends ChannelInterceptorBase {
         return cnt;
     }
 
-    public static class Counter {
+    protected static class Counter {
         private AtomicInteger value = new AtomicInteger(0);
         
         public int getCounter() {
@@ -203,7 +239,7 @@ public class OrderInterceptor extends ChannelInterceptorBase {
         }
     }
     
-    public static class MessageOrder {
+    protected static class MessageOrder {
         private long received = System.currentTimeMillis();
         private MessageOrder next;
         private int msgNr;
index cd4f99d..3f4ff0e 100644 (file)
@@ -31,6 +31,7 @@ import org.apache.catalina.tribes.group.ChannelInterceptorBase;
 import org.apache.catalina.tribes.ChannelMessage;\r
 import org.apache.catalina.tribes.group.InterceptorPayload;\r
 import org.apache.catalina.tribes.ChannelException;\r
+import java.util.concurrent.atomic.AtomicInteger;\r
 \r
 public class TestOrderInterceptor extends TestCase {\r
 \r
@@ -76,8 +77,9 @@ public class TestOrderInterceptor extends TestCase {
     \r
     public void testOrder1() throws Exception {\r
         Member[] dest = channels[0].getMembers();\r
+        final AtomicInteger value = new AtomicInteger(0);\r
         for ( int i=0; i<100; i++ ) {\r
-            channels[0].send(dest,new Integer(i),0);\r
+            channels[0].send(dest,new Integer(value.getAndAdd(1)),0);\r
         }\r
         Thread.sleep(5000);\r
         for ( int i=0; i<test.length; i++ ) {\r
@@ -85,6 +87,40 @@ public class TestOrderInterceptor extends TestCase {
         }\r
     }\r
     \r
+    public void testOrder2() throws Exception {\r
+        final Member[] dest = channels[0].getMembers();\r
+        final AtomicInteger value = new AtomicInteger(0);\r
+        Runnable run = new Runnable() {\r
+            public void run() {\r
+                for (int i = 0; i < 100; i++) {\r
+                    try {\r
+                        synchronized (channels[0]) {\r
+                            channels[0].send(dest, new Integer(value.getAndAdd(1)), 0);\r
+                        }\r
+                    }catch ( Exception x ) {\r
+                        x.printStackTrace();\r
+                        assertEquals(true,false);\r
+                    }\r
+                }\r
+            }\r
+        };\r
+        Thread[] threads = new Thread[5];\r
+        for (int i=0;i<threads.length;i++) {\r
+            threads[i] = new Thread(run);\r
+        }\r
+        for (int i=0;i<threads.length;i++) {\r
+            threads[i].start();\r
+        }\r
+        for (int i=0;i<threads.length;i++) {\r
+            threads[i].join();\r
+        }\r
+        Thread.sleep(5000);\r
+        for ( int i=0; i<test.length; i++ ) {\r
+            super.assertEquals(false,test[i].fail);\r
+        }\r
+    }\r
+\r
+\r
     protected void tearDown() throws Exception {\r
         System.out.println("tearDown");\r
         super.tearDown();\r
@@ -112,7 +148,7 @@ public class TestOrderInterceptor extends TestCase {
             Integer i = (Integer)msg;\r
             if ( i.intValue() != cnt ) fail = true;\r
             else cnt++;\r
-            System.out.println("Listener["+id+"] Message received:"+i+" Count:"+total);\r
+            System.out.println("Listener["+id+"] Message received:"+i+" Count:"+total+" Fail:"+fail);\r
 \r
         }\r
 \r