Publish primary node information as well
authorfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Thu, 9 Aug 2007 18:43:12 +0000 (18:43 +0000)
committerfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Thu, 9 Aug 2007 18:43:12 +0000 (18:43 +0000)
git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@564335 13f79535-47bb-0310-9956-ffa450edef68

java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
java/org/apache/catalina/tribes/tipis/ReplicatedMap.java
test/org/apache/catalina/tribes/demos/MapDemo.java

index 24fa6b1..827ec18 100644 (file)
@@ -239,7 +239,8 @@ public abstract class AbstractReplicatedMap extends ConcurrentHashMap implements
                                         null, 
                                         null, 
                                         null, 
-                                        wrap(channel.getLocalMember(false)));
+                                        channel.getLocalMember(false),
+                                        new Member[0]);
         if ( channel.getMembers().length > 0 ) {
             //send a ping, wait for all nodes to reply
             Response[] resp = rpcChannel.send(channel.getMembers(), 
@@ -287,7 +288,7 @@ public abstract class AbstractReplicatedMap extends ConcurrentHashMap implements
     protected void broadcast(int msgtype, boolean rpc) throws ChannelException {
         //send out a map membership message, only wait for the first reply
         MapMessage msg = new MapMessage(this.mapContextName, msgtype,
-                                        false, null, null, null, wrap(channel.getLocalMember(false)));
+                                        false, null, null, null, channel.getLocalMember(false), new Member[0]);
         if ( rpc) {
             Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, (channelSendOptions),rpcTimeout);
             for (int i = 0; i < resp.length; i++) {
@@ -391,6 +392,7 @@ public abstract class AbstractReplicatedMap extends ConcurrentHashMap implements
                     msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
                                          true, (Serializable) entry.getKey(), null,
                                          rentry.getDiff(),
+                                         entry.getPrimary(),
                                          entry.getBackupNodes());
                 } catch (IOException x) {
                     log.error("Unable to diff object. Will replicate the entire object instead.", x);
@@ -404,7 +406,7 @@ public abstract class AbstractReplicatedMap extends ConcurrentHashMap implements
                 msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
                                      false, (Serializable) entry.getKey(),
                                      (Serializable) entry.getValue(),
-                                     null, entry.getBackupNodes());
+                                     null, entry.getPrimary(),entry.getBackupNodes());
 
             }
             try {
@@ -439,7 +441,7 @@ public abstract class AbstractReplicatedMap extends ConcurrentHashMap implements
             Member backup = members.length > 0 ? (Member) members[0] : null;
             if (backup != null) {
                 MapMessage msg = new MapMessage(mapContextName, getStateMessageType(), false,
-                                                null, null, null, null);
+                                                null, null, null, null, null);
                 Response[] resp = rpcChannel.send(new Member[] {backup}, msg, rpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout);
                 if (resp.length > 0) {
                     synchronized (stateMutex) {
@@ -506,7 +508,7 @@ public abstract class AbstractReplicatedMap extends ConcurrentHashMap implements
                         boolean copy = (mapmsg.getMsgType() == mapmsg.MSG_STATE_COPY);
                         MapMessage me = new MapMessage(mapContextName, 
                                                        copy?MapMessage.MSG_COPY:MapMessage.MSG_PROXY,
-                            false, (Serializable) entry.getKey(), copy?(Serializable) entry.getValue():null, null, entry.getBackupNodes());
+                            false, (Serializable) entry.getKey(), copy?(Serializable) entry.getValue():null, null, entry.getPrimary(),entry.getBackupNodes());
                         list.add(me);
                     }
                 }
@@ -534,9 +536,9 @@ public abstract class AbstractReplicatedMap extends ConcurrentHashMap implements
         try {
             mapmsg.deserialize(getExternalLoaders());
             if (mapmsg.getMsgType() == MapMessage.MSG_START) {
-                mapMemberAdded(mapmsg.getBackupNodes()[0]);
+                mapMemberAdded(mapmsg.getPrimary());
             } else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) {
-                memberAlive(mapmsg.getBackupNodes()[0]);
+                memberAlive(mapmsg.getPrimary());
             }
         } catch (IOException x ) {
             log.error("Unable to deserialize MapMessage.",x);
@@ -565,11 +567,11 @@ public abstract class AbstractReplicatedMap extends ConcurrentHashMap implements
         if ( log.isTraceEnabled() ) 
             log.trace("Map message received from:"+sender.getName()+" msg:"+mapmsg);
         if (mapmsg.getMsgType() == MapMessage.MSG_START) {
-            mapMemberAdded(mapmsg.getBackupNodes()[0]);
+            mapMemberAdded(mapmsg.getPrimary());
         }
 
         if (mapmsg.getMsgType() == MapMessage.MSG_STOP) {
-            memberDisappeared(mapmsg.getBackupNodes()[0]);
+            memberDisappeared(mapmsg.getPrimary());
         }
 
         if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) {
@@ -579,11 +581,13 @@ public abstract class AbstractReplicatedMap extends ConcurrentHashMap implements
                 entry.setBackup(false);
                 entry.setProxy(true);
                 entry.setBackupNodes(mapmsg.getBackupNodes());
+                entry.setPrimary(mapmsg.getPrimary());
                 super.put(entry.getKey(), entry);
             } else {
                 entry.setProxy(true);
                 entry.setBackup(false);
                 entry.setBackupNodes(mapmsg.getBackupNodes());
+                entry.setPrimary(mapmsg.getPrimary());
             }
         }
 
@@ -598,6 +602,7 @@ public abstract class AbstractReplicatedMap extends ConcurrentHashMap implements
                 entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
                 entry.setProxy(false);
                 entry.setBackupNodes(mapmsg.getBackupNodes());
+                entry.setPrimary(mapmsg.getPrimary());
                 if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof ReplicatedMapEntry ) {
                     ((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner());
                 }
@@ -761,7 +766,7 @@ public abstract class AbstractReplicatedMap extends ConcurrentHashMap implements
 
         try {
             if (getMapMembers().length > 0 && notify) {
-                MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null);
+                MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null,null);
                 getChannel().send(getMapMembers(), msg, getChannelSendOptions());
             }
         } catch ( ChannelException x ) {
@@ -786,7 +791,7 @@ public abstract class AbstractReplicatedMap extends ConcurrentHashMap implements
                 if ( !entry.isBackup() ) {
                     //make sure we don't retrieve from ourselves
                     msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false,
-                                         (Serializable) key, null, null, null);
+                                         (Serializable) key, null, null, null,null);
                     Response[] resp = getRpcChannel().send(entry.getBackupNodes(),msg, this.getRpcChannel().FIRST_REPLY, Channel.SEND_OPTIONS_DEFAULT, getRpcTimeout());
                     if (resp == null || resp.length == 0) {
                         //no responses
@@ -807,7 +812,7 @@ public abstract class AbstractReplicatedMap extends ConcurrentHashMap implements
                     backup = publishEntryInfo(key, entry.getValue());
                 } else if ( entry.isProxy() ) {
                     //invalidate the previous primary
-                    msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,backup);
+                    msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,channel.getLocalMember(false),backup);
                     Member[] dest = getMapMembersExcl(backup);
                     if ( dest!=null && dest.length >0) {
                         getChannel().send(dest, msg, getChannelSendOptions());
@@ -875,6 +880,7 @@ public abstract class AbstractReplicatedMap extends ConcurrentHashMap implements
             MapEntry entry = new MapEntry(key,value);
             entry.setBackup(false);
             entry.setProxy(false);
+            entry.setPrimary(channel.getLocalMember(false));
     
             Object old = null;
     
@@ -1026,7 +1032,7 @@ public abstract class AbstractReplicatedMap extends ConcurrentHashMap implements
         private boolean backup;
         private boolean proxy;
         private Member[] backupNodes;
-
+        private Member primary;
         private Object key;
         private Object value;
 
@@ -1080,6 +1086,14 @@ public abstract class AbstractReplicatedMap extends ConcurrentHashMap implements
         public Member[] getBackupNodes() {
             return backupNodes;
         }
+        
+        public void setPrimary(Member m) {
+            primary = m;
+        }
+        
+        public Member getPrimary() {
+            return primary;
+        }
 
         public Object getValue() {
             return value;
@@ -1172,6 +1186,7 @@ public abstract class AbstractReplicatedMap extends ConcurrentHashMap implements
         private byte[] keydata;
         private byte[] diffvalue;
         private Member[] nodes;
+        private Member primary;
         
         public String toString() {
             StringBuffer buf = new StringBuffer("MapMessage[context=");
@@ -1205,7 +1220,7 @@ public abstract class AbstractReplicatedMap extends ConcurrentHashMap implements
 
         public MapMessage(byte[] mapId,int msgtype, boolean diff,
                           Serializable key, Serializable value,
-                          byte[] diffvalue, Member[] nodes)  {
+                          byte[] diffvalue, Member primary, Member[] nodes)  {
             this.mapId = mapId;
             this.msgtype = msgtype;
             this.diff = diff;
@@ -1213,6 +1228,7 @@ public abstract class AbstractReplicatedMap extends ConcurrentHashMap implements
             this.value = value;
             this.diffvalue = diffvalue;
             this.nodes = nodes;
+            this.primary = primary;
             setValue(value);
             setKey(key);
         }
@@ -1283,6 +1299,14 @@ public abstract class AbstractReplicatedMap extends ConcurrentHashMap implements
         private void setBackUpNodes(Member[] nodes) {
             this.nodes = nodes;
         }
+        
+        public Member getPrimary() {
+            return primary;
+        }
+        
+        private void setPrimary(Member m) {
+            primary = m;
+        }
 
         public byte[] getMapId() {
             return mapId;
@@ -1335,7 +1359,7 @@ public abstract class AbstractReplicatedMap extends ConcurrentHashMap implements
          * @return Object
          */
         public Object clone() {
-            MapMessage msg = new MapMessage(this.mapId, this.msgtype, this.diff, this.key, this.value, this.diffvalue, this.nodes);
+            MapMessage msg = new MapMessage(this.mapId, this.msgtype, this.diff, this.key, this.value, this.diffvalue, this.primary, this.nodes);
             msg.keydata = this.keydata;
             msg.valuedata = this.valuedata;
             return msg;
index 653b52a..59f2d75 100644 (file)
@@ -151,7 +151,7 @@ public class LazyReplicatedMap extends AbstractReplicatedMap
                 backup = wrap(next);
                 //publish the backup data to one node
                 msg = new MapMessage(getMapContextName(), MapMessage.MSG_BACKUP, false,
-                                     (Serializable) key, (Serializable) value, null, backup);
+                                     (Serializable) key, (Serializable) value, null, channel.getLocalMember(false), backup);
                 if ( log.isTraceEnabled() ) 
                     log.trace("Publishing backup data:"+msg+" to: "+next.getName());
                 UniqueId id = getChannel().send(backup, msg, getChannelSendOptions());
@@ -167,7 +167,7 @@ public class LazyReplicatedMap extends AbstractReplicatedMap
                 Member[] proxies = excludeFromSet(backup, getMapMembers());
                 if (success && proxies.length > 0 ) {
                     msg = new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false,
-                                         (Serializable) key, null, null, backup);
+                                         (Serializable) key, null, null, channel.getLocalMember(false),backup);
                     if ( log.isTraceEnabled() ) 
                     log.trace("Publishing proxy data:"+msg+" to: "+Arrays.toNameString(proxies));
                     getChannel().send(proxies, msg, getChannelSendOptions());
index 2505409..d66b3a2 100644 (file)
@@ -112,7 +112,7 @@ public class ReplicatedMap extends AbstractReplicatedMap implements RpcCallback,
 
         //publish the data out to all nodes
         MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_COPY, false,
-                                        (Serializable) key, (Serializable) value, null, backup);
+                                        (Serializable) key, (Serializable) value, null,channel.getLocalMember(false), backup);
 
         getChannel().send(getMapMembers(), msg, getChannelSendOptions());
 
index aee1d5d..305f2a5 100644 (file)
@@ -166,6 +166,7 @@ public class MapDemo implements ChannelListener, MembershipListener{
             String[] columnNames = {
                                    "Key",
                                    "Value",
+                                   "Primary Node",
                                    "Backup Node",
                                    "isPrimary",
                                    "isProxy",
@@ -198,10 +199,11 @@ public class MapDemo implements ChannelListener, MembershipListener{
                 switch (col) {
                     case 0: return entry.getKey();
                     case 1: return entry.getValue();
-                    case 2: return getMemberNames(entry.getBackupNodes());
-                    case 3: return new Boolean(entry.isPrimary());
-                    case 4: return new Boolean(entry.isProxy());
-                    case 5: return new Boolean(entry.isBackup());
+                    case 2: return entry.getPrimary()!=null?entry.getPrimary().getName():"null";
+                    case 3: return getMemberNames(entry.getBackupNodes());
+                    case 4: return new Boolean(entry.isPrimary());
+                    case 5: return new Boolean(entry.isProxy());
+                    case 6: return new Boolean(entry.isBackup());
                     default: return "";
                 }
                 
@@ -408,9 +410,9 @@ public class MapDemo implements ChannelListener, MembershipListener{
             cell.setBackground(Color.WHITE);
             if ( row > 0 ) {
                 Color color = null;
-                boolean primary = ( (Boolean) table.getValueAt(row, 3)).booleanValue();
-                boolean proxy = ( (Boolean) table.getValueAt(row, 4)).booleanValue();
-                boolean backup = ( (Boolean) table.getValueAt(row, 5)).booleanValue();
+                boolean primary = ( (Boolean) table.getValueAt(row, 4)).booleanValue();
+                boolean proxy = ( (Boolean) table.getValueAt(row, 5)).booleanValue();
+                boolean backup = ( (Boolean) table.getValueAt(row, 6)).booleanValue();
                 if (primary) color = Color.GREEN;
                 else if (proxy) color = Color.RED;
                 else if (backup) color = Color.BLUE;