* Send options, when a message is sent, it can have an option flag
* to trigger certain behavior. Most flags are used to trigger channel interceptors
* as the message passes through the channel stack. <br>
- * However, there are four default flags that every channel implementation must implement<br>
+ * However, there are five default flags that every channel implementation must implement<br>
* SEND_OPTIONS_BYTE_MESSAGE - The message is a pure byte message and no marshalling or unmarshalling will
* be performed.<br>
*
* Send options, when a message is sent, it can have an option flag
* to trigger certain behavior. Most flags are used to trigger channel interceptors
* as the message passes through the channel stack. <br>
- * However, there are four default flags that every channel implementation must implement<br>
+ * However, there are five default flags that every channel implementation must implement<br>
* SEND_OPTIONS_USE_ACK - Message is sent and an ACK is received when the message has been received by the recipient<br>
* If no ack is received, the message is not considered successful<br>
* @see #send(Member[], Serializable , int)
* Send options, when a message is sent, it can have an option flag
* to trigger certain behavior. Most flags are used to trigger channel interceptors
* as the message passes through the channel stack. <br>
- * However, there are four default flags that every channel implementation must implement<br>
+ * However, there are five default flags that every channel implementation must implement<br>
* SEND_OPTIONS_SYNCHRONIZED_ACK - Message is sent and an ACK is received when the message has been received and
* processed by the recipient<br>
* If no ack is received, the message is not considered successful<br>
* Send options, when a message is sent, it can have an option flag
* to trigger certain behavior. Most flags are used to trigger channel interceptors
* as the message passes through the channel stack. <br>
- * However, there are four default flags that every channel implementation must implement<br>
+ * However, there are five default flags that every channel implementation must implement<br>
* SEND_OPTIONS_ASYNCHRONOUS - Message is sent and an ACK is received when the message has been received and
* processed by the recipient<br>
* If no ack is received, the message is not considered successful<br>
*/
public static final int SEND_OPTIONS_ASYNCHRONOUS = 0x0008;
+ /**
+ * Send options, when a message is sent, it can have an option flag
+ * to trigger certain behavior. Most flags are used to trigger channel interceptors
+ * as the message passes through the channel stack. <br>
+ * However, there are five default flags that every channel implementation must implement<br>
+ * SEND_OPTIONS_SECURE - Message is sent over an encrypted channel<br>
+ * @see #send(Member[], Serializable , int)
+ * @see #send(Member[], Serializable, int, ErrorHandler)
+ */
+ public static final int SEND_OPTIONS_SECURE = 0x0010;
+
/**
* Send options, when a message is sent, it can have an option flag
* to trigger certain behavior. Most flags are used to trigger channel interceptors
* as the message passes through the channel stack. <br>
- * However, there are four default flags that every channel implementation must implement<br>
+ * However, there are five default flags that every channel implementation must implement<br>
* SEND_OPTIONS_DEFAULT - the default sending options, just a helper variable. <br>
* The default is <code>int SEND_OPTIONS_DEFAULT = SEND_OPTIONS_USE_ACK;</code><br>
* @see #SEND_OPTIONS_USE_ACK
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
-import java.util.ConcurrentModificationException;
-
/**
- * <p>Title: </p>
- *
- * <p>Description: </p>
- *
- * <p>Copyright: Copyright (c) 2005</p>
- *
- * <p>Company: </p>
*
- * @author not attributable
+ * @author Filip Hanik
* @version 1.0
*/
public abstract class AbstractReplicatedMap extends LinkedHashMap implements RpcCallback, ChannelListener, MembershipListener, Heartbeat {
//------------------------------------------------------------------------------
// INSTANCE VARIABLES
//------------------------------------------------------------------------------
- private transient long rpcTimeout = 5000;
- private transient Channel channel;
- private transient RpcChannel rpcChannel;
- private transient byte[] mapContextName;
- private transient boolean stateTransferred = false;
- private transient Object stateMutex = new Object();
- private transient HashMap mapMembers = new HashMap();
- private transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT;
- private transient Object mapOwner;
- private transient ClassLoader[] externalLoaders;
+
+ /**
+ * Timeout for RPC messages, how long we will wait for a reply
+ */
+ protected transient long rpcTimeout = 5000;
+ /**
+ * Reference to the channel for sending messages
+ */
+ protected transient Channel channel;
+ /**
+ * The RpcChannel to send RPC messages through
+ */
+ protected transient RpcChannel rpcChannel;
+ /**
+ * The Map context name makes this map unique, this
+ * allows us to have more than one map shared
+ * through one channel
+ */
+ protected transient byte[] mapContextName;
+ /**
+ * Has the state been transferred
+ */
+ protected transient boolean stateTransferred = false;
+ /**
+ * Simple lock object for transfers
+ */
+ protected transient Object stateMutex = new Object();
+ /**
+ * A list of members in our map
+ */
+ protected transient HashMap mapMembers = new HashMap();
+ /**
+ * Our default send options
+ */
+ protected transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT;
+ /**
+ * The owner of this map, ala a SessionManager for example
+ */
+ protected transient Object mapOwner;
+ /**
+ * External class loaders if serialization and deserialization is to be performed successfully.
+ */
+ protected transient ClassLoader[] externalLoaders;
+
+ /**
+ * The node we are currently backing up data to, this index will rotate
+ * on a round robin basis
+ */
protected transient int currentNode = 0;
- private transient long accessTimeout = 5000;
- private transient String mapname = "";
+
+ /**
+ * Since the map keeps internal membership
+ * this is the timeout for a ping message to be responded to
+ * If a remote map doesn't respond within this timeframe,
+ * its considered dead.
+ */
+ protected transient long accessTimeout = 5000;
+
+ /**
+ * Readable string of the mapContextName value
+ */
+ protected transient String mapname = "";
//------------------------------------------------------------------------------
}
+ /**
+ * Helper methods, wraps a single member in an array
+ * @param m Member
+ * @return Member[]
+ */
protected Member[] wrap(Member m) {
if ( m == null ) return new Member[0];
else return new Member[] {m};
}
- private void init(Object owner, Channel channel, String mapContextName, long timeout, int channelSendOptions,ClassLoader[] cls) {
+ /**
+ * Initializes the map by creating the RPC channel, registering itself as a channel listener
+ * This method is also responsible for initiating the state transfer
+ * @param owner Object
+ * @param channel Channel
+ * @param mapContextName String
+ * @param timeout long
+ * @param channelSendOptions int
+ * @param cls ClassLoader[]
+ */
+ protected void init(Object owner, Channel channel, String mapContextName, long timeout, int channelSendOptions,ClassLoader[] cls) {
log.info("Initializing AbstractReplicatedMap with context name:"+mapContextName);
this.mapOwner = owner;
this.externalLoaders = cls;
//create an rpc channel and add the map as a listener
this.rpcChannel = new RpcChannel(this.mapContextName, channel, this);
+ //add this map as a message listener
this.channel.addChannelListener(this);
+ //listen for membership notifications
this.channel.addMembershipListener(this);
try {
+ //broadcast our map, this just notifies other members of our existence
broadcast(MapMessage.MSG_INIT, true);
//transfer state from another map
transferState();
+ //state is transferred, we are ready for messaging
broadcast(MapMessage.MSG_START, true);
} catch (ChannelException x) {
log.warn("Unable to send map start message.");
throw new RuntimeException("Unable to start replicated map.",x);
}
-
}
- private void ping(long timeout) throws ChannelException {
+ /**
+ * Sends a ping out to all the members in the cluster, not just map members
+ * that this map is alive.
+ * @param timeout long
+ * @throws ChannelException
+ */
+ protected void ping(long timeout) throws ChannelException {
//send out a map membership message, only wait for the first reply
- MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_INIT,
- false, null, null, null, wrap(channel.getLocalMember(false)));
- Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.ALL_REPLY, (channelSendOptions), (int)accessTimeout);
- for (int i = 0; i < resp.length; i++) {
- memberAlive(resp[i].getSource());
- }//for
-
+ MapMessage msg = new MapMessage(this.mapContextName,
+ MapMessage.MSG_INIT,
+ false,
+ null,
+ null,
+ null,
+ wrap(channel.getLocalMember(false)));
+ if ( channel.getMembers().length > 0 ) {
+ //send a ping, wait for all nodes to reply
+ Response[] resp = rpcChannel.send(channel.getMembers(),
+ msg, rpcChannel.ALL_REPLY,
+ (channelSendOptions),
+ (int) accessTimeout);
+ for (int i = 0; i < resp.length; i++) {
+ memberAlive(resp[i].getSource());
+ } //for
+ }
+ //update our map of members, expire some if we didn't receive a ping back
synchronized (mapMembers) {
Iterator it = mapMembers.entrySet().iterator();
long now = System.currentTimeMillis();
}//synch
}
- private void memberAlive(Member member) {
+ /**
+ * We have received a member alive notification
+ * @param member Member
+ */
+ protected void memberAlive(Member member) {
synchronized (mapMembers) {
if (!mapMembers.containsKey(member)) {
mapMemberAdded(member);
mapMembers.put(member, new Long(System.currentTimeMillis()));
}
}
-
- private void broadcast(int msgtype, boolean rpc) throws ChannelException {
+
+ /**
+ * Helper method to broadcast a message to all members in a channel
+ * @param msgtype int
+ * @param rpc boolean
+ * @throws ChannelException
+ */
+ protected void broadcast(int msgtype, boolean rpc) throws ChannelException {
//send out a map membership message, only wait for the first reply
MapMessage msg = new MapMessage(this.mapContextName, msgtype,
false, null, null, null, wrap(channel.getLocalMember(false)));