https://issues.apache.org/bugzilla/show_bug.cgi?id=50667
authorfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Tue, 8 Feb 2011 20:18:19 +0000 (20:18 +0000)
committerfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Tue, 8 Feb 2011 20:18:19 +0000 (20:18 +0000)
Allow a replier to get confirmation if the reply message was sent successfully or if it failed

git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1068549 13f79535-47bb-0310-9956-ffa450edef68

java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java [new file with mode: 0644]
java/org/apache/catalina/tribes/group/RpcChannel.java

diff --git a/java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java b/java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java
new file mode 100644 (file)
index 0000000..a6b43db
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.group;
+
+import java.io.Serializable;
+
+import org.apache.catalina.tribes.ErrorHandler;
+import org.apache.catalina.tribes.Member;
+/**
+ * Extension to the {@link RpcCallback} interface. Allows a RPC messenger to get a confirmation if the reply
+ * was sent successfully to the original sender.
+ * @author fhanik
+ *
+ */
+public interface ExtendedRpcCallback extends RpcCallback {
+    
+    /**
+     * 
+     * @param request - the original message that requested the reply
+     * @param response - the reply message to the original message
+     * @param sender - the sender requested that reply
+     * @param reason - the reason the reply failed
+     * @return true if the callback would like to reattempt the reply, false otherwise
+     */
+    public boolean replyFailed(Serializable request, Serializable response, Member sender, Exception reason);
+    
+    /**
+     * 
+     * @param request - the original message that requested the reply
+     * @param response - the reply message to the original message
+     * @param sender - the sender requested that reply
+     */
+    public void replySucceeded(Serializable request, Serializable response, Member sender);
+}
index 04ad440..30a42fb 100644 (file)
@@ -24,7 +24,9 @@ import java.util.HashMap;
 import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.ChannelException;
 import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.ErrorHandler;
 import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.UniqueId;
 import org.apache.catalina.tribes.util.UUIDGenerator;
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
@@ -126,14 +128,46 @@ public class RpcChannel implements ChannelListener{
                 }//synchronized
             }//end if
         } else{
+            boolean finished = false;
+            final ExtendedRpcCallback excallback = (callback instanceof ExtendedRpcCallback)?((ExtendedRpcCallback)callback) : null;
+            boolean asyncReply = ((replyMessageOptions & Channel.SEND_OPTIONS_ASYNCHRONOUS) == Channel.SEND_OPTIONS_ASYNCHRONOUS);
             Serializable reply = callback.replyRequest(rmsg.message,sender);
-            rmsg.reply = true;
-            rmsg.message = reply;
-            try {
-                channel.send(new Member[] {sender}, rmsg,
-                        replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
-            }catch ( Exception x )  {
-                log.error("Unable to send back reply in RpcChannel.",x);
+            while (!finished) {
+                ErrorHandler handler = null;
+                final Serializable request = msg;
+                final Serializable response = reply;
+                final Member fsender = sender;
+                if (excallback!=null && asyncReply) {
+                    handler = new ErrorHandler() {
+                        public void handleError(ChannelException x, UniqueId id) {
+                            excallback.replyFailed(request, response, fsender, x);
+                        }
+                        
+                        public void handleCompletion(UniqueId id) {
+                            excallback.replySucceeded(request, response, fsender);
+                        }
+                    };
+                }
+                rmsg.reply = true;
+                rmsg.message = reply;
+                try {
+                    if (handler!=null) {
+                        channel.send(new Member[] {sender}, rmsg,replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK, handler);
+                    } else {
+                        channel.send(new Member[] {sender}, rmsg,replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
+                    }
+                    finished = true;
+                    if (excallback != null && !asyncReply) {
+                        excallback.replySucceeded(rmsg.message, reply, sender);
+                    }
+                }catch ( Exception x )  {
+                    if (excallback != null && !asyncReply) {
+                        finished = !excallback.replyFailed(rmsg.message, reply, sender, x);
+                    } else {
+                        finished = true;
+                        log.error("Unable to send back reply in RpcChannel.",x);
+                    }
+                }
             }
         }//end if
     }