if (XByteBuffer.firstIndexOf(data,0,MemberImpl.TRIBES_MBR_BEGIN)==0) {
memberDataReceived(data);
} else {
- XByteBuffer buffer = new XByteBuffer(data,true);
- if (buffer.countPackages(true)>0) {
- int count = buffer.countPackages();
- ChannelData[] pkgs = new ChannelData[count];
- for (int i=0; i<count; i++) {
- try {
- pkgs[i] = buffer.extractPackage(true);
- }catch (IllegalStateException ise) {
- log.debug("Unable to decode message.",ise);
- }
- }
- memberBroadcastsReceived(pkgs);
- }
+ memberBroadcastsReceived(data);
}
}
}
}
- private void memberBroadcastsReceived(final ChannelData[] data) {
+ private void memberBroadcastsReceived(final byte[] b) {
if (log.isTraceEnabled()) log.trace("Mcast received broadcasts.");
- Runnable t = new Runnable() {
- public void run() {
- String name = Thread.currentThread().getName();
+ XByteBuffer buffer = new XByteBuffer(b,true);
+ if (buffer.countPackages(true)>0) {
+ int count = buffer.countPackages();
+ final ChannelData[] data = new ChannelData[count];
+ for (int i=0; i<count; i++) {
try {
- Thread.currentThread().setName("Membership-MemberAdded.");
- for (int i=0; i<data.length; i++ ) {
- try {
- if (data[i]!=null) {
- msgservice.messageReceived(data[i]);
+ data[i] = buffer.extractPackage(true);
+ }catch (IllegalStateException ise) {
+ log.debug("Unable to decode message.",ise);
+ }catch (IOException x) {
+ log.debug("Unable to decode message.",x);
+ }
+ }
+ Runnable t = new Runnable() {
+ public void run() {
+ String name = Thread.currentThread().getName();
+ try {
+ Thread.currentThread().setName("Membership-MemberAdded.");
+ for (int i=0; i<data.length; i++ ) {
+ try {
+ if (data[i]!=null && !member.equals(data[i].getAddress())) {
+ msgservice.messageReceived(data[i]);
+ }
+ }catch (Throwable t) {
+ log.error("Unable to receive broadcast message.",t);
}
- }catch (Throwable t) {
- log.error("Unable to receive broadcast message.",t);
}
+ }finally {
+ Thread.currentThread().setName(name);
}
- }finally {
- Thread.currentThread().setName(name);
}
- }
- };
- executor.execute(t);
+ };
+ executor.execute(t);
+ }
}
protected final Object expiredMutex = new Object();
}
private final Object sendLock = new Object();
+
public void send(boolean checkexpired, DatagramPacket packet) throws IOException{
checkexpired = (checkexpired && (packet==null));
//ignore if we haven't started the sender