boolean frag = false;
int fragsize = 1024;
int autoBind = 10;
- ArrayList staticMembers = new ArrayList();
+ ArrayList<Member> staticMembers = new ArrayList<Member>();
Properties transportProperties = new Properties();
String transport = "org.apache.catalina.tribes.transport.nio.PooledParallelSender";
String receiver = "org.apache.catalina.tribes.transport.nio.NioReceiver";
}
System.out.println("Creating receiver class="+receiver);
- Class cl = Class.forName(receiver,true,ChannelCreator.class.getClassLoader());
+ Class<?> cl = Class.forName(receiver, true,
+ ChannelCreator.class.getClassLoader());
ReceiverBase rx = (ReceiverBase)cl.newInstance();
- rx.setTcpListenAddress(bind);
- rx.setTcpListenPort(port);
- rx.setTcpSelectorTimeout(tcpseltimeout);
- rx.setTcpThreadCount(tcpthreadcount);
+ rx.setAddress(bind);
+ rx.setPort(port);
+ rx.setSelectorTimeout(tcpseltimeout);
+ rx.setMaxThreads(tcpthreadcount);
+ rx.setMinThreads(tcpthreadcount);
rx.getBind();
rx.setRxBufSize(43800);
rx.setTxBufSize(25188);
ps.setTransport(sender);
McastService service = new McastService();
- service.setMcastAddr(mcastaddr);
+ service.setAddress(mcastaddr);
if (mbind != null) service.setMcastBindAddress(mbind);
- service.setMcastFrequency(mcastfreq);
+ service.setFrequency(mcastfreq);
service.setMcastDropTime(mcastdrop);
- service.setMcastPort(mcastport);
+ service.setPort(mcastport);
ManagedChannel channel = new GroupChannel();
channel.setChannelReceiver(rx);
if ( staticMembers.size() > 0 ) {
StaticMembershipInterceptor smi = new StaticMembershipInterceptor();
for (int x=0; x<staticMembers.size(); x++ ) {
- smi.addStaticMember((Member)staticMembers.get(x));
+ smi.addStaticMember(staticMembers.get(x));
}
channel.addInterceptor(smi);
}
import java.io.InputStreamReader;
import java.util.StringTokenizer;
-import org.apache.catalina.tribes.ChannelInterceptor;
-import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent;
+import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.GroupChannel;
import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
if ( channel == null ) {
channel = createChannel();
startstatus = "starting";
- channel.start(channel.DEFAULT);
+ channel.start(Channel.DEFAULT);
startstatus = "running";
} else {
status = "Channel already started.";
public void stop() {
try {
if ( channel != null ) {
- channel.stop(channel.DEFAULT);
+ channel.stop(Channel.DEFAULT);
status = "Channel Stopped";
} else {
status = "Channel Already Stopped";
try {
System.out.println("Sending ["+msg+"]");
long start = System.currentTimeMillis();
- Response[] resp = rpc.send(channel.getMembers(),(Serializable)msg,options,Channel.SEND_OPTIONS_DEFAULT,timeout);
+ Response[] resp = rpc.send(channel.getMembers(),msg,options,Channel.SEND_OPTIONS_DEFAULT,timeout);
System.out.println("Send of ["+msg+"] completed. Nr of responses="+resp.length+" Time:"+(System.currentTimeMillis()-start)+" ms.");
for ( int i=0; i<resp.length; i++ ) {
System.out.println("Received a response message from ["+resp[i].getSource().getName()+"] with data ["+resp[i].getMessage()+"]");
}
public static void main(String[] args) throws Exception {
- boolean send = true;
- boolean debug = false;
long pause = 3000;
int count = 1000000;
int stats = 10000;
String name = "EchoRpcId";
- boolean breakOnEx = false;
- int threads = 1;
int options = RpcChannel.ALL_REPLY;
long timeout = 15000;
String message = "EchoRpcMessage";
}
for (int i = 0; i < args.length; i++) {
if ("-threads".equals(args[i])) {
- threads = Integer.parseInt(args[++i]);
+ // Not used
} else if ("-count".equals(args[i])) {
count = Integer.parseInt(args[++i]);
System.out.println("Sending "+count+" messages.");
} else if ("-pause".equals(args[i])) {
pause = Long.parseLong(args[++i])*1000;
} else if ("-break".equals(args[i])) {
- breakOnEx = true;
+ // Not used
} else if ("-stats".equals(args[i])) {
stats = Integer.parseInt(args[++i]);
System.out.println("Stats every "+stats+" message");
else if ( "first".equals(args[i]) ) options = RpcChannel.FIRST_REPLY;
else if ( "majority".equals(args[i]) ) options = RpcChannel.MAJORITY_REPLY;
} else if ("-debug".equals(args[i])) {
- debug = true;
+ // Not used
} else if ("-help".equals(args[i]))
{
usage();
ManagedChannel channel = (ManagedChannel)ChannelCreator.createChannel(args);
EchoRpcTest test = new EchoRpcTest(channel,name,count,message,pause,options,timeout);
- channel.start(channel.DEFAULT);
+ channel.start(Channel.DEFAULT);
Runtime.getRuntime().addShutdownHook(new Shutdown(channel));
test.run();
exit.setDaemon(true);
exit.start();
try {
- channel.stop(channel.DEFAULT);
+ channel.stop(Channel.DEFAULT);
}catch ( Exception x ) {
x.printStackTrace();
*/
public static void execute(Object proxy, String method) throws Exception {
Method executeM = null;
- Class c = proxy.getClass();
- Class params[] = new Class[0];
+ Class<?> c = proxy.getClass();
+ Class<?> params[] = new Class[0];
// params[0]=args.getClass();
executeM = findMethod(c, method, params);
if (executeM == null) {
}
Method executeM = null;
- Class c = proxy.getClass();
- Class params[] = new Class[2];
+ Class<?> c = proxy.getClass();
+ Class<?> params[] = new Class[2];
params[0] = String.class;
params[1] = Object.class;
executeM = findMethod(c, "setAttribute", params);
*/
public static Object getAttribute(Object proxy, String n) throws Exception {
Method executeM = null;
- Class c = proxy.getClass();
- Class params[] = new Class[1];
+ Class<?> c = proxy.getClass();
+ Class<?> params[] = new Class[1];
params[0] = String.class;
executeM = findMethod(c, "getAttribute", params);
if (executeM == null) {
*/
public static ClassLoader getURLClassLoader(URL urls[], ClassLoader parent) {
try {
- Class urlCL = Class.forName("java.net.URLClassLoader");
- Class paramT[] = new Class[2];
+ Class<?> urlCL = Class.forName("java.net.URLClassLoader");
+ Class<?> paramT[] = new Class[2];
paramT[0] = urls.getClass();
paramT[1] = ClassLoader.class;
Method m = findMethod(urlCL, "newInstance", paramT);
// First, the ideal case - a setFoo( String ) method
for (int i = 0; i < methods.length; i++) {
- Class paramT[] = methods[i].getParameterTypes();
+ Class<?> paramT[] = methods[i].getParameterTypes();
if (setter.equals(methods[i].getName()) && paramT.length == 1
&& "java.lang.String".equals(paramT[0].getName())) {
&& methods[i].getParameterTypes().length == 1) {
// match - find the type and invoke it
- Class paramType = methods[i].getParameterTypes()[0];
+ Class<?> paramType = methods[i].getParameterTypes()[0];
Object params[] = new Object[1];
// Try a setFoo ( int )
// First, the ideal case - a getFoo() method
for (int i = 0; i < methods.length; i++) {
- Class paramT[] = methods[i].getParameterTypes();
+ Class<?> paramT[] = methods[i].getParameterTypes();
if (getter.equals(methods[i].getName()) && paramT.length == 0) {
return methods[i].invoke(o, (Object[]) null);
}
String setter = "set" + capitalize(name);
try {
Method methods[] = findMethods(o.getClass());
- Method setPropertyMethod = null;
// find setFoo() method
for (int i = 0; i < methods.length; i++) {
- Class paramT[] = methods[i].getParameterTypes();
+ Class<?> paramT[] = methods[i].getParameterTypes();
if (setter.equals(methods[i].getName()) && paramT.length == 0) {
methods[i].invoke(o, new Object[] {});
return;
*/
public static String replaceProperties(String value, Object getter) {
if (getter instanceof Hashtable)
- return replaceProperties(value, (Hashtable) getter, null);
+ return replaceProperties(value, (Hashtable<String, String>) getter,
+ null);
if (getter instanceof PropertySource) {
PropertySource src[] = new PropertySource[] { (PropertySource) getter };
/**
* Replace ${NAME} with the property value
*/
- public static String replaceProperties(String value, Hashtable staticProp,
+ public static String replaceProperties(String value,
+ Hashtable<String, String> staticProp,
PropertySource dynamicProp[]) {
StringBuffer sb = new StringBuffer();
int prev = 0;
String n = value.substring(pos + 2, endName);
String v = null;
if (staticProp != null) {
- v = (String) ((Hashtable) staticProp).get(n);
+ v = staticProp.get(n);
}
if (v == null && dynamicProp != null) {
for (int i = 0; i < dynamicProp.length; i++) {
* Add all the jar files in a dir to the classpath, represented as a Vector
* of URLs.
*/
- public static void addToClassPath(Vector cpV, String dir) {
+ public static void addToClassPath(Vector<URL> cpV, String dir) {
try {
String cpComp[] = getFilesByExt(dir, ".jar");
if (cpComp != null) {
}
}
- public static void addToolsJar(Vector v) {
+ public static void addToolsJar(Vector<URL> v) {
try {
// Add tools.jar in any case
File f = new File(System.getProperty("java.home")
* @throws IOException If an I/O error occurs
* @throws MalformedURLException Doh ;)
*/
- public static void addJarsFromClassPath(Vector jars, String cp)
+ public static void addJarsFromClassPath(Vector<URL> jars, String cp)
throws IOException, MalformedURLException {
String sep = System.getProperty("path.separator");
- String token;
StringTokenizer st;
if (cp != null) {
st = new StringTokenizer(cp, sep);
/**
* Return a URL[] that can be used to construct a class loader
*/
- public static URL[] getClassPath(Vector v) {
+ public static URL[] getClassPath(Vector<URL> v) {
URL[] urls = new URL[v.size()];
for (int i = 0; i < v.size(); i++) {
- urls[i] = (URL) v.elementAt(i);
+ urls[i] = v.elementAt(i);
}
return urls;
}
public static URL[] getClassPath(String dir, String cpath,
String cpathProp, boolean addTools) throws IOException,
MalformedURLException {
- Vector jarsV = new Vector();
+ Vector<URL> jarsV = new Vector<URL>();
if (dir != null) {
// Add dir/classes first, if it exists
URL url = getURL(dir, "classes");
//args0=findVoidSetters(proxy.getClass());
args0 = findBooleanSetters(proxy.getClass());
}
- Hashtable h = null;
+ Hashtable<String, String> h = null;
if (null != findMethod(proxy.getClass(), "getOptionAliases",
new Class[] {})) {
- h = (Hashtable) callMethod0(proxy, "getOptionAliases");
+ h = (Hashtable<String, String>) callMethod0(proxy,
+ "getOptionAliases");
}
return processArgs(proxy, args, args0, null, h);
}
public static boolean processArgs(Object proxy, String args[],
- String args0[], String args1[], Hashtable aliases) throws Exception {
+ String args0[], String args1[],
+ Hashtable<String, String> aliases) throws Exception {
for (int i = 0; i < args.length; i++) {
String arg = args[i];
if (arg.startsWith("-"))
arg = arg.substring(1);
if (aliases != null && aliases.get(arg) != null)
- arg = (String) aliases.get(arg);
+ arg = aliases.get(arg);
if (args0 != null) {
boolean set = false;
objectMethods.clear();
}
- public static String[] findVoidSetters(Class c) {
+ public static String[] findVoidSetters(Class<?> c) {
Method m[] = findMethods(c);
if (m == null)
return null;
- Vector v = new Vector();
+ Vector<String> v = new Vector<String>();
for (int i = 0; i < m.length; i++) {
if (m[i].getName().startsWith("set")
&& m[i].getParameterTypes().length == 0) {
}
String s[] = new String[v.size()];
for (int i = 0; i < s.length; i++) {
- s[i] = (String) v.elementAt(i);
+ s[i] = v.elementAt(i);
}
return s;
}
- public static String[] findBooleanSetters(Class c) {
+ public static String[] findBooleanSetters(Class<?> c) {
Method m[] = findMethods(c);
if (m == null)
return null;
- Vector v = new Vector();
+ Vector<String> v = new Vector<String>();
for (int i = 0; i < m.length; i++) {
if (m[i].getName().startsWith("set")
&& m[i].getParameterTypes().length == 1
}
String s[] = new String[v.size()];
for (int i = 0; i < s.length; i++) {
- s[i] = (String) v.elementAt(i);
+ s[i] = v.elementAt(i);
}
return s;
}
- static Hashtable objectMethods = new Hashtable();
+ static Hashtable<Class<?>, Method[]> objectMethods =
+ new Hashtable<Class<?>, Method[]>();
- public static Method[] findMethods(Class c) {
- Method methods[] = (Method[]) objectMethods.get(c);
+ public static Method[] findMethods(Class<?> c) {
+ Method methods[] = objectMethods.get(c);
if (methods != null)
return methods;
return methods;
}
- public static Method findMethod(Class c, String name, Class params[]) {
+ public static Method findMethod(Class<?> c, String name, Class<?> params[]) {
Method methods[] = findMethods(c);
if (methods == null)
return null;
for (int i = 0; i < methods.length; i++) {
if (methods[i].getName().equals(name)) {
- Class methodParams[] = methods[i].getParameterTypes();
+ Class<?> methodParams[] = methods[i].getParameterTypes();
if (methodParams == null)
if (params == null || params.length == 0)
return methods[i];
for (int i = 0; i < myMethods.length; i++) {
if (methodN.equals(myMethods[i].getName())) {
// check if it's overriden
- Class declaring = myMethods[i].getDeclaringClass();
- Class parentOfDeclaring = declaring.getSuperclass();
+ Class<?> declaring = myMethods[i].getDeclaringClass();
+ Class<?> parentOfDeclaring = declaring.getSuperclass();
// this works only if the base class doesn't extend
// another class.
return false;
}
- public static void callMain(Class c, String args[]) throws Exception {
- Class p[] = new Class[1];
+ public static void callMain(Class<?> c, String args[]) throws Exception {
+ Class<?> p[] = new Class[1];
p[0] = args.getClass();
Method m = c.getMethod("main", p);
m.invoke(c, new Object[] { args });
d("callMethod1 " + target.getClass().getName() + " "
+ param1.getClass().getName() + " " + typeParam1);
- Class params[] = new Class[1];
+ Class<?> params[] = new Class[1];
if (typeParam1 == null)
params[0] = param1.getClass();
else
if (dbg > 0)
d("callMethod0 " + target.getClass().getName() + "." + methodN);
- Class params[] = new Class[0];
+ Class<?> params[] = new Class[0];
Method m = findMethod(target.getClass(), methodN, params);
if (m == null)
throw new NoSuchMethodException(target.getClass().getName() + " "
static Object[] emptyArray = new Object[] {};
public static Object callMethodN(Object target, String methodN,
- Object params[], Class typeParams[]) throws Exception {
+ Object params[], Class<?> typeParams[]) throws Exception {
Method m = null;
m = findMethod(target.getClass(), methodN, typeParams);
if (m == null) {
return o;
}
- public static Object convert(String object, Class paramType) {
+ public static Object convert(String object, Class<?> paramType) {
Object result = null;
if ("java.lang.String".equals(paramType.getName())) {
result = object;
import org.apache.catalina.tribes.ManagedChannel;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
-import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.Channel;
-import java.io.Externalizable;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
}
private static void printSendStats(long counter, int messageSize) {
- float cnt = (float)counter;
- float size = (float)messageSize;
- float time = (float)(System.currentTimeMillis()-messageStartSendTime) / 1000f;
+ float cnt = counter;
+ float size = messageSize;
+ float time = (System.currentTimeMillis()-messageStartSendTime) / 1000f;
log.info("****SEND STATS-"+Thread.currentThread().getName()+"*****"+
"\n\tMessage count:"+counter+
"\n\tTotal bytes :"+(long)(size*cnt)+
long counter = 0;
long total = 0;
LoadMessage msg = new LoadMessage();
- int messageSize = LoadTest.messageSize;
try {
startTest();
bytesReceived+=((LoadMessage)msg).getMessage().length;
- mBytesReceived+=((float)((LoadMessage)msg).getMessage().length)/1024f/1024f;
+ mBytesReceived+=(((LoadMessage)msg).getMessage().length)/1024f/1024f;
messagesReceived++;
if ( (messagesReceived%statsInterval)==0 || (messagesReceived==msgCount)) {
- float bytes = (float)(((LoadMessage)msg).getMessage().length*messagesReceived);
- float seconds = ((float)(System.currentTimeMillis()-receiveStart)) / 1000f;
+ float bytes = (((LoadMessage)msg).getMessage().length*messagesReceived);
+ float seconds = (System.currentTimeMillis()-receiveStart) / 1000f;
log.info("****RECEIVE STATS-"+Thread.currentThread().getName()+"*****"+
"\n\tMessage count :"+(long)messagesReceived+
"\n\tMessage/sec :"+messagesReceived/seconds+
public static byte[] outdata = new byte[size];
public static Random r = new Random(System.currentTimeMillis());
public static int getMessageSize (LoadMessage msg) {
- int messageSize = msg.getMessage().length;
- if ( ((Object)msg) instanceof ByteMessage ) return messageSize;
- try {
- messageSize = XByteBuffer.serialize(new LoadMessage()).length;
- log.info("Average message size:" + messageSize + " bytes");
- } catch (Exception x) {
- log.error("Unable to calculate test message size.", x);
- }
- return messageSize;
+ return msg.getMessage().length;
}
static {
r.nextBytes(outdata);
test.channelOptions = channelOptions;
}
test.run();
- if ( shutdown && send ) channel.stop(channel.DEFAULT);
+ if ( shutdown && send ) channel.stop(Channel.DEFAULT);
System.out.println("System test complete, sleeping to let threads finish.");
Thread.sleep(60*1000*60);
}
exit.setDaemon(true);
exit.start();
try {
- channel.stop(channel.DEFAULT);
+ channel.stop(Channel.DEFAULT);
}catch ( Exception x ) {
x.printStackTrace();
package org.apache.catalina.tribes.demos;
import java.io.Serializable;
-import java.util.Map;
import java.awt.ComponentOrientation;
import java.awt.Dimension;
import javax.swing.JTable;
import javax.swing.JTextField;
import javax.swing.table.AbstractTableModel;
-import javax.swing.table.TableModel;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.ManagedChannel;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
-import org.apache.catalina.tribes.tipis.AbstractReplicatedMap;
import org.apache.catalina.tribes.tipis.LazyReplicatedMap;
import javax.swing.table.DefaultTableCellRenderer;
import java.awt.Color;
import java.awt.Component;
import javax.swing.table.TableColumn;
-import org.apache.catalina.tribes.util.UUIDGenerator;
-import org.apache.catalina.tribes.util.Arrays;
-import java.util.Set;
import java.util.Random;
/**
if ( args.length > 0 && (!args[args.length-1].startsWith("-"))) {
mapName = args[args.length-1];
}
- channel.start(channel.DEFAULT);
+ channel.start(Channel.DEFAULT);
Runtime.getRuntime().addShutdownHook(new Shutdown(channel));
- MapDemo demo = new MapDemo(channel,mapName);
+ new MapDemo(channel,mapName);
System.out.println("System test complete, time to start="+(System.currentTimeMillis()-start)+" ms. Sleeping to let threads finish.");
Thread.sleep(60 * 1000 * 60);
exit.setDaemon(true);
exit.start();
try {
- channel.stop(channel.DEFAULT);
+ channel.stop(Channel.DEFAULT);
} catch (Exception x) {
x.printStackTrace();
try {
Thread.sleep(500);
} catch (InterruptedException x) {
- Thread.currentThread().interrupted();
+ Thread.interrupted();
}
}
}
-/*\r
- * Licensed to the Apache Software Foundation (ASF) under one or more\r
- * contributor license agreements. See the NOTICE file distributed with\r
- * this work for additional information regarding copyright ownership.\r
- * The ASF licenses this file to You under the Apache License, Version 2.0\r
- * (the "License"); you may not use this file except in compliance with\r
- * the License. You may obtain a copy of the License at\r
- *\r
- * http://www.apache.org/licenses/LICENSE-2.0\r
- *\r
- * Unless required by applicable law or agreed to in writing, software\r
- * distributed under the License is distributed on an "AS IS" BASIS,\r
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * See the License for the specific language governing permissions and\r
- * limitations under the License.\r
- */\r
-package org.apache.catalina.tribes.demos;\r
-\r
-import java.io.Serializable;\r
-import org.apache.catalina.tribes.ManagedChannel;\r
-import org.apache.catalina.tribes.Member;\r
-import org.apache.catalina.tribes.Channel;\r
-import java.util.Properties;\r
-import org.apache.catalina.tribes.MembershipListener;\r
-import org.apache.catalina.tribes.util.UUIDGenerator;\r
-import org.apache.catalina.tribes.util.Arrays;\r
-import java.io.ByteArrayOutputStream;\r
-import java.io.ByteArrayInputStream;\r
-import java.io.IOException;\r
-\r
-public class MembersWithProperties implements MembershipListener{\r
- Channel channel;\r
- static Thread main;\r
-\r
- public MembersWithProperties(Channel channel, Properties props) throws IOException {\r
- this.channel = channel;\r
- channel.addMembershipListener(this);\r
- ManagedChannel mchannel = (ManagedChannel)channel;\r
- mchannel.getMembershipService().setPayload(getPayload(props));\r
- }\r
- \r
- byte[] getPayload(Properties props) throws IOException {\r
- ByteArrayOutputStream bout = new ByteArrayOutputStream();\r
- props.store(bout,"");\r
- return bout.toByteArray();\r
- }\r
- \r
- Properties getProperties(byte[] payload) throws IOException {\r
- ByteArrayInputStream bin = new ByteArrayInputStream(payload);\r
- Properties props = new Properties();\r
- props.load(bin);\r
- return props;\r
- }\r
-\r
- public void memberAdded(Member member) {\r
- try {\r
- System.out.println("Received member added:"+member);\r
- System.out.println("Payload["+member+"] :");\r
- getProperties(member.getPayload()).store(System.out,"");\r
- }catch ( Exception x ) {\r
- x.printStackTrace();\r
- }\r
- }\r
- \r
- public void memberDisappeared(Member member) {\r
- try {\r
- System.out.println("Received member disappeared:"+member);\r
- System.out.println("Payload["+member+"] :");\r
- getProperties(member.getPayload()).store(System.out,"");\r
- }catch ( Exception x ) {\r
- x.printStackTrace();\r
- }\r
- }\r
-\r
- public static void usage() {\r
- System.out.println("Tribes Member Properties demo.");\r
- System.out.println("Usage:\n\t" +\r
- "java MemberWithProperties \n\t" +\r
- "Channel options:" +\r
- ChannelCreator.usage() + "\n\n" +\r
- "Example:\n\t" +\r
- "java MembersWithProperties -port 4004\n\t" +\r
- "java MembersWithProperties -bind 192.168.0.45 -port 4005\n\t" +\r
- "java MembersWithProperties -bind 192.168.0.45 -port 4005 -mbind 192.168.0.45 -count 100 -stats 10\n");\r
- }\r
-\r
- public static void main(String[] args) throws Exception {\r
- if (args.length==0) usage();\r
- main = Thread.currentThread();\r
- ManagedChannel channel = (ManagedChannel) ChannelCreator.createChannel(args);\r
- Properties props = new Properties();\r
- props.setProperty("mydomainkey","mydomainvalue");\r
- props.setProperty("someotherkey", Arrays.toString(UUIDGenerator.randomUUID(true)));\r
- MembersWithProperties test = new MembersWithProperties(channel, props);\r
- channel.start(channel.DEFAULT);\r
- Runtime.getRuntime().addShutdownHook(new Shutdown(channel));\r
- try {\r
- main.sleep(Long.MAX_VALUE);\r
- }catch(InterruptedException ix) {\r
- main.sleep(5000);//allow everything to shutdown\r
- }\r
- }\r
-\r
- public static class Shutdown extends Thread {\r
- ManagedChannel channel = null;\r
- public Shutdown(ManagedChannel channel) {\r
- this.channel = channel;\r
- }\r
-\r
- public void run() {\r
- System.out.println("Shutting down...");\r
- try {\r
- channel.stop(channel.DEFAULT);\r
- } catch (Exception x) {\r
- x.printStackTrace();\r
- }\r
- System.out.println("Channel stopped.");\r
- main.interrupt();\r
- }\r
- }\r
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.demos;
+
+import org.apache.catalina.tribes.ManagedChannel;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.Channel;
+import java.util.Properties;
+import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.util.UUIDGenerator;
+import org.apache.catalina.tribes.util.Arrays;
+import java.io.ByteArrayOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+public class MembersWithProperties implements MembershipListener{
+ Channel channel;
+ static Thread main;
+
+ public MembersWithProperties(Channel channel, Properties props) throws IOException {
+ this.channel = channel;
+ channel.addMembershipListener(this);
+ ManagedChannel mchannel = (ManagedChannel)channel;
+ mchannel.getMembershipService().setPayload(getPayload(props));
+ }
+
+ byte[] getPayload(Properties props) throws IOException {
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ props.store(bout,"");
+ return bout.toByteArray();
+ }
+
+ Properties getProperties(byte[] payload) throws IOException {
+ ByteArrayInputStream bin = new ByteArrayInputStream(payload);
+ Properties props = new Properties();
+ props.load(bin);
+ return props;
+ }
+
+ public void memberAdded(Member member) {
+ try {
+ System.out.println("Received member added:"+member);
+ System.out.println("Payload["+member+"] :");
+ getProperties(member.getPayload()).store(System.out,"");
+ }catch ( Exception x ) {
+ x.printStackTrace();
+ }
+ }
+
+ public void memberDisappeared(Member member) {
+ try {
+ System.out.println("Received member disappeared:"+member);
+ System.out.println("Payload["+member+"] :");
+ getProperties(member.getPayload()).store(System.out,"");
+ }catch ( Exception x ) {
+ x.printStackTrace();
+ }
+ }
+
+ public static void usage() {
+ System.out.println("Tribes Member Properties demo.");
+ System.out.println("Usage:\n\t" +
+ "java MemberWithProperties \n\t" +
+ "Channel options:" +
+ ChannelCreator.usage() + "\n\n" +
+ "Example:\n\t" +
+ "java MembersWithProperties -port 4004\n\t" +
+ "java MembersWithProperties -bind 192.168.0.45 -port 4005\n\t" +
+ "java MembersWithProperties -bind 192.168.0.45 -port 4005 -mbind 192.168.0.45 -count 100 -stats 10\n");
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length==0) usage();
+ main = Thread.currentThread();
+ ManagedChannel channel = (ManagedChannel) ChannelCreator.createChannel(args);
+ Properties props = new Properties();
+ props.setProperty("mydomainkey","mydomainvalue");
+ props.setProperty("someotherkey", Arrays.toString(UUIDGenerator.randomUUID(true)));
+ new MembersWithProperties(channel, props);
+ channel.start(Channel.DEFAULT);
+ Runtime.getRuntime().addShutdownHook(new Shutdown(channel));
+ try {
+ Thread.sleep(Long.MAX_VALUE);
+ }catch(InterruptedException ix) {
+ Thread.sleep(5000);//allow everything to shutdown
+ }
+ }
+
+ public static class Shutdown extends Thread {
+ ManagedChannel channel = null;
+ public Shutdown(ManagedChannel channel) {
+ this.channel = channel;
+ }
+
+ public void run() {
+ System.out.println("Shutting down...");
+ try {
+ channel.stop(Channel.DEFAULT);
+ } catch (Exception x) {
+ x.printStackTrace();
+ }
+ System.out.println("Channel stopped.");
+ main.interrupt();
+ }
+ }
}
\ No newline at end of file
*/
package org.apache.catalina.tribes.test;
-import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.util.Iterator;
import java.nio.channels.Selector;
continue;
}
- Iterator it = selector.selectedKeys().iterator();
+ Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
- SelectionKey sk = (SelectionKey) it.next();
+ SelectionKey sk = it.next();
it.remove();
try {
int readyOps = sk.readyOps();
NioSender sender = (NioSender) sk.attachment();
if ( sender.process(sk, (testOptions&Channel.SEND_OPTIONS_USE_ACK)==Channel.SEND_OPTIONS_USE_ACK) ) {
System.out.println("Message completed for handler:"+sender);
- Thread.currentThread().sleep(2000);
+ Thread.sleep(2000);
sender.reset();
sender.setMessage(XByteBuffer.createDataPackage(getMessage(mbr)));
}
protected void tearDown() throws Exception {
super.tearDown();
- try {channel.stop(channel.DEFAULT);}catch (Exception ignore){}
+ try {channel.stop(Channel.DEFAULT);}catch (Exception ignore){}
}
public void testDoubleFullStart() throws Exception {
int count = 0;
try {
- channel.start(channel.DEFAULT);
+ channel.start(Channel.DEFAULT);
count++;
} catch ( Exception x){x.printStackTrace();}
try {
- channel.start(channel.DEFAULT);
+ channel.start(Channel.DEFAULT);
count++;
} catch ( Exception x){x.printStackTrace();}
assertEquals(count,2);
- channel.stop(channel.DEFAULT);
+ channel.stop(Channel.DEFAULT);
}
public void testScrap() throws Exception {
//try to double start the RX
int count = 0;
try {
- channel.start(channel.SND_RX_SEQ);
- channel.start(channel.MBR_RX_SEQ);
+ channel.start(Channel.SND_RX_SEQ);
+ channel.start(Channel.MBR_RX_SEQ);
count++;
} catch ( Exception x){x.printStackTrace();}
try {
- channel.start(channel.MBR_RX_SEQ);
+ channel.start(Channel.MBR_RX_SEQ);
count++;
} catch ( Exception x){/*expected*/}
assertEquals(count,1);
- channel.stop(channel.DEFAULT);
+ channel.stop(Channel.DEFAULT);
//double the membership sender
count = 0;
try {
- channel.start(channel.SND_RX_SEQ);
- channel.start(channel.MBR_TX_SEQ);
+ channel.start(Channel.SND_RX_SEQ);
+ channel.start(Channel.MBR_TX_SEQ);
count++;
} catch ( Exception x){x.printStackTrace();}
try {
- channel.start(channel.MBR_TX_SEQ);
+ channel.start(Channel.MBR_TX_SEQ);
count++;
} catch ( Exception x){/*expected*/}
assertEquals(count,1);
- channel.stop(channel.DEFAULT);
+ channel.stop(Channel.DEFAULT);
count = 0;
try {
- channel.start(channel.SND_RX_SEQ);
+ channel.start(Channel.SND_RX_SEQ);
count++;
} catch ( Exception x){x.printStackTrace();}
try {
- channel.start(channel.SND_RX_SEQ);
+ channel.start(Channel.SND_RX_SEQ);
count++;
} catch ( Exception x){/*expected*/}
assertEquals(count,1);
- channel.stop(channel.DEFAULT);
+ channel.stop(Channel.DEFAULT);
count = 0;
try {
- channel.start(channel.SND_TX_SEQ);
+ channel.start(Channel.SND_TX_SEQ);
count++;
} catch ( Exception x){x.printStackTrace();}
try {
- channel.start(channel.SND_TX_SEQ);
+ channel.start(Channel.SND_TX_SEQ);
count++;
} catch ( Exception x){/*expected*/}
assertEquals(count,1);
- channel.stop(channel.DEFAULT);
+ channel.stop(Channel.DEFAULT);
}
public void testFalseOption() throws Exception {
count++;
} catch ( Exception x){/*expected*/}
assertEquals(count,2);
- channel.stop(channel.DEFAULT);
+ channel.stop(Channel.DEFAULT);
}
public void testUdpReceiverStart() throws Exception {
import junit.framework.*;
import org.apache.catalina.tribes.group.*;
+import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelInterceptor;
import org.apache.catalina.tribes.ChannelException;
protected void tearDown() throws Exception {
super.tearDown();
- if ( channel != null ) try {channel.stop(channel.DEFAULT);}catch ( Exception ignore) {}
+ if ( channel != null ) try {channel.stop(Channel.DEFAULT);}catch ( Exception ignore) {}
channel = null;
}
i.setOptionFlag(128);
channel.addInterceptor(i);
try {
- channel.start(channel.DEFAULT);
+ channel.start(Channel.DEFAULT);
}catch ( ChannelException x ) {
if ( x.getMessage().indexOf("option flag conflict") >= 0 ) error = true;
}
i.setOptionFlag(256);
channel.addInterceptor(i);
try {
- channel.start(channel.DEFAULT);
+ channel.start(Channel.DEFAULT);
}catch ( ChannelException x ) {
if ( x.getMessage().indexOf("option flag conflict") >= 0 ) error = true;
}
import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.GroupChannel;
-import org.apache.catalina.tribes.test.channel.TestDataIntegrity.Listener;
-import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
/**
-/*\r
- * Licensed to the Apache Software Foundation (ASF) under one or more\r
- * contributor license agreements. See the NOTICE file distributed with\r
- * this work for additional information regarding copyright ownership.\r
- * The ASF licenses this file to You under the Apache License, Version 2.0\r
- * (the "License"); you may not use this file except in compliance with\r
- * the License. You may obtain a copy of the License at\r
- *\r
- * http://www.apache.org/licenses/LICENSE-2.0\r
- *\r
- * Unless required by applicable law or agreed to in writing, software\r
- * distributed under the License is distributed on an "AS IS" BASIS,\r
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * See the License for the specific language governing permissions and\r
- * limitations under the License.\r
- */\r
-package org.apache.catalina.tribes.test.channel;\r
-\r
-import junit.framework.TestCase;\r
-import java.io.Serializable;\r
-import java.util.Random;\r
-import java.util.Arrays;\r
-import java.util.concurrent.atomic.AtomicInteger;\r
-import java.util.concurrent.atomic.AtomicLong;\r
-\r
-import org.apache.catalina.tribes.Channel;\r
-import org.apache.catalina.tribes.ChannelListener;\r
-import org.apache.catalina.tribes.ChannelReceiver;\r
-import org.apache.catalina.tribes.Member;\r
-import org.apache.catalina.tribes.group.GroupChannel;\r
-import org.apache.catalina.tribes.test.channel.TestDataIntegrity.Listener;\r
-import org.apache.catalina.tribes.transport.AbstractSender;\r
-import org.apache.catalina.tribes.transport.ReceiverBase;\r
-import org.apache.catalina.tribes.transport.ReplicationTransmitter;\r
-import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;\r
-import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;\r
-import org.apache.catalina.tribes.group.interceptors.ThroughputInterceptor;\r
-import org.apache.catalina.tribes.io.XByteBuffer;\r
-\r
-/**\r
- */\r
-public class TestUdpPackages extends TestCase {\r
- int msgCount = 500;\r
- int threadCount = 20;\r
- GroupChannel channel1;\r
- GroupChannel channel2;\r
- Listener listener1;\r
- int threadCounter = 0;\r
- protected void setUp() throws Exception {\r
- super.setUp();\r
- channel1 = new GroupChannel();\r
- channel1.addInterceptor(new MessageDispatch15Interceptor());\r
- channel2 = new GroupChannel();\r
- channel2.addInterceptor(new MessageDispatch15Interceptor());\r
- ThroughputInterceptor tint = new ThroughputInterceptor();\r
- tint.setInterval(500);\r
- ThroughputInterceptor tint2 = new ThroughputInterceptor();\r
- tint2.setInterval(500);\r
- //channel1.addInterceptor(tint);\r
- channel2.addInterceptor(tint2);\r
- listener1 = new Listener();\r
- ReceiverBase rb1 = (ReceiverBase)channel1.getChannelReceiver();\r
- ReceiverBase rb2 = (ReceiverBase)channel2.getChannelReceiver();\r
- rb1.setUdpPort(50000);\r
- rb2.setUdpPort(50000);\r
- channel2.addChannelListener(listener1);\r
- channel1.start(GroupChannel.DEFAULT);\r
- channel2.start(GroupChannel.DEFAULT);\r
- }\r
-\r
- protected void tearDown() throws Exception {\r
- super.tearDown();\r
- channel1.stop(GroupChannel.DEFAULT);\r
- channel2.stop(GroupChannel.DEFAULT);\r
- }\r
-\r
- public void testSingleDataSendNO_ACK() throws Exception {\r
- AbstractSender s1 =(AbstractSender) ((ReplicationTransmitter)channel1.getChannelSender()).getTransport();\r
- AbstractSender s2 =(AbstractSender) ((ReplicationTransmitter)channel2.getChannelSender()).getTransport();\r
- s1.setTimeout(Long.MAX_VALUE); //for debugging\r
- s2.setTimeout(Long.MAX_VALUE); //for debugging\r
- \r
- System.err.println("Starting Single package NO_ACK");\r
- channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(1024),Channel.SEND_OPTIONS_UDP);\r
- Thread.sleep(500);\r
- System.err.println("Finished Single package NO_ACK ["+listener1.count+"]");\r
- assertEquals("Checking success messages.",1,listener1.count.get());\r
- }\r
-\r
- \r
- public void testDataSendNO_ACK() throws Exception {\r
- final AtomicInteger counter = new AtomicInteger(0);\r
- ReceiverBase rb1 = (ReceiverBase)channel1.getChannelReceiver();\r
- ReceiverBase rb2 = (ReceiverBase)channel2.getChannelReceiver();\r
- rb1.setUdpRxBufSize(1024*1024*10);\r
- rb2.setUdpRxBufSize(1024*1024*10);\r
- rb1.setUdpTxBufSize(1024*1024*10);\r
- rb2.setUdpTxBufSize(1024*1024*10);\r
- System.err.println("Starting NO_ACK");\r
- Thread[] threads = new Thread[threadCount];\r
- for (int x=0; x<threads.length; x++ ) {\r
- threads[x] = new Thread() {\r
- public void run() {\r
- try {\r
- long start = System.currentTimeMillis();\r
- for (int i = 0; i < msgCount; i++) {\r
- int cnt = counter.getAndAdd(1);\r
- channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(1024,cnt),Channel.SEND_OPTIONS_UDP);\r
- //Thread.currentThread().sleep(10);\r
- }\r
- System.out.println("Thread["+this.getName()+"] sent "+msgCount+" messages in "+(System.currentTimeMillis()-start)+" ms.");\r
- }catch ( Exception x ) {\r
- x.printStackTrace();\r
- return;\r
- } finally {\r
- threadCounter++;\r
- }\r
- }\r
- };\r
- } \r
- for (int x=0; x<threads.length; x++ ) { threads[x].start();}\r
- for (int x=0; x<threads.length; x++ ) { threads[x].join();}\r
- //sleep for 50 sec, let the other messages in\r
- long start = System.currentTimeMillis();\r
- while ( (System.currentTimeMillis()-start)<25000 && msgCount*threadCount!=listener1.count.get()) Thread.sleep(500);\r
- System.err.println("Finished NO_ACK ["+listener1.count+"]");\r
- System.out.println("Sent "+counter.get()+ " messages. Received "+listener1.count+" Highest msg received:"+listener1.maxIdx);\r
- System.out.print("Missing messages:");\r
- printMissingMsgs(listener1.nrs,counter.get());\r
- assertEquals("Checking success messages.",msgCount*threadCount,listener1.count.get());\r
- }\r
- \r
- public static void printMissingMsgs(int[] msgs, int maxIdx) {\r
- for (int i=0; i<maxIdx && i<msgs.length; i++) {\r
- if (msgs[i]==0) System.out.print(i+", ");\r
- }\r
- System.out.println();\r
- }\r
-\r
- public void testDataSendASYNCM() throws Exception {\r
- final AtomicInteger counter = new AtomicInteger(0);\r
- ReceiverBase rb1 = (ReceiverBase)channel1.getChannelReceiver();\r
- ReceiverBase rb2 = (ReceiverBase)channel2.getChannelReceiver();\r
- rb1.setUdpRxBufSize(1024*1024*10);\r
- rb2.setUdpRxBufSize(1024*1024*10);\r
- rb1.setUdpTxBufSize(1024*1024*10);\r
- rb2.setUdpTxBufSize(1024*1024*10);\r
- System.err.println("Starting NO_ACK");\r
- Thread[] threads = new Thread[threadCount];\r
- for (int x=0; x<threads.length; x++ ) {\r
- threads[x] = new Thread() {\r
- public void run() {\r
- try {\r
- long start = System.currentTimeMillis();\r
- for (int i = 0; i < msgCount; i++) {\r
- int cnt = counter.getAndAdd(1);\r
- channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(1024,cnt),Channel.SEND_OPTIONS_UDP|Channel.SEND_OPTIONS_ASYNCHRONOUS);\r
- //Thread.currentThread().sleep(10);\r
- }\r
- System.out.println("Thread["+this.getName()+"] sent "+msgCount+" messages in "+(System.currentTimeMillis()-start)+" ms.");\r
- }catch ( Exception x ) {\r
- x.printStackTrace();\r
- return;\r
- } finally {\r
- threadCounter++;\r
- }\r
- }\r
- };\r
- } \r
- for (int x=0; x<threads.length; x++ ) { threads[x].start();}\r
- for (int x=0; x<threads.length; x++ ) { threads[x].join();}\r
- //sleep for 50 sec, let the other messages in\r
- long start = System.currentTimeMillis();\r
- while ( (System.currentTimeMillis()-start)<25000 && msgCount*threadCount!=listener1.count.get()) Thread.sleep(500);\r
- System.err.println("Finished NO_ACK ["+listener1.count+"]");\r
- System.out.println("Sent "+counter.get()+ " messages. Received "+listener1.count+" Highest msg received:"+listener1.maxIdx);\r
- System.out.print("Missing messages:");\r
- printMissingMsgs(listener1.nrs,counter.get());\r
- assertEquals("Checking success messages.",msgCount*threadCount,listener1.count.get());\r
- }\r
- public void testDataSendASYNC() throws Exception {\r
- System.err.println("Starting ASYNC");\r
- for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS|Channel.SEND_OPTIONS_UDP);\r
- //sleep for 50 sec, let the other messages in\r
- long start = System.currentTimeMillis();\r
- while ( (System.currentTimeMillis()-start)<5000 && msgCount!=listener1.count.get()) Thread.sleep(500);\r
- System.err.println("Finished ASYNC");\r
- assertEquals("Checking success messages.",msgCount,listener1.count.get());\r
- }\r
-\r
- public void testDataSendACK() throws Exception {\r
- System.err.println("Starting ACK");\r
- for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_UDP);\r
- Thread.sleep(250);\r
- System.err.println("Finished ACK");\r
- assertEquals("Checking success messages.",msgCount,listener1.count.get());\r
- }\r
-\r
- public void testDataSendSYNCACK() throws Exception {\r
- System.err.println("Starting SYNC_ACK");\r
- for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_UDP);\r
- Thread.sleep(250);\r
- System.err.println("Finished SYNC_ACK");\r
- assertEquals("Checking success messages.",msgCount,listener1.count.get());\r
- }\r
-\r
- public static class Listener implements ChannelListener {\r
- AtomicLong count = new AtomicLong(0);\r
- int maxIdx = -1;\r
- int[] nrs = new int[1000000];\r
- public Listener() {\r
- Arrays.fill(nrs, 0);\r
- }\r
- public boolean accept(Serializable s, Member m) {\r
- return (s instanceof Data);\r
- }\r
-\r
- public void messageReceived(Serializable s, Member m) {\r
- try {\r
- Data d = (Data)s;\r
- if ( !Data.verify(d) ) {\r
- System.err.println("ERROR - Unable to verify data package");\r
- } else {\r
- long c = count.addAndGet(1);\r
- if ((c%1000) ==0 ) {\r
- System.err.println("SUCCESS:"+c);\r
- }\r
- int nr = d.getNumber();\r
- if (nr>=0 && nr<nrs.length) {\r
- maxIdx = Math.max(maxIdx, nr);\r
- nrs[nr] = 1;\r
- }\r
- }\r
- }catch (Exception x ) {\r
- x.printStackTrace();\r
- }\r
- }\r
- }\r
-\r
- public static class Data implements Serializable {\r
- public int length;\r
- public byte[] data;\r
- public byte key;\r
- public boolean hasNr = false;\r
- public static Random r = new Random(System.currentTimeMillis());\r
- public static Data createRandomData() {\r
- return createRandomData(ChannelReceiver.MAX_UDP_SIZE);\r
- }\r
- public static Data createRandomData(int size) {\r
- return createRandomData(size,-1);\r
- }\r
- \r
- public static Data createRandomData(int size, int number) {\r
- int i = r.nextInt();\r
- i = ( i % 127 );\r
- int length = Math.abs(r.nextInt() % size);\r
- if (length<100) length += 100;\r
- Data d = new Data();\r
- d.length = length;\r
- d.key = (byte)i;\r
- d.data = new byte[length];\r
- Arrays.fill(d.data,d.key);\r
- if (number>0 && d.data.length>=4) {\r
- //populate number\r
- d.hasNr = true;\r
- XByteBuffer.toBytes(number,d.data, 0);\r
- }\r
- return d;\r
- }\r
- \r
- public int getNumber() {\r
- if (!hasNr) return -1;\r
- return XByteBuffer.toInt(this.data, 0);\r
- }\r
-\r
- public static boolean verify(Data d) {\r
- boolean result = (d.length == d.data.length);\r
- for ( int i=(d.hasNr?4:0); result && (i<d.data.length); i++ ) result = result && d.data[i] == d.key;\r
- return result;\r
- }\r
- }\r
-\r
-\r
-\r
-}\r
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.test.channel;
+
+import junit.framework.TestCase;
+import java.io.Serializable;
+import java.util.Random;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.ChannelReceiver;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.transport.AbstractSender;
+import org.apache.catalina.tribes.transport.ReceiverBase;
+import org.apache.catalina.tribes.transport.ReplicationTransmitter;
+import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
+import org.apache.catalina.tribes.group.interceptors.ThroughputInterceptor;
+import org.apache.catalina.tribes.io.XByteBuffer;
+
+/**
+ */
+public class TestUdpPackages extends TestCase {
+ int msgCount = 500;
+ int threadCount = 20;
+ GroupChannel channel1;
+ GroupChannel channel2;
+ Listener listener1;
+ int threadCounter = 0;
+ protected void setUp() throws Exception {
+ super.setUp();
+ channel1 = new GroupChannel();
+ channel1.addInterceptor(new MessageDispatch15Interceptor());
+ channel2 = new GroupChannel();
+ channel2.addInterceptor(new MessageDispatch15Interceptor());
+ ThroughputInterceptor tint = new ThroughputInterceptor();
+ tint.setInterval(500);
+ ThroughputInterceptor tint2 = new ThroughputInterceptor();
+ tint2.setInterval(500);
+ //channel1.addInterceptor(tint);
+ channel2.addInterceptor(tint2);
+ listener1 = new Listener();
+ ReceiverBase rb1 = (ReceiverBase)channel1.getChannelReceiver();
+ ReceiverBase rb2 = (ReceiverBase)channel2.getChannelReceiver();
+ rb1.setUdpPort(50000);
+ rb2.setUdpPort(50000);
+ channel2.addChannelListener(listener1);
+ channel1.start(GroupChannel.DEFAULT);
+ channel2.start(GroupChannel.DEFAULT);
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ channel1.stop(GroupChannel.DEFAULT);
+ channel2.stop(GroupChannel.DEFAULT);
+ }
+
+ public void testSingleDataSendNO_ACK() throws Exception {
+ AbstractSender s1 =(AbstractSender) ((ReplicationTransmitter)channel1.getChannelSender()).getTransport();
+ AbstractSender s2 =(AbstractSender) ((ReplicationTransmitter)channel2.getChannelSender()).getTransport();
+ s1.setTimeout(Long.MAX_VALUE); //for debugging
+ s2.setTimeout(Long.MAX_VALUE); //for debugging
+
+ System.err.println("Starting Single package NO_ACK");
+ channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(1024),Channel.SEND_OPTIONS_UDP);
+ Thread.sleep(500);
+ System.err.println("Finished Single package NO_ACK ["+listener1.count+"]");
+ assertEquals("Checking success messages.",1,listener1.count.get());
+ }
+
+
+ public void testDataSendNO_ACK() throws Exception {
+ final AtomicInteger counter = new AtomicInteger(0);
+ ReceiverBase rb1 = (ReceiverBase)channel1.getChannelReceiver();
+ ReceiverBase rb2 = (ReceiverBase)channel2.getChannelReceiver();
+ rb1.setUdpRxBufSize(1024*1024*10);
+ rb2.setUdpRxBufSize(1024*1024*10);
+ rb1.setUdpTxBufSize(1024*1024*10);
+ rb2.setUdpTxBufSize(1024*1024*10);
+ System.err.println("Starting NO_ACK");
+ Thread[] threads = new Thread[threadCount];
+ for (int x=0; x<threads.length; x++ ) {
+ threads[x] = new Thread() {
+ public void run() {
+ try {
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < msgCount; i++) {
+ int cnt = counter.getAndAdd(1);
+ channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(1024,cnt),Channel.SEND_OPTIONS_UDP);
+ //Thread.currentThread().sleep(10);
+ }
+ System.out.println("Thread["+this.getName()+"] sent "+msgCount+" messages in "+(System.currentTimeMillis()-start)+" ms.");
+ }catch ( Exception x ) {
+ x.printStackTrace();
+ return;
+ } finally {
+ threadCounter++;
+ }
+ }
+ };
+ }
+ for (int x=0; x<threads.length; x++ ) { threads[x].start();}
+ for (int x=0; x<threads.length; x++ ) { threads[x].join();}
+ //sleep for 50 sec, let the other messages in
+ long start = System.currentTimeMillis();
+ while ( (System.currentTimeMillis()-start)<25000 && msgCount*threadCount!=listener1.count.get()) Thread.sleep(500);
+ System.err.println("Finished NO_ACK ["+listener1.count+"]");
+ System.out.println("Sent "+counter.get()+ " messages. Received "+listener1.count+" Highest msg received:"+listener1.maxIdx);
+ System.out.print("Missing messages:");
+ printMissingMsgs(listener1.nrs,counter.get());
+ assertEquals("Checking success messages.",msgCount*threadCount,listener1.count.get());
+ }
+
+ public static void printMissingMsgs(int[] msgs, int maxIdx) {
+ for (int i=0; i<maxIdx && i<msgs.length; i++) {
+ if (msgs[i]==0) System.out.print(i+", ");
+ }
+ System.out.println();
+ }
+
+ public void testDataSendASYNCM() throws Exception {
+ final AtomicInteger counter = new AtomicInteger(0);
+ ReceiverBase rb1 = (ReceiverBase)channel1.getChannelReceiver();
+ ReceiverBase rb2 = (ReceiverBase)channel2.getChannelReceiver();
+ rb1.setUdpRxBufSize(1024*1024*10);
+ rb2.setUdpRxBufSize(1024*1024*10);
+ rb1.setUdpTxBufSize(1024*1024*10);
+ rb2.setUdpTxBufSize(1024*1024*10);
+ System.err.println("Starting NO_ACK");
+ Thread[] threads = new Thread[threadCount];
+ for (int x=0; x<threads.length; x++ ) {
+ threads[x] = new Thread() {
+ public void run() {
+ try {
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < msgCount; i++) {
+ int cnt = counter.getAndAdd(1);
+ channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(1024,cnt),Channel.SEND_OPTIONS_UDP|Channel.SEND_OPTIONS_ASYNCHRONOUS);
+ //Thread.currentThread().sleep(10);
+ }
+ System.out.println("Thread["+this.getName()+"] sent "+msgCount+" messages in "+(System.currentTimeMillis()-start)+" ms.");
+ }catch ( Exception x ) {
+ x.printStackTrace();
+ return;
+ } finally {
+ threadCounter++;
+ }
+ }
+ };
+ }
+ for (int x=0; x<threads.length; x++ ) { threads[x].start();}
+ for (int x=0; x<threads.length; x++ ) { threads[x].join();}
+ //sleep for 50 sec, let the other messages in
+ long start = System.currentTimeMillis();
+ while ( (System.currentTimeMillis()-start)<25000 && msgCount*threadCount!=listener1.count.get()) Thread.sleep(500);
+ System.err.println("Finished NO_ACK ["+listener1.count+"]");
+ System.out.println("Sent "+counter.get()+ " messages. Received "+listener1.count+" Highest msg received:"+listener1.maxIdx);
+ System.out.print("Missing messages:");
+ printMissingMsgs(listener1.nrs,counter.get());
+ assertEquals("Checking success messages.",msgCount*threadCount,listener1.count.get());
+ }
+ public void testDataSendASYNC() throws Exception {
+ System.err.println("Starting ASYNC");
+ for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS|Channel.SEND_OPTIONS_UDP);
+ //sleep for 50 sec, let the other messages in
+ long start = System.currentTimeMillis();
+ while ( (System.currentTimeMillis()-start)<5000 && msgCount!=listener1.count.get()) Thread.sleep(500);
+ System.err.println("Finished ASYNC");
+ assertEquals("Checking success messages.",msgCount,listener1.count.get());
+ }
+
+ public void testDataSendACK() throws Exception {
+ System.err.println("Starting ACK");
+ for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_UDP);
+ Thread.sleep(250);
+ System.err.println("Finished ACK");
+ assertEquals("Checking success messages.",msgCount,listener1.count.get());
+ }
+
+ public void testDataSendSYNCACK() throws Exception {
+ System.err.println("Starting SYNC_ACK");
+ for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_UDP);
+ Thread.sleep(250);
+ System.err.println("Finished SYNC_ACK");
+ assertEquals("Checking success messages.",msgCount,listener1.count.get());
+ }
+
+ public static class Listener implements ChannelListener {
+ AtomicLong count = new AtomicLong(0);
+ int maxIdx = -1;
+ int[] nrs = new int[1000000];
+ public Listener() {
+ Arrays.fill(nrs, 0);
+ }
+ public boolean accept(Serializable s, Member m) {
+ return (s instanceof Data);
+ }
+
+ public void messageReceived(Serializable s, Member m) {
+ try {
+ Data d = (Data)s;
+ if ( !Data.verify(d) ) {
+ System.err.println("ERROR - Unable to verify data package");
+ } else {
+ long c = count.addAndGet(1);
+ if ((c%1000) ==0 ) {
+ System.err.println("SUCCESS:"+c);
+ }
+ int nr = d.getNumber();
+ if (nr>=0 && nr<nrs.length) {
+ maxIdx = Math.max(maxIdx, nr);
+ nrs[nr] = 1;
+ }
+ }
+ }catch (Exception x ) {
+ x.printStackTrace();
+ }
+ }
+ }
+
+ public static class Data implements Serializable {
+ public int length;
+ public byte[] data;
+ public byte key;
+ public boolean hasNr = false;
+ public static Random r = new Random(System.currentTimeMillis());
+ public static Data createRandomData() {
+ return createRandomData(ChannelReceiver.MAX_UDP_SIZE);
+ }
+ public static Data createRandomData(int size) {
+ return createRandomData(size,-1);
+ }
+
+ public static Data createRandomData(int size, int number) {
+ int i = r.nextInt();
+ i = ( i % 127 );
+ int length = Math.abs(r.nextInt() % size);
+ if (length<100) length += 100;
+ Data d = new Data();
+ d.length = length;
+ d.key = (byte)i;
+ d.data = new byte[length];
+ Arrays.fill(d.data,d.key);
+ if (number>0 && d.data.length>=4) {
+ //populate number
+ d.hasNr = true;
+ XByteBuffer.toBytes(number,d.data, 0);
+ }
+ return d;
+ }
+
+ public int getNumber() {
+ if (!hasNr) return -1;
+ return XByteBuffer.toInt(this.data, 0);
+ }
+
+ public static boolean verify(Data d) {
+ boolean result = (d.length == d.data.length);
+ for ( int i=(d.hasNr?4:0); result && (i<d.data.length); i++ ) result = result && d.data[i] == d.key;
+ return result;
+ }
+ }
+
+
+
+}
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.GroupChannel;
-import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator;
-import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
import junit.framework.TestCase;
import junit.framework.TestResult;
import junit.framework.TestSuite;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ManagedChannel;
import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.MembershipListener;
import org.apache.catalina.tribes.group.GroupChannel;
import junit.framework.TestCase;
import org.apache.catalina.tribes.ChannelListener;
public static class TestMsg implements Serializable {
static Random r = new Random(System.currentTimeMillis());
- HashMap map = new HashMap();
+ HashMap<Integer, ArrayList<Object>> map =
+ new HashMap<Integer, ArrayList<Object>>();
public TestMsg() {
int size = Math.abs(r.nextInt() % 200);
for (int i=0; i<size; i++ ) {
int length = Math.abs(r.nextInt() %65000);
- ArrayList list = new ArrayList(length);
+ ArrayList<Object> list = new ArrayList<Object>(length);
map.put(new Integer(i),list);
}
}
this.name = name;
}
- public ArrayList members = new ArrayList();
+ public ArrayList<Member> members = new ArrayList<Member>();
public void memberAdded(Member member) {
if (!members.contains(member)) {
members.add(member);
this.name = name;
}
- public ArrayList members = new ArrayList();
+ public ArrayList<Member> members = new ArrayList<Member>();
public void memberAdded(Member member) {
if (!members.contains(member)) {
members.add(member);
public void testTcpSendFailureMemberDrop() throws Exception {
System.out.println("testTcpSendFailureMemberDrop()");
clear();
- channel1.start(channel1.DEFAULT);
- channel2.start(channel2.DEFAULT);
+ channel1.start(Channel.DEFAULT);
+ channel2.start(Channel.DEFAULT);
//Thread.sleep(1000);
assertEquals("Expecting member count to be equal",mbrlist1.members.size(),mbrlist2.members.size());
- channel2.stop(channel2.SND_RX_SEQ);
+ channel2.stop(Channel.SND_RX_SEQ);
ByteMessage msg = new ByteMessage(new byte[1024]);
try {
channel1.send(channel1.getMembers(), msg, 0);
public void testTcpFailureMemberAdd() throws Exception {
System.out.println("testTcpFailureMemberAdd()");
clear();
- channel1.start(channel1.DEFAULT);
- channel2.start(channel2.SND_RX_SEQ);
- channel2.start(channel2.SND_TX_SEQ);
- channel2.start(channel2.MBR_RX_SEQ);
- channel2.stop(channel2.SND_RX_SEQ);
- channel2.start(channel2.MBR_TX_SEQ);
+ channel1.start(Channel.DEFAULT);
+ channel2.start(Channel.SND_RX_SEQ);
+ channel2.start(Channel.SND_TX_SEQ);
+ channel2.start(Channel.MBR_RX_SEQ);
+ channel2.stop(Channel.SND_RX_SEQ);
+ channel2.start(Channel.MBR_TX_SEQ);
//Thread.sleep(1000);
assertEquals("Expecting member count to not be equal",mbrlist1.members.size()+1,mbrlist2.members.size());
channel1.stop(Channel.DEFAULT);
public void testTcpMcastFail() throws Exception {
System.out.println("testTcpMcastFail()");
clear();
- channel1.start(channel1.DEFAULT);
- channel2.start(channel2.DEFAULT);
+ channel1.start(Channel.DEFAULT);
+ channel2.start(Channel.DEFAULT);
//Thread.sleep(1000);
assertEquals("Expecting member count to be equal",mbrlist1.members.size(),mbrlist2.members.size());
- channel2.stop(channel2.MBR_TX_SEQ);
+ channel2.stop(Channel.MBR_TX_SEQ);
ByteMessage msg = new ByteMessage(new byte[1024]);
try {
Thread.sleep(5000);
public TestMbrListener(String name) {
this.name = name;
}
- public ArrayList members = new ArrayList();
+ public ArrayList<Member> members = new ArrayList<Member>();
public void memberAdded(Member member) {
if ( !members.contains(member) ) {
members.add(member);
*/
package org.apache.catalina.tribes.test.transport;
-import java.io.OutputStream;
-import java.net.Socket;
import java.text.DecimalFormat;
import org.apache.catalina.tribes.transport.nio.NioSender;
import org.apache.catalina.tribes.membership.MemberImpl;
continue;
}
- Iterator it = selector.selectedKeys().iterator();
+ Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
- SelectionKey sk = (SelectionKey) it.next();
+ SelectionKey sk = it.next();
it.remove();
try {
int readyOps = sk.readyOps();
*/
package org.apache.catalina.tribes.test.transport;
-import java.io.OutputStream;
-import java.net.Socket;
import java.text.DecimalFormat;
import org.apache.catalina.tribes.transport.nio.NioSender;
import org.apache.catalina.tribes.membership.MemberImpl;
import java.nio.channels.Selector;
-import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.Member;
import java.nio.channels.SelectionKey;
import java.util.Iterator;
-import org.apache.catalina.tribes.Channel;
-import org.apache.catalina.tribes.io.ChannelData;
import java.math.BigDecimal;
import java.util.Arrays;
continue;
}
- Iterator it = selector.selectedKeys().iterator();
+ Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
- SelectionKey sk = (SelectionKey) it.next();
+ SelectionKey sk = it.next();
it.remove();
try {
int readyOps = sk.readyOps();