From: fhanik Date: Mon, 11 Sep 2006 14:47:47 +0000 (+0000) Subject: Added demos/simple use cases for the group com module X-Git-Url: https://git.internetallee.de/?a=commitdiff_plain;h=87b55c0c637a5e5464576ba57e9045ec01c9f79a;p=tomcat7.0 Added demos/simple use cases for the group com module git-svn-id: https://svn.apache.org/repos/asf/tomcat/tc6.0.x/trunk@442235 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/test/org/apache/catalina/tribes/demos/ChannelCreator.java b/test/org/apache/catalina/tribes/demos/ChannelCreator.java new file mode 100644 index 000000000..076505b2b --- /dev/null +++ b/test/org/apache/catalina/tribes/demos/ChannelCreator.java @@ -0,0 +1,253 @@ +/* + * Copyright 1999,2004-2006 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.tribes.demos; + +import java.util.Iterator; +import java.util.Properties; + +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ManagedChannel; +import org.apache.catalina.tribes.group.GroupChannel; +import org.apache.catalina.tribes.group.interceptors.FragmentationInterceptor; +import org.apache.catalina.tribes.group.interceptors.GzipInterceptor; +import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor; +import org.apache.catalina.tribes.group.interceptors.OrderInterceptor; +import org.apache.catalina.tribes.membership.McastService; +import org.apache.catalina.tribes.transport.MultiPointSender; +import org.apache.catalina.tribes.transport.ReceiverBase; +import org.apache.catalina.tribes.transport.ReplicationTransmitter; +import org.apache.catalina.tribes.group.interceptors.ThroughputInterceptor; +import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor; +import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector; +import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor; +import java.util.ArrayList; +import org.apache.catalina.tribes.membership.MemberImpl; +import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor; +import org.apache.catalina.tribes.Member; + +/** + *

Title:

+ * + *

Description:

+ * + * + *

Company:

+ * + * @author fhanik + * @version 1.0 + */ +public class ChannelCreator { + + org.apache.commons.logging.impl.LogFactoryImpl impl=null; + public static StringBuffer usage() { + StringBuffer buf = new StringBuffer(); + buf.append("\n\t\t[-bind tcpbindaddress]") + .append("\n\t\t[-tcpselto tcpselectortimeout]") + .append("\n\t\t[-tcpthreads tcpthreadcount]") + .append("\n\t\t[-port tcplistenport]") + .append("\n\t\t[-autobind tcpbindtryrange]") + .append("\n\t\t[-ackto acktimeout]") + .append("\n\t\t[-receiver org.apache.catalina.tribes.transport.nio.NioReceiver|org.apache.catalina.tribes.transport.bio.BioReceiver|]") + .append("\n\t\t[-transport org.apache.catalina.tribes.transport.nio.PooledParallelSender|org.apache.catalina.tribes.transport.bio.PooledMultiSender]") + .append("\n\t\t[-transport.xxx transport specific property]") + .append("\n\t\t[-maddr multicastaddr]") + .append("\n\t\t[-mport multicastport]") + .append("\n\t\t[-mbind multicastbindaddr]") + .append("\n\t\t[-mfreq multicastfrequency]") + .append("\n\t\t[-mdrop multicastdroptime]") + .append("\n\t\t[-gzip]") + .append("\n\t\t[-static hostname:port (-static localhost:9999 -static 127.0.0.1:8888 can be repeated)]") + .append("\n\t\t[-order]") + .append("\n\t\t[-ordersize maxorderqueuesize]") + .append("\n\t\t[-frag]") + .append("\n\t\t[-fragsize maxmsgsize]") + .append("\n\t\t[-throughput]") + .append("\n\t\t[-failuredetect]") + .append("\n\t\t[-async]") + .append("\n\t\t[-asyncsize maxqueuesizeinkilobytes]"); + return buf; + + } + + public static Channel createChannel(String[] args) throws Exception { + String bind = "auto"; + int port = 4001; + String mbind = null; + boolean gzip = false; + int tcpseltimeout = 5000; + int tcpthreadcount = 4; + int acktimeout = 15000; + String mcastaddr = "228.0.0.5"; + int mcastport = 45565; + long mcastfreq = 500; + long mcastdrop = 2000; + boolean order = false; + int ordersize = Integer.MAX_VALUE; + boolean frag = false; + int fragsize = 1024; + int autoBind = 10; + ArrayList staticMembers = new ArrayList(); + Properties transportProperties = new Properties(); + String transport = "org.apache.catalina.tribes.transport.nio.PooledParallelSender"; + String receiver = "org.apache.catalina.tribes.transport.nio.NioReceiver"; + boolean async = false; + int asyncsize = 1024*1024*50; //50MB + boolean throughput = false; + boolean failuredetect = false; + + for (int i = 0; i < args.length; i++) { + if ("-bind".equals(args[i])) { + bind = args[++i]; + } else if ("-port".equals(args[i])) { + port = Integer.parseInt(args[++i]); + } else if ("-autobind".equals(args[i])) { + autoBind = Integer.parseInt(args[++i]); + } else if ("-tcpselto".equals(args[i])) { + tcpseltimeout = Integer.parseInt(args[++i]); + } else if ("-tcpthreads".equals(args[i])) { + tcpthreadcount = Integer.parseInt(args[++i]); + } else if ("-gzip".equals(args[i])) { + gzip = true; + } else if ("-async".equals(args[i])) { + async = true; + } else if ("-failuredetect".equals(args[i])) { + failuredetect = true; + } else if ("-asyncsize".equals(args[i])) { + asyncsize = Integer.parseInt(args[++i]); + System.out.println("Setting MessageDispatchInterceptor.maxQueueSize="+asyncsize); + } else if ("-static".equals(args[i])) { + String d = args[++i]; + String h = d.substring(0,d.indexOf(":")); + String p = d.substring(h.length()+1); + MemberImpl m = new MemberImpl(h,Integer.parseInt(p),2000); + staticMembers.add(m); + } else if ("-throughput".equals(args[i])) { + throughput = true; + } else if ("-order".equals(args[i])) { + order = true; + } else if ("-ordersize".equals(args[i])) { + ordersize = Integer.parseInt(args[++i]); + System.out.println("Setting OrderInterceptor.maxQueue="+ordersize); + } else if ("-frag".equals(args[i])) { + frag = true; + } else if ("-fragsize".equals(args[i])) { + fragsize = Integer.parseInt(args[++i]); + System.out.println("Setting FragmentationInterceptor.maxSize="+fragsize); + } else if ("-ackto".equals(args[i])) { + acktimeout = Integer.parseInt(args[++i]); + } else if ("-transport".equals(args[i])) { + transport = args[++i]; + } else if (args[i]!=null && args[i].startsWith("transport.")) { + String key = args[i]; + String val = args[++i]; + transportProperties.setProperty(key,val); + } else if ("-receiver".equals(args[i])) { + receiver = args[++i]; + } else if ("-maddr".equals(args[i])) { + mcastaddr = args[++i]; + } else if ("-mport".equals(args[i])) { + mcastport = Integer.parseInt(args[++i]); + } else if ("-mfreq".equals(args[i])) { + mcastfreq = Long.parseLong(args[++i]); + } else if ("-mdrop".equals(args[i])) { + mcastdrop = Long.parseLong(args[++i]); + } else if ("-mbind".equals(args[i])) { + mbind = args[++i]; + } + } + + System.out.println("Creating receiver class="+receiver); + Class cl = Class.forName(receiver,true,ChannelCreator.class.getClassLoader()); + ReceiverBase rx = (ReceiverBase)cl.newInstance(); + rx.setTcpListenAddress(bind); + rx.setTcpListenPort(port); + rx.setTcpSelectorTimeout(tcpseltimeout); + rx.setTcpThreadCount(tcpthreadcount); + rx.getBind(); + rx.setRxBufSize(43800); + rx.setTxBufSize(25188); + rx.setAutoBind(autoBind); + + + ReplicationTransmitter ps = new ReplicationTransmitter(); + System.out.println("Creating transport class="+transport); + MultiPointSender sender = (MultiPointSender)Class.forName(transport,true,ChannelCreator.class.getClassLoader()).newInstance(); + sender.setTimeout(acktimeout); + sender.setMaxRetryAttempts(2); + sender.setRxBufSize(43800); + sender.setTxBufSize(25188); + + Iterator i = transportProperties.keySet().iterator(); + while ( i.hasNext() ) { + String key = (String)i.next(); + IntrospectionUtils.setProperty(sender,key,transportProperties.getProperty(key)); + } + ps.setTransport(sender); + + McastService service = new McastService(); + service.setMcastAddr(mcastaddr); + if (mbind != null) service.setMcastBindAddress(mbind); + service.setMcastFrequency(mcastfreq); + service.setMcastDropTime(mcastdrop); + service.setMcastPort(mcastport); + + ManagedChannel channel = new GroupChannel(); + channel.setChannelReceiver(rx); + channel.setChannelSender(ps); + channel.setMembershipService(service); + + if ( throughput ) channel.addInterceptor(new ThroughputInterceptor()); + if (gzip) channel.addInterceptor(new GzipInterceptor()); + if ( frag ) { + FragmentationInterceptor fi = new FragmentationInterceptor(); + fi.setMaxSize(fragsize); + channel.addInterceptor(fi); + } + if (order) { + OrderInterceptor oi = new OrderInterceptor(); + oi.setMaxQueue(ordersize); + channel.addInterceptor(oi); + } + + if ( async ) { + MessageDispatchInterceptor mi = new MessageDispatch15Interceptor(); + mi.setMaxQueueSize(asyncsize); + channel.addInterceptor(mi); + System.out.println("Added MessageDispatchInterceptor"); + } + + if ( failuredetect ) { + TcpFailureDetector tcpfi = new TcpFailureDetector(); + channel.addInterceptor(tcpfi); + } + if ( staticMembers.size() > 0 ) { + StaticMembershipInterceptor smi = new StaticMembershipInterceptor(); + for (int x=0; x= 1 && (!"quit".equalsIgnoreCase(args[0]))) { + if ("start".equalsIgnoreCase(args[0])) { + cmdStart(args); + } else if ("stop".equalsIgnoreCase(args[0])) { + cmdStop(args); + + } + printScreen(); + l = reader.readLine(); + args = tokenize(l); + } + for ( int i=0; i= 0 ) { + setSystemStatus("Stopping member:"+(index+1)); + status[index].stop(); + setSystemStatus("Member stopped:"+(index+1)); + } + } + } + + private void cmdStart(String[] args) { + if ( args.length == 1 ) { + setSystemStatus("System starting up..."); + Thread[] t = new Thread[CHANNEL_COUNT]; + for (int i = 0; i < status.length; i++) { + final int j = i; + t[j] = new Thread() { + public void run() { + status[j].start(); + } + }; + } + for (int i = 0; i < status.length; i++) if (MULTI_THREAD ) t[i].start(); else t[i].run(); + setSystemStatus("System started."); + } else { + int index = -1; + try { index = Integer.parseInt(args[1])-1;}catch ( Exception x ) {setSystemStatus("Invalid index:"+args[1]);} + if ( index >= 0 ) { + setSystemStatus("Starting member:"+(index+1)); + status[index].start(); + setSystemStatus("Member started:"+(index+1)); + } + } + } + + public void setSystemStatus(String status) { + statusLine.delete(0,statusLine.length()); + statusLine.append(status); + } + + + + public static void setEvents(String events) { + java.util.Arrays.fill(VIEW_EVENTS,false); + StringTokenizer t = new StringTokenizer(events,","); + while (t.hasMoreTokens() ) { + int idx = Integer.parseInt(t.nextToken()); + VIEW_EVENTS[idx] = true; + } + } + + public static void run(String[] args,CoordinationDemo demo) throws Exception { + usage(); + java.util.Arrays.fill(VIEW_EVENTS,true); + + for (int i=0; i starts demo single threaded start/stop with 5 channels"); + System.out.println("\tjava o.a.c.t.d.CoordinationDemo -c 10 -> starts demo single threaded start/stop with 10 channels"); + System.out.println("\tjava o.a.c.t.d.CoordinationDemo -c 7 -t true -s 1000 -sc 50-> starts demo multi threaded start/stop with 7 channels and 1 second sleep time between events and 50 lines to clear screen"); + System.out.println("\tjava o.a.c.t.d.CoordinationDemo -t true -p 12 -> starts demo multi threaded start/stop with 5 channels and only prints the EVT_CONF_RX event"); + System.out.println(); + } + public static void main(String[] args) throws Exception { + CoordinationDemo demo = new CoordinationDemo(); + run(args,demo); + } + + public static String leftfill(String value, int length, String ch) { + return fill(value,length,ch,true); + } + + public static String rightfill(String value, int length, String ch) { + return fill(value,length,ch,false); + } + + public static String fill(String value, int length, String ch, boolean left) { + StringBuffer buf = new StringBuffer(); + if ( !left ) buf.append(value.trim()); + for (int i=value.trim().length(); iTitle:

+ * + *

Description:

+ * + *

Copyright: Copyright (c) 2005

+ * + *

Company:

+ * + * @author not attributable + * @version 1.0 + */ +public class EchoRpcTest implements RpcCallback, Runnable { + + Channel channel; + int count; + String message; + long pause; + RpcChannel rpc; + int options; + long timeout; + String name; + + public EchoRpcTest(Channel channel, String name, int count, String message, long pause, int options, long timeout) { + this.channel = channel; + this.count = count; + this.message = message; + this.pause = pause; + this.options = options; + this.rpc = new RpcChannel(name.getBytes(),channel,this); + this.timeout = timeout; + this.name = name; + } + + /** + * If the reply has already been sent to the requesting thread, the rpc + * callback can handle any data that comes in after the fact. + * + * @param msg Serializable + * @param sender Member + * @todo Implement this org.apache.catalina.tribes.tipis.RpcCallback + * method + */ + public void leftOver(Serializable msg, Member sender) { + System.out.println("Received a left over message from ["+sender.getName()+"] with data ["+msg+"]"); + } + + /** + * + * @param msg Serializable + * @param sender Member + * @return Serializable - null if no reply should be sent + * @todo Implement this org.apache.catalina.tribes.tipis.RpcCallback + * method + */ + public Serializable replyRequest(Serializable msg, Member sender) { + System.out.println("Received a reply request message from ["+sender.getName()+"] with data ["+msg+"]"); + return "Reply("+name+"):"+msg; + } + + public void run() { + long counter = 0; + while (counter 1) + d("setProperty(" + o.getClass() + " " + name + "=" + value + ")"); + + String setter = "set" + capitalize(name); + + try { + Method methods[] = findMethods(o.getClass()); + Method setPropertyMethod = null; + + // First, the ideal case - a setFoo( String ) method + for (int i = 0; i < methods.length; i++) { + Class paramT[] = methods[i].getParameterTypes(); + if (setter.equals(methods[i].getName()) && paramT.length == 1 + && "java.lang.String".equals(paramT[0].getName())) { + + methods[i].invoke(o, new Object[] { value }); + return; + } + } + + // Try a setFoo ( int ) or ( boolean ) + for (int i = 0; i < methods.length; i++) { + boolean ok = true; + if (setter.equals(methods[i].getName()) + && methods[i].getParameterTypes().length == 1) { + + // match - find the type and invoke it + Class paramType = methods[i].getParameterTypes()[0]; + Object params[] = new Object[1]; + + // Try a setFoo ( int ) + if ("java.lang.Integer".equals(paramType.getName()) + || "int".equals(paramType.getName())) { + try { + params[0] = new Integer(value); + } catch (NumberFormatException ex) { + ok = false; + } + // Try a setFoo ( long ) + }else if ("java.lang.Long".equals(paramType.getName()) + || "long".equals(paramType.getName())) { + try { + params[0] = new Long(value); + } catch (NumberFormatException ex) { + ok = false; + } + + // Try a setFoo ( boolean ) + } else if ("java.lang.Boolean".equals(paramType.getName()) + || "boolean".equals(paramType.getName())) { + params[0] = new Boolean(value); + + // Try a setFoo ( InetAddress ) + } else if ("java.net.InetAddress".equals(paramType + .getName())) { + try { + params[0] = InetAddress.getByName(value); + } catch (UnknownHostException exc) { + d("Unable to resolve host name:" + value); + ok = false; + } + + // Unknown type + } else { + d("Unknown type " + paramType.getName()); + } + + if (ok) { + methods[i].invoke(o, params); + return; + } + } + + // save "setProperty" for later + if ("setProperty".equals(methods[i].getName())) { + setPropertyMethod = methods[i]; + } + } + + // Ok, no setXXX found, try a setProperty("name", "value") + if (setPropertyMethod != null) { + Object params[] = new Object[2]; + params[0] = name; + params[1] = value; + setPropertyMethod.invoke(o, params); + } + + } catch (IllegalArgumentException ex2) { + log.warn("IAE " + o + " " + name + " " + value, ex2); + } catch (SecurityException ex1) { + if (dbg > 0) + d("SecurityException for " + o.getClass() + " " + name + "=" + + value + ")"); + if (dbg > 1) + ex1.printStackTrace(); + } catch (IllegalAccessException iae) { + if (dbg > 0) + d("IllegalAccessException for " + o.getClass() + " " + name + + "=" + value + ")"); + if (dbg > 1) + iae.printStackTrace(); + } catch (InvocationTargetException ie) { + if (dbg > 0) + d("InvocationTargetException for " + o.getClass() + " " + name + + "=" + value + ")"); + if (dbg > 1) + ie.printStackTrace(); + } + } + + public static Object getProperty(Object o, String name) { + String getter = "get" + capitalize(name); + String isGetter = "is" + capitalize(name); + + try { + Method methods[] = findMethods(o.getClass()); + Method getPropertyMethod = null; + + // First, the ideal case - a getFoo() method + for (int i = 0; i < methods.length; i++) { + Class paramT[] = methods[i].getParameterTypes(); + if (getter.equals(methods[i].getName()) && paramT.length == 0) { + return methods[i].invoke(o, (Object[]) null); + } + if (isGetter.equals(methods[i].getName()) && paramT.length == 0) { + return methods[i].invoke(o, (Object[]) null); + } + + if ("getProperty".equals(methods[i].getName())) { + getPropertyMethod = methods[i]; + } + } + + // Ok, no setXXX found, try a getProperty("name") + if (getPropertyMethod != null) { + Object params[] = new Object[1]; + params[0] = name; + return getPropertyMethod.invoke(o, params); + } + + } catch (IllegalArgumentException ex2) { + log.warn("IAE " + o + " " + name, ex2); + } catch (SecurityException ex1) { + if (dbg > 0) + d("SecurityException for " + o.getClass() + " " + name + ")"); + if (dbg > 1) + ex1.printStackTrace(); + } catch (IllegalAccessException iae) { + if (dbg > 0) + d("IllegalAccessException for " + o.getClass() + " " + name + + ")"); + if (dbg > 1) + iae.printStackTrace(); + } catch (InvocationTargetException ie) { + if (dbg > 0) + d("InvocationTargetException for " + o.getClass() + " " + name + + ")"); + if (dbg > 1) + ie.printStackTrace(); + } + return null; + } + + /** + */ + public static void setProperty(Object o, String name) { + String setter = "set" + capitalize(name); + try { + Method methods[] = findMethods(o.getClass()); + Method setPropertyMethod = null; + // find setFoo() method + for (int i = 0; i < methods.length; i++) { + Class paramT[] = methods[i].getParameterTypes(); + if (setter.equals(methods[i].getName()) && paramT.length == 0) { + methods[i].invoke(o, new Object[] {}); + return; + } + } + } catch (Exception ex1) { + if (dbg > 0) + d("Exception for " + o.getClass() + " " + name); + if (dbg > 1) + ex1.printStackTrace(); + } + } + + /** + * Replace ${NAME} with the property value + * + * @deprecated Use the explicit method + */ + public static String replaceProperties(String value, Object getter) { + if (getter instanceof Hashtable) + return replaceProperties(value, (Hashtable) getter, null); + + if (getter instanceof PropertySource) { + PropertySource src[] = new PropertySource[] { (PropertySource) getter }; + return replaceProperties(value, null, src); + } + return value; + } + + /** + * Replace ${NAME} with the property value + */ + public static String replaceProperties(String value, Hashtable staticProp, + PropertySource dynamicProp[]) { + StringBuffer sb = new StringBuffer(); + int prev = 0; + // assert value!=nil + int pos; + while ((pos = value.indexOf("$", prev)) >= 0) { + if (pos > 0) { + sb.append(value.substring(prev, pos)); + } + if (pos == (value.length() - 1)) { + sb.append('$'); + prev = pos + 1; + } else if (value.charAt(pos + 1) != '{') { + sb.append('$'); + prev = pos + 1; // XXX + } else { + int endName = value.indexOf('}', pos); + if (endName < 0) { + sb.append(value.substring(pos)); + prev = value.length(); + continue; + } + String n = value.substring(pos + 2, endName); + String v = null; + if (staticProp != null) { + v = (String) ((Hashtable) staticProp).get(n); + } + if (v == null && dynamicProp != null) { + for (int i = 0; i < dynamicProp.length; i++) { + v = dynamicProp[i].getProperty(n); + if (v != null) { + break; + } + } + } + if (v == null) + v = "${" + n + "}"; + + sb.append(v); + prev = endName + 1; + } + } + if (prev < value.length()) + sb.append(value.substring(prev)); + return sb.toString(); + } + + /** + * Reverse of Introspector.decapitalize + */ + public static String capitalize(String name) { + if (name == null || name.length() == 0) { + return name; + } + char chars[] = name.toCharArray(); + chars[0] = Character.toUpperCase(chars[0]); + return new String(chars); + } + + public static String unCapitalize(String name) { + if (name == null || name.length() == 0) { + return name; + } + char chars[] = name.toCharArray(); + chars[0] = Character.toLowerCase(chars[0]); + return new String(chars); + } + + // -------------------- Class path tools -------------------- + + /** + * Add all the jar files in a dir to the classpath, represented as a Vector + * of URLs. + */ + public static void addToClassPath(Vector cpV, String dir) { + try { + String cpComp[] = getFilesByExt(dir, ".jar"); + if (cpComp != null) { + int jarCount = cpComp.length; + for (int i = 0; i < jarCount; i++) { + URL url = getURL(dir, cpComp[i]); + if (url != null) + cpV.addElement(url); + } + } + } catch (Exception ex) { + ex.printStackTrace(); + } + } + + public static void addToolsJar(Vector v) { + try { + // Add tools.jar in any case + File f = new File(System.getProperty("java.home") + + "/../lib/tools.jar"); + + if (!f.exists()) { + // On some systems java.home gets set to the root of jdk. + // That's a bug, but we can work around and be nice. + f = new File(System.getProperty("java.home") + "/lib/tools.jar"); + if (f.exists()) { + if (log.isDebugEnabled()) + log.debug("Detected strange java.home value " + + System.getProperty("java.home") + + ", it should point to jre"); + } + } + URL url = new URL("file", "", f.getAbsolutePath()); + + v.addElement(url); + } catch (MalformedURLException ex) { + ex.printStackTrace(); + } + } + + /** + * Return all files with a given extension in a dir + */ + public static String[] getFilesByExt(String ld, String ext) { + File dir = new File(ld); + String[] names = null; + final String lext = ext; + if (dir.isDirectory()) { + names = dir.list(new FilenameFilter() { + public boolean accept(File d, String name) { + if (name.endsWith(lext)) { + return true; + } + return false; + } + }); + } + return names; + } + + /** + * Construct a file url from a file, using a base dir + */ + public static URL getURL(String base, String file) { + try { + File baseF = new File(base); + File f = new File(baseF, file); + String path = f.getCanonicalPath(); + if (f.isDirectory()) { + path += "/"; + } + if (!f.exists()) + return null; + return new URL("file", "", path); + } catch (Exception ex) { + ex.printStackTrace(); + return null; + } + } + + /** + * Add elements from the classpath cp to a Vector jars as + * file URLs (We use Vector for JDK 1.1 compat). + *

+ * + * @param jars The jar list + * @param cp a String classpath of directory or jar file elements + * separated by path.separator delimiters. + * @throws IOException If an I/O error occurs + * @throws MalformedURLException Doh ;) + */ + public static void addJarsFromClassPath(Vector jars, String cp) + throws IOException, MalformedURLException { + String sep = System.getProperty("path.separator"); + String token; + StringTokenizer st; + if (cp != null) { + st = new StringTokenizer(cp, sep); + while (st.hasMoreTokens()) { + File f = new File(st.nextToken()); + String path = f.getCanonicalPath(); + if (f.isDirectory()) { + path += "/"; + } + URL url = new URL("file", "", path); + if (!jars.contains(url)) { + jars.addElement(url); + } + } + } + } + + /** + * Return a URL[] that can be used to construct a class loader + */ + public static URL[] getClassPath(Vector v) { + URL[] urls = new URL[v.size()]; + for (int i = 0; i < v.size(); i++) { + urls[i] = (URL) v.elementAt(i); + } + return urls; + } + + /** + * Construct a URL classpath from files in a directory, a cpath property, + * and tools.jar. + */ + public static URL[] getClassPath(String dir, String cpath, + String cpathProp, boolean addTools) throws IOException, + MalformedURLException { + Vector jarsV = new Vector(); + if (dir != null) { + // Add dir/classes first, if it exists + URL url = getURL(dir, "classes"); + if (url != null) + jarsV.addElement(url); + addToClassPath(jarsV, dir); + } + + if (cpath != null) + addJarsFromClassPath(jarsV, cpath); + + if (cpathProp != null) { + String cpath1 = System.getProperty(cpathProp); + addJarsFromClassPath(jarsV, cpath1); + } + + if (addTools) + addToolsJar(jarsV); + + return getClassPath(jarsV); + } + + // -------------------- Mapping command line params to setters + + public static boolean processArgs(Object proxy, String args[]) + throws Exception { + String args0[] = null; + if (null != findMethod(proxy.getClass(), "getOptions1", new Class[] {})) { + args0 = (String[]) callMethod0(proxy, "getOptions1"); + } + + if (args0 == null) { + //args0=findVoidSetters(proxy.getClass()); + args0 = findBooleanSetters(proxy.getClass()); + } + Hashtable h = null; + if (null != findMethod(proxy.getClass(), "getOptionAliases", + new Class[] {})) { + h = (Hashtable) callMethod0(proxy, "getOptionAliases"); + } + return processArgs(proxy, args, args0, null, h); + } + + public static boolean processArgs(Object proxy, String args[], + String args0[], String args1[], Hashtable aliases) throws Exception { + for (int i = 0; i < args.length; i++) { + String arg = args[i]; + if (arg.startsWith("-")) + arg = arg.substring(1); + if (aliases != null && aliases.get(arg) != null) + arg = (String) aliases.get(arg); + + if (args0 != null) { + boolean set = false; + for (int j = 0; j < args0.length; j++) { + if (args0[j].equalsIgnoreCase(arg)) { + setProperty(proxy, args0[j], "true"); + set = true; + break; + } + } + if (set) + continue; + } + if (args1 != null) { + for (int j = 0; j < args1.length; j++) { + if (args1[j].equalsIgnoreCase(arg)) { + i++; + if (i >= args.length) + return false; + setProperty(proxy, arg, args[i]); + break; + } + } + } else { + // if args1 is not specified,assume all other options have param + i++; + if (i >= args.length) + return false; + setProperty(proxy, arg, args[i]); + } + + } + return true; + } + + // -------------------- other utils -------------------- + public static void clear() { + objectMethods.clear(); + } + + public static String[] findVoidSetters(Class c) { + Method m[] = findMethods(c); + if (m == null) + return null; + Vector v = new Vector(); + for (int i = 0; i < m.length; i++) { + if (m[i].getName().startsWith("set") + && m[i].getParameterTypes().length == 0) { + String arg = m[i].getName().substring(3); + v.addElement(unCapitalize(arg)); + } + } + String s[] = new String[v.size()]; + for (int i = 0; i < s.length; i++) { + s[i] = (String) v.elementAt(i); + } + return s; + } + + public static String[] findBooleanSetters(Class c) { + Method m[] = findMethods(c); + if (m == null) + return null; + Vector v = new Vector(); + for (int i = 0; i < m.length; i++) { + if (m[i].getName().startsWith("set") + && m[i].getParameterTypes().length == 1 + && "boolean".equalsIgnoreCase(m[i].getParameterTypes()[0] + .getName())) { + String arg = m[i].getName().substring(3); + v.addElement(unCapitalize(arg)); + } + } + String s[] = new String[v.size()]; + for (int i = 0; i < s.length; i++) { + s[i] = (String) v.elementAt(i); + } + return s; + } + + static Hashtable objectMethods = new Hashtable(); + + public static Method[] findMethods(Class c) { + Method methods[] = (Method[]) objectMethods.get(c); + if (methods != null) + return methods; + + methods = c.getMethods(); + objectMethods.put(c, methods); + return methods; + } + + public static Method findMethod(Class c, String name, Class params[]) { + Method methods[] = findMethods(c); + if (methods == null) + return null; + for (int i = 0; i < methods.length; i++) { + if (methods[i].getName().equals(name)) { + Class methodParams[] = methods[i].getParameterTypes(); + if (methodParams == null) + if (params == null || params.length == 0) + return methods[i]; + if (params == null) + if (methodParams == null || methodParams.length == 0) + return methods[i]; + if (params.length != methodParams.length) + continue; + boolean found = true; + for (int j = 0; j < params.length; j++) { + if (params[j] != methodParams[j]) { + found = false; + break; + } + } + if (found) + return methods[i]; + } + } + return null; + } + + /** Test if the object implements a particular + * method + */ + public static boolean hasHook(Object obj, String methodN) { + try { + Method myMethods[] = findMethods(obj.getClass()); + for (int i = 0; i < myMethods.length; i++) { + if (methodN.equals(myMethods[i].getName())) { + // check if it's overriden + Class declaring = myMethods[i].getDeclaringClass(); + Class parentOfDeclaring = declaring.getSuperclass(); + // this works only if the base class doesn't extend + // another class. + + // if the method is declared in a top level class + // like BaseInterceptor parent is Object, otherwise + // parent is BaseInterceptor or an intermediate class + if (!"java.lang.Object".equals(parentOfDeclaring.getName())) { + return true; + } + } + } + } catch (Exception ex) { + ex.printStackTrace(); + } + return false; + } + + public static void callMain(Class c, String args[]) throws Exception { + Class p[] = new Class[1]; + p[0] = args.getClass(); + Method m = c.getMethod("main", p); + m.invoke(c, new Object[] { args }); + } + + public static Object callMethod1(Object target, String methodN, + Object param1, String typeParam1, ClassLoader cl) throws Exception { + if (target == null || param1 == null) { + d("Assert: Illegal params " + target + " " + param1); + } + if (dbg > 0) + d("callMethod1 " + target.getClass().getName() + " " + + param1.getClass().getName() + " " + typeParam1); + + Class params[] = new Class[1]; + if (typeParam1 == null) + params[0] = param1.getClass(); + else + params[0] = cl.loadClass(typeParam1); + Method m = findMethod(target.getClass(), methodN, params); + if (m == null) + throw new NoSuchMethodException(target.getClass().getName() + " " + + methodN); + return m.invoke(target, new Object[] { param1 }); + } + + public static Object callMethod0(Object target, String methodN) + throws Exception { + if (target == null) { + d("Assert: Illegal params " + target); + return null; + } + if (dbg > 0) + d("callMethod0 " + target.getClass().getName() + "." + methodN); + + Class params[] = new Class[0]; + Method m = findMethod(target.getClass(), methodN, params); + if (m == null) + throw new NoSuchMethodException(target.getClass().getName() + " " + + methodN); + return m.invoke(target, emptyArray); + } + + static Object[] emptyArray = new Object[] {}; + + public static Object callMethodN(Object target, String methodN, + Object params[], Class typeParams[]) throws Exception { + Method m = null; + m = findMethod(target.getClass(), methodN, typeParams); + if (m == null) { + d("Can't find method " + methodN + " in " + target + " CLASS " + + target.getClass()); + return null; + } + Object o = m.invoke(target, params); + + if (dbg > 0) { + // debug + StringBuffer sb = new StringBuffer(); + sb.append("" + target.getClass().getName() + "." + methodN + "( "); + for (int i = 0; i < params.length; i++) { + if (i > 0) + sb.append(", "); + sb.append(params[i]); + } + sb.append(")"); + d(sb.toString()); + } + return o; + } + + public static Object convert(String object, Class paramType) { + Object result = null; + if ("java.lang.String".equals(paramType.getName())) { + result = object; + } else if ("java.lang.Integer".equals(paramType.getName()) + || "int".equals(paramType.getName())) { + try { + result = new Integer(object); + } catch (NumberFormatException ex) { + } + // Try a setFoo ( boolean ) + } else if ("java.lang.Boolean".equals(paramType.getName()) + || "boolean".equals(paramType.getName())) { + result = new Boolean(object); + + // Try a setFoo ( InetAddress ) + } else if ("java.net.InetAddress".equals(paramType + .getName())) { + try { + result = InetAddress.getByName(object); + } catch (UnknownHostException exc) { + d("Unable to resolve host name:" + object); + } + + // Unknown type + } else { + d("Unknown type " + paramType.getName()); + } + if (result == null) { + throw new IllegalArgumentException("Can't convert argument: " + object); + } + return result; + } + + // -------------------- Get property -------------------- + // This provides a layer of abstraction + + public static interface PropertySource { + + public String getProperty(String key); + + } + + public static interface AttributeHolder { + + public void setAttribute(String key, Object o); + + } + + // debug -------------------- + static final int dbg = 0; + + static void d(String s) { + if (log.isDebugEnabled()) + log.debug("IntrospectionUtils: " + s); + } +} diff --git a/test/org/apache/catalina/tribes/demos/LoadTest.java b/test/org/apache/catalina/tribes/demos/LoadTest.java new file mode 100644 index 000000000..d2f7dc77c --- /dev/null +++ b/test/org/apache/catalina/tribes/demos/LoadTest.java @@ -0,0 +1,424 @@ +/* + * Copyright 1999,2004-2006 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.tribes.demos; + +import java.io.Serializable; +import java.util.Random; + +import org.apache.catalina.tribes.ByteMessage; +import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelListener; +import org.apache.catalina.tribes.ManagedChannel; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.MembershipListener; +import org.apache.catalina.tribes.io.XByteBuffer; +import org.apache.catalina.tribes.Channel; +import java.io.Externalizable; + + +/** + *

Title:

+ * + *

Description:

+ * + *

Copyright: Copyright (c) 2005

+ * + *

Company:

+ * + * @author not attributable + * @version 1.0 + */ +public class LoadTest implements MembershipListener,ChannelListener, Runnable { + protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(LoadTest.class); + public static int size = 24000; + public static Object mutex = new Object(); + public boolean doRun = true; + + public long bytesReceived = 0; + public float mBytesReceived = 0; + public int messagesReceived = 0; + public boolean send = true; + public boolean debug = false; + public int msgCount = 100; + ManagedChannel channel=null; + public int statsInterval = 10000; + public long pause = 0; + public boolean breakonChannelException = false; + public boolean async = false; + public long receiveStart = 0; + public int channelOptions = Channel.SEND_OPTIONS_DEFAULT; + + static int messageSize = 0; + + public static long messagesSent = 0; + public static long messageStartSendTime = 0; + public static long messageEndSendTime = 0; + public static int threadCount = 0; + + public static synchronized void startTest() { + threadCount++; + if ( messageStartSendTime == 0 ) messageStartSendTime = System.currentTimeMillis(); + } + + public static synchronized void endTest() { + threadCount--; + if ( messageEndSendTime == 0 && threadCount==0 ) messageEndSendTime = System.currentTimeMillis(); + } + + + public static synchronized long addSendStats(long count) { + messagesSent+=count; + return 0l; + } + + private static void printSendStats(long counter, int messageSize) { + float cnt = (float)counter; + float size = (float)messageSize; + float time = (float)(System.currentTimeMillis()-messageStartSendTime) / 1000f; + log.info("****SEND STATS-"+Thread.currentThread().getName()+"*****"+ + "\n\tMessage count:"+counter+ + "\n\tTotal bytes :"+(long)(size*cnt)+ + "\n\tTotal seconds:"+(time)+ + "\n\tBytes/second :"+(size*cnt/time)+ + "\n\tMBytes/second:"+(size*cnt/time/1024f/1024f)); + } + + + + public LoadTest(ManagedChannel channel, + boolean send, + int msgCount, + boolean debug, + long pause, + int stats, + boolean breakOnEx) { + this.channel = channel; + this.send = send; + this.msgCount = msgCount; + this.debug = debug; + this.pause = pause; + this.statsInterval = stats; + this.breakonChannelException = breakOnEx; + } + + + + public void run() { + + long counter = 0; + long total = 0; + LoadMessage msg = new LoadMessage(); + int messageSize = LoadTest.messageSize; + + try { + startTest(); + while (total < msgCount) { + if (channel.getMembers().length == 0 || (!send)) { + synchronized (mutex) { + try { + mutex.wait(); + } catch (InterruptedException x) { + log.info("Thread interrupted from wait"); + } + } + } else { + try { + //msg.setMsgNr((int)++total); + counter++; + if (debug) { + printArray(msg.getMessage()); + } + channel.send(channel.getMembers(), msg, channelOptions); + if ( pause > 0 ) { + if ( debug) System.out.println("Pausing sender for "+pause+" ms."); + Thread.sleep(pause); + } + } catch (ChannelException x) { + if ( debug ) log.error("Unable to send message:"+x.getMessage(),x); + log.error("Unable to send message:"+x.getMessage()); + ChannelException.FaultyMember[] faulty = x.getFaultyMembers(); + for (int i=0; i 0)) { + //add to the global counter + counter = addSendStats(counter); + //print from the global counter + //printSendStats(LoadTest.messagesSent, LoadTest.messageSize, LoadTest.messageSendTime); + printSendStats(LoadTest.messagesSent, LoadTest.messageSize); + + } + + } + }catch ( Exception x ) { + log.error("Captured error while sending:"+x.getMessage()); + if ( debug ) log.error("",x); + printSendStats(LoadTest.messagesSent, LoadTest.messageSize); + } + endTest(); + } + + + + /** + * memberAdded + * + * @param member Member + * @todo Implement this org.apache.catalina.tribes.MembershipListener + * method + */ + public void memberAdded(Member member) { + log.info("Member added:"+member); + synchronized (mutex) { + mutex.notifyAll(); + } + } + + /** + * memberDisappeared + * + * @param member Member + * @todo Implement this org.apache.catalina.tribes.MembershipListener + * method + */ + public void memberDisappeared(Member member) { + log.info("Member disappeared:"+member); + } + + public boolean accept(Serializable msg, Member mbr){ + return (msg instanceof LoadMessage) || (msg instanceof ByteMessage); + } + + public void messageReceived(Serializable msg, Member mbr){ + if ( receiveStart == 0 ) receiveStart = System.currentTimeMillis(); + if ( debug ) { + if ( msg instanceof LoadMessage ) { + printArray(((LoadMessage)msg).getMessage()); + } + } + + if ( msg instanceof ByteMessage && !(msg instanceof LoadMessage)) { + LoadMessage tmp = new LoadMessage(); + tmp.setMessage(((ByteMessage)msg).getMessage()); + msg = tmp; + tmp = null; + } + + + bytesReceived+=((LoadMessage)msg).getMessage().length; + mBytesReceived+=((float)((LoadMessage)msg).getMessage().length)/1024f/1024f; + messagesReceived++; + if ( (messagesReceived%statsInterval)==0 || (messagesReceived==msgCount)) { + float bytes = (float)(((LoadMessage)msg).getMessage().length*messagesReceived); + float seconds = ((float)(System.currentTimeMillis()-receiveStart)) / 1000f; + log.info("****RECEIVE STATS-"+Thread.currentThread().getName()+"*****"+ + "\n\tMessage count :"+(long)messagesReceived+ + "\n\tTotal bytes :"+(long)bytes+ + "\n\tTotal mbytes :"+(long)mBytesReceived+ + "\n\tTime since 1st:"+seconds+" seconds"+ + "\n\tBytes/second :"+(bytes/seconds)+ + "\n\tMBytes/second :"+(mBytesReceived/seconds)+"\n"); + + } + } + + + public static void printArray(byte[] data) { + System.out.print("{"); + for (int i=0; i 1 ) { + Thread t = new Thread(test); + t.setDaemon(true); + t.start(); + threads--; + test = new LoadTest(channel,send,count,debug,pause,stats,breakOnEx); + test.channelOptions = channelOptions; + } + test.run(); + if ( shutdown && send ) channel.stop(channel.DEFAULT); + System.out.println("System test complete, sleeping to let threads finish."); + Thread.sleep(60*1000*60); + } + + public static class Shutdown extends Thread { + ManagedChannel channel = null; + public Shutdown(ManagedChannel channel) { + this.channel = channel; + } + + public void run() { + System.out.println("Shutting down..."); + SystemExit exit = new SystemExit(5000); + exit.setDaemon(true); + exit.start(); + try { + channel.stop(channel.DEFAULT); + + }catch ( Exception x ) { + x.printStackTrace(); + } + System.out.println("Channel stopped."); + } + } + public static class SystemExit extends Thread { + private long delay; + public SystemExit(long delay) { + this.delay = delay; + } + public void run () { + try { + Thread.sleep(delay); + }catch ( Exception x ) { + x.printStackTrace(); + } + System.exit(0); + + } + } + +} \ No newline at end of file diff --git a/test/org/apache/catalina/tribes/demos/MapDemo.java b/test/org/apache/catalina/tribes/demos/MapDemo.java new file mode 100644 index 000000000..f67e97dbd --- /dev/null +++ b/test/org/apache/catalina/tribes/demos/MapDemo.java @@ -0,0 +1,415 @@ +package org.apache.catalina.tribes.demos; + +import java.io.Serializable; +import java.util.Map; + +import java.awt.ComponentOrientation; +import java.awt.Dimension; +import java.awt.event.ActionEvent; +import java.awt.event.ActionListener; +import java.awt.event.MouseAdapter; +import java.awt.event.MouseEvent; +import javax.swing.BoxLayout; +import javax.swing.JButton; +import javax.swing.JFrame; +import javax.swing.JPanel; +import javax.swing.JScrollPane; +import javax.swing.JTable; +import javax.swing.JTextField; +import javax.swing.table.AbstractTableModel; +import javax.swing.table.TableModel; + +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ChannelListener; +import org.apache.catalina.tribes.ManagedChannel; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.MembershipListener; +import org.apache.catalina.tribes.tipis.AbstractReplicatedMap; +import org.apache.catalina.tribes.tipis.LazyReplicatedMap; +import javax.swing.table.DefaultTableCellRenderer; +import java.awt.Color; +import java.awt.Component; +import javax.swing.table.TableColumn; +import org.apache.catalina.tribes.util.UUIDGenerator; +import org.apache.catalina.tribes.util.Arrays; + +/** + *

Title:

+ * + *

Description:

+ * + *

Copyright: Copyright (c) 2005

+ * + *

Company:

+ * + * @author not attributable + * @version 1.0 + */ +public class MapDemo implements ChannelListener, MembershipListener{ + + protected LazyReplicatedMap map; + protected SimpleTableDemo table; + + public MapDemo(Channel channel, String mapName ) { + map = new LazyReplicatedMap(null,channel,5000, mapName,null); + table = SimpleTableDemo.createAndShowGUI(map,channel.getLocalMember(false).getName()); + channel.addChannelListener(this); + channel.addMembershipListener(this); +// for ( int i=0; i<1000; i++ ) { +// map.put("MyKey-"+i,"My String Value-"+i); +// } + this.messageReceived(null,null); + } + + public boolean accept(Serializable msg, Member source) { + table.dataModel.getValueAt(-1,-1); + return false; + } + + public void messageReceived(Serializable msg, Member source) { + + } + + public void memberAdded(Member member) { + } + public void memberDisappeared(Member member) { + table.dataModel.getValueAt(-1,-1); + } + + public static void usage() { + System.out.println("Tribes MapDemo."); + System.out.println("Usage:\n\t" + + "java MapDemo [channel options] mapName\n\t" + + "\tChannel options:" + + ChannelCreator.usage()); + } + + public static void main(String[] args) throws Exception { + long start = System.currentTimeMillis(); + ManagedChannel channel = (ManagedChannel) ChannelCreator.createChannel(args); + String mapName = "MapDemo"; + if ( args.length > 0 && (!args[args.length-1].startsWith("-"))) { + mapName = args[args.length-1]; + } + channel.start(channel.DEFAULT); + Runtime.getRuntime().addShutdownHook(new Shutdown(channel)); + MapDemo demo = new MapDemo(channel,mapName); + + System.out.println("System test complete, time to start="+(System.currentTimeMillis()-start)+" ms. Sleeping to let threads finish."); + Thread.sleep(60 * 1000 * 60); + } + + public static class Shutdown + extends Thread { + ManagedChannel channel = null; + public Shutdown(ManagedChannel channel) { + this.channel = channel; + } + + public void run() { + System.out.println("Shutting down..."); + SystemExit exit = new SystemExit(5000); + exit.setDaemon(true); + exit.start(); + try { + channel.stop(channel.DEFAULT); + + } catch (Exception x) { + x.printStackTrace(); + } + System.out.println("Channel stopped."); + } + } + + public static class SystemExit + extends Thread { + private long delay; + public SystemExit(long delay) { + this.delay = delay; + } + + public void run() { + try { + Thread.sleep(delay); + } catch (Exception x) { + x.printStackTrace(); + } + System.exit(0); + + } + } + + public static class SimpleTableDemo + extends JPanel implements ActionListener{ + private static int WIDTH = 550; + + private LazyReplicatedMap map; + private boolean DEBUG = false; + AbstractTableModel dataModel = new AbstractTableModel() { + + + String[] columnNames = { + "Key", + "Value", + "Backup Node", + "isPrimary", + "isProxy", + "isBackup"}; + + public int getColumnCount() { return columnNames.length; } + + public int getRowCount() {return map.sizeFull() +1; } + + public StringBuffer getMemberNames(Member[] members){ + StringBuffer buf = new StringBuffer(); + if ( members!=null ) { + for (int i=0;i 0 ) { + Color color = null; + boolean primary = ( (Boolean) table.getValueAt(row, 3)).booleanValue(); + boolean proxy = ( (Boolean) table.getValueAt(row, 4)).booleanValue(); + boolean backup = ( (Boolean) table.getValueAt(row, 5)).booleanValue(); + if (primary) color = Color.GREEN; + else if (proxy) color = Color.RED; + else if (backup) color = Color.BLUE; + if ( color != null ) cell.setBackground(color); + } +// System.out.println("Row:"+row+" Column:"+column+" Color:"+cell.getBackground()); +// cell.setBackground(bkgndColor); +// cell.setForeground(fgndColor); + + return cell; + } + + + } + + +}