--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.group;
+
+import java.io.Serializable;
+
+import org.apache.catalina.tribes.ErrorHandler;
+import org.apache.catalina.tribes.Member;
+/**
+ * Extension to the {@link RpcCallback} interface. Allows a RPC messenger to get a confirmation if the reply
+ * was sent successfully to the original sender.
+ * @author fhanik
+ *
+ */
+public interface ExtendedRpcCallback extends RpcCallback {
+
+ /**
+ *
+ * @param request - the original message that requested the reply
+ * @param response - the reply message to the original message
+ * @param sender - the sender requested that reply
+ * @param reason - the reason the reply failed
+ * @return true if the callback would like to reattempt the reply, false otherwise
+ */
+ public boolean replyFailed(Serializable request, Serializable response, Member sender, Exception reason);
+
+ /**
+ *
+ * @param request - the original message that requested the reply
+ * @param response - the reply message to the original message
+ * @param sender - the sender requested that reply
+ */
+ public void replySucceeded(Serializable request, Serializable response, Member sender);
+}
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.ErrorHandler;
import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.UniqueId;
import org.apache.catalina.tribes.util.UUIDGenerator;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
}//synchronized
}//end if
} else{
+ boolean finished = false;
+ final ExtendedRpcCallback excallback = (callback instanceof ExtendedRpcCallback)?((ExtendedRpcCallback)callback) : null;
+ boolean asyncReply = ((replyMessageOptions & Channel.SEND_OPTIONS_ASYNCHRONOUS) == Channel.SEND_OPTIONS_ASYNCHRONOUS);
Serializable reply = callback.replyRequest(rmsg.message,sender);
- rmsg.reply = true;
- rmsg.message = reply;
- try {
- channel.send(new Member[] {sender}, rmsg,
- replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
- }catch ( Exception x ) {
- log.error("Unable to send back reply in RpcChannel.",x);
+ while (!finished) {
+ ErrorHandler handler = null;
+ final Serializable request = msg;
+ final Serializable response = reply;
+ final Member fsender = sender;
+ if (excallback!=null && asyncReply) {
+ handler = new ErrorHandler() {
+ public void handleError(ChannelException x, UniqueId id) {
+ excallback.replyFailed(request, response, fsender, x);
+ }
+
+ public void handleCompletion(UniqueId id) {
+ excallback.replySucceeded(request, response, fsender);
+ }
+ };
+ }
+ rmsg.reply = true;
+ rmsg.message = reply;
+ try {
+ if (handler!=null) {
+ channel.send(new Member[] {sender}, rmsg,replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK, handler);
+ } else {
+ channel.send(new Member[] {sender}, rmsg,replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
+ }
+ finished = true;
+ if (excallback != null && !asyncReply) {
+ excallback.replySucceeded(rmsg.message, reply, sender);
+ }
+ }catch ( Exception x ) {
+ if (excallback != null && !asyncReply) {
+ finished = !excallback.replyFailed(rmsg.message, reply, sender, x);
+ } else {
+ finished = true;
+ log.error("Unable to send back reply in RpcChannel.",x);
+ }
+ }
}
}//end if
}