From: fhanik Date: Tue, 10 Oct 2006 16:03:36 +0000 (+0000) Subject: Added in documentation X-Git-Url: https://git.internetallee.de/?a=commitdiff_plain;h=0e793b4148335db5e2a87847187ecdb4b52e6cb2;p=tomcat7.0 Added in documentation git-svn-id: https://svn.apache.org/repos/asf/tomcat/tc6.0.x/trunk@454797 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/java/org/apache/catalina/tribes/Channel.java b/java/org/apache/catalina/tribes/Channel.java index 1212ba145..fb8bc0af1 100644 --- a/java/org/apache/catalina/tribes/Channel.java +++ b/java/org/apache/catalina/tribes/Channel.java @@ -123,7 +123,7 @@ public interface Channel { * Send options, when a message is sent, it can have an option flag * to trigger certain behavior. Most flags are used to trigger channel interceptors * as the message passes through the channel stack.
- * However, there are four default flags that every channel implementation must implement
+ * However, there are five default flags that every channel implementation must implement
* SEND_OPTIONS_BYTE_MESSAGE - The message is a pure byte message and no marshalling or unmarshalling will * be performed.
* @@ -136,7 +136,7 @@ public interface Channel { * Send options, when a message is sent, it can have an option flag * to trigger certain behavior. Most flags are used to trigger channel interceptors * as the message passes through the channel stack.
- * However, there are four default flags that every channel implementation must implement
+ * However, there are five default flags that every channel implementation must implement
* SEND_OPTIONS_USE_ACK - Message is sent and an ACK is received when the message has been received by the recipient
* If no ack is received, the message is not considered successful
* @see #send(Member[], Serializable , int) @@ -148,7 +148,7 @@ public interface Channel { * Send options, when a message is sent, it can have an option flag * to trigger certain behavior. Most flags are used to trigger channel interceptors * as the message passes through the channel stack.
- * However, there are four default flags that every channel implementation must implement
+ * However, there are five default flags that every channel implementation must implement
* SEND_OPTIONS_SYNCHRONIZED_ACK - Message is sent and an ACK is received when the message has been received and * processed by the recipient
* If no ack is received, the message is not considered successful
@@ -161,7 +161,7 @@ public interface Channel { * Send options, when a message is sent, it can have an option flag * to trigger certain behavior. Most flags are used to trigger channel interceptors * as the message passes through the channel stack.
- * However, there are four default flags that every channel implementation must implement
+ * However, there are five default flags that every channel implementation must implement
* SEND_OPTIONS_ASYNCHRONOUS - Message is sent and an ACK is received when the message has been received and * processed by the recipient
* If no ack is received, the message is not considered successful
@@ -170,12 +170,23 @@ public interface Channel { */ public static final int SEND_OPTIONS_ASYNCHRONOUS = 0x0008; + /** + * Send options, when a message is sent, it can have an option flag + * to trigger certain behavior. Most flags are used to trigger channel interceptors + * as the message passes through the channel stack.
+ * However, there are five default flags that every channel implementation must implement
+ * SEND_OPTIONS_SECURE - Message is sent over an encrypted channel
+ * @see #send(Member[], Serializable , int) + * @see #send(Member[], Serializable, int, ErrorHandler) + */ + public static final int SEND_OPTIONS_SECURE = 0x0010; + /** * Send options, when a message is sent, it can have an option flag * to trigger certain behavior. Most flags are used to trigger channel interceptors * as the message passes through the channel stack.
- * However, there are four default flags that every channel implementation must implement
+ * However, there are five default flags that every channel implementation must implement
* SEND_OPTIONS_DEFAULT - the default sending options, just a helper variable.
* The default is int SEND_OPTIONS_DEFAULT = SEND_OPTIONS_USE_ACK;
* @see #SEND_OPTIONS_USE_ACK diff --git a/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java b/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java index d0d665ffb..227f94558 100644 --- a/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java +++ b/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java @@ -46,18 +46,9 @@ import org.apache.catalina.tribes.util.Arrays; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; -import java.util.ConcurrentModificationException; - /** - *

Title:

- * - *

Description:

- * - *

Copyright: Copyright (c) 2005

- * - *

Company:

* - * @author not attributable + * @author Filip Hanik * @version 1.0 */ public abstract class AbstractReplicatedMap extends LinkedHashMap implements RpcCallback, ChannelListener, MembershipListener, Heartbeat { @@ -81,19 +72,68 @@ public abstract class AbstractReplicatedMap extends LinkedHashMap implements Rpc //------------------------------------------------------------------------------ // INSTANCE VARIABLES //------------------------------------------------------------------------------ - private transient long rpcTimeout = 5000; - private transient Channel channel; - private transient RpcChannel rpcChannel; - private transient byte[] mapContextName; - private transient boolean stateTransferred = false; - private transient Object stateMutex = new Object(); - private transient HashMap mapMembers = new HashMap(); - private transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT; - private transient Object mapOwner; - private transient ClassLoader[] externalLoaders; + + /** + * Timeout for RPC messages, how long we will wait for a reply + */ + protected transient long rpcTimeout = 5000; + /** + * Reference to the channel for sending messages + */ + protected transient Channel channel; + /** + * The RpcChannel to send RPC messages through + */ + protected transient RpcChannel rpcChannel; + /** + * The Map context name makes this map unique, this + * allows us to have more than one map shared + * through one channel + */ + protected transient byte[] mapContextName; + /** + * Has the state been transferred + */ + protected transient boolean stateTransferred = false; + /** + * Simple lock object for transfers + */ + protected transient Object stateMutex = new Object(); + /** + * A list of members in our map + */ + protected transient HashMap mapMembers = new HashMap(); + /** + * Our default send options + */ + protected transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT; + /** + * The owner of this map, ala a SessionManager for example + */ + protected transient Object mapOwner; + /** + * External class loaders if serialization and deserialization is to be performed successfully. + */ + protected transient ClassLoader[] externalLoaders; + + /** + * The node we are currently backing up data to, this index will rotate + * on a round robin basis + */ protected transient int currentNode = 0; - private transient long accessTimeout = 5000; - private transient String mapname = ""; + + /** + * Since the map keeps internal membership + * this is the timeout for a ping message to be responded to + * If a remote map doesn't respond within this timeframe, + * its considered dead. + */ + protected transient long accessTimeout = 5000; + + /** + * Readable string of the mapContextName value + */ + protected transient String mapname = ""; //------------------------------------------------------------------------------ @@ -122,12 +162,27 @@ public abstract class AbstractReplicatedMap extends LinkedHashMap implements Rpc } + /** + * Helper methods, wraps a single member in an array + * @param m Member + * @return Member[] + */ protected Member[] wrap(Member m) { if ( m == null ) return new Member[0]; else return new Member[] {m}; } - private void init(Object owner, Channel channel, String mapContextName, long timeout, int channelSendOptions,ClassLoader[] cls) { + /** + * Initializes the map by creating the RPC channel, registering itself as a channel listener + * This method is also responsible for initiating the state transfer + * @param owner Object + * @param channel Channel + * @param mapContextName String + * @param timeout long + * @param channelSendOptions int + * @param cls ClassLoader[] + */ + protected void init(Object owner, Channel channel, String mapContextName, long timeout, int channelSendOptions,ClassLoader[] cls) { log.info("Initializing AbstractReplicatedMap with context name:"+mapContextName); this.mapOwner = owner; this.externalLoaders = cls; @@ -147,32 +202,52 @@ public abstract class AbstractReplicatedMap extends LinkedHashMap implements Rpc //create an rpc channel and add the map as a listener this.rpcChannel = new RpcChannel(this.mapContextName, channel, this); + //add this map as a message listener this.channel.addChannelListener(this); + //listen for membership notifications this.channel.addMembershipListener(this); try { + //broadcast our map, this just notifies other members of our existence broadcast(MapMessage.MSG_INIT, true); //transfer state from another map transferState(); + //state is transferred, we are ready for messaging broadcast(MapMessage.MSG_START, true); } catch (ChannelException x) { log.warn("Unable to send map start message."); throw new RuntimeException("Unable to start replicated map.",x); } - } - private void ping(long timeout) throws ChannelException { + /** + * Sends a ping out to all the members in the cluster, not just map members + * that this map is alive. + * @param timeout long + * @throws ChannelException + */ + protected void ping(long timeout) throws ChannelException { //send out a map membership message, only wait for the first reply - MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_INIT, - false, null, null, null, wrap(channel.getLocalMember(false))); - Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.ALL_REPLY, (channelSendOptions), (int)accessTimeout); - for (int i = 0; i < resp.length; i++) { - memberAlive(resp[i].getSource()); - }//for - + MapMessage msg = new MapMessage(this.mapContextName, + MapMessage.MSG_INIT, + false, + null, + null, + null, + wrap(channel.getLocalMember(false))); + if ( channel.getMembers().length > 0 ) { + //send a ping, wait for all nodes to reply + Response[] resp = rpcChannel.send(channel.getMembers(), + msg, rpcChannel.ALL_REPLY, + (channelSendOptions), + (int) accessTimeout); + for (int i = 0; i < resp.length; i++) { + memberAlive(resp[i].getSource()); + } //for + } + //update our map of members, expire some if we didn't receive a ping back synchronized (mapMembers) { Iterator it = mapMembers.entrySet().iterator(); long now = System.currentTimeMillis(); @@ -184,7 +259,11 @@ public abstract class AbstractReplicatedMap extends LinkedHashMap implements Rpc }//synch } - private void memberAlive(Member member) { + /** + * We have received a member alive notification + * @param member Member + */ + protected void memberAlive(Member member) { synchronized (mapMembers) { if (!mapMembers.containsKey(member)) { mapMemberAdded(member); @@ -192,8 +271,14 @@ public abstract class AbstractReplicatedMap extends LinkedHashMap implements Rpc mapMembers.put(member, new Long(System.currentTimeMillis())); } } - - private void broadcast(int msgtype, boolean rpc) throws ChannelException { + + /** + * Helper method to broadcast a message to all members in a channel + * @param msgtype int + * @param rpc boolean + * @throws ChannelException + */ + protected void broadcast(int msgtype, boolean rpc) throws ChannelException { //send out a map membership message, only wait for the first reply MapMessage msg = new MapMessage(this.mapContextName, msgtype, false, null, null, null, wrap(channel.getLocalMember(false)));