private int mapSendOptions = Channel.SEND_OPTIONS_SYNCHRONIZED_ACK|Channel.SEND_OPTIONS_USE_ACK;
/**
+ * Timeout for RPC messages.
+ */
+ private long rpcTimeout = DEFAULT_REPL_TIMEOUT;
+
+ /**
* Constructor, just calls super()
*
*/
CatalinaCluster catclust = cluster;
LazyReplicatedMap map = new LazyReplicatedMap(this,
catclust.getChannel(),
- DEFAULT_REPL_TIMEOUT,
+ rpcTimeout,
getMapName(),
getClassLoaders());
map.setChannelSendOptions(mapSendOptions);
return mapSendOptions;
}
+ public void setRpcTimeout(long rpcTimeout) {
+ this.rpcTimeout = rpcTimeout;
+ }
+
+ public long getRpcTimeout() {
+ return rpcTimeout;
+ }
+
@Override
public String[] getInvalidatedSessions() {
return new String[0];
result.notifyListenersOnReplication = notifyListenersOnReplication;
result.mapSendOptions = mapSendOptions;
result.maxActiveSessions = maxActiveSessions;
+ result.rpcTimeout = rpcTimeout;
return result;
}
name="rejectedSessions"
description="Number of sessions we rejected due to maxActive beeing reached"
type="int"/>
+ <attribute
+ name="rpcTimeout"
+ description="Timeout for RPC messages, how long we will wait for a reply"
+ type="long"/>
<operation
name="expireSession"
description="Expired the given session"
* @throws ChannelException
*/
protected void broadcast(int msgtype, boolean rpc) throws ChannelException {
+ // No destination.
+ if (channel.getMembers().length == 0 ) return;
//send out a map membership message, only wait for the first reply
MapMessage msg = new MapMessage(this.mapContextName, msgtype,
false, null, null, null, channel.getLocalMember(false), null);
if ( rpc) {
Response[] resp = rpcChannel.send(channel.getMembers(), msg, RpcChannel.FIRST_REPLY, (channelSendOptions),rpcTimeout);
- for (int i = 0; i < resp.length; i++) {
- mapMemberAdded(resp[i].getSource());
- messageReceived(resp[i].getMessage(), resp[i].getSource());
+ if (resp.length > 0) {
+ for (int i = 0; i < resp.length; i++) {
+ mapMemberAdded(resp[i].getSource());
+ messageReceived(resp[i].getMessage(), resp[i].getSource());
+ }
+ } else {
+ log.warn("broadcast 0 replies, probably a timeout.");
}
} else {
channel.send(channel.getMembers(),msg,channelSendOptions);
sessions where the current node is the primary node for the session are
considered active sessions.
</attribute>
+ <attribute name="rpcTimeout" required="false">
+ Timeout for RPC message used for broadcast and transfer state from
+ another map.
+ Default value is <code>15000</code> milliseconds.
+ </attribute>
</attributes>
</subsection>
</section>