defer the deserialization of the message to an async thread to be able to handle...
authorfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Fri, 9 Jan 2009 23:21:08 +0000 (23:21 +0000)
committerfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Fri, 9 Jan 2009 23:21:08 +0000 (23:21 +0000)
git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@733187 13f79535-47bb-0310-9956-ffa450edef68

java/org/apache/catalina/tribes/membership/McastService.java
java/org/apache/catalina/tribes/membership/McastServiceImpl.java

index 99ea04c..cdb28f5 100644 (file)
@@ -503,7 +503,6 @@ public class McastService implements MembershipService,MembershipListener,Messag
     public boolean accept(ChannelMessage msg) {
         return true;
     }
-    
     public void broadcast(ChannelMessage message) throws ChannelException {
         if (impl==null || (impl.startLevel & Channel.MBR_TX_SEQ)!=Channel.MBR_TX_SEQ )
             throw new ChannelException("Multicast send is not started or enabled.");
index 707d67f..9cc378f 100644 (file)
@@ -346,19 +346,7 @@ public class McastServiceImpl
                 if (XByteBuffer.firstIndexOf(data,0,MemberImpl.TRIBES_MBR_BEGIN)==0) {
                     memberDataReceived(data);
                 } else {
-                    XByteBuffer buffer = new XByteBuffer(data,true);
-                    if (buffer.countPackages(true)>0) {
-                        int count = buffer.countPackages();
-                        ChannelData[] pkgs = new ChannelData[count];
-                        for (int i=0; i<count; i++) {
-                            try {
-                                pkgs[i] = buffer.extractPackage(true);
-                            }catch (IllegalStateException ise) {
-                                log.debug("Unable to decode message.",ise);
-                            }
-                        }
-                        memberBroadcastsReceived(pkgs);
-                    }
+                    memberBroadcastsReceived(data);
                 }
                 
             }
@@ -407,28 +395,42 @@ public class McastServiceImpl
         }
     }
     
-    private void memberBroadcastsReceived(final ChannelData[] data) {
+    private void memberBroadcastsReceived(final byte[] b) {
         if (log.isTraceEnabled()) log.trace("Mcast received broadcasts.");
-        Runnable t = new Runnable() {
-            public void run() {
-                String name = Thread.currentThread().getName();
+        XByteBuffer buffer = new XByteBuffer(b,true);
+        if (buffer.countPackages(true)>0) {
+            int count = buffer.countPackages();
+            final ChannelData[] data = new ChannelData[count];
+            for (int i=0; i<count; i++) {
                 try {
-                    Thread.currentThread().setName("Membership-MemberAdded.");
-                    for (int i=0; i<data.length; i++ ) {
-                        try {
-                            if (data[i]!=null) {
-                                msgservice.messageReceived(data[i]);
+                    data[i] = buffer.extractPackage(true);
+                }catch (IllegalStateException ise) {
+                    log.debug("Unable to decode message.",ise);
+                }catch (IOException x) {
+                    log.debug("Unable to decode message.",x);
+                }
+            }
+            Runnable t = new Runnable() {
+                public void run() {
+                    String name = Thread.currentThread().getName();
+                    try {
+                        Thread.currentThread().setName("Membership-MemberAdded.");
+                        for (int i=0; i<data.length; i++ ) {
+                            try {
+                                if (data[i]!=null && !member.equals(data[i].getAddress())) {
+                                    msgservice.messageReceived(data[i]);
+                                }
+                            }catch (Throwable t) {
+                                log.error("Unable to receive broadcast message.",t);
                             }
-                        }catch (Throwable t) {
-                            log.error("Unable to receive broadcast message.",t);
                         }
+                    }finally {
+                        Thread.currentThread().setName(name);
                     }
-                }finally {
-                    Thread.currentThread().setName(name);
                 }
-            }
-        };
-        executor.execute(t);
+            };
+            executor.execute(t);
+        }
     }
 
     protected final Object expiredMutex = new Object();
@@ -469,6 +471,7 @@ public class McastServiceImpl
     }
     
     private final Object sendLock = new Object();
+
     public void send(boolean checkexpired, DatagramPacket packet) throws IOException{
         checkexpired = (checkexpired && (packet==null));
         //ignore if we haven't started the sender