From 4995acf4ab890493c4dc2ed087a73a7b2f84e08a Mon Sep 17 00:00:00 2001 From: fhanik Date: Tue, 8 Feb 2011 20:18:19 +0000 Subject: [PATCH] https://issues.apache.org/bugzilla/show_bug.cgi?id=50667 Allow a replier to get confirmation if the reply message was sent successfully or if it failed git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1068549 13f79535-47bb-0310-9956-ffa450edef68 --- .../catalina/tribes/group/ExtendedRpcCallback.java | 48 ++++++++++++++++++++++ .../apache/catalina/tribes/group/RpcChannel.java | 48 ++++++++++++++++++---- 2 files changed, 89 insertions(+), 7 deletions(-) create mode 100644 java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java diff --git a/java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java b/java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java new file mode 100644 index 000000000..a6b43db9a --- /dev/null +++ b/java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.tribes.group; + +import java.io.Serializable; + +import org.apache.catalina.tribes.ErrorHandler; +import org.apache.catalina.tribes.Member; +/** + * Extension to the {@link RpcCallback} interface. Allows a RPC messenger to get a confirmation if the reply + * was sent successfully to the original sender. + * @author fhanik + * + */ +public interface ExtendedRpcCallback extends RpcCallback { + + /** + * + * @param request - the original message that requested the reply + * @param response - the reply message to the original message + * @param sender - the sender requested that reply + * @param reason - the reason the reply failed + * @return true if the callback would like to reattempt the reply, false otherwise + */ + public boolean replyFailed(Serializable request, Serializable response, Member sender, Exception reason); + + /** + * + * @param request - the original message that requested the reply + * @param response - the reply message to the original message + * @param sender - the sender requested that reply + */ + public void replySucceeded(Serializable request, Serializable response, Member sender); +} diff --git a/java/org/apache/catalina/tribes/group/RpcChannel.java b/java/org/apache/catalina/tribes/group/RpcChannel.java index 04ad44074..30a42fb6a 100644 --- a/java/org/apache/catalina/tribes/group/RpcChannel.java +++ b/java/org/apache/catalina/tribes/group/RpcChannel.java @@ -24,7 +24,9 @@ import java.util.HashMap; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelException; import org.apache.catalina.tribes.ChannelListener; +import org.apache.catalina.tribes.ErrorHandler; import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.UniqueId; import org.apache.catalina.tribes.util.UUIDGenerator; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; @@ -126,14 +128,46 @@ public class RpcChannel implements ChannelListener{ }//synchronized }//end if } else{ + boolean finished = false; + final ExtendedRpcCallback excallback = (callback instanceof ExtendedRpcCallback)?((ExtendedRpcCallback)callback) : null; + boolean asyncReply = ((replyMessageOptions & Channel.SEND_OPTIONS_ASYNCHRONOUS) == Channel.SEND_OPTIONS_ASYNCHRONOUS); Serializable reply = callback.replyRequest(rmsg.message,sender); - rmsg.reply = true; - rmsg.message = reply; - try { - channel.send(new Member[] {sender}, rmsg, - replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK); - }catch ( Exception x ) { - log.error("Unable to send back reply in RpcChannel.",x); + while (!finished) { + ErrorHandler handler = null; + final Serializable request = msg; + final Serializable response = reply; + final Member fsender = sender; + if (excallback!=null && asyncReply) { + handler = new ErrorHandler() { + public void handleError(ChannelException x, UniqueId id) { + excallback.replyFailed(request, response, fsender, x); + } + + public void handleCompletion(UniqueId id) { + excallback.replySucceeded(request, response, fsender); + } + }; + } + rmsg.reply = true; + rmsg.message = reply; + try { + if (handler!=null) { + channel.send(new Member[] {sender}, rmsg,replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK, handler); + } else { + channel.send(new Member[] {sender}, rmsg,replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK); + } + finished = true; + if (excallback != null && !asyncReply) { + excallback.replySucceeded(rmsg.message, reply, sender); + } + }catch ( Exception x ) { + if (excallback != null && !asyncReply) { + finished = !excallback.replyFailed(rmsg.message, reply, sender, x); + } else { + finished = true; + log.error("Unable to send back reply in RpcChannel.",x); + } + } } }//end if } -- 2.11.0