protected InetAddress mcastBindAddress = null;
/**
+ * nr of times the system has to fail before a recovery is initiated
+ */
+ protected int recoveryCounter = 10;
+
+ /**
* Create a new mcast service impl
* @param member - the local member
* @param sendFrequency - the time (ms) in between pings sent out
this.timeToExpiration = expireTime;
this.service = service;
this.sendFrequency = sendFrequency;
+ init();
+ }
+
+ public void init() throws IOException {
setupSocket();
sendPacket = new DatagramPacket(new byte[MAX_PACKET_SIZE],MAX_PACKET_SIZE);
sendPacket.setAddress(address);
receivePacket = new DatagramPacket(new byte[MAX_PACKET_SIZE],MAX_PACKET_SIZE);
receivePacket.setAddress(address);
receivePacket.setPort(port);
+ member.setCommand(new byte[0]);
+ member.getData(true, true);
membership = new Membership(member);
}
public class ReceiverThread extends Thread {
+ int errorCounter = 0;
public ReceiverThread() {
super();
- setName("Cluster-MembershipReceiver");
+ setName("Tribes-MembershipReceiver");
}
public void run() {
while ( doRunReceiver ) {
try {
receive();
+ errorCounter=0;
} catch ( ArrayIndexOutOfBoundsException ax ) {
//we can ignore this, as it means we have an invalid package
//but we will log it to debug
if ( log.isDebugEnabled() )
log.debug("Invalid member mcast package.",ax);
} catch ( Exception x ) {
- log.warn("Error receiving mcast package. Sleeping 500ms",x);
+ if (errorCounter==0) log.warn("Error receiving mcast package. Sleeping 500ms",x);
+ else log.debug("Error receiving mcast package. Sleeping 500ms",x);
try { Thread.sleep(500); } catch ( Exception ignore ){}
-
+ if ( (++errorCounter)>=recoveryCounter ) {
+ errorCounter=0;
+ new RecoveryThread(McastServiceImpl.this);
+ }
}
}
}
public class SenderThread extends Thread {
long time;
+ int errorCounter=0;
public SenderThread(long time) {
this.time = time;
- setName("Cluster-MembershipSender");
+ setName("Tribes-MembershipSender");
}
public void run() {
while ( doRunSender ) {
try {
send(true);
+ errorCounter = 0;
} catch ( Exception x ) {
- log.warn("Unable to send mcast message.",x);
+ if (errorCounter==0) log.warn("Unable to send mcast message.",x);
+ else log.debug("Unable to send mcast message.",x);
+ if ( (++errorCounter)>=recoveryCounter ) {
+ errorCounter=0;
+ new RecoveryThread(McastServiceImpl.this);
+ }
}
try { Thread.sleep(time); } catch ( Exception ignore ) {}
}
}
}//class SenderThread
+
+ protected static class RecoveryThread extends Thread {
+ static boolean running = false;
+ McastServiceImpl parent = null;
+ public RecoveryThread(McastServiceImpl parent) {
+ this.parent = parent;
+ if (!init(this)) parent = null;
+ }
+
+ public static synchronized boolean init(RecoveryThread t) {
+ if ( running ) return false;
+ running = true;
+ t.setName("Tribes-MembershipRecovery");
+ t.setDaemon(true);
+ t.start();
+ return true;
+ }
+
+ public boolean stopService() {
+ try {
+ parent.stop(Channel.MBR_RX_SEQ | Channel.MBR_TX_SEQ);
+ return true;
+ } catch (Exception x) {
+ log.warn("Recovery thread failed to stop membership service.", x);
+ return false;
+ }
+ }
+ public boolean startService() {
+ try {
+ parent.init();
+ parent.start(Channel.MBR_RX_SEQ | Channel.MBR_TX_SEQ);
+ return true;
+ } catch (Exception x) {
+ log.warn("Recovery thread failed to start membership service.", x);
+ return false;
+ }
+ }
+ public void run() {
+ boolean success = false;
+ int attempt = 0;
+ try {
+ while (!success) {
+ log.info("Tribes membership, running recovery thread, multicasting is not functional.");
+ if (stopService() & startService()) {
+ success = true;
+ log.info("Membership recovery was successful.");
+ }
+ try {
+ if (!success) {
+ log.info("Recovery attempt "+(++attempt)+" failed, trying again in 5 seconds");
+ Thread.sleep(5000);
+ }
+ }catch (InterruptedException ignore) {
+ }
+ }//while
+ }finally {
+ running = false;
+ }
+ }//run
+ }
}