Added a TCP ping for membership, to be used with static memberships and with the...
authorfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Fri, 13 Apr 2007 23:26:07 +0000 (23:26 +0000)
committerfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Fri, 13 Apr 2007 23:26:07 +0000 (23:26 +0000)
git-svn-id: https://svn.apache.org/repos/asf/tomcat/tc6.0.x/trunk@528702 13f79535-47bb-0310-9956-ffa450edef68

java/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
java/org/apache/catalina/tribes/group/interceptors/TcpPingInterceptor.java [new file with mode: 0644]

index 59c1fea..dd6270c 100644 (file)
@@ -180,48 +180,15 @@ public class TcpFailureDetector extends ChannelInterceptorBase {
     }
     
     public void heartbeat() {
+        checkMembers(false);
+    }
+    public void checkMembers(boolean checkAll) {
+        
         try {
             if (membership == null) setupMembership();
             synchronized (membership) {
-                //update all alive times
-                Member[] members = super.getMembers();
-                for (int i = 0; members != null && i < members.length; i++) {
-                    if (membership.memberAlive( (MemberImpl) members[i])) {
-                        //we don't have this one in our membership, check to see if he/she is alive
-                        if (memberAlive(members[i])) {
-                            log.warn("Member added, even though we werent notified:" + members[i]);
-                            super.memberAdded(members[i]);
-                        } else {
-                            membership.removeMember( (MemberImpl) members[i]);
-                        } //end if
-                    } //end if
-                } //for
-
-                //check suspect members if they are still alive,
-                //if not, simply issue the memberDisappeared message
-                MemberImpl[] keys = (MemberImpl[]) removeSuspects.keySet().toArray(new MemberImpl[removeSuspects.size()]);
-                for (int i = 0; i < keys.length; i++) {
-                    MemberImpl m = (MemberImpl) keys[i];
-                    if (membership.getMember(m) != null && (!memberAlive(m))) {
-                        membership.removeMember(m);
-                        super.memberDisappeared(m);
-                        removeSuspects.remove(m);
-                        log.info("Suspect member, confirmed dead.["+m+"]");
-                    } //end if
-                }
-
-                //check add suspects members if they are alive now,
-                //if they are, simply issue the memberAdded message
-                keys = (MemberImpl[]) addSuspects.keySet().toArray(new MemberImpl[addSuspects.size()]);
-                for (int i = 0; i < keys.length; i++) {
-                    MemberImpl m = (MemberImpl) keys[i];
-                    if ( membership.getMember(m) == null && (memberAlive(m))) {
-                        membership.memberAlive(m);
-                        super.memberAdded(m);
-                        addSuspects.remove(m);
-                        log.info("Suspect member, confirmed alive.["+m+"]");
-                    } //end if
-                }
+                if ( !checkAll ) performBasicCheck();
+                else performForcedCheck();
             }
         }catch ( Exception x ) {
             log.warn("Unable to perform heartbeat on the TcpFailureDetector.",x);
@@ -230,6 +197,66 @@ public class TcpFailureDetector extends ChannelInterceptorBase {
         }
     }
     
+    protected void performForcedCheck() {
+        //update all alive times
+        Member[] members = super.getMembers();
+        for (int i = 0; members != null && i < members.length; i++) {
+            if (memberAlive(members[i])) {
+                if (membership.memberAlive((MemberImpl)members[i])) super.memberAdded(members[i]);
+                addSuspects.remove(members[i]);
+            } else {
+                if (membership.getMember(members[i])!=null) {
+                    membership.removeMember((MemberImpl)members[i]);
+                    removeSuspects.remove(members[i]);
+                    super.memberDisappeared((MemberImpl)members[i]);
+                }
+            } //end if
+        } //for
+
+    }
+
+    protected void performBasicCheck() {
+        //update all alive times
+        Member[] members = super.getMembers();
+        for (int i = 0; members != null && i < members.length; i++) {
+            if (membership.memberAlive( (MemberImpl) members[i])) {
+                //we don't have this one in our membership, check to see if he/she is alive
+                if (memberAlive(members[i])) {
+                    log.warn("Member added, even though we werent notified:" + members[i]);
+                    super.memberAdded(members[i]);
+                } else {
+                    membership.removeMember( (MemberImpl) members[i]);
+                } //end if
+            } //end if
+        } //for
+
+        //check suspect members if they are still alive,
+        //if not, simply issue the memberDisappeared message
+        MemberImpl[] keys = (MemberImpl[]) removeSuspects.keySet().toArray(new MemberImpl[removeSuspects.size()]);
+        for (int i = 0; i < keys.length; i++) {
+            MemberImpl m = (MemberImpl) keys[i];
+            if (membership.getMember(m) != null && (!memberAlive(m))) {
+                membership.removeMember(m);
+                super.memberDisappeared(m);
+                removeSuspects.remove(m);
+                log.info("Suspect member, confirmed dead.["+m+"]");
+            } //end if
+        }
+
+        //check add suspects members if they are alive now,
+        //if they are, simply issue the memberAdded message
+        keys = (MemberImpl[]) addSuspects.keySet().toArray(new MemberImpl[addSuspects.size()]);
+        for (int i = 0; i < keys.length; i++) {
+            MemberImpl m = (MemberImpl) keys[i];
+            if ( membership.getMember(m) == null && (memberAlive(m))) {
+                membership.memberAlive(m);
+                super.memberAdded(m);
+                addSuspects.remove(m);
+                log.info("Suspect member, confirmed alive.["+m+"]");
+            } //end if
+        }
+    }
+    
     protected synchronized void setupMembership() {
         if ( membership == null ) {
             membership = new Membership((MemberImpl)super.getLocalMember(true));
diff --git a/java/org/apache/catalina/tribes/group/interceptors/TcpPingInterceptor.java b/java/org/apache/catalina/tribes/group/interceptors/TcpPingInterceptor.java
new file mode 100644 (file)
index 0000000..73ba07e
--- /dev/null
@@ -0,0 +1,179 @@
+/*\r
+ * Licensed to the Apache Software Foundation (ASF) under one or more\r
+ * contributor license agreements.  See the NOTICE file distributed with\r
+ * this work for additional information regarding copyright ownership.\r
+ * The ASF licenses this file to You under the Apache License, Version 2.0\r
+ * (the "License"); you may not use this file except in compliance with\r
+ * the License.  You may obtain a copy of the License at\r
+ * \r
+ *      http://www.apache.org/licenses/LICENSE-2.0\r
+ * \r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+\r
+package org.apache.catalina.tribes.group.interceptors;\r
+\r
+import java.lang.ref.WeakReference;\r
+import java.util.Arrays;\r
+import java.util.concurrent.atomic.AtomicInteger;\r
+\r
+import org.apache.catalina.tribes.ChannelException;\r
+import org.apache.catalina.tribes.ChannelInterceptor;\r
+import org.apache.catalina.tribes.ChannelMessage;\r
+import org.apache.catalina.tribes.Member;\r
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;\r
+import org.apache.catalina.tribes.io.ChannelData;\r
+\r
+/**\r
+ * \r
+ * Sends a ping to all members.\r
+ * Configure this interceptor with the TcpFailureDetector below it,\r
+ * and the TcpFailureDetector will act as the membership guide.\r
+ * @author Filip Hanik\r
+ * @version 1.0\r
+ */\r
+\r
+public class TcpPingInterceptor extends ChannelInterceptorBase {\r
+    \r
+    protected static org.apache.juli.logging.Log log = \r
+        org.apache.juli.logging.LogFactory.getLog(TcpPingInterceptor.class);\r
+    \r
+    protected static byte[] TCP_PING_DATA = new byte[] {\r
+        79, -89, 115, 72, 121, -33, 67, -55, -97, 111, -119, -128, -95, 91, 7, 20,\r
+        125, -39, 82, 91, -21, -33, 67, -102, -73, 126, -66, -113, -127, 103, 30, -74,\r
+        55, 21, -66, -121, 69, 33, 76, -88, -65, 10, 77, 19, 83, 56, 21, 50,\r
+        85, -10, -108, -73, 58, -33, 33, 120, -111, 4, 125, -41, 114, -124, -64, -43};  \r
+\r
+    protected long interval = 1000; //1 second\r
+\r
+    protected boolean useThread = false;\r
+    protected boolean staticOnly = false;\r
+    protected boolean running = true;\r
+    protected PingThread thread = null;\r
+    protected static AtomicInteger cnt = new AtomicInteger(0);\r
+    \r
+    WeakReference<TcpFailureDetector> failureDetector = null;\r
+    WeakReference<StaticMembershipInterceptor> staticMembers = null;\r
+    \r
+    public synchronized void start(int svc) throws ChannelException {\r
+        super.start(svc);\r
+        running = true;\r
+        if ( thread == null ) {\r
+            thread = new PingThread();\r
+            thread.setDaemon(true);\r
+            thread.setName("TcpPingInterceptor.PingThread-"+cnt.addAndGet(1));\r
+            thread.start();\r
+        }\r
+        \r
+        //acquire the interceptors to invoke on send ping events\r
+        ChannelInterceptor next = getNext();\r
+        while ( next != null ) {\r
+            if ( next instanceof TcpFailureDetector ) \r
+                failureDetector = new WeakReference<TcpFailureDetector>((TcpFailureDetector)next);\r
+            if ( next instanceof StaticMembershipInterceptor ) \r
+                staticMembers = new WeakReference<StaticMembershipInterceptor>((StaticMembershipInterceptor)next);\r
+            next = next.getNext();\r
+        }\r
+        \r
+    }\r
+    \r
+    public void stop(int svc) throws ChannelException {\r
+        running = false;\r
+        if ( thread != null ) thread.interrupt();\r
+        thread = null;\r
+        super.stop(svc);\r
+    }\r
+    \r
+    public void heartbeat() {\r
+        super.heartbeat();\r
+        if (!getUseThread()) sendPing();\r
+    }\r
+\r
+    public long getInterval() {\r
+        return interval;\r
+    }\r
+\r
+    public void setInterval(long interval) {\r
+        this.interval = interval;\r
+    }\r
+\r
+    public void setUseThread(boolean useThread) {\r
+        this.useThread = useThread;\r
+    }\r
+\r
+    public void setStaticOnly(boolean staticOnly) {\r
+        this.staticOnly = staticOnly;\r
+    }\r
+\r
+    public boolean getUseThread() {\r
+        return useThread;\r
+    }\r
+\r
+    public boolean getStaticOnly() {\r
+        return staticOnly;\r
+    }\r
+\r
+    protected void sendPing() {\r
+        if (failureDetector.get()!=null) {\r
+            //we have a reference to the failure detector\r
+            //piggy back on that dude\r
+            failureDetector.get().checkMembers(true);\r
+        }else {\r
+            if (staticOnly && staticMembers.get()!=null) {\r
+                sendPingMessage(staticMembers.get().getMembers());\r
+            } else {\r
+                sendPingMessage(getMembers());\r
+            }\r
+        }\r
+    }\r
+\r
+    protected void sendPingMessage(Member[] members) {\r
+        if ( members == null || members.length == 0 ) return;\r
+        ChannelData data = new ChannelData(true);//generates a unique Id\r
+        data.setAddress(getLocalMember(false));\r
+        data.setTimestamp(System.currentTimeMillis());\r
+        data.setOptions(getOptionFlag());\r
+        try {\r
+            super.sendMessage(members, data, null);\r
+        }catch (ChannelException x) {\r
+            log.warn("Unable to send TCP ping.",x);\r
+        }\r
+    }\r
+    \r
+    public void messageReceived(ChannelMessage msg) {\r
+        //catch incoming \r
+        boolean process = true;\r
+        if ( okToProcess(msg.getOptions()) ) {\r
+            //check to see if it is a ping message, if so, process = false\r
+            process = ( (msg.getMessage().getLength() != TCP_PING_DATA.length) ||\r
+                        (!Arrays.equals(TCP_PING_DATA,msg.getMessage().getBytes()) ) );\r
+        }//end if\r
+\r
+        //ignore the message, it doesnt have the flag set\r
+        if ( process ) super.messageReceived(msg);\r
+        else if ( log.isDebugEnabled() ) log.debug("Received a TCP ping packet:"+msg);\r
+    }//messageReceived\r
+    \r
+    protected class PingThread extends Thread {\r
+        public void run() {\r
+            while (running) {\r
+                try {\r
+                    sleep(interval);\r
+                    sendPing();\r
+                }catch ( InterruptedException ix ) {\r
+                    interrupted();\r
+                }catch ( Exception x )  {\r
+                    log.warn("Unable to send ping from TCP ping thread.",x);\r
+                }\r
+            }\r
+        }\r
+    }\r
+\r
+    \r
+    \r
+\r
+}\r