Fixes to the clustering code, some changes in StandardSession broke func, this has...
authorfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Fri, 6 Oct 2006 21:17:50 +0000 (21:17 +0000)
committerfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Fri, 6 Oct 2006 21:17:50 +0000 (21:17 +0000)
consolidated the use of a nested <Manager> element

git-svn-id: https://svn.apache.org/repos/asf/tomcat/tc6.0.x/trunk@453771 13f79535-47bb-0310-9956-ffa450edef68

16 files changed:
conf/server.xml
java/org/apache/catalina/Cluster.java
java/org/apache/catalina/ha/CatalinaCluster.java
java/org/apache/catalina/ha/deploy/FarmWarDeployer.java
java/org/apache/catalina/ha/session/BackupManager.java
java/org/apache/catalina/ha/session/DeltaManager.java
java/org/apache/catalina/ha/session/DeltaSession.java
java/org/apache/catalina/ha/session/SimpleTcpReplicationManager.java
java/org/apache/catalina/ha/tcp/SimpleTcpCluster.java
java/org/apache/catalina/mbeans/MBeanUtils.java
java/org/apache/catalina/tribes/group/GroupChannel.java
java/org/apache/catalina/tribes/mbeans-descriptors.xml
java/org/apache/catalina/tribes/membership/mbeans-descriptors.xml
java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
java/org/apache/catalina/tribes/transport/mbeans-descriptors.xml
webapps/docs/config/cluster.xml

index 50caa85..5c43599 100644 (file)
         -->
         
         <!--
-        <Cluster className="org.apache.catalina.ha.tcp.SimpleTcpCluster">
+        <Cluster className="org.apache.catalina.ha.tcp.SimpleTcpCluster"
+                 channelSendOptions="11">
           <Manager className="org.apache.catalina.ha.session.DeltaManager"
                    expireSessionsOnShutdown="false"
                    notifyListenersOnReplication="true"/>
index b860e01..ef93c24 100644 (file)
@@ -108,6 +108,12 @@ public interface Cluster {
      * @param manager Manager
      */
     public void registerManager(Manager manager);
+    
+    /**
+     * Removes a manager from the cluster
+     * @param manager Manager
+     */
+    public void removeManager(Manager manager);
 
     // --------------------------------------------------------- Cluster Wide Deployments
     
index d4a9e6e..52bac11 100644 (file)
@@ -119,12 +119,11 @@ public interface CatalinaCluster extends Cluster {
     public Map getManagers();
 
     public Manager getManager(String name);
-    public void removeManager(String name,Manager manager);
-    public void addManager(String name,Manager manager);
     public String getManagerName(String name, Manager manager);
     public Valve[] getValves();
     
     public void setChannel(Channel channel);
     public Channel getChannel();
+    
 
 }
index 98b6b44..11ec1f2 100644 (file)
@@ -174,7 +174,7 @@ public class FarmWarDeployer extends ClusterListener implements ClusterDeployer,
     /*
      * stop cluster wide deployments
      * 
-     * @see org.apache.catalina.cluster.ClusterDeployer#stop()
+     * @see org.apache.catalina.ha.ClusterDeployer#stop()
      */
     public void stop() throws LifecycleException {
         started = false;
@@ -426,7 +426,7 @@ public class FarmWarDeployer extends ClusterListener implements ClusterDeployer,
     /*
      * Modifcation from watchDir war detected!
      * 
-     * @see org.apache.catalina.cluster.deploy.FileChangeListener#fileModified(java.io.File)
+     * @see org.apache.catalina.ha.deploy.FileChangeListener#fileModified(java.io.File)
      */
     public void fileModified(File newWar) {
         try {
@@ -450,7 +450,7 @@ public class FarmWarDeployer extends ClusterListener implements ClusterDeployer,
     /*
      * War remvoe from watchDir
      * 
-     * @see org.apache.catalina.cluster.deploy.FileChangeListener#fileRemoved(java.io.File)
+     * @see org.apache.catalina.ha.deploy.FileChangeListener#fileRemoved(java.io.File)
      */
     public void fileRemoved(File removeWar) {
         try {
@@ -581,7 +581,7 @@ public class FarmWarDeployer extends ClusterListener implements ClusterDeployer,
     /*
      * Call watcher to check for deploy changes
      * 
-     * @see org.apache.catalina.cluster.ClusterDeployer#backgroundProcess()
+     * @see org.apache.catalina.ha.ClusterDeployer#backgroundProcess()
      */
     public void backgroundProcess() {
         if (started) {
index c360d74..879c109 100644 (file)
@@ -177,9 +177,10 @@ public class BackupManager extends StandardManager implements ClusterManager
      */
     public void start() throws LifecycleException {
         if ( this.started ) return;
+        
         try {
+            cluster.registerManager(this);
             CatalinaCluster catclust = (CatalinaCluster)cluster;
-            catclust.addManager(getName(), this);
             LazyReplicatedMap map = new LazyReplicatedMap(this,
                                                           catclust.getChannel(),
                                                           DEFAULT_REPL_TIMEOUT,
@@ -188,6 +189,7 @@ public class BackupManager extends StandardManager implements ClusterManager
             map.setChannelSendOptions(mapSendOptions);
             this.sessions = map;
             super.start();
+            this.started = true;
         }  catch ( Exception x ) {
             log.error("Unable to start BackupManager",x);
             throw new LifecycleException("Failed to start BackupManager",x);
@@ -196,7 +198,7 @@ public class BackupManager extends StandardManager implements ClusterManager
     
     public String getMapName() {
         CatalinaCluster catclust = (CatalinaCluster)cluster;
-        String name = catclust.getManagerName(getName(),this)+"-"+"";
+        String name = catclust.getManagerName(getName(),this)+"-"+"map";
         if ( log.isDebugEnabled() ) log.debug("Backup manager, Setting map name to:"+name);
         return name;
     }
@@ -225,7 +227,7 @@ public class BackupManager extends StandardManager implements ClusterManager
         } finally {
             super.stop();
         }
-        cluster.removeManager(getName(),this);
+        cluster.removeManager(this);
 
     }
 
index d5a2bc8..600e25a 100644 (file)
@@ -837,7 +837,7 @@ public class DeltaManager extends ClusterManagerBase{
             if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.startClustering", getName()));
             //to survice context reloads, as only a stop/start is called, not
             // createManager
-            ((CatalinaCluster)cluster).addManager(getName(), this);
+            cluster.registerManager(this);
 
             getAllClusterSessions();
 
@@ -1015,7 +1015,7 @@ public class DeltaManager extends ClusterManagerBase{
 
         // Require a new random number generator if we are restarted
         this.random = null;
-        getCluster().removeManager(getName(),this);
+        getCluster().removeManager(this);
         replicationValve = null;
         if (initialized) {
             destroy();
index e55626e..1f1b494 100644 (file)
@@ -43,6 +43,7 @@ import org.apache.catalina.util.Enumerator;
 import org.apache.catalina.util.StringManager;
 import org.apache.catalina.session.StandardManager;
 import org.apache.catalina.session.ManagerBase;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  *
@@ -103,6 +104,7 @@ public class DeltaSession extends StandardSession implements Externalizable,Clus
 
     public DeltaSession(Manager manager) {
         super(manager);
+        accessCount = new AtomicInteger();
         this.resetDeltaRequest();
     }
 
@@ -545,6 +547,7 @@ public class DeltaSession extends StandardSession implements Externalizable,Clus
         isValid = ( (Boolean) stream.readObject()).booleanValue();
         thisAccessedTime = ( (Long) stream.readObject()).longValue();
         version = ( (Long) stream.readObject()).longValue();
+        this.accessCount = new AtomicInteger();
         boolean hasPrincipal = stream.readBoolean();
         principal = null;
         if (hasPrincipal) {
index d5f83c3..9f3bb33 100644 (file)
@@ -46,10 +46,7 @@ import org.apache.catalina.Loader;
  * @author Bela Ban (modifications for synchronous replication)
  * @version 1.0 for TC 4.0
  * Description: The InMemoryReplicationManager is a session manager that replicated
- * session information in memory. It uses <a href="www.javagroups.com">JavaGroups</a> as
- * a communication protocol to ensure guaranteed and ordered message delivery.
- * JavaGroups also provides a very flexible protocol stack to ensure that the replication
- * can be used in any environment.
+ * session information in memory. 
  * <BR><BR>
  * The InMemoryReplicationManager extends the StandardManager hence it allows for us
  * to inherit all the basic session management features like expiration, session listeners etc
@@ -269,7 +266,6 @@ public class SimpleTcpReplicationManager extends StandardManager implements Clus
                      getName()+"] is not distributable. Ignoring message");
             return null;
         }
-        //notify javagroups
         try
         {
             if ( invalidatedSessions.get(sessionId) != null ) {
@@ -478,7 +474,6 @@ public class SimpleTcpReplicationManager extends StandardManager implements Clus
     public void start() throws LifecycleException {
         mManagerRunning = true;
         super.start();
-        //start the javagroups channel
         try {
             //the channel is already running
             if ( mChannelStarted ) return;
@@ -488,7 +483,7 @@ public class SimpleTcpReplicationManager extends StandardManager implements Clus
                 log.error("Starting... no cluster associated with this context:"+getName());
                 return;
             }
-            cluster.addManager(getName(),this);
+            cluster.registerManager(this);
 
             if (cluster.getMembers().length > 0) {
                 Member mbr = cluster.getMembers()[0];
@@ -543,15 +538,10 @@ public class SimpleTcpReplicationManager extends StandardManager implements Clus
         mManagerRunning = false;
         mChannelStarted = false;
         super.stop();
-        //stop the javagroup channel
         try
         {
             this.sessions.clear();
-            cluster.removeManager(getName(),this);
-//            mReplicationListener.stopListening();
-//            mReplicationTransmitter.stop();
-//            service.stop();
-//            service = null;
+            cluster.removeManager(this);
         }
         catch ( Exception x )
         {
index b32013c..832e3ea 100644 (file)
@@ -345,6 +345,10 @@ public class SimpleTcpCluster
         this.managerTemplate = managerTemplate;
     }
 
+    public void setChannelSendOptions(int channelSendOptions) {
+        this.channelSendOptions = channelSendOptions;
+    }
+
     /**
      * has members
      */
@@ -479,6 +483,10 @@ public class SimpleTcpCluster
         return managerTemplate;
     }
 
+    public int getChannelSendOptions() {
+        return channelSendOptions;
+    }
+
     /**
      * Create new Manager without add to cluster (comes with start the manager)
      * 
@@ -493,6 +501,7 @@ public class SimpleTcpCluster
         Manager manager = null;
         try {
             manager = managerTemplate.cloneFromTemplate();
+            ((ClusterManager)manager).setName(name);
         } catch (Exception x) {
             log.error("Unable to clone cluster manager, defaulting to org.apache.catalina.ha.session.DeltaManager", x);
             manager = new org.apache.catalina.ha.session.DeltaManager();
@@ -503,13 +512,23 @@ public class SimpleTcpCluster
     }
     
     public void registerManager(Manager manager) {
-        manager.setDistributable(true);
-        if (manager instanceof ClusterManager) {
-            ClusterManager cmanager = (ClusterManager) manager ;
-            cmanager.setDefaultMode(false);
-            cmanager.setName(getManagerName(((ClusterManager)manager).getName(),manager));
-            cmanager.setCluster(this);
+    
+        if (! (manager instanceof ClusterManager)) {
+            log.warn("Manager [ " + manager + "] does not implement ClusterManager, addition to cluster has been aborted.");
+            return;
         }
+        ClusterManager cmanager = (ClusterManager) manager ;
+        cmanager.setDistributable(true);
+        // Notify our interested LifecycleListeners
+        lifecycle.fireLifecycleEvent(BEFORE_MANAGERREGISTER_EVENT, manager);
+        String clusterName = getManagerName(cmanager.getName(), manager);
+        cmanager.setName(clusterName);
+        cmanager.setCluster(this);
+        cmanager.setDefaultMode(false);
+    
+        managers.put(clusterName, manager);
+        // Notify our interested LifecycleListeners
+        lifecycle.fireLifecycleEvent(AFTER_MANAGERREGISTER_EVENT, manager);    
     }
 
     /**
@@ -517,48 +536,19 @@ public class SimpleTcpCluster
      * 
      * @see org.apache.catalina.ha.CatalinaCluster#removeManager(java.lang.String,Manager)
      */
-    public void removeManager(String name,Manager manager) {
-        if (manager != null) {
+    public void removeManager(Manager manager) {
+        if (manager != null && manager instanceof ClusterManager ) {
+            ClusterManager cmgr = (ClusterManager) manager;
             // Notify our interested LifecycleListeners
             lifecycle.fireLifecycleEvent(BEFORE_MANAGERUNREGISTER_EVENT,manager);
-            managers.remove(getManagerName(name,manager));
-            if (manager instanceof ClusterManager) ((ClusterManager) manager).setCluster(null);
+            managers.remove(getManagerName(cmgr.getName(),manager));
+            cmgr.setCluster(null);
             // Notify our interested LifecycleListeners
             lifecycle.fireLifecycleEvent(AFTER_MANAGERUNREGISTER_EVENT, manager);
         }
     }
 
     /**
-     * add an application to cluster replication bus
-     * 
-     * @param name
-     *            of the context
-     * @param manager
-     *            manager to register
-     * @see org.apache.catalina.ha.CatalinaCluster#addManager(java.lang.String,
-     *      org.apache.catalina.Manager)
-     */
-    public void addManager(String name, Manager manager) {
-        if (!manager.getDistributable()) {
-            log.warn("Manager with name " + name + " is not distributable, can't add as cluster manager");
-            return;
-        }
-        // Notify our interested LifecycleListeners
-        lifecycle.fireLifecycleEvent(BEFORE_MANAGERREGISTER_EVENT, manager);
-        String clusterName = getManagerName(name, manager);
-        if (manager instanceof ClusterManager) {
-            ClusterManager cmanager = (ClusterManager) manager ;
-            cmanager.setName(clusterName);
-            cmanager.setCluster(this);
-            //not needed anymore, we have an explicit Manager element
-            //if(cmanager.isDefaultMode()) transferProperty("manager",cmanager);
-        }
-        managers.put(clusterName, manager);
-        // Notify our interested LifecycleListeners
-        lifecycle.fireLifecycleEvent(AFTER_MANAGERREGISTER_EVENT, manager);
-    }
-
-    /**
      * @param name
      * @param manager
      * @return
index 6acbe0b..71ae2a8 100644 (file)
@@ -1366,7 +1366,7 @@ public class MBeanUtils {
             registry.loadDescriptors("org.apache.catalina.session", cl);
             registry.loadDescriptors("org.apache.catalina.startup", cl);
             registry.loadDescriptors("org.apache.catalina.users", cl);
-            registry.loadDescriptors("org.apache.catalina.cluster", cl);
+            registry.loadDescriptors("org.apache.catalina.ha", cl);
             registry.loadDescriptors("org.apache.catalina.connector", cl);
             registry.loadDescriptors("org.apache.catalina.valves",  cl);
         }
index ff4a08e..69f7f1d 100644 (file)
@@ -504,7 +504,7 @@ public class GroupChannel extends ChannelInterceptorBase implements ManagedChann
         if (!this.channelListeners.contains(channelListener) ) {
             this.channelListeners.add(channelListener);
         } else {
-            throw new IllegalArgumentException("Listener already exists:"+channelListener);
+            throw new IllegalArgumentException("Listener already exists:"+channelListener+"["+channelListener.getClass().getName()+"]");
         }
     }
 
index 3f5bf56..7f54550 100644 (file)
@@ -6,7 +6,7 @@
           description="Tcp Cluster implementation"
                domain="Catalina"
                 group="Cluster"
-                 type="org.apache.catalina.cluster.tcp.SimpleTcpCluster">
+                 type="org.apache.catalina.ha.tcp.SimpleTcpCluster">
 
     <attribute   name="protocolStack"
           description="JavaGroups protocol stack selection"
@@ -20,7 +20,7 @@
           description="Clustered implementation of the Manager interface"
                domain="Catalina"
                 group="Manager"
-                 type="org.apache.catalina.cluster.tcp.SimpleTcpReplicationManager">
+                 type="org.apache.catalina.ha.tcp.SimpleTcpReplicationManager">
 
     <attribute   name="algorithm"
           description="The message digest algorithm to be used when generating
@@ -77,7 +77,7 @@
           description="Valve for simple tcp replication"
                domain="Catalina"
                 group="Valve"
-                 type="org.apache.catalina.cluster.tcp.ReplicationValve">
+                 type="org.apache.catalina.ha.tcp.ReplicationValve">
 
     <attribute   name="className"
           description="Fully qualified class name of the managed object"
index d6e3fc4..41e3329 100644 (file)
@@ -8,7 +8,7 @@
            description="Cluster Membership service implementation"
                domain="Catalina"
                 group="Cluster"
-                 type="org.apache.catalina.cluster.mcast.McastService">
+                 type="org.apache.catalina.ha.mcast.McastService">
     <attribute   name="info"
           description="Class version info"
                  type="java.lang.String"
index 13459e4..2ac75e9 100644 (file)
@@ -275,6 +275,7 @@ public abstract class AbstractReplicatedMap extends LinkedHashMap implements Rpc
         if ( log.isTraceEnabled() )
             log.trace("Replicate invoked on key:"+key);
         MapEntry entry = (MapEntry)super.get(key);
+        if ( entry == null ) return;
         if ( !entry.isSerializable() ) return;
         if (entry != null && entry.isPrimary() && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0) {
             Object value = entry.getValue();
index e7b8624..1453811 100644 (file)
@@ -8,7 +8,7 @@
            description="Tcp Cluster implementation"
                domain="Catalina"
                 group="Cluster"
-                 type="org.apache.catalina.cluster.tcp.SimpleTcpCluster">
+                 type="org.apache.catalina.ha.tcp.SimpleTcpCluster">
     <attribute   name="info"
           description="Class version info"
                  type="java.lang.String"
@@ -48,7 +48,7 @@
                returnType="void">
       <parameter name="message"
                  description="replication message"
-                 type="org.apache.catalina.cluster.ClusterMessage"/>
+                 type="org.apache.catalina.ha.ClusterMessage"/>
     </operation>
     
     <operation   name="sendClusterDomain"
@@ -57,7 +57,7 @@
                returnType="void">
       <parameter name="message"
                  description="replication message"
-                 type="org.apache.catalina.cluster.ClusterMessage"/>
+                 type="org.apache.catalina.ha.ClusterMessage"/>
     </operation>
         
     <operation   name="sendToMember"
@@ -90,7 +90,7 @@
            description="Tcp Cluster NioReceiver implementation"
                domain="Catalina"
                 group="Cluster"
-                 type="org.apache.catalina.cluster.tcp.ClusterReceiverBase">
+                 type="org.apache.catalina.ha.tcp.ClusterReceiverBase">
     <attribute   name="info"
           description="Class version info"
                  type="java.lang.String"
            description="Tcp Cluster SocketReplicationListener implementation"
                domain="Catalina"
                 group="Cluster"
-                 type="org.apache.catalina.cluster.tcp.SocketReplicationListener">
+                 type="org.apache.catalina.ha.tcp.SocketReplicationListener">
     <attribute   name="info"
           description="Class version info"
                  type="java.lang.String"
           description="Tcp replication transmitter"
                domain="Catalina"
                 group="ClusterSender"
-                 type="org.apache.catalina.cluster.tcp.ReplicationTransmitter">
+                 type="org.apache.catalina.ha.tcp.ReplicationTransmitter">
     <attribute   name="info"
           description="Class version info"
                  type="java.lang.String"
           description="Async Cluster Sender"
                domain="Catalina"
                 group="IDataSender"
-                 type="org.apache.catalina.cluster.tcp.AsyncSocketSender">
+                 type="org.apache.catalina.ha.tcp.AsyncSocketSender">
     <attribute   name="info"
           description="Class version info"
                  type="java.lang.String"
           description="Multi Socket Sender, more than one socket per member"
                domain="Catalina"
                 group="IDataSender"
-                 type="org.apache.catalina.cluster.tcp.PooledSocketSender">
+                 type="org.apache.catalina.ha.tcp.PooledSocketSender">
     <attribute   name="address"
           description="sender ip address"
                  type="java.net.InetAddress"
           description="Sync Cluster Sender"
                domain="Catalina"
                 group="IDataSender"
-                 type="org.apache.catalina.cluster.tcp.SocketSender">
+                 type="org.apache.catalina.ha.tcp.SocketSender">
     <attribute   name="address"
           description="sender ip address"
                  type="java.net.InetAddress"
           description="Valve for simple tcp replication"
                domain="Catalina"
                 group="Valve"
-                 type="org.apache.catalina.cluster.tcp.ReplicationValve">
+                 type="org.apache.catalina.ha.tcp.ReplicationValve">
     <attribute   name="info"
           description="Class version info"
                  type="java.lang.String"
index 7c66ca1..bcdeae7 100644 (file)
          <code>org.apache.catalina.ha.tcp.SimpleTcpCluster</code>
       </p>
     </attribute>
+    <attribute name="channelSendOptions" required="true">
+      <p>The Tribes channel send options, default is 11
+      <source>
+        int options= Channel.SEND_OPTIONS_ASYNCHRONOUS | 
+                     Channel.SEND_OPTIONS_SYNCHRONIZED_ACK | 
+                     Channel.SEND_OPTIONS_USE_ACK;
+      </source>
+      Some of the values are:<br/>
+      <code>Channel.SEND_OPTIONS_SYNCHRONIZED_ACK = 0x0004</code><br/>
+      <code>Channel.SEND_OPTIONS_ASYNCHRONOUS = 0x0008</code><br/>
+      <code>Channel.SEND_OPTIONS_USE_ACK = 0x0002</code><br/>
+      </p>
+    </attribute>
+
     <attribute name="doClusterLog" required="false">
       <p><b>Deprecated since 6.0.0</b></p>
       <p>Possible values are <code>true</code> or <code>false</code><br/>