null,
null,
null,
- wrap(channel.getLocalMember(false)));
+ channel.getLocalMember(false),
+ new Member[0]);
if ( channel.getMembers().length > 0 ) {
//send a ping, wait for all nodes to reply
Response[] resp = rpcChannel.send(channel.getMembers(),
protected void broadcast(int msgtype, boolean rpc) throws ChannelException {
//send out a map membership message, only wait for the first reply
MapMessage msg = new MapMessage(this.mapContextName, msgtype,
- false, null, null, null, wrap(channel.getLocalMember(false)));
+ false, null, null, null, channel.getLocalMember(false), new Member[0]);
if ( rpc) {
Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, (channelSendOptions),rpcTimeout);
for (int i = 0; i < resp.length; i++) {
msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
true, (Serializable) entry.getKey(), null,
rentry.getDiff(),
+ entry.getPrimary(),
entry.getBackupNodes());
} catch (IOException x) {
log.error("Unable to diff object. Will replicate the entire object instead.", x);
msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
false, (Serializable) entry.getKey(),
(Serializable) entry.getValue(),
- null, entry.getBackupNodes());
+ null, entry.getPrimary(),entry.getBackupNodes());
}
try {
Member backup = members.length > 0 ? (Member) members[0] : null;
if (backup != null) {
MapMessage msg = new MapMessage(mapContextName, getStateMessageType(), false,
- null, null, null, null);
+ null, null, null, null, null);
Response[] resp = rpcChannel.send(new Member[] {backup}, msg, rpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout);
if (resp.length > 0) {
synchronized (stateMutex) {
boolean copy = (mapmsg.getMsgType() == mapmsg.MSG_STATE_COPY);
MapMessage me = new MapMessage(mapContextName,
copy?MapMessage.MSG_COPY:MapMessage.MSG_PROXY,
- false, (Serializable) entry.getKey(), copy?(Serializable) entry.getValue():null, null, entry.getBackupNodes());
+ false, (Serializable) entry.getKey(), copy?(Serializable) entry.getValue():null, null, entry.getPrimary(),entry.getBackupNodes());
list.add(me);
}
}
try {
mapmsg.deserialize(getExternalLoaders());
if (mapmsg.getMsgType() == MapMessage.MSG_START) {
- mapMemberAdded(mapmsg.getBackupNodes()[0]);
+ mapMemberAdded(mapmsg.getPrimary());
} else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) {
- memberAlive(mapmsg.getBackupNodes()[0]);
+ memberAlive(mapmsg.getPrimary());
}
} catch (IOException x ) {
log.error("Unable to deserialize MapMessage.",x);
if ( log.isTraceEnabled() )
log.trace("Map message received from:"+sender.getName()+" msg:"+mapmsg);
if (mapmsg.getMsgType() == MapMessage.MSG_START) {
- mapMemberAdded(mapmsg.getBackupNodes()[0]);
+ mapMemberAdded(mapmsg.getPrimary());
}
if (mapmsg.getMsgType() == MapMessage.MSG_STOP) {
- memberDisappeared(mapmsg.getBackupNodes()[0]);
+ memberDisappeared(mapmsg.getPrimary());
}
if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) {
entry.setBackup(false);
entry.setProxy(true);
entry.setBackupNodes(mapmsg.getBackupNodes());
+ entry.setPrimary(mapmsg.getPrimary());
super.put(entry.getKey(), entry);
} else {
entry.setProxy(true);
entry.setBackup(false);
entry.setBackupNodes(mapmsg.getBackupNodes());
+ entry.setPrimary(mapmsg.getPrimary());
}
}
entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
entry.setProxy(false);
entry.setBackupNodes(mapmsg.getBackupNodes());
+ entry.setPrimary(mapmsg.getPrimary());
if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof ReplicatedMapEntry ) {
((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner());
}
try {
if (getMapMembers().length > 0 && notify) {
- MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null);
+ MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null,null);
getChannel().send(getMapMembers(), msg, getChannelSendOptions());
}
} catch ( ChannelException x ) {
if ( !entry.isBackup() ) {
//make sure we don't retrieve from ourselves
msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false,
- (Serializable) key, null, null, null);
+ (Serializable) key, null, null, null,null);
Response[] resp = getRpcChannel().send(entry.getBackupNodes(),msg, this.getRpcChannel().FIRST_REPLY, Channel.SEND_OPTIONS_DEFAULT, getRpcTimeout());
if (resp == null || resp.length == 0) {
//no responses
backup = publishEntryInfo(key, entry.getValue());
} else if ( entry.isProxy() ) {
//invalidate the previous primary
- msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,backup);
+ msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,channel.getLocalMember(false),backup);
Member[] dest = getMapMembersExcl(backup);
if ( dest!=null && dest.length >0) {
getChannel().send(dest, msg, getChannelSendOptions());
MapEntry entry = new MapEntry(key,value);
entry.setBackup(false);
entry.setProxy(false);
+ entry.setPrimary(channel.getLocalMember(false));
Object old = null;
private boolean backup;
private boolean proxy;
private Member[] backupNodes;
-
+ private Member primary;
private Object key;
private Object value;
public Member[] getBackupNodes() {
return backupNodes;
}
+
+ public void setPrimary(Member m) {
+ primary = m;
+ }
+
+ public Member getPrimary() {
+ return primary;
+ }
public Object getValue() {
return value;
private byte[] keydata;
private byte[] diffvalue;
private Member[] nodes;
+ private Member primary;
public String toString() {
StringBuffer buf = new StringBuffer("MapMessage[context=");
public MapMessage(byte[] mapId,int msgtype, boolean diff,
Serializable key, Serializable value,
- byte[] diffvalue, Member[] nodes) {
+ byte[] diffvalue, Member primary, Member[] nodes) {
this.mapId = mapId;
this.msgtype = msgtype;
this.diff = diff;
this.value = value;
this.diffvalue = diffvalue;
this.nodes = nodes;
+ this.primary = primary;
setValue(value);
setKey(key);
}
private void setBackUpNodes(Member[] nodes) {
this.nodes = nodes;
}
+
+ public Member getPrimary() {
+ return primary;
+ }
+
+ private void setPrimary(Member m) {
+ primary = m;
+ }
public byte[] getMapId() {
return mapId;
* @return Object
*/
public Object clone() {
- MapMessage msg = new MapMessage(this.mapId, this.msgtype, this.diff, this.key, this.value, this.diffvalue, this.nodes);
+ MapMessage msg = new MapMessage(this.mapId, this.msgtype, this.diff, this.key, this.value, this.diffvalue, this.primary, this.nodes);
msg.keydata = this.keydata;
msg.valuedata = this.valuedata;
return msg;