if ( destination==null || destination.length == 0 ) return new Response[0];
//avoid dead lock
- channelOptions = channelOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK;
+ int sendOptions =
+ channelOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK;
RpcCollectorKey key = new RpcCollectorKey(UUIDGenerator.randomUUID(false));
RpcCollector collector = new RpcCollector(key,rpcOptions,destination.length,timeout);
synchronized (collector) {
if ( rpcOptions != NO_REPLY ) responseMap.put(key, collector);
RpcMessage rmsg = new RpcMessage(rpcId, key.id, message);
- channel.send(destination, rmsg, channelOptions);
+ channel.send(destination, rmsg, sendOptions);
if ( rpcOptions != NO_REPLY ) collector.wait(timeout);
}
} catch ( InterruptedException ix ) {
return collector.getResponses();
}
+ @Override
public void messageReceived(Serializable msg, Member sender) {
RpcMessage rmsg = (RpcMessage)msg;
RpcCollectorKey key = new RpcCollectorKey(rmsg.uuid);
breakdown();
}
+ @Override
public boolean accept(Serializable msg, Member sender) {
if ( msg instanceof RpcMessage ) {
RpcMessage rmsg = (RpcMessage)msg;