Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=45851
authormarkt <markt@13f79535-47bb-0310-9956-ffa450edef68>
Fri, 26 Sep 2008 17:37:34 +0000 (17:37 +0000)
committermarkt <markt@13f79535-47bb-0310-9956-ffa450edef68>
Fri, 26 Sep 2008 17:37:34 +0000 (17:37 +0000)
Correct NPE when cluster is defined at engine level
Ensure that only 1 thread writes the replicated WAR to disk and that the messages containing the WAR are processed in the correct order.

git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@699427 13f79535-47bb-0310-9956-ffa450edef68

java/org/apache/catalina/ha/deploy/FarmWarDeployer.java
java/org/apache/catalina/ha/deploy/FileMessageFactory.java

index aa6ceb6..ce31bb3 100644 (file)
@@ -158,6 +158,7 @@ public class FarmWarDeployer extends ClusterListener implements ClusterDeployer,
         }else {
             engine = (Engine)parent;
             hostname = engine.getDefaultHost();
+            host = (Host) engine.findChild(hostname);
         }
         try {
             oname = new ObjectName(engine.getName() + ":type=Deployer,host="
index 225f964..971e11a 100644 (file)
@@ -22,6 +22,9 @@ import java.io.IOException;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.FileNotFoundException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * This factory is used to read files and write files by splitting them up into
@@ -74,7 +77,7 @@ public class FileMessageFactory {
     protected FileOutputStream out;
 
     /**
-     * The number of messages we have read or written
+     * The number of messages we have written
      */
     protected int nrOfMessagesProcessed = 0;
 
@@ -87,6 +90,19 @@ public class FileMessageFactory {
      * The total number of packets that we split this file into
      */
     protected long totalNrOfMessages = 0;
+    
+    /**
+     * The number of the last message procssed. Message IDs are 1 based.
+     */
+    protected AtomicLong lastMessageProcessed = new AtomicLong(0);
+    
+    /**
+     * Messages received out of order are held in the buffer until required. If
+     * everything is worked as expected, messages will spend very little time in
+     * the buffer.
+     */
+    protected Map<Long, FileMessage> msgBuffer =
+        new ConcurrentHashMap<Long, FileMessage>();
 
     /**
      * The bytes that we hold the data in, not thread safe.
@@ -94,6 +110,12 @@ public class FileMessageFactory {
     protected byte[] data = new byte[READ_SIZE];
 
     /**
+     * Flag that indicates if a thread is writing messages to disk. Access to
+     * this flag must be synchronised.
+     */
+    protected boolean isWriting = false;
+
+    /**
      * Private constructor, either instantiates a factory to read or write. <BR>
      * When openForWrite==true, then a the file, f, will be created and an
      * output stream is opened to write to it. <BR>
@@ -205,25 +227,65 @@ public class FileMessageFactory {
         if (log.isDebugEnabled())
             log.debug("Message " + msg + " data " + msg.getData()
                     + " data length " + msg.getDataLength() + " out " + out);
-        if (out != null) {
-            out.write(msg.getData(), 0, msg.getDataLength());
-            nrOfMessagesProcessed++;
+        
+        if (msg.getMessageNumber() <= lastMessageProcessed.get()) {
+            // Duplicate of message already processed
+            log.warn("Receive Message again -- Sender ActTimeout too short [ path: "
+                    + msg.getContextPath()
+                    + " war: "
+                    + msg.getFileName()
+                    + " data: "
+                    + msg.getData()
+                    + " data length: " + msg.getDataLength() + " ]");
+            return false;
+        }
+        
+        FileMessage previous =
+            msgBuffer.put(new Long(msg.getMessageNumber()), msg);
+        if (previous !=null) {
+            // Duplicate of message not yet processed
+            log.warn("Receive Message again -- Sender ActTimeout too short [ path: "
+                    + msg.getContextPath()
+                    + " war: "
+                    + msg.getFileName()
+                    + " data: "
+                    + msg.getData()
+                    + " data length: " + msg.getDataLength() + " ]");
+            return false;
+        }
+        
+        FileMessage next = null;
+        synchronized (this) {
+            if (!isWriting) {
+                next = msgBuffer.get(new Long(lastMessageProcessed.get() + 1));
+                if (next != null) {
+                    isWriting = true;
+                } else {
+                    return false;
+                }
+            } else {
+                return false;
+            }
+        }
+        
+        while (next != null) {
+            out.write(next.getData(), 0, next.getDataLength());
+            lastMessageProcessed.incrementAndGet();
             out.flush();
-            if (msg.getMessageNumber() == msg.getTotalNrOfMsgs()) {
+            if (next.getMessageNumber() == next.getTotalNrOfMsgs()) {
                 out.close();
                 cleanup();
                 return true;
-            }//end if
-        } else {
-            if (log.isWarnEnabled())
-                log.warn("Receive Message again -- Sender ActTimeout to short [ path: "
-                                + msg.getContextPath()
-                                + " war: "
-                                + msg.getFileName()
-                                + " data: "
-                                + msg.getData()
-                                + " data length: " + msg.getDataLength() + " ]");
+            }
+            synchronized(this) {
+                next =
+                    msgBuffer.get(new Long(lastMessageProcessed.get() + 1));
+                if (next == null) {
+                    isWriting = false;
+                }
+            }
         }
+        
         return false;
     }//writeMessage
 
@@ -248,6 +310,8 @@ public class FileMessageFactory {
         data = null;
         nrOfMessagesProcessed = 0;
         totalNrOfMessages = 0;
+        msgBuffer.clear();
+        lastMessageProcessed = null;
     }
 
     /**
@@ -309,4 +373,4 @@ public class FileMessageFactory {
         return file;
     }
 
-}
\ No newline at end of file
+}