final ExtendedRpcCallback excallback = (callback instanceof ExtendedRpcCallback)?((ExtendedRpcCallback)callback) : null;
boolean asyncReply = ((replyMessageOptions & Channel.SEND_OPTIONS_ASYNCHRONOUS) == Channel.SEND_OPTIONS_ASYNCHRONOUS);
Serializable reply = callback.replyRequest(rmsg.message,sender);
- while (!finished) {
- ErrorHandler handler = null;
- final Serializable request = msg;
- final Serializable response = reply;
- final Member fsender = sender;
- if (excallback!=null && asyncReply) {
- handler = new ErrorHandler() {
- @Override
- public void handleError(ChannelException x, UniqueId id) {
- excallback.replyFailed(request, response, fsender, x);
- }
- @Override
- public void handleCompletion(UniqueId id) {
- excallback.replySucceeded(request, response, fsender);
- }
- };
- }
- rmsg.reply = true;
- rmsg.message = reply;
- try {
- if (handler!=null) {
- channel.send(new Member[] {sender}, rmsg,replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK, handler);
- } else {
- channel.send(new Member[] {sender}, rmsg,replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
+ ErrorHandler handler = null;
+ final Serializable request = msg;
+ final Serializable response = reply;
+ final Member fsender = sender;
+ if (excallback!=null && asyncReply) {
+ handler = new ErrorHandler() {
+ @Override
+ public void handleError(ChannelException x, UniqueId id) {
+ excallback.replyFailed(request, response, fsender, x);
}
- finished = true;
- }catch ( Exception x ) {
- if (excallback != null && !asyncReply) {
- finished = !excallback.replyFailed(rmsg.message, reply, sender, x);
- } else {
- finished = true;
- log.error("Unable to send back reply in RpcChannel.",x);
+ @Override
+ public void handleCompletion(UniqueId id) {
+ excallback.replySucceeded(request, response, fsender);
}
+ };
+ }
+ rmsg.reply = true;
+ rmsg.message = reply;
+ try {
+ if (handler!=null) {
+ channel.send(new Member[] {sender}, rmsg,replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK, handler);
+ } else {
+ channel.send(new Member[] {sender}, rmsg,replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
}
- if (finished && excallback != null && !asyncReply) {
- excallback.replySucceeded(rmsg.message, reply, sender);
+ finished = true;
+ }catch ( Exception x ) {
+ if (excallback != null && !asyncReply) {
+ finished = !excallback.replyFailed(rmsg.message, reply, sender, x);
+ } else {
+ finished = true;
+ log.error("Unable to send back reply in RpcChannel.",x);
}
}
+ if (finished && excallback != null && !asyncReply) {
+ excallback.replySucceeded(rmsg.message, reply, sender);
+ }
}//end if
}