public static final int SEND_OPTIONS_UDP = 0x0020;
/**
+ * Send options. When a message is sent with this flag on
+ * the system sends a UDP message on the Multicast address instead of UDP or TCP to individual addresses
+ * @see #send(Member[], Serializable , int)
+ * @see #send(Member[], Serializable, int, ErrorHandler)
+ */
+ public static final int SEND_OPTIONS_MULTICAST = 0x0040;
+
+ /**
* Send options, when a message is sent, it can have an option flag
* to trigger certain behavior. Most flags are used to trigger channel interceptors
* as the message passes through the channel stack. <br>
public void setPayload(byte[] payload);
public void setDomain(byte[] domain);
+
+ /**
+ * Broadcasts a message to all members
+ * @param message
+ * @throws ChannelException
+ */
+ public void broadcast(ChannelMessage message) throws ChannelException;
}
*/
public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
if ( destination == null ) destination = membershipService.getMembers();
- clusterSender.sendMessage(msg,destination);
+ if ((msg.getOptions()&Channel.SEND_OPTIONS_MULTICAST) == Channel.SEND_OPTIONS_MULTICAST) {
+ membershipService.broadcast(msg);
+ } else {
+ clusterSender.sendMessage(msg,destination);
+ }
if ( Logs.MESSAGES.isTraceEnabled() ) {
Logs.MESSAGES.trace("ChannelCoordinator - Sent msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " to "+Arrays.toNameString(destination));
}
if ( Channel.MBR_RX_SEQ==(svc & Channel.MBR_RX_SEQ) ) {
membershipService.setMembershipListener(this);
+ if (membershipService instanceof McastService) {
+ ((McastService)membershipService).setMessageListener(this);
+ }
membershipService.start(MembershipService.MBR_RX);
valid = true;
}
super.messageReceived(msg);
}
-
public ChannelReceiver getClusterReceiver() {
return clusterReceiver;
}
package org.apache.catalina.tribes.membership;
+import java.io.IOException;
+import java.net.DatagramPacket;
import java.util.Properties;
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
import org.apache.catalina.tribes.MembershipService;
+import org.apache.catalina.tribes.MessageListener;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.util.StringManager;
import org.apache.catalina.tribes.util.UUIDGenerator;
-import java.io.IOException;
/**
* A <b>membership</b> implementation using simple multicast.
*/
-public class McastService implements MembershipService,MembershipListener {
+public class McastService implements MembershipService,MembershipListener,MessageListener {
private static org.apache.juli.logging.Log log =
org.apache.juli.logging.LogFactory.getLog( McastService.class );
*/
protected MembershipListener listener;
/**
+ * A message listener delegate for broadcasts
+ */
+ protected MessageListener msglistener;
+ /**
* The local member
*/
protected MemberImpl localMember ;
java.net.InetAddress.getByName(properties.getProperty("mcastAddress")),
ttl,
soTimeout,
+ this,
this);
String value = properties.getProperty("recoveryEnabled","true");
boolean recEnabled = Boolean.valueOf(value).booleanValue() ;
public void setMembershipListener(MembershipListener listener) {
this.listener = listener;
}
+
+ public void setMessageListener(MessageListener listener) {
+ this.msglistener = listener;
+ }
+
+ public void removeMessageListener() {
+ this.msglistener = null;
+ }
/**
* Remove the membership listener
*/
{
if ( listener!=null ) listener.memberDisappeared(member);
}
+
+ public void messageReceived(ChannelMessage msg) {
+ if (msglistener!=null && msglistener.accept(msg)) msglistener.messageReceived(msg);
+ }
+
+ public boolean accept(ChannelMessage msg) {
+ return true;
+ }
+
+ public void broadcast(ChannelMessage message) throws ChannelException {
+ if (impl==null || (impl.startLevel & Channel.MBR_TX_SEQ)!=Channel.MBR_TX_SEQ )
+ throw new ChannelException("Multicast send is not started or enabled.");
+
+ byte[] data = XByteBuffer.createDataPackage((ChannelData)message);
+ DatagramPacket packet = new DatagramPacket(data,0,data.length);
+ try {
+ impl.send(false, packet);
+ } catch (Exception x) {
+ throw new ChannelException(x);
+ }
+ }
/**
* @deprecated use getSoTimeout
import java.io.IOException;
+import java.net.BindException;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MulticastSocket;
import java.net.SocketTimeoutException;
import java.util.Arrays;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
-import java.net.BindException;
+import org.apache.catalina.tribes.MessageListener;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.XByteBuffer;
/**
* A <b>membership</b> implementation using simple multicast.
*/
protected Membership membership;
/**
- * The actual listener, for callback when shits goes down
+ * The actual listener, for callback when stuff goes down
*/
protected MembershipListener service;
/**
+ * The actual listener for broadcast callbacks
+ */
+ protected MessageListener msgservice;
+ /**
* Thread to listen for pings
*/
protected ReceiverThread receiver;
* Add the ability to turn on/off recovery
*/
protected boolean recoveryEnabled = true;
+
+ /**
+ * Dont interrupt the sender/receiver thread, but pass off to an executor
+ */
+ protected ExecutorService executor = new ThreadPoolExecutor(0, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+
/**
* Create a new mcast service impl
* @param member - the local member
InetAddress mcastAddress,
int ttl,
int soTimeout,
- MembershipListener service)
+ MembershipListener service,
+ MessageListener msgservice)
throws IOException {
this.member = member;
this.address = mcastAddress;
this.mcastBindAddress = bind;
this.timeToExpiration = expireTime;
this.service = service;
+ this.msgservice = msgservice;
this.sendFrequency = sendFrequency;
init();
}
* @throws IOException
*/
public void receive() throws IOException {
+ boolean checkexpired = true;
try {
+
socket.receive(receivePacket);
if(receivePacket.getLength() > MAX_PACKET_SIZE) {
log.error("Multicast packet received was too long, dropping package:"+receivePacket.getLength());
} else {
byte[] data = new byte[receivePacket.getLength()];
System.arraycopy(receivePacket.getData(), receivePacket.getOffset(), data, 0, data.length);
- final MemberImpl m = MemberImpl.getMember(data);
- if (log.isTraceEnabled()) log.trace("Mcast receive ping from member " + m);
- Thread t = null;
- if (Arrays.equals(m.getCommand(), Member.SHUTDOWN_PAYLOAD)) {
- if (log.isDebugEnabled()) log.debug("Member has shutdown:" + m);
- membership.removeMember(m);
- t = new Thread() {
- public void run() {
- service.memberDisappeared(m);
- }
- };
- } else if (membership.memberAlive(m)) {
- if (log.isDebugEnabled()) log.debug("Mcast add member " + m);
- t = new Thread() {
- public void run() {
- service.memberAdded(m);
+ if (XByteBuffer.firstIndexOf(data,0,MemberImpl.TRIBES_MBR_BEGIN)==0) {
+ memberDataReceived(data);
+ } else {
+ XByteBuffer buffer = new XByteBuffer(data,true);
+ if (buffer.countPackages(true)>0) {
+ int count = buffer.countPackages();
+ ChannelData[] pkgs = new ChannelData[count];
+ for (int i=0; i<count; i++) {
+ try {
+ pkgs[i] = buffer.extractPackage(true);
+ }catch (IllegalStateException ise) {
+ log.debug("Unable to decode message.",ise);
+ }
}
- };
- } //end if
- if ( t != null ) {
- t.setDaemon(true);
- t.start();
+ memberBroadcastsReceived(pkgs);
+ }
}
+
}
} catch (SocketTimeoutException x ) {
//do nothing, this is normal, we don't want to block forever
//since the receive thread is the same thread
//that does membership expiration
}
- checkExpired();
+ if (checkexpired) checkExpired();
+ }
+
+ private void memberDataReceived(byte[] data) {
+ final MemberImpl m = MemberImpl.getMember(data);
+ if (log.isTraceEnabled()) log.trace("Mcast receive ping from member " + m);
+ Runnable t = null;
+ if (Arrays.equals(m.getCommand(), Member.SHUTDOWN_PAYLOAD)) {
+ if (log.isDebugEnabled()) log.debug("Member has shutdown:" + m);
+ membership.removeMember(m);
+ t = new Runnable() {
+ public void run() {
+ String name = Thread.currentThread().getName();
+ try {
+ Thread.currentThread().setName("Membership-MemberDisappeared.");
+ service.memberDisappeared(m);
+ }finally {
+ Thread.currentThread().setName(name);
+ }
+ }
+ };
+ } else if (membership.memberAlive(m)) {
+ if (log.isDebugEnabled()) log.debug("Mcast add member " + m);
+ t = new Runnable() {
+ public void run() {
+ String name = Thread.currentThread().getName();
+ try {
+ Thread.currentThread().setName("Membership-MemberAdded.");
+ service.memberAdded(m);
+ }finally {
+ Thread.currentThread().setName(name);
+ }
+ }
+ };
+ } //end if
+ if ( t != null ) {
+ executor.execute(t);
+ }
}
- protected Object expiredMutex = new Object();
+ private void memberBroadcastsReceived(final ChannelData[] data) {
+ if (log.isTraceEnabled()) log.trace("Mcast received broadcasts.");
+ Runnable t = new Runnable() {
+ public void run() {
+ String name = Thread.currentThread().getName();
+ try {
+ Thread.currentThread().setName("Membership-MemberAdded.");
+ for (int i=0; i<data.length; i++ ) {
+ try {
+ if (data[i]!=null) {
+ msgservice.messageReceived(data[i]);
+ }
+ }catch (Throwable t) {
+ log.error("Unable to receive broadcast message.",t);
+ }
+ }
+ }finally {
+ Thread.currentThread().setName(name);
+ }
+ }
+ };
+ executor.execute(t);
+ }
+
+ protected final Object expiredMutex = new Object();
protected void checkExpired() {
synchronized (expiredMutex) {
MemberImpl[] expired = membership.expire(timeToExpiration);
if (log.isDebugEnabled())
log.debug("Mcast exipre member " + expired[i]);
try {
- Thread t = new Thread() {
+ Runnable t = new Runnable() {
public void run() {
- service.memberDisappeared(member);
+ String name = Thread.currentThread().getName();
+ try {
+ Thread.currentThread().setName("Membership-MemberExpired.");
+ service.memberDisappeared(member);
+ }finally {
+ Thread.currentThread().setName(name);
+ }
+
}
};
- t.start();
+ executor.execute(t);
} catch (Exception x) {
log.error("Unable to process member disappeared message.", x);
}
* @throws Exception
*/
public void send(boolean checkexpired) throws IOException{
+ send(checkexpired,null);
+ }
+
+ private final Object sendLock = new Object();
+ public void send(boolean checkexpired, DatagramPacket packet) throws IOException{
+ checkexpired = (checkexpired && (packet==null));
//ignore if we haven't started the sender
//if ( (startLevel&Channel.MBR_TX_SEQ) != Channel.MBR_TX_SEQ ) return;
- member.inc();
- if(log.isTraceEnabled())
- log.trace("Mcast send ping from member " + member);
- byte[] data = member.getData();
- DatagramPacket p = new DatagramPacket(data,data.length);
- p.setAddress(address);
- p.setPort(port);
- socket.send(p);
+ if (packet==null) {
+ member.inc();
+ if(log.isTraceEnabled()) {
+ log.trace("Mcast send ping from member " + member);
+ }
+ byte[] data = member.getData();
+ packet = new DatagramPacket(data,data.length);
+ } else if (log.isTraceEnabled()) {
+ log.trace("Sending message broadcast "+packet.getLength()+ " bytes from "+ member);
+ }
+ packet.setAddress(address);
+ packet.setPort(port);
+ //TODO this operation is not thread safe
+ synchronized (sendLock) {
+ socket.send(packet);
+ }
if ( checkexpired ) checkExpired();
}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.test.channel;
+
+import junit.framework.TestCase;
+import java.io.Serializable;
+import java.util.Random;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.ChannelReceiver;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.transport.AbstractSender;
+import org.apache.catalina.tribes.transport.ReceiverBase;
+import org.apache.catalina.tribes.transport.ReplicationTransmitter;
+import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
+import org.apache.catalina.tribes.group.interceptors.ThroughputInterceptor;
+import org.apache.catalina.tribes.io.XByteBuffer;
+
+/**
+ */
+public class TestMulticastPackages extends TestCase {
+ int msgCount = 500;
+ int threadCount = 20;
+ GroupChannel channel1;
+ GroupChannel channel2;
+ Listener listener1;
+ int threadCounter = 0;
+ protected void setUp() throws Exception {
+ super.setUp();
+ channel1 = new GroupChannel();
+ channel1.addInterceptor(new MessageDispatch15Interceptor());
+ channel2 = new GroupChannel();
+ channel2.addInterceptor(new MessageDispatch15Interceptor());
+ ThroughputInterceptor tint = new ThroughputInterceptor();
+ tint.setInterval(500);
+ ThroughputInterceptor tint2 = new ThroughputInterceptor();
+ tint2.setInterval(500);
+ //channel1.addInterceptor(tint);
+ channel2.addInterceptor(tint2);
+ listener1 = new Listener();
+ ReceiverBase rb1 = (ReceiverBase)channel1.getChannelReceiver();
+ ReceiverBase rb2 = (ReceiverBase)channel2.getChannelReceiver();
+ rb1.setUdpPort(50000);
+ rb2.setUdpPort(50000);
+ channel2.addChannelListener(listener1);
+ channel1.start(GroupChannel.DEFAULT);
+ channel2.start(GroupChannel.DEFAULT);
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ channel1.stop(GroupChannel.DEFAULT);
+ channel2.stop(GroupChannel.DEFAULT);
+ }
+
+ public void testSingleDataSendNO_ACK() throws Exception {
+ AbstractSender s1 =(AbstractSender) ((ReplicationTransmitter)channel1.getChannelSender()).getTransport();
+ AbstractSender s2 =(AbstractSender) ((ReplicationTransmitter)channel2.getChannelSender()).getTransport();
+ s1.setTimeout(Long.MAX_VALUE); //for debugging
+ s2.setTimeout(Long.MAX_VALUE); //for debugging
+
+ System.err.println("Starting Single package NO_ACK");
+ channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(1024),Channel.SEND_OPTIONS_MULTICAST);
+ Thread.sleep(500);
+ System.err.println("Finished Single package NO_ACK ["+listener1.count+"]");
+ assertEquals("Checking success messages.",1,listener1.count.get());
+ }
+
+
+ public static void printMissingMsgs(int[] msgs, int maxIdx) {
+ for (int i=0; i<maxIdx && i<msgs.length; i++) {
+ if (msgs[i]==0) System.out.print(i+", ");
+ }
+ System.out.println();
+ }
+
+ public void testDataSendASYNCM() throws Exception {
+ final AtomicInteger counter = new AtomicInteger(0);
+ ReceiverBase rb1 = (ReceiverBase)channel1.getChannelReceiver();
+ ReceiverBase rb2 = (ReceiverBase)channel2.getChannelReceiver();
+ rb1.setUdpRxBufSize(1024*1024*10);
+ rb2.setUdpRxBufSize(1024*1024*10);
+ rb1.setUdpTxBufSize(1024*1024*10);
+ rb2.setUdpTxBufSize(1024*1024*10);
+ System.err.println("Starting NO_ACK");
+ Thread[] threads = new Thread[threadCount];
+ for (int x=0; x<threads.length; x++ ) {
+ threads[x] = new Thread() {
+ public void run() {
+ try {
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < msgCount; i++) {
+ int cnt = counter.getAndAdd(1);
+ channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(1024,cnt),Channel.SEND_OPTIONS_MULTICAST|Channel.SEND_OPTIONS_ASYNCHRONOUS);
+ //Thread.currentThread().sleep(10);
+ }
+ System.out.println("Thread["+this.getName()+"] sent "+msgCount+" messages in "+(System.currentTimeMillis()-start)+" ms.");
+ }catch ( Exception x ) {
+ x.printStackTrace();
+ return;
+ } finally {
+ threadCounter++;
+ }
+ }
+ };
+ }
+ for (int x=0; x<threads.length; x++ ) { threads[x].start();}
+ for (int x=0; x<threads.length; x++ ) { threads[x].join();}
+ //sleep for 50 sec, let the other messages in
+ long start = System.currentTimeMillis();
+ while ( (System.currentTimeMillis()-start)<25000 && msgCount*threadCount!=listener1.count.get()) Thread.sleep(500);
+ System.err.println("Finished NO_ACK ["+listener1.count+"]");
+ System.out.println("Sent "+counter.get()+ " messages. Received "+listener1.count+" Highest msg received:"+listener1.maxIdx);
+ System.out.print("Missing messages:");
+ printMissingMsgs(listener1.nrs,counter.get());
+ assertEquals("Checking success messages.",msgCount*threadCount,listener1.count.get());
+ }
+ public void testDataSendASYNC() throws Exception {
+ System.err.println("Starting ASYNC");
+ for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS|Channel.SEND_OPTIONS_MULTICAST);
+ //sleep for 50 sec, let the other messages in
+ long start = System.currentTimeMillis();
+ while ( (System.currentTimeMillis()-start)<5000 && msgCount!=listener1.count.get()) Thread.sleep(500);
+ System.err.println("Finished ASYNC");
+ assertEquals("Checking success messages.",msgCount,listener1.count.get());
+ }
+
+ public void testDataSendACK() throws Exception {
+ System.err.println("Starting ACK");
+ for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_MULTICAST);
+ Thread.sleep(250);
+ System.err.println("Finished ACK");
+ assertEquals("Checking success messages.",msgCount,listener1.count.get());
+ }
+
+ public void testDataSendSYNCACK() throws Exception {
+ System.err.println("Starting SYNC_ACK");
+ for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_MULTICAST);
+ Thread.sleep(250);
+ System.err.println("Finished SYNC_ACK");
+ assertEquals("Checking success messages.",msgCount,listener1.count.get());
+ }
+
+ public static class Listener implements ChannelListener {
+ AtomicLong count = new AtomicLong(0);
+ int maxIdx = -1;
+ int[] nrs = new int[1000000];
+ public Listener() {
+ Arrays.fill(nrs, 0);
+ }
+ public boolean accept(Serializable s, Member m) {
+ return (s instanceof Data);
+ }
+
+ public void messageReceived(Serializable s, Member m) {
+ try {
+ Data d = (Data)s;
+ if ( !Data.verify(d) ) {
+ System.err.println("ERROR - Unable to verify data package");
+ } else {
+ long c = count.addAndGet(1);
+ if ((c%1000) ==0 ) {
+ System.err.println("SUCCESS:"+c);
+ }
+ int nr = d.getNumber();
+ if (nr>=0 && nr<nrs.length) {
+ maxIdx = Math.max(maxIdx, nr);
+ nrs[nr] = 1;
+ }
+ }
+ }catch (Exception x ) {
+ x.printStackTrace();
+ }
+ }
+ }
+
+ public static class Data implements Serializable {
+ public int length;
+ public byte[] data;
+ public byte key;
+ public boolean hasNr = false;
+ public static Random r = new Random(System.currentTimeMillis());
+ public static Data createRandomData() {
+ return createRandomData(ChannelReceiver.MAX_UDP_SIZE);
+ }
+ public static Data createRandomData(int size) {
+ return createRandomData(size,-1);
+ }
+
+ public static Data createRandomData(int size, int number) {
+ int i = r.nextInt();
+ i = ( i % 127 );
+ int length = Math.abs(r.nextInt() % size);
+ if (length<100) length += 100;
+ Data d = new Data();
+ d.length = length;
+ d.key = (byte)i;
+ d.data = new byte[length];
+ Arrays.fill(d.data,d.key);
+ if (number>0 && d.data.length>=4) {
+ //populate number
+ d.hasNr = true;
+ XByteBuffer.toBytes(number,d.data, 0);
+ }
+ return d;
+ }
+
+ public int getNumber() {
+ if (!hasNr) return -1;
+ return XByteBuffer.toInt(this.data, 0);
+ }
+
+ public static boolean verify(Data d) {
+ boolean result = (d.length == d.data.length);
+ for ( int i=(d.hasNr?4:0); result && (i<d.data.length); i++ ) result = result && d.data[i] == d.key;
+ return result;
+ }
+ }
+
+
+
+}