From: fhanik Date: Fri, 9 Jan 2009 22:38:52 +0000 (+0000) Subject: Implement the ability to broadcast a message using multicast and bypass all TCP,... X-Git-Url: https://git.internetallee.de/?a=commitdiff_plain;h=b38ba89bd53ee8a05c4e70dd0dbf2594724d2d47;p=tomcat7.0 Implement the ability to broadcast a message using multicast and bypass all TCP, simple fire-and-forget behavior, yet no change in how messages are sent and received for the consumer git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@733180 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/java/org/apache/catalina/tribes/Channel.java b/java/org/apache/catalina/tribes/Channel.java index 617089e90..ced1e70be 100644 --- a/java/org/apache/catalina/tribes/Channel.java +++ b/java/org/apache/catalina/tribes/Channel.java @@ -191,6 +191,14 @@ public interface Channel { public static final int SEND_OPTIONS_UDP = 0x0020; /** + * Send options. When a message is sent with this flag on + * the system sends a UDP message on the Multicast address instead of UDP or TCP to individual addresses + * @see #send(Member[], Serializable , int) + * @see #send(Member[], Serializable, int, ErrorHandler) + */ + public static final int SEND_OPTIONS_MULTICAST = 0x0040; + + /** * Send options, when a message is sent, it can have an option flag * to trigger certain behavior. Most flags are used to trigger channel interceptors * as the message passes through the channel stack.
diff --git a/java/org/apache/catalina/tribes/MembershipService.java b/java/org/apache/catalina/tribes/MembershipService.java index d036f4cef..ec81fe500 100644 --- a/java/org/apache/catalina/tribes/MembershipService.java +++ b/java/org/apache/catalina/tribes/MembershipService.java @@ -131,5 +131,12 @@ public interface MembershipService { public void setPayload(byte[] payload); public void setDomain(byte[] domain); + + /** + * Broadcasts a message to all members + * @param message + * @throws ChannelException + */ + public void broadcast(ChannelMessage message) throws ChannelException; } diff --git a/java/org/apache/catalina/tribes/group/ChannelCoordinator.java b/java/org/apache/catalina/tribes/group/ChannelCoordinator.java index a49e20e2f..bbc6e512f 100644 --- a/java/org/apache/catalina/tribes/group/ChannelCoordinator.java +++ b/java/org/apache/catalina/tribes/group/ChannelCoordinator.java @@ -75,7 +75,11 @@ public class ChannelCoordinator extends ChannelInterceptorBase implements Messag */ public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { if ( destination == null ) destination = membershipService.getMembers(); - clusterSender.sendMessage(msg,destination); + if ((msg.getOptions()&Channel.SEND_OPTIONS_MULTICAST) == Channel.SEND_OPTIONS_MULTICAST) { + membershipService.broadcast(msg); + } else { + clusterSender.sendMessage(msg,destination); + } if ( Logs.MESSAGES.isTraceEnabled() ) { Logs.MESSAGES.trace("ChannelCoordinator - Sent msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " to "+Arrays.toNameString(destination)); } @@ -154,6 +158,9 @@ public class ChannelCoordinator extends ChannelInterceptorBase implements Messag if ( Channel.MBR_RX_SEQ==(svc & Channel.MBR_RX_SEQ) ) { membershipService.setMembershipListener(this); + if (membershipService instanceof McastService) { + ((McastService)membershipService).setMessageListener(this); + } membershipService.start(MembershipService.MBR_RX); valid = true; } @@ -244,7 +251,6 @@ public class ChannelCoordinator extends ChannelInterceptorBase implements Messag super.messageReceived(msg); } - public ChannelReceiver getClusterReceiver() { return clusterReceiver; } diff --git a/java/org/apache/catalina/tribes/membership/McastService.java b/java/org/apache/catalina/tribes/membership/McastService.java index 0fcd363b5..99ea04c49 100644 --- a/java/org/apache/catalina/tribes/membership/McastService.java +++ b/java/org/apache/catalina/tribes/membership/McastService.java @@ -17,14 +17,21 @@ package org.apache.catalina.tribes.membership; +import java.io.IOException; +import java.net.DatagramPacket; import java.util.Properties; +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.MembershipListener; import org.apache.catalina.tribes.MembershipService; +import org.apache.catalina.tribes.MessageListener; +import org.apache.catalina.tribes.io.ChannelData; +import org.apache.catalina.tribes.io.XByteBuffer; import org.apache.catalina.tribes.util.StringManager; import org.apache.catalina.tribes.util.UUIDGenerator; -import java.io.IOException; /** * A membership implementation using simple multicast. @@ -37,7 +44,7 @@ import java.io.IOException; */ -public class McastService implements MembershipService,MembershipListener { +public class McastService implements MembershipService,MembershipListener,MessageListener { private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog( McastService.class ); @@ -65,6 +72,10 @@ public class McastService implements MembershipService,MembershipListener { */ protected MembershipListener listener; /** + * A message listener delegate for broadcasts + */ + protected MessageListener msglistener; + /** * The local member */ protected MemberImpl localMember ; @@ -371,6 +382,7 @@ public class McastService implements MembershipService,MembershipListener { java.net.InetAddress.getByName(properties.getProperty("mcastAddress")), ttl, soTimeout, + this, this); String value = properties.getProperty("recoveryEnabled","true"); boolean recEnabled = Boolean.valueOf(value).booleanValue() ; @@ -456,6 +468,14 @@ public class McastService implements MembershipService,MembershipListener { public void setMembershipListener(MembershipListener listener) { this.listener = listener; } + + public void setMessageListener(MessageListener listener) { + this.msglistener = listener; + } + + public void removeMessageListener() { + this.msglistener = null; + } /** * Remove the membership listener */ @@ -475,6 +495,27 @@ public class McastService implements MembershipService,MembershipListener { { if ( listener!=null ) listener.memberDisappeared(member); } + + public void messageReceived(ChannelMessage msg) { + if (msglistener!=null && msglistener.accept(msg)) msglistener.messageReceived(msg); + } + + public boolean accept(ChannelMessage msg) { + return true; + } + + public void broadcast(ChannelMessage message) throws ChannelException { + if (impl==null || (impl.startLevel & Channel.MBR_TX_SEQ)!=Channel.MBR_TX_SEQ ) + throw new ChannelException("Multicast send is not started or enabled."); + + byte[] data = XByteBuffer.createDataPackage((ChannelData)message); + DatagramPacket packet = new DatagramPacket(data,0,data.length); + try { + impl.send(false, packet); + } catch (Exception x) { + throw new ChannelException(x); + } + } /** * @deprecated use getSoTimeout diff --git a/java/org/apache/catalina/tribes/membership/McastServiceImpl.java b/java/org/apache/catalina/tribes/membership/McastServiceImpl.java index db53e7a31..707d67f6a 100644 --- a/java/org/apache/catalina/tribes/membership/McastServiceImpl.java +++ b/java/org/apache/catalina/tribes/membership/McastServiceImpl.java @@ -19,17 +19,24 @@ package org.apache.catalina.tribes.membership; import java.io.IOException; +import java.net.BindException; import java.net.DatagramPacket; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.MulticastSocket; import java.net.SocketTimeoutException; import java.util.Arrays; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.MembershipListener; -import java.net.BindException; +import org.apache.catalina.tribes.MessageListener; +import org.apache.catalina.tribes.io.ChannelData; +import org.apache.catalina.tribes.io.XByteBuffer; /** * A membership implementation using simple multicast. @@ -91,10 +98,14 @@ public class McastServiceImpl */ protected Membership membership; /** - * The actual listener, for callback when shits goes down + * The actual listener, for callback when stuff goes down */ protected MembershipListener service; /** + * The actual listener for broadcast callbacks + */ + protected MessageListener msgservice; + /** * Thread to listen for pings */ protected ReceiverThread receiver; @@ -135,6 +146,12 @@ public class McastServiceImpl * Add the ability to turn on/off recovery */ protected boolean recoveryEnabled = true; + + /** + * Dont interrupt the sender/receiver thread, but pass off to an executor + */ + protected ExecutorService executor = new ThreadPoolExecutor(0, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); + /** * Create a new mcast service impl * @param member - the local member @@ -155,7 +172,8 @@ public class McastServiceImpl InetAddress mcastAddress, int ttl, int soTimeout, - MembershipListener service) + MembershipListener service, + MessageListener msgservice) throws IOException { this.member = member; this.address = mcastAddress; @@ -165,6 +183,7 @@ public class McastServiceImpl this.mcastBindAddress = bind; this.timeToExpiration = expireTime; this.service = service; + this.msgservice = msgservice; this.sendFrequency = sendFrequency; init(); } @@ -315,46 +334,104 @@ public class McastServiceImpl * @throws IOException */ public void receive() throws IOException { + boolean checkexpired = true; try { + socket.receive(receivePacket); if(receivePacket.getLength() > MAX_PACKET_SIZE) { log.error("Multicast packet received was too long, dropping package:"+receivePacket.getLength()); } else { byte[] data = new byte[receivePacket.getLength()]; System.arraycopy(receivePacket.getData(), receivePacket.getOffset(), data, 0, data.length); - final MemberImpl m = MemberImpl.getMember(data); - if (log.isTraceEnabled()) log.trace("Mcast receive ping from member " + m); - Thread t = null; - if (Arrays.equals(m.getCommand(), Member.SHUTDOWN_PAYLOAD)) { - if (log.isDebugEnabled()) log.debug("Member has shutdown:" + m); - membership.removeMember(m); - t = new Thread() { - public void run() { - service.memberDisappeared(m); - } - }; - } else if (membership.memberAlive(m)) { - if (log.isDebugEnabled()) log.debug("Mcast add member " + m); - t = new Thread() { - public void run() { - service.memberAdded(m); + if (XByteBuffer.firstIndexOf(data,0,MemberImpl.TRIBES_MBR_BEGIN)==0) { + memberDataReceived(data); + } else { + XByteBuffer buffer = new XByteBuffer(data,true); + if (buffer.countPackages(true)>0) { + int count = buffer.countPackages(); + ChannelData[] pkgs = new ChannelData[count]; + for (int i=0; i=0 && nr0 && d.data.length>=4) { + //populate number + d.hasNr = true; + XByteBuffer.toBytes(number,d.data, 0); + } + return d; + } + + public int getNumber() { + if (!hasNr) return -1; + return XByteBuffer.toInt(this.data, 0); + } + + public static boolean verify(Data d) { + boolean result = (d.length == d.data.length); + for ( int i=(d.hasNr?4:0); result && (i