--- /dev/null
+/*
+ * Copyright 1999,2004-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.demos;
+
+import java.util.Iterator;
+import java.util.Properties;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ManagedChannel;
+import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.group.interceptors.FragmentationInterceptor;
+import org.apache.catalina.tribes.group.interceptors.GzipInterceptor;
+import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
+import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
+import org.apache.catalina.tribes.membership.McastService;
+import org.apache.catalina.tribes.transport.MultiPointSender;
+import org.apache.catalina.tribes.transport.ReceiverBase;
+import org.apache.catalina.tribes.transport.ReplicationTransmitter;
+import org.apache.catalina.tribes.group.interceptors.ThroughputInterceptor;
+import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
+import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
+import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor;
+import java.util.ArrayList;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
+import org.apache.catalina.tribes.Member;
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ *
+ * <p>Company: </p>
+ *
+ * @author fhanik
+ * @version 1.0
+ */
+public class ChannelCreator {
+
+ org.apache.commons.logging.impl.LogFactoryImpl impl=null;
+ public static StringBuffer usage() {
+ StringBuffer buf = new StringBuffer();
+ buf.append("\n\t\t[-bind tcpbindaddress]")
+ .append("\n\t\t[-tcpselto tcpselectortimeout]")
+ .append("\n\t\t[-tcpthreads tcpthreadcount]")
+ .append("\n\t\t[-port tcplistenport]")
+ .append("\n\t\t[-autobind tcpbindtryrange]")
+ .append("\n\t\t[-ackto acktimeout]")
+ .append("\n\t\t[-receiver org.apache.catalina.tribes.transport.nio.NioReceiver|org.apache.catalina.tribes.transport.bio.BioReceiver|]")
+ .append("\n\t\t[-transport org.apache.catalina.tribes.transport.nio.PooledParallelSender|org.apache.catalina.tribes.transport.bio.PooledMultiSender]")
+ .append("\n\t\t[-transport.xxx transport specific property]")
+ .append("\n\t\t[-maddr multicastaddr]")
+ .append("\n\t\t[-mport multicastport]")
+ .append("\n\t\t[-mbind multicastbindaddr]")
+ .append("\n\t\t[-mfreq multicastfrequency]")
+ .append("\n\t\t[-mdrop multicastdroptime]")
+ .append("\n\t\t[-gzip]")
+ .append("\n\t\t[-static hostname:port (-static localhost:9999 -static 127.0.0.1:8888 can be repeated)]")
+ .append("\n\t\t[-order]")
+ .append("\n\t\t[-ordersize maxorderqueuesize]")
+ .append("\n\t\t[-frag]")
+ .append("\n\t\t[-fragsize maxmsgsize]")
+ .append("\n\t\t[-throughput]")
+ .append("\n\t\t[-failuredetect]")
+ .append("\n\t\t[-async]")
+ .append("\n\t\t[-asyncsize maxqueuesizeinkilobytes]");
+ return buf;
+
+ }
+
+ public static Channel createChannel(String[] args) throws Exception {
+ String bind = "auto";
+ int port = 4001;
+ String mbind = null;
+ boolean gzip = false;
+ int tcpseltimeout = 5000;
+ int tcpthreadcount = 4;
+ int acktimeout = 15000;
+ String mcastaddr = "228.0.0.5";
+ int mcastport = 45565;
+ long mcastfreq = 500;
+ long mcastdrop = 2000;
+ boolean order = false;
+ int ordersize = Integer.MAX_VALUE;
+ boolean frag = false;
+ int fragsize = 1024;
+ int autoBind = 10;
+ ArrayList staticMembers = new ArrayList();
+ Properties transportProperties = new Properties();
+ String transport = "org.apache.catalina.tribes.transport.nio.PooledParallelSender";
+ String receiver = "org.apache.catalina.tribes.transport.nio.NioReceiver";
+ boolean async = false;
+ int asyncsize = 1024*1024*50; //50MB
+ boolean throughput = false;
+ boolean failuredetect = false;
+
+ for (int i = 0; i < args.length; i++) {
+ if ("-bind".equals(args[i])) {
+ bind = args[++i];
+ } else if ("-port".equals(args[i])) {
+ port = Integer.parseInt(args[++i]);
+ } else if ("-autobind".equals(args[i])) {
+ autoBind = Integer.parseInt(args[++i]);
+ } else if ("-tcpselto".equals(args[i])) {
+ tcpseltimeout = Integer.parseInt(args[++i]);
+ } else if ("-tcpthreads".equals(args[i])) {
+ tcpthreadcount = Integer.parseInt(args[++i]);
+ } else if ("-gzip".equals(args[i])) {
+ gzip = true;
+ } else if ("-async".equals(args[i])) {
+ async = true;
+ } else if ("-failuredetect".equals(args[i])) {
+ failuredetect = true;
+ } else if ("-asyncsize".equals(args[i])) {
+ asyncsize = Integer.parseInt(args[++i]);
+ System.out.println("Setting MessageDispatchInterceptor.maxQueueSize="+asyncsize);
+ } else if ("-static".equals(args[i])) {
+ String d = args[++i];
+ String h = d.substring(0,d.indexOf(":"));
+ String p = d.substring(h.length()+1);
+ MemberImpl m = new MemberImpl(h,Integer.parseInt(p),2000);
+ staticMembers.add(m);
+ } else if ("-throughput".equals(args[i])) {
+ throughput = true;
+ } else if ("-order".equals(args[i])) {
+ order = true;
+ } else if ("-ordersize".equals(args[i])) {
+ ordersize = Integer.parseInt(args[++i]);
+ System.out.println("Setting OrderInterceptor.maxQueue="+ordersize);
+ } else if ("-frag".equals(args[i])) {
+ frag = true;
+ } else if ("-fragsize".equals(args[i])) {
+ fragsize = Integer.parseInt(args[++i]);
+ System.out.println("Setting FragmentationInterceptor.maxSize="+fragsize);
+ } else if ("-ackto".equals(args[i])) {
+ acktimeout = Integer.parseInt(args[++i]);
+ } else if ("-transport".equals(args[i])) {
+ transport = args[++i];
+ } else if (args[i]!=null && args[i].startsWith("transport.")) {
+ String key = args[i];
+ String val = args[++i];
+ transportProperties.setProperty(key,val);
+ } else if ("-receiver".equals(args[i])) {
+ receiver = args[++i];
+ } else if ("-maddr".equals(args[i])) {
+ mcastaddr = args[++i];
+ } else if ("-mport".equals(args[i])) {
+ mcastport = Integer.parseInt(args[++i]);
+ } else if ("-mfreq".equals(args[i])) {
+ mcastfreq = Long.parseLong(args[++i]);
+ } else if ("-mdrop".equals(args[i])) {
+ mcastdrop = Long.parseLong(args[++i]);
+ } else if ("-mbind".equals(args[i])) {
+ mbind = args[++i];
+ }
+ }
+
+ System.out.println("Creating receiver class="+receiver);
+ Class cl = Class.forName(receiver,true,ChannelCreator.class.getClassLoader());
+ ReceiverBase rx = (ReceiverBase)cl.newInstance();
+ rx.setTcpListenAddress(bind);
+ rx.setTcpListenPort(port);
+ rx.setTcpSelectorTimeout(tcpseltimeout);
+ rx.setTcpThreadCount(tcpthreadcount);
+ rx.getBind();
+ rx.setRxBufSize(43800);
+ rx.setTxBufSize(25188);
+ rx.setAutoBind(autoBind);
+
+
+ ReplicationTransmitter ps = new ReplicationTransmitter();
+ System.out.println("Creating transport class="+transport);
+ MultiPointSender sender = (MultiPointSender)Class.forName(transport,true,ChannelCreator.class.getClassLoader()).newInstance();
+ sender.setTimeout(acktimeout);
+ sender.setMaxRetryAttempts(2);
+ sender.setRxBufSize(43800);
+ sender.setTxBufSize(25188);
+
+ Iterator i = transportProperties.keySet().iterator();
+ while ( i.hasNext() ) {
+ String key = (String)i.next();
+ IntrospectionUtils.setProperty(sender,key,transportProperties.getProperty(key));
+ }
+ ps.setTransport(sender);
+
+ McastService service = new McastService();
+ service.setMcastAddr(mcastaddr);
+ if (mbind != null) service.setMcastBindAddress(mbind);
+ service.setMcastFrequency(mcastfreq);
+ service.setMcastDropTime(mcastdrop);
+ service.setMcastPort(mcastport);
+
+ ManagedChannel channel = new GroupChannel();
+ channel.setChannelReceiver(rx);
+ channel.setChannelSender(ps);
+ channel.setMembershipService(service);
+
+ if ( throughput ) channel.addInterceptor(new ThroughputInterceptor());
+ if (gzip) channel.addInterceptor(new GzipInterceptor());
+ if ( frag ) {
+ FragmentationInterceptor fi = new FragmentationInterceptor();
+ fi.setMaxSize(fragsize);
+ channel.addInterceptor(fi);
+ }
+ if (order) {
+ OrderInterceptor oi = new OrderInterceptor();
+ oi.setMaxQueue(ordersize);
+ channel.addInterceptor(oi);
+ }
+
+ if ( async ) {
+ MessageDispatchInterceptor mi = new MessageDispatch15Interceptor();
+ mi.setMaxQueueSize(asyncsize);
+ channel.addInterceptor(mi);
+ System.out.println("Added MessageDispatchInterceptor");
+ }
+
+ if ( failuredetect ) {
+ TcpFailureDetector tcpfi = new TcpFailureDetector();
+ channel.addInterceptor(tcpfi);
+ }
+ if ( staticMembers.size() > 0 ) {
+ StaticMembershipInterceptor smi = new StaticMembershipInterceptor();
+ for (int x=0; x<staticMembers.size(); x++ ) {
+ smi.addStaticMember((Member)staticMembers.get(x));
+ }
+ channel.addInterceptor(smi);
+ }
+
+
+ byte[] domain = new byte[] {1,2,3,4,5,6,7,8,9,0};
+ ((McastService)channel.getMembershipService()).setDomain(domain);
+ DomainFilterInterceptor filter = new DomainFilterInterceptor();
+ filter.setDomain(domain);
+ channel.addInterceptor(filter);
+ return channel;
+ }
+
+}
\ No newline at end of file
--- /dev/null
+package org.apache.catalina.tribes.demos;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.StringTokenizer;
+
+import org.apache.catalina.tribes.ChannelInterceptor;
+import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
+import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator;
+import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
+import org.apache.catalina.tribes.transport.ReceiverBase;
+import org.apache.catalina.tribes.util.Arrays;
+
+
+
+public class CoordinationDemo {
+ static int CHANNEL_COUNT = 5;
+ static int SCREEN_WIDTH = 120;
+ static long SLEEP_TIME = 10;
+ static int CLEAR_SCREEN = 30;
+ static boolean MULTI_THREAD = false;
+ static boolean[] VIEW_EVENTS = new boolean[255];
+ StringBuffer statusLine = new StringBuffer();
+ Status[] status = null;
+ BufferedReader reader = null;
+ /**
+ * Construct and show the application.
+ */
+ public CoordinationDemo() {
+ }
+
+ public void init() {
+ reader = new BufferedReader(new InputStreamReader(System.in));
+ status = new Status[CHANNEL_COUNT];
+ }
+
+
+ public void clearScreen() {
+ StringBuffer buf = new StringBuffer(700);
+ for (int i=0; i<CLEAR_SCREEN; i++ ) buf.append("\n");
+ System.out.println(buf);
+ }
+
+ public void printMenuOptions() {
+ System.out.println("Commands:");
+ System.out.println("\tstart [member id]");
+ System.out.println("\tstop [member id]");
+ System.out.println("\tprint (refresh)");
+ System.out.println("\tquit");
+ System.out.print("Enter command:");
+ }
+
+ public synchronized void printScreen() {
+ clearScreen();
+ System.out.println(" ###."+getHeader());
+ for ( int i=0; i<status.length; i++ ) {
+ System.out.print(leftfill(String.valueOf(i+1)+".",5," "));
+ if ( status[i] != null ) System.out.print(status[i].getStatusLine());
+ }
+ System.out.println("\n\n");
+ System.out.println("Overall status:"+statusLine);
+ printMenuOptions();
+
+ }
+
+ public String getHeader() {
+ //member - 30
+ //running- 10
+ //coord - 30
+ //view-id - 24
+ //view count - 8
+
+ StringBuffer buf = new StringBuffer();
+ buf.append(leftfill("Member",30," "));
+ buf.append(leftfill("Running",10," "));
+ buf.append(leftfill("Coord",30," "));
+ buf.append(leftfill("View-id(short)",24," "));
+ buf.append(leftfill("Count",8," "));
+ buf.append("\n");
+
+ buf.append(rightfill("==="+new java.sql.Timestamp(System.currentTimeMillis()).toString(),SCREEN_WIDTH,"="));
+ buf.append("\n");
+ return buf.toString();
+ }
+
+ public String[] tokenize(String line) {
+ StringTokenizer tz = new StringTokenizer(line," ");
+ String[] result = new String[tz.countTokens()];
+ for (int i=0; i<result.length; i++ ) result[i] = tz.nextToken();
+ return result;
+ }
+
+ public void waitForInput() throws IOException {
+ for ( int i=0; i<status.length; i++ ) status[i] = new Status(this);
+ printScreen();
+ String l = reader.readLine();
+ String[] args = tokenize(l);
+ while ( args.length >= 1 && (!"quit".equalsIgnoreCase(args[0]))) {
+ if ("start".equalsIgnoreCase(args[0])) {
+ cmdStart(args);
+ } else if ("stop".equalsIgnoreCase(args[0])) {
+ cmdStop(args);
+
+ }
+ printScreen();
+ l = reader.readLine();
+ args = tokenize(l);
+ }
+ for ( int i=0; i<status.length; i++ ) status[i].stop();
+ }
+
+ private void cmdStop(String[] args) {
+ if ( args.length == 1 ) {
+ setSystemStatus("System shutting down...");
+ Thread[] t = new Thread[CHANNEL_COUNT];
+ for (int i = 0; i < status.length; i++) {
+ final int j = i;
+ t[j] = new Thread() {
+ public void run() {
+ status[j].stop();
+ }
+ };
+ }
+ for (int i = 0; i < status.length; i++) if (MULTI_THREAD ) t[i].start(); else t[i].run();
+ setSystemStatus("System stopped.");
+ } else {
+ int index = -1;
+ try { index = Integer.parseInt(args[1])-1;}catch ( Exception x ) {setSystemStatus("Invalid index:"+args[1]);}
+ if ( index >= 0 ) {
+ setSystemStatus("Stopping member:"+(index+1));
+ status[index].stop();
+ setSystemStatus("Member stopped:"+(index+1));
+ }
+ }
+ }
+
+ private void cmdStart(String[] args) {
+ if ( args.length == 1 ) {
+ setSystemStatus("System starting up...");
+ Thread[] t = new Thread[CHANNEL_COUNT];
+ for (int i = 0; i < status.length; i++) {
+ final int j = i;
+ t[j] = new Thread() {
+ public void run() {
+ status[j].start();
+ }
+ };
+ }
+ for (int i = 0; i < status.length; i++) if (MULTI_THREAD ) t[i].start(); else t[i].run();
+ setSystemStatus("System started.");
+ } else {
+ int index = -1;
+ try { index = Integer.parseInt(args[1])-1;}catch ( Exception x ) {setSystemStatus("Invalid index:"+args[1]);}
+ if ( index >= 0 ) {
+ setSystemStatus("Starting member:"+(index+1));
+ status[index].start();
+ setSystemStatus("Member started:"+(index+1));
+ }
+ }
+ }
+
+ public void setSystemStatus(String status) {
+ statusLine.delete(0,statusLine.length());
+ statusLine.append(status);
+ }
+
+
+
+ public static void setEvents(String events) {
+ java.util.Arrays.fill(VIEW_EVENTS,false);
+ StringTokenizer t = new StringTokenizer(events,",");
+ while (t.hasMoreTokens() ) {
+ int idx = Integer.parseInt(t.nextToken());
+ VIEW_EVENTS[idx] = true;
+ }
+ }
+
+ public static void run(String[] args,CoordinationDemo demo) throws Exception {
+ usage();
+ java.util.Arrays.fill(VIEW_EVENTS,true);
+
+ for (int i=0; i<args.length; i++ ) {
+ if ( "-c".equals(args[i]) )
+ CHANNEL_COUNT = Integer.parseInt(args[++i]);
+ else if ( "-t".equals(args[i]) )
+ MULTI_THREAD = Boolean.parseBoolean(args[++i]);
+ else if ( "-s".equals(args[i]) )
+ SLEEP_TIME = Long.parseLong(args[++i]);
+ else if ( "-sc".equals(args[i]) )
+ CLEAR_SCREEN = Integer.parseInt(args[++i]);
+ else if ( "-p".equals(args[i]) )
+ setEvents(args[++i]);
+ else if ( "-h".equals(args[i]) ) System.exit(0);
+ }
+ demo.init();
+ demo.waitForInput();
+ }
+
+ private static void usage() {
+ System.out.println("Usage:");
+ System.out.println("\tjava org.apache.catalina.tribes.demos.CoordinationDemo -c channel-count(int) -t multi-thread(true|false) -s sleep-time(ms) -sc clear-screen(int) -p view_events_csv(1,2,5,7)");
+ System.out.println("Example:");
+ System.out.println("\tjava o.a.c.t.d.CoordinationDemo -> starts demo single threaded start/stop with 5 channels");
+ System.out.println("\tjava o.a.c.t.d.CoordinationDemo -c 10 -> starts demo single threaded start/stop with 10 channels");
+ System.out.println("\tjava o.a.c.t.d.CoordinationDemo -c 7 -t true -s 1000 -sc 50-> starts demo multi threaded start/stop with 7 channels and 1 second sleep time between events and 50 lines to clear screen");
+ System.out.println("\tjava o.a.c.t.d.CoordinationDemo -t true -p 12 -> starts demo multi threaded start/stop with 5 channels and only prints the EVT_CONF_RX event");
+ System.out.println();
+ }
+ public static void main(String[] args) throws Exception {
+ CoordinationDemo demo = new CoordinationDemo();
+ run(args,demo);
+ }
+
+ public static String leftfill(String value, int length, String ch) {
+ return fill(value,length,ch,true);
+ }
+
+ public static String rightfill(String value, int length, String ch) {
+ return fill(value,length,ch,false);
+ }
+
+ public static String fill(String value, int length, String ch, boolean left) {
+ StringBuffer buf = new StringBuffer();
+ if ( !left ) buf.append(value.trim());
+ for (int i=value.trim().length(); i<length; i++ ) buf.append(ch);
+ if ( left ) buf.append(value.trim());
+ return buf.toString();
+ }
+
+
+ public static class Status {
+ public CoordinationDemo parent;
+ public GroupChannel channel;
+ NonBlockingCoordinator interceptor = null;
+ public String status;
+ public Exception error;
+ public String startstatus = "new";
+
+ public Status(CoordinationDemo parent) {
+ this.parent = parent;
+ }
+
+ public String getStatusLine() {
+ //member - 30
+ //running- 10
+ //coord - 30
+ //view-id - 24
+ //view count - 8
+ StringBuffer buf = new StringBuffer();
+ String local = "";
+ String coord = "";
+ String viewId = "";
+ String count = "0";
+ if ( channel != null ) {
+ Member lm = channel.getLocalMember(false);
+ local = lm!=null?lm.getName():"";
+ coord = interceptor!=null && interceptor.getCoordinator()!=null?interceptor.getCoordinator().getName():"";
+ viewId = getByteString(interceptor.getViewId()!=null?interceptor.getViewId().getBytes():new byte[0]);
+ count = String.valueOf(interceptor.getView().length);
+ }
+ buf.append(leftfill(local,30," "));
+ buf.append(leftfill(startstatus, 10, " "));
+ buf.append(leftfill(coord, 30, " "));
+ buf.append(leftfill(viewId, 24, " "));
+ buf.append(leftfill(count, 8, " "));
+ buf.append("\n");
+ buf.append("Status:"+status);
+ buf.append("\n");
+ return buf.toString();
+ }
+
+ public String getByteString(byte[] b) {
+ if ( b == null ) return "{}";
+ return Arrays.toString(b,0,Math.min(b.length,4));
+ }
+
+ public void start() {
+ try {
+ if ( channel == null ) {
+ channel = createChannel();
+ startstatus = "starting";
+ channel.start(channel.DEFAULT);
+ startstatus = "running";
+ } else {
+ status = "Channel already started.";
+ }
+ } catch ( Exception x ) {
+ synchronized (System.err) {
+ System.err.println("Start failed:");
+ StackTraceElement[] els = x.getStackTrace();
+ for (int i = 0; i < els.length; i++) System.err.println(els[i].toString());
+ }
+ status = "Start failed:"+x.getMessage();
+ error = x;
+ startstatus = "failed";
+ try { channel.stop(GroupChannel.DEFAULT);}catch(Exception ignore){}
+ channel = null;
+ interceptor = null;
+ }
+ }
+
+ public void stop() {
+ try {
+ if ( channel != null ) {
+ channel.stop(channel.DEFAULT);
+ status = "Channel Stopped";
+ } else {
+ status = "Channel Already Stopped";
+ }
+ }catch ( Exception x ) {
+ synchronized (System.err) {
+ System.err.println("Stop failed:");
+ StackTraceElement[] els = x.getStackTrace();
+ for (int i = 0; i < els.length; i++) System.err.println(els[i].toString());
+ }
+
+ status = "Stop failed:"+x.getMessage();
+ error = x;
+ }finally {
+ startstatus = "stopped";
+ channel = null;
+ interceptor = null;
+ }
+ }
+
+ public GroupChannel createChannel() {
+ channel = new GroupChannel();
+ ((ReceiverBase)channel.getChannelReceiver()).setAutoBind(100);
+ interceptor = new NonBlockingCoordinator() {
+ public void fireInterceptorEvent(InterceptorEvent event) {
+ status = event.getEventTypeDesc();
+ int type = event.getEventType();
+ boolean display = VIEW_EVENTS[type];
+ if ( display ) parent.printScreen();
+ try { Thread.sleep(SLEEP_TIME); }catch ( Exception x){}
+ }
+ };
+ channel.addInterceptor(interceptor);
+ channel.addInterceptor(new TcpFailureDetector());
+ channel.addInterceptor(new MessageDispatch15Interceptor());
+ return channel;
+ }
+ }
+}
\ No newline at end of file
--- /dev/null
+package org.apache.catalina.tribes.demos;
+
+import java.io.Serializable;
+
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.RpcCallback;
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ManagedChannel;
+import org.apache.catalina.tribes.group.RpcChannel;
+import org.apache.catalina.tribes.group.Response;
+
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Copyright: Copyright (c) 2005</p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class EchoRpcTest implements RpcCallback, Runnable {
+
+ Channel channel;
+ int count;
+ String message;
+ long pause;
+ RpcChannel rpc;
+ int options;
+ long timeout;
+ String name;
+
+ public EchoRpcTest(Channel channel, String name, int count, String message, long pause, int options, long timeout) {
+ this.channel = channel;
+ this.count = count;
+ this.message = message;
+ this.pause = pause;
+ this.options = options;
+ this.rpc = new RpcChannel(name.getBytes(),channel,this);
+ this.timeout = timeout;
+ this.name = name;
+ }
+
+ /**
+ * If the reply has already been sent to the requesting thread, the rpc
+ * callback can handle any data that comes in after the fact.
+ *
+ * @param msg Serializable
+ * @param sender Member
+ * @todo Implement this org.apache.catalina.tribes.tipis.RpcCallback
+ * method
+ */
+ public void leftOver(Serializable msg, Member sender) {
+ System.out.println("Received a left over message from ["+sender.getName()+"] with data ["+msg+"]");
+ }
+
+ /**
+ *
+ * @param msg Serializable
+ * @param sender Member
+ * @return Serializable - null if no reply should be sent
+ * @todo Implement this org.apache.catalina.tribes.tipis.RpcCallback
+ * method
+ */
+ public Serializable replyRequest(Serializable msg, Member sender) {
+ System.out.println("Received a reply request message from ["+sender.getName()+"] with data ["+msg+"]");
+ return "Reply("+name+"):"+msg;
+ }
+
+ public void run() {
+ long counter = 0;
+ while (counter<count) {
+ String msg = message + " cnt="+(++counter);
+ try {
+ System.out.println("Sending ["+msg+"]");
+ long start = System.currentTimeMillis();
+ Response[] resp = rpc.send(channel.getMembers(),(Serializable)msg,options,Channel.SEND_OPTIONS_DEFAULT,timeout);
+ System.out.println("Send of ["+msg+"] completed. Nr of responses="+resp.length+" Time:"+(System.currentTimeMillis()-start)+" ms.");
+ for ( int i=0; i<resp.length; i++ ) {
+ System.out.println("Received a response message from ["+resp[i].getSource().getName()+"] with data ["+resp[i].getMessage()+"]");
+ }
+ Thread.sleep(pause);
+ }catch(Exception x){}
+ }
+ }
+
+ public static void usage() {
+ System.out.println("Tribes RPC tester.");
+ System.out.println("Usage:\n\t"+
+ "java EchoRpcTest [options]\n\t"+
+ "Options:\n\t\t"+
+ "[-mode all|first|majority] \n\t\t"+
+ "[-debug] \n\t\t"+
+ "[-count messagecount] \n\t\t"+
+ "[-timeout timeoutinms] \n\t\t"+
+ "[-stats statinterval] \n\t\t"+
+ "[-pause nrofsecondstopausebetweensends] \n\t\t"+
+ "[-message message] \n\t\t"+
+ "[-name rpcname] \n\t\t"+
+ "[-break (halts execution on exception)]\n"+
+ "\tChannel options:"+
+ ChannelCreator.usage()+"\n\n"+
+ "Example:\n\t"+
+ "java EchoRpcTest -port 4004\n\t"+
+ "java EchoRpcTest -bind 192.168.0.45 -port 4005\n\t"+
+ "java EchoRpcTest -bind 192.168.0.45 -port 4005 -mbind 192.168.0.45 -count 100 -stats 10\n");
+ }
+
+ public static void main(String[] args) throws Exception {
+ boolean send = true;
+ boolean debug = false;
+ long pause = 3000;
+ int count = 1000000;
+ int stats = 10000;
+ String name = "EchoRpcId";
+ boolean breakOnEx = false;
+ int threads = 1;
+ int options = RpcChannel.ALL_REPLY;
+ long timeout = 15000;
+ String message = "EchoRpcMessage";
+ if ( args.length == 0 ) {
+ args = new String[] {"-help"};
+ }
+ for (int i = 0; i < args.length; i++) {
+ if ("-threads".equals(args[i])) {
+ threads = Integer.parseInt(args[++i]);
+ } else if ("-count".equals(args[i])) {
+ count = Integer.parseInt(args[++i]);
+ System.out.println("Sending "+count+" messages.");
+ } else if ("-pause".equals(args[i])) {
+ pause = Long.parseLong(args[++i])*1000;
+ } else if ("-break".equals(args[i])) {
+ breakOnEx = true;
+ } else if ("-stats".equals(args[i])) {
+ stats = Integer.parseInt(args[++i]);
+ System.out.println("Stats every "+stats+" message");
+ } else if ("-timeout".equals(args[i])) {
+ timeout = Long.parseLong(args[++i]);
+ } else if ("-message".equals(args[i])) {
+ message = args[++i];
+ } else if ("-name".equals(args[i])) {
+ name = args[++i];
+ } else if ("-mode".equals(args[i])) {
+ if ( "all".equals(args[++i]) ) options = RpcChannel.ALL_REPLY;
+ else if ( "first".equals(args[i]) ) options = RpcChannel.FIRST_REPLY;
+ else if ( "majority".equals(args[i]) ) options = RpcChannel.MAJORITY_REPLY;
+ } else if ("-debug".equals(args[i])) {
+ debug = true;
+ } else if ("-help".equals(args[i]))
+ {
+ usage();
+ System.exit(1);
+ }
+ }
+
+
+ ManagedChannel channel = (ManagedChannel)ChannelCreator.createChannel(args);
+ EchoRpcTest test = new EchoRpcTest(channel,name,count,message,pause,options,timeout);
+ channel.start(channel.DEFAULT);
+ Runtime.getRuntime().addShutdownHook(new Shutdown(channel));
+ test.run();
+
+ System.out.println("System test complete, sleeping to let threads finish.");
+ Thread.sleep(60*1000*60);
+ }
+
+ public static class Shutdown extends Thread {
+ ManagedChannel channel = null;
+ public Shutdown(ManagedChannel channel) {
+ this.channel = channel;
+ }
+
+ public void run() {
+ System.out.println("Shutting down...");
+ SystemExit exit = new SystemExit(5000);
+ exit.setDaemon(true);
+ exit.start();
+ try {
+ channel.stop(channel.DEFAULT);
+
+ }catch ( Exception x ) {
+ x.printStackTrace();
+ }
+ System.out.println("Channel stopped.");
+ }
+ }
+ public static class SystemExit extends Thread {
+ private long delay;
+ public SystemExit(long delay) {
+ this.delay = delay;
+ }
+ public void run () {
+ try {
+ Thread.sleep(delay);
+ }catch ( Exception x ) {
+ x.printStackTrace();
+ }
+ System.exit(0);
+
+ }
+ }}
\ No newline at end of file
--- /dev/null
+/*\r
+ * Copyright 1999-2004 The Apache Software Foundation\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+\r
+package org.apache.catalina.tribes.demos;\r
+\r
+import java.io.File;\r
+import java.io.FilenameFilter;\r
+import java.io.IOException;\r
+import java.lang.reflect.InvocationTargetException;\r
+import java.lang.reflect.Method;\r
+import java.net.InetAddress;\r
+import java.net.MalformedURLException;\r
+import java.net.URL;\r
+import java.net.UnknownHostException;\r
+import java.util.Hashtable;\r
+import java.util.StringTokenizer;\r
+import java.util.Vector;\r
+\r
+// Depends: JDK1.1\r
+\r
+/**\r
+ * Utils for introspection and reflection\r
+ */\r
+public final class IntrospectionUtils {\r
+\r
+\r
+ private static org.apache.commons.logging.Log log=\r
+ org.apache.commons.logging.LogFactory.getLog( IntrospectionUtils.class );\r
+\r
+ /**\r
+ * Call execute() - any ant-like task should work\r
+ */\r
+ public static void execute(Object proxy, String method) throws Exception {\r
+ Method executeM = null;\r
+ Class c = proxy.getClass();\r
+ Class params[] = new Class[0];\r
+ // params[0]=args.getClass();\r
+ executeM = findMethod(c, method, params);\r
+ if (executeM == null) {\r
+ throw new RuntimeException("No execute in " + proxy.getClass());\r
+ }\r
+ executeM.invoke(proxy, (Object[]) null);//new Object[] { args });\r
+ }\r
+\r
+ /**\r
+ * Call void setAttribute( String ,Object )\r
+ */\r
+ public static void setAttribute(Object proxy, String n, Object v)\r
+ throws Exception {\r
+ if (proxy instanceof AttributeHolder) {\r
+ ((AttributeHolder) proxy).setAttribute(n, v);\r
+ return;\r
+ }\r
+\r
+ Method executeM = null;\r
+ Class c = proxy.getClass();\r
+ Class params[] = new Class[2];\r
+ params[0] = String.class;\r
+ params[1] = Object.class;\r
+ executeM = findMethod(c, "setAttribute", params);\r
+ if (executeM == null) {\r
+ if (log.isDebugEnabled())\r
+ log.debug("No setAttribute in " + proxy.getClass());\r
+ return;\r
+ }\r
+ if (false)\r
+ if (log.isDebugEnabled())\r
+ log.debug("Setting " + n + "=" + v + " in " + proxy);\r
+ executeM.invoke(proxy, new Object[] { n, v });\r
+ return;\r
+ }\r
+\r
+ /**\r
+ * Call void getAttribute( String )\r
+ */\r
+ public static Object getAttribute(Object proxy, String n) throws Exception {\r
+ Method executeM = null;\r
+ Class c = proxy.getClass();\r
+ Class params[] = new Class[1];\r
+ params[0] = String.class;\r
+ executeM = findMethod(c, "getAttribute", params);\r
+ if (executeM == null) {\r
+ if (log.isDebugEnabled())\r
+ log.debug("No getAttribute in " + proxy.getClass());\r
+ return null;\r
+ }\r
+ return executeM.invoke(proxy, new Object[] { n });\r
+ }\r
+\r
+ /**\r
+ * Construct a URLClassLoader. Will compile and work in JDK1.1 too.\r
+ */\r
+ public static ClassLoader getURLClassLoader(URL urls[], ClassLoader parent) {\r
+ try {\r
+ Class urlCL = Class.forName("java.net.URLClassLoader");\r
+ Class paramT[] = new Class[2];\r
+ paramT[0] = urls.getClass();\r
+ paramT[1] = ClassLoader.class;\r
+ Method m = findMethod(urlCL, "newInstance", paramT);\r
+ if (m == null)\r
+ return null;\r
+\r
+ ClassLoader cl = (ClassLoader) m.invoke(urlCL, new Object[] { urls,\r
+ parent });\r
+ return cl;\r
+ } catch (ClassNotFoundException ex) {\r
+ // jdk1.1\r
+ return null;\r
+ } catch (Exception ex) {\r
+ ex.printStackTrace();\r
+ return null;\r
+ }\r
+ }\r
+\r
+ public static String guessInstall(String installSysProp,\r
+ String homeSysProp, String jarName) {\r
+ return guessInstall(installSysProp, homeSysProp, jarName, null);\r
+ }\r
+\r
+ /**\r
+ * Guess a product install/home by analyzing the class path. It works for\r
+ * product using the pattern: lib/executable.jar or if executable.jar is\r
+ * included in classpath by a shell script. ( java -jar also works )\r
+ *\r
+ * Insures both "install" and "home" System properties are set. If either or\r
+ * both System properties are unset, "install" and "home" will be set to the\r
+ * same value. This value will be the other System property that is set, or\r
+ * the guessed value if neither is set.\r
+ */\r
+ public static String guessInstall(String installSysProp,\r
+ String homeSysProp, String jarName, String classFile) {\r
+ String install = null;\r
+ String home = null;\r
+\r
+ if (installSysProp != null)\r
+ install = System.getProperty(installSysProp);\r
+\r
+ if (homeSysProp != null)\r
+ home = System.getProperty(homeSysProp);\r
+\r
+ if (install != null) {\r
+ if (home == null)\r
+ System.getProperties().put(homeSysProp, install);\r
+ return install;\r
+ }\r
+\r
+ // Find the directory where jarName.jar is located\r
+\r
+ String cpath = System.getProperty("java.class.path");\r
+ String pathSep = System.getProperty("path.separator");\r
+ StringTokenizer st = new StringTokenizer(cpath, pathSep);\r
+ while (st.hasMoreTokens()) {\r
+ String path = st.nextToken();\r
+ // log( "path " + path );\r
+ if (path.endsWith(jarName)) {\r
+ home = path.substring(0, path.length() - jarName.length());\r
+ try {\r
+ if ("".equals(home)) {\r
+ home = new File("./").getCanonicalPath();\r
+ } else if (home.endsWith(File.separator)) {\r
+ home = home.substring(0, home.length() - 1);\r
+ }\r
+ File f = new File(home);\r
+ String parentDir = f.getParent();\r
+ if (parentDir == null)\r
+ parentDir = home; // unix style\r
+ File f1 = new File(parentDir);\r
+ install = f1.getCanonicalPath();\r
+ if (installSysProp != null)\r
+ System.getProperties().put(installSysProp, install);\r
+ if (home == null && homeSysProp != null)\r
+ System.getProperties().put(homeSysProp, install);\r
+ return install;\r
+ } catch (Exception ex) {\r
+ ex.printStackTrace();\r
+ }\r
+ } else {\r
+ String fname = path + (path.endsWith("/") ? "" : "/")\r
+ + classFile;\r
+ if (new File(fname).exists()) {\r
+ try {\r
+ File f = new File(path);\r
+ String parentDir = f.getParent();\r
+ if (parentDir == null)\r
+ parentDir = path; // unix style\r
+ File f1 = new File(parentDir);\r
+ install = f1.getCanonicalPath();\r
+ if (installSysProp != null)\r
+ System.getProperties().put(installSysProp, install);\r
+ if (home == null && homeSysProp != null)\r
+ System.getProperties().put(homeSysProp, install);\r
+ return install;\r
+ } catch (Exception ex) {\r
+ ex.printStackTrace();\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+ // if install directory can't be found, use home as the default\r
+ if (home != null) {\r
+ System.getProperties().put(installSysProp, home);\r
+ return home;\r
+ }\r
+\r
+ return null;\r
+ }\r
+\r
+ /**\r
+ * Debug method, display the classpath\r
+ */\r
+ public static void displayClassPath(String msg, URL[] cp) {\r
+ if (log.isDebugEnabled()) {\r
+ log.debug(msg);\r
+ for (int i = 0; i < cp.length; i++) {\r
+ log.debug(cp[i].getFile());\r
+ }\r
+ }\r
+ }\r
+\r
+ public static String PATH_SEPARATOR = System.getProperty("path.separator");\r
+\r
+ /**\r
+ * Adds classpath entries from a vector of URL's to the "tc_path_add" System\r
+ * property. This System property lists the classpath entries common to web\r
+ * applications. This System property is currently used by Jasper when its\r
+ * JSP servlet compiles the Java file for a JSP.\r
+ */\r
+ public static String classPathAdd(URL urls[], String cp) {\r
+ if (urls == null)\r
+ return cp;\r
+\r
+ for (int i = 0; i < urls.length; i++) {\r
+ if (cp != null)\r
+ cp += PATH_SEPARATOR + urls[i].getFile();\r
+ else\r
+ cp = urls[i].getFile();\r
+ }\r
+ return cp;\r
+ }\r
+\r
+ /**\r
+ * Find a method with the right name If found, call the method ( if param is\r
+ * int or boolean we'll convert value to the right type before) - that means\r
+ * you can have setDebug(1).\r
+ */\r
+ public static void setProperty(Object o, String name, String value) {\r
+ if (dbg > 1)\r
+ d("setProperty(" + o.getClass() + " " + name + "=" + value + ")");\r
+\r
+ String setter = "set" + capitalize(name);\r
+\r
+ try {\r
+ Method methods[] = findMethods(o.getClass());\r
+ Method setPropertyMethod = null;\r
+\r
+ // First, the ideal case - a setFoo( String ) method\r
+ for (int i = 0; i < methods.length; i++) {\r
+ Class paramT[] = methods[i].getParameterTypes();\r
+ if (setter.equals(methods[i].getName()) && paramT.length == 1\r
+ && "java.lang.String".equals(paramT[0].getName())) {\r
+\r
+ methods[i].invoke(o, new Object[] { value });\r
+ return;\r
+ }\r
+ }\r
+\r
+ // Try a setFoo ( int ) or ( boolean )\r
+ for (int i = 0; i < methods.length; i++) {\r
+ boolean ok = true;\r
+ if (setter.equals(methods[i].getName())\r
+ && methods[i].getParameterTypes().length == 1) {\r
+\r
+ // match - find the type and invoke it\r
+ Class paramType = methods[i].getParameterTypes()[0];\r
+ Object params[] = new Object[1];\r
+\r
+ // Try a setFoo ( int )\r
+ if ("java.lang.Integer".equals(paramType.getName())\r
+ || "int".equals(paramType.getName())) {\r
+ try {\r
+ params[0] = new Integer(value);\r
+ } catch (NumberFormatException ex) {\r
+ ok = false;\r
+ }\r
+ // Try a setFoo ( long )\r
+ }else if ("java.lang.Long".equals(paramType.getName())\r
+ || "long".equals(paramType.getName())) {\r
+ try {\r
+ params[0] = new Long(value);\r
+ } catch (NumberFormatException ex) {\r
+ ok = false;\r
+ }\r
+\r
+ // Try a setFoo ( boolean )\r
+ } else if ("java.lang.Boolean".equals(paramType.getName())\r
+ || "boolean".equals(paramType.getName())) {\r
+ params[0] = new Boolean(value);\r
+\r
+ // Try a setFoo ( InetAddress )\r
+ } else if ("java.net.InetAddress".equals(paramType\r
+ .getName())) {\r
+ try {\r
+ params[0] = InetAddress.getByName(value);\r
+ } catch (UnknownHostException exc) {\r
+ d("Unable to resolve host name:" + value);\r
+ ok = false;\r
+ }\r
+\r
+ // Unknown type\r
+ } else {\r
+ d("Unknown type " + paramType.getName());\r
+ }\r
+\r
+ if (ok) {\r
+ methods[i].invoke(o, params);\r
+ return;\r
+ }\r
+ }\r
+\r
+ // save "setProperty" for later\r
+ if ("setProperty".equals(methods[i].getName())) {\r
+ setPropertyMethod = methods[i];\r
+ }\r
+ }\r
+\r
+ // Ok, no setXXX found, try a setProperty("name", "value")\r
+ if (setPropertyMethod != null) {\r
+ Object params[] = new Object[2];\r
+ params[0] = name;\r
+ params[1] = value;\r
+ setPropertyMethod.invoke(o, params);\r
+ }\r
+\r
+ } catch (IllegalArgumentException ex2) {\r
+ log.warn("IAE " + o + " " + name + " " + value, ex2);\r
+ } catch (SecurityException ex1) {\r
+ if (dbg > 0)\r
+ d("SecurityException for " + o.getClass() + " " + name + "="\r
+ + value + ")");\r
+ if (dbg > 1)\r
+ ex1.printStackTrace();\r
+ } catch (IllegalAccessException iae) {\r
+ if (dbg > 0)\r
+ d("IllegalAccessException for " + o.getClass() + " " + name\r
+ + "=" + value + ")");\r
+ if (dbg > 1)\r
+ iae.printStackTrace();\r
+ } catch (InvocationTargetException ie) {\r
+ if (dbg > 0)\r
+ d("InvocationTargetException for " + o.getClass() + " " + name\r
+ + "=" + value + ")");\r
+ if (dbg > 1)\r
+ ie.printStackTrace();\r
+ }\r
+ }\r
+\r
+ public static Object getProperty(Object o, String name) {\r
+ String getter = "get" + capitalize(name);\r
+ String isGetter = "is" + capitalize(name);\r
+\r
+ try {\r
+ Method methods[] = findMethods(o.getClass());\r
+ Method getPropertyMethod = null;\r
+\r
+ // First, the ideal case - a getFoo() method\r
+ for (int i = 0; i < methods.length; i++) {\r
+ Class paramT[] = methods[i].getParameterTypes();\r
+ if (getter.equals(methods[i].getName()) && paramT.length == 0) {\r
+ return methods[i].invoke(o, (Object[]) null);\r
+ }\r
+ if (isGetter.equals(methods[i].getName()) && paramT.length == 0) {\r
+ return methods[i].invoke(o, (Object[]) null);\r
+ }\r
+\r
+ if ("getProperty".equals(methods[i].getName())) {\r
+ getPropertyMethod = methods[i];\r
+ }\r
+ }\r
+\r
+ // Ok, no setXXX found, try a getProperty("name")\r
+ if (getPropertyMethod != null) {\r
+ Object params[] = new Object[1];\r
+ params[0] = name;\r
+ return getPropertyMethod.invoke(o, params);\r
+ }\r
+\r
+ } catch (IllegalArgumentException ex2) {\r
+ log.warn("IAE " + o + " " + name, ex2);\r
+ } catch (SecurityException ex1) {\r
+ if (dbg > 0)\r
+ d("SecurityException for " + o.getClass() + " " + name + ")");\r
+ if (dbg > 1)\r
+ ex1.printStackTrace();\r
+ } catch (IllegalAccessException iae) {\r
+ if (dbg > 0)\r
+ d("IllegalAccessException for " + o.getClass() + " " + name\r
+ + ")");\r
+ if (dbg > 1)\r
+ iae.printStackTrace();\r
+ } catch (InvocationTargetException ie) {\r
+ if (dbg > 0)\r
+ d("InvocationTargetException for " + o.getClass() + " " + name\r
+ + ")");\r
+ if (dbg > 1)\r
+ ie.printStackTrace();\r
+ }\r
+ return null;\r
+ }\r
+\r
+ /**\r
+ */\r
+ public static void setProperty(Object o, String name) {\r
+ String setter = "set" + capitalize(name);\r
+ try {\r
+ Method methods[] = findMethods(o.getClass());\r
+ Method setPropertyMethod = null;\r
+ // find setFoo() method\r
+ for (int i = 0; i < methods.length; i++) {\r
+ Class paramT[] = methods[i].getParameterTypes();\r
+ if (setter.equals(methods[i].getName()) && paramT.length == 0) {\r
+ methods[i].invoke(o, new Object[] {});\r
+ return;\r
+ }\r
+ }\r
+ } catch (Exception ex1) {\r
+ if (dbg > 0)\r
+ d("Exception for " + o.getClass() + " " + name);\r
+ if (dbg > 1)\r
+ ex1.printStackTrace();\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Replace ${NAME} with the property value\r
+ *\r
+ * @deprecated Use the explicit method\r
+ */\r
+ public static String replaceProperties(String value, Object getter) {\r
+ if (getter instanceof Hashtable)\r
+ return replaceProperties(value, (Hashtable) getter, null);\r
+\r
+ if (getter instanceof PropertySource) {\r
+ PropertySource src[] = new PropertySource[] { (PropertySource) getter };\r
+ return replaceProperties(value, null, src);\r
+ }\r
+ return value;\r
+ }\r
+\r
+ /**\r
+ * Replace ${NAME} with the property value\r
+ */\r
+ public static String replaceProperties(String value, Hashtable staticProp,\r
+ PropertySource dynamicProp[]) {\r
+ StringBuffer sb = new StringBuffer();\r
+ int prev = 0;\r
+ // assert value!=nil\r
+ int pos;\r
+ while ((pos = value.indexOf("$", prev)) >= 0) {\r
+ if (pos > 0) {\r
+ sb.append(value.substring(prev, pos));\r
+ }\r
+ if (pos == (value.length() - 1)) {\r
+ sb.append('$');\r
+ prev = pos + 1;\r
+ } else if (value.charAt(pos + 1) != '{') {\r
+ sb.append('$');\r
+ prev = pos + 1; // XXX\r
+ } else {\r
+ int endName = value.indexOf('}', pos);\r
+ if (endName < 0) {\r
+ sb.append(value.substring(pos));\r
+ prev = value.length();\r
+ continue;\r
+ }\r
+ String n = value.substring(pos + 2, endName);\r
+ String v = null;\r
+ if (staticProp != null) {\r
+ v = (String) ((Hashtable) staticProp).get(n);\r
+ }\r
+ if (v == null && dynamicProp != null) {\r
+ for (int i = 0; i < dynamicProp.length; i++) {\r
+ v = dynamicProp[i].getProperty(n);\r
+ if (v != null) {\r
+ break;\r
+ }\r
+ }\r
+ }\r
+ if (v == null)\r
+ v = "${" + n + "}";\r
+\r
+ sb.append(v);\r
+ prev = endName + 1;\r
+ }\r
+ }\r
+ if (prev < value.length())\r
+ sb.append(value.substring(prev));\r
+ return sb.toString();\r
+ }\r
+\r
+ /**\r
+ * Reverse of Introspector.decapitalize\r
+ */\r
+ public static String capitalize(String name) {\r
+ if (name == null || name.length() == 0) {\r
+ return name;\r
+ }\r
+ char chars[] = name.toCharArray();\r
+ chars[0] = Character.toUpperCase(chars[0]);\r
+ return new String(chars);\r
+ }\r
+\r
+ public static String unCapitalize(String name) {\r
+ if (name == null || name.length() == 0) {\r
+ return name;\r
+ }\r
+ char chars[] = name.toCharArray();\r
+ chars[0] = Character.toLowerCase(chars[0]);\r
+ return new String(chars);\r
+ }\r
+\r
+ // -------------------- Class path tools --------------------\r
+\r
+ /**\r
+ * Add all the jar files in a dir to the classpath, represented as a Vector\r
+ * of URLs.\r
+ */\r
+ public static void addToClassPath(Vector cpV, String dir) {\r
+ try {\r
+ String cpComp[] = getFilesByExt(dir, ".jar");\r
+ if (cpComp != null) {\r
+ int jarCount = cpComp.length;\r
+ for (int i = 0; i < jarCount; i++) {\r
+ URL url = getURL(dir, cpComp[i]);\r
+ if (url != null)\r
+ cpV.addElement(url);\r
+ }\r
+ }\r
+ } catch (Exception ex) {\r
+ ex.printStackTrace();\r
+ }\r
+ }\r
+\r
+ public static void addToolsJar(Vector v) {\r
+ try {\r
+ // Add tools.jar in any case\r
+ File f = new File(System.getProperty("java.home")\r
+ + "/../lib/tools.jar");\r
+\r
+ if (!f.exists()) {\r
+ // On some systems java.home gets set to the root of jdk.\r
+ // That's a bug, but we can work around and be nice.\r
+ f = new File(System.getProperty("java.home") + "/lib/tools.jar");\r
+ if (f.exists()) {\r
+ if (log.isDebugEnabled())\r
+ log.debug("Detected strange java.home value "\r
+ + System.getProperty("java.home")\r
+ + ", it should point to jre");\r
+ }\r
+ }\r
+ URL url = new URL("file", "", f.getAbsolutePath());\r
+\r
+ v.addElement(url);\r
+ } catch (MalformedURLException ex) {\r
+ ex.printStackTrace();\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Return all files with a given extension in a dir\r
+ */\r
+ public static String[] getFilesByExt(String ld, String ext) {\r
+ File dir = new File(ld);\r
+ String[] names = null;\r
+ final String lext = ext;\r
+ if (dir.isDirectory()) {\r
+ names = dir.list(new FilenameFilter() {\r
+ public boolean accept(File d, String name) {\r
+ if (name.endsWith(lext)) {\r
+ return true;\r
+ }\r
+ return false;\r
+ }\r
+ });\r
+ }\r
+ return names;\r
+ }\r
+\r
+ /**\r
+ * Construct a file url from a file, using a base dir\r
+ */\r
+ public static URL getURL(String base, String file) {\r
+ try {\r
+ File baseF = new File(base);\r
+ File f = new File(baseF, file);\r
+ String path = f.getCanonicalPath();\r
+ if (f.isDirectory()) {\r
+ path += "/";\r
+ }\r
+ if (!f.exists())\r
+ return null;\r
+ return new URL("file", "", path);\r
+ } catch (Exception ex) {\r
+ ex.printStackTrace();\r
+ return null;\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Add elements from the classpath <i>cp </i> to a Vector <i>jars </i> as\r
+ * file URLs (We use Vector for JDK 1.1 compat).\r
+ * <p>\r
+ *\r
+ * @param jars The jar list\r
+ * @param cp a String classpath of directory or jar file elements\r
+ * separated by path.separator delimiters.\r
+ * @throws IOException If an I/O error occurs\r
+ * @throws MalformedURLException Doh ;)\r
+ */\r
+ public static void addJarsFromClassPath(Vector jars, String cp)\r
+ throws IOException, MalformedURLException {\r
+ String sep = System.getProperty("path.separator");\r
+ String token;\r
+ StringTokenizer st;\r
+ if (cp != null) {\r
+ st = new StringTokenizer(cp, sep);\r
+ while (st.hasMoreTokens()) {\r
+ File f = new File(st.nextToken());\r
+ String path = f.getCanonicalPath();\r
+ if (f.isDirectory()) {\r
+ path += "/";\r
+ }\r
+ URL url = new URL("file", "", path);\r
+ if (!jars.contains(url)) {\r
+ jars.addElement(url);\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Return a URL[] that can be used to construct a class loader\r
+ */\r
+ public static URL[] getClassPath(Vector v) {\r
+ URL[] urls = new URL[v.size()];\r
+ for (int i = 0; i < v.size(); i++) {\r
+ urls[i] = (URL) v.elementAt(i);\r
+ }\r
+ return urls;\r
+ }\r
+\r
+ /**\r
+ * Construct a URL classpath from files in a directory, a cpath property,\r
+ * and tools.jar.\r
+ */\r
+ public static URL[] getClassPath(String dir, String cpath,\r
+ String cpathProp, boolean addTools) throws IOException,\r
+ MalformedURLException {\r
+ Vector jarsV = new Vector();\r
+ if (dir != null) {\r
+ // Add dir/classes first, if it exists\r
+ URL url = getURL(dir, "classes");\r
+ if (url != null)\r
+ jarsV.addElement(url);\r
+ addToClassPath(jarsV, dir);\r
+ }\r
+\r
+ if (cpath != null)\r
+ addJarsFromClassPath(jarsV, cpath);\r
+\r
+ if (cpathProp != null) {\r
+ String cpath1 = System.getProperty(cpathProp);\r
+ addJarsFromClassPath(jarsV, cpath1);\r
+ }\r
+\r
+ if (addTools)\r
+ addToolsJar(jarsV);\r
+\r
+ return getClassPath(jarsV);\r
+ }\r
+\r
+ // -------------------- Mapping command line params to setters\r
+\r
+ public static boolean processArgs(Object proxy, String args[])\r
+ throws Exception {\r
+ String args0[] = null;\r
+ if (null != findMethod(proxy.getClass(), "getOptions1", new Class[] {})) {\r
+ args0 = (String[]) callMethod0(proxy, "getOptions1");\r
+ }\r
+\r
+ if (args0 == null) {\r
+ //args0=findVoidSetters(proxy.getClass());\r
+ args0 = findBooleanSetters(proxy.getClass());\r
+ }\r
+ Hashtable h = null;\r
+ if (null != findMethod(proxy.getClass(), "getOptionAliases",\r
+ new Class[] {})) {\r
+ h = (Hashtable) callMethod0(proxy, "getOptionAliases");\r
+ }\r
+ return processArgs(proxy, args, args0, null, h);\r
+ }\r
+\r
+ public static boolean processArgs(Object proxy, String args[],\r
+ String args0[], String args1[], Hashtable aliases) throws Exception {\r
+ for (int i = 0; i < args.length; i++) {\r
+ String arg = args[i];\r
+ if (arg.startsWith("-"))\r
+ arg = arg.substring(1);\r
+ if (aliases != null && aliases.get(arg) != null)\r
+ arg = (String) aliases.get(arg);\r
+\r
+ if (args0 != null) {\r
+ boolean set = false;\r
+ for (int j = 0; j < args0.length; j++) {\r
+ if (args0[j].equalsIgnoreCase(arg)) {\r
+ setProperty(proxy, args0[j], "true");\r
+ set = true;\r
+ break;\r
+ }\r
+ }\r
+ if (set)\r
+ continue;\r
+ }\r
+ if (args1 != null) {\r
+ for (int j = 0; j < args1.length; j++) {\r
+ if (args1[j].equalsIgnoreCase(arg)) {\r
+ i++;\r
+ if (i >= args.length)\r
+ return false;\r
+ setProperty(proxy, arg, args[i]);\r
+ break;\r
+ }\r
+ }\r
+ } else {\r
+ // if args1 is not specified,assume all other options have param\r
+ i++;\r
+ if (i >= args.length)\r
+ return false;\r
+ setProperty(proxy, arg, args[i]);\r
+ }\r
+\r
+ }\r
+ return true;\r
+ }\r
+\r
+ // -------------------- other utils --------------------\r
+ public static void clear() {\r
+ objectMethods.clear();\r
+ }\r
+\r
+ public static String[] findVoidSetters(Class c) {\r
+ Method m[] = findMethods(c);\r
+ if (m == null)\r
+ return null;\r
+ Vector v = new Vector();\r
+ for (int i = 0; i < m.length; i++) {\r
+ if (m[i].getName().startsWith("set")\r
+ && m[i].getParameterTypes().length == 0) {\r
+ String arg = m[i].getName().substring(3);\r
+ v.addElement(unCapitalize(arg));\r
+ }\r
+ }\r
+ String s[] = new String[v.size()];\r
+ for (int i = 0; i < s.length; i++) {\r
+ s[i] = (String) v.elementAt(i);\r
+ }\r
+ return s;\r
+ }\r
+\r
+ public static String[] findBooleanSetters(Class c) {\r
+ Method m[] = findMethods(c);\r
+ if (m == null)\r
+ return null;\r
+ Vector v = new Vector();\r
+ for (int i = 0; i < m.length; i++) {\r
+ if (m[i].getName().startsWith("set")\r
+ && m[i].getParameterTypes().length == 1\r
+ && "boolean".equalsIgnoreCase(m[i].getParameterTypes()[0]\r
+ .getName())) {\r
+ String arg = m[i].getName().substring(3);\r
+ v.addElement(unCapitalize(arg));\r
+ }\r
+ }\r
+ String s[] = new String[v.size()];\r
+ for (int i = 0; i < s.length; i++) {\r
+ s[i] = (String) v.elementAt(i);\r
+ }\r
+ return s;\r
+ }\r
+\r
+ static Hashtable objectMethods = new Hashtable();\r
+\r
+ public static Method[] findMethods(Class c) {\r
+ Method methods[] = (Method[]) objectMethods.get(c);\r
+ if (methods != null)\r
+ return methods;\r
+\r
+ methods = c.getMethods();\r
+ objectMethods.put(c, methods);\r
+ return methods;\r
+ }\r
+\r
+ public static Method findMethod(Class c, String name, Class params[]) {\r
+ Method methods[] = findMethods(c);\r
+ if (methods == null)\r
+ return null;\r
+ for (int i = 0; i < methods.length; i++) {\r
+ if (methods[i].getName().equals(name)) {\r
+ Class methodParams[] = methods[i].getParameterTypes();\r
+ if (methodParams == null)\r
+ if (params == null || params.length == 0)\r
+ return methods[i];\r
+ if (params == null)\r
+ if (methodParams == null || methodParams.length == 0)\r
+ return methods[i];\r
+ if (params.length != methodParams.length)\r
+ continue;\r
+ boolean found = true;\r
+ for (int j = 0; j < params.length; j++) {\r
+ if (params[j] != methodParams[j]) {\r
+ found = false;\r
+ break;\r
+ }\r
+ }\r
+ if (found)\r
+ return methods[i];\r
+ }\r
+ }\r
+ return null;\r
+ }\r
+\r
+ /** Test if the object implements a particular\r
+ * method\r
+ */\r
+ public static boolean hasHook(Object obj, String methodN) {\r
+ try {\r
+ Method myMethods[] = findMethods(obj.getClass());\r
+ for (int i = 0; i < myMethods.length; i++) {\r
+ if (methodN.equals(myMethods[i].getName())) {\r
+ // check if it's overriden\r
+ Class declaring = myMethods[i].getDeclaringClass();\r
+ Class parentOfDeclaring = declaring.getSuperclass();\r
+ // this works only if the base class doesn't extend\r
+ // another class.\r
+\r
+ // if the method is declared in a top level class\r
+ // like BaseInterceptor parent is Object, otherwise\r
+ // parent is BaseInterceptor or an intermediate class\r
+ if (!"java.lang.Object".equals(parentOfDeclaring.getName())) {\r
+ return true;\r
+ }\r
+ }\r
+ }\r
+ } catch (Exception ex) {\r
+ ex.printStackTrace();\r
+ }\r
+ return false;\r
+ }\r
+\r
+ public static void callMain(Class c, String args[]) throws Exception {\r
+ Class p[] = new Class[1];\r
+ p[0] = args.getClass();\r
+ Method m = c.getMethod("main", p);\r
+ m.invoke(c, new Object[] { args });\r
+ }\r
+\r
+ public static Object callMethod1(Object target, String methodN,\r
+ Object param1, String typeParam1, ClassLoader cl) throws Exception {\r
+ if (target == null || param1 == null) {\r
+ d("Assert: Illegal params " + target + " " + param1);\r
+ }\r
+ if (dbg > 0)\r
+ d("callMethod1 " + target.getClass().getName() + " "\r
+ + param1.getClass().getName() + " " + typeParam1);\r
+\r
+ Class params[] = new Class[1];\r
+ if (typeParam1 == null)\r
+ params[0] = param1.getClass();\r
+ else\r
+ params[0] = cl.loadClass(typeParam1);\r
+ Method m = findMethod(target.getClass(), methodN, params);\r
+ if (m == null)\r
+ throw new NoSuchMethodException(target.getClass().getName() + " "\r
+ + methodN);\r
+ return m.invoke(target, new Object[] { param1 });\r
+ }\r
+\r
+ public static Object callMethod0(Object target, String methodN)\r
+ throws Exception {\r
+ if (target == null) {\r
+ d("Assert: Illegal params " + target);\r
+ return null;\r
+ }\r
+ if (dbg > 0)\r
+ d("callMethod0 " + target.getClass().getName() + "." + methodN);\r
+\r
+ Class params[] = new Class[0];\r
+ Method m = findMethod(target.getClass(), methodN, params);\r
+ if (m == null)\r
+ throw new NoSuchMethodException(target.getClass().getName() + " "\r
+ + methodN);\r
+ return m.invoke(target, emptyArray);\r
+ }\r
+\r
+ static Object[] emptyArray = new Object[] {};\r
+\r
+ public static Object callMethodN(Object target, String methodN,\r
+ Object params[], Class typeParams[]) throws Exception {\r
+ Method m = null;\r
+ m = findMethod(target.getClass(), methodN, typeParams);\r
+ if (m == null) {\r
+ d("Can't find method " + methodN + " in " + target + " CLASS "\r
+ + target.getClass());\r
+ return null;\r
+ }\r
+ Object o = m.invoke(target, params);\r
+\r
+ if (dbg > 0) {\r
+ // debug\r
+ StringBuffer sb = new StringBuffer();\r
+ sb.append("" + target.getClass().getName() + "." + methodN + "( ");\r
+ for (int i = 0; i < params.length; i++) {\r
+ if (i > 0)\r
+ sb.append(", ");\r
+ sb.append(params[i]);\r
+ }\r
+ sb.append(")");\r
+ d(sb.toString());\r
+ }\r
+ return o;\r
+ }\r
+\r
+ public static Object convert(String object, Class paramType) {\r
+ Object result = null;\r
+ if ("java.lang.String".equals(paramType.getName())) {\r
+ result = object;\r
+ } else if ("java.lang.Integer".equals(paramType.getName())\r
+ || "int".equals(paramType.getName())) {\r
+ try {\r
+ result = new Integer(object);\r
+ } catch (NumberFormatException ex) {\r
+ }\r
+ // Try a setFoo ( boolean )\r
+ } else if ("java.lang.Boolean".equals(paramType.getName())\r
+ || "boolean".equals(paramType.getName())) {\r
+ result = new Boolean(object);\r
+\r
+ // Try a setFoo ( InetAddress )\r
+ } else if ("java.net.InetAddress".equals(paramType\r
+ .getName())) {\r
+ try {\r
+ result = InetAddress.getByName(object);\r
+ } catch (UnknownHostException exc) {\r
+ d("Unable to resolve host name:" + object);\r
+ }\r
+\r
+ // Unknown type\r
+ } else {\r
+ d("Unknown type " + paramType.getName());\r
+ }\r
+ if (result == null) {\r
+ throw new IllegalArgumentException("Can't convert argument: " + object);\r
+ }\r
+ return result;\r
+ }\r
+\r
+ // -------------------- Get property --------------------\r
+ // This provides a layer of abstraction\r
+\r
+ public static interface PropertySource {\r
+\r
+ public String getProperty(String key);\r
+\r
+ }\r
+\r
+ public static interface AttributeHolder {\r
+\r
+ public void setAttribute(String key, Object o);\r
+\r
+ }\r
+\r
+ // debug --------------------\r
+ static final int dbg = 0;\r
+\r
+ static void d(String s) {\r
+ if (log.isDebugEnabled())\r
+ log.debug("IntrospectionUtils: " + s);\r
+ }\r
+}\r
--- /dev/null
+/*
+ * Copyright 1999,2004-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.demos;
+
+import java.io.Serializable;
+import java.util.Random;
+
+import org.apache.catalina.tribes.ByteMessage;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.ManagedChannel;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.Channel;
+import java.io.Externalizable;
+
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Copyright: Copyright (c) 2005</p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class LoadTest implements MembershipListener,ChannelListener, Runnable {
+ protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(LoadTest.class);
+ public static int size = 24000;
+ public static Object mutex = new Object();
+ public boolean doRun = true;
+
+ public long bytesReceived = 0;
+ public float mBytesReceived = 0;
+ public int messagesReceived = 0;
+ public boolean send = true;
+ public boolean debug = false;
+ public int msgCount = 100;
+ ManagedChannel channel=null;
+ public int statsInterval = 10000;
+ public long pause = 0;
+ public boolean breakonChannelException = false;
+ public boolean async = false;
+ public long receiveStart = 0;
+ public int channelOptions = Channel.SEND_OPTIONS_DEFAULT;
+
+ static int messageSize = 0;
+
+ public static long messagesSent = 0;
+ public static long messageStartSendTime = 0;
+ public static long messageEndSendTime = 0;
+ public static int threadCount = 0;
+
+ public static synchronized void startTest() {
+ threadCount++;
+ if ( messageStartSendTime == 0 ) messageStartSendTime = System.currentTimeMillis();
+ }
+
+ public static synchronized void endTest() {
+ threadCount--;
+ if ( messageEndSendTime == 0 && threadCount==0 ) messageEndSendTime = System.currentTimeMillis();
+ }
+
+
+ public static synchronized long addSendStats(long count) {
+ messagesSent+=count;
+ return 0l;
+ }
+
+ private static void printSendStats(long counter, int messageSize) {
+ float cnt = (float)counter;
+ float size = (float)messageSize;
+ float time = (float)(System.currentTimeMillis()-messageStartSendTime) / 1000f;
+ log.info("****SEND STATS-"+Thread.currentThread().getName()+"*****"+
+ "\n\tMessage count:"+counter+
+ "\n\tTotal bytes :"+(long)(size*cnt)+
+ "\n\tTotal seconds:"+(time)+
+ "\n\tBytes/second :"+(size*cnt/time)+
+ "\n\tMBytes/second:"+(size*cnt/time/1024f/1024f));
+ }
+
+
+
+ public LoadTest(ManagedChannel channel,
+ boolean send,
+ int msgCount,
+ boolean debug,
+ long pause,
+ int stats,
+ boolean breakOnEx) {
+ this.channel = channel;
+ this.send = send;
+ this.msgCount = msgCount;
+ this.debug = debug;
+ this.pause = pause;
+ this.statsInterval = stats;
+ this.breakonChannelException = breakOnEx;
+ }
+
+
+
+ public void run() {
+
+ long counter = 0;
+ long total = 0;
+ LoadMessage msg = new LoadMessage();
+ int messageSize = LoadTest.messageSize;
+
+ try {
+ startTest();
+ while (total < msgCount) {
+ if (channel.getMembers().length == 0 || (!send)) {
+ synchronized (mutex) {
+ try {
+ mutex.wait();
+ } catch (InterruptedException x) {
+ log.info("Thread interrupted from wait");
+ }
+ }
+ } else {
+ try {
+ //msg.setMsgNr((int)++total);
+ counter++;
+ if (debug) {
+ printArray(msg.getMessage());
+ }
+ channel.send(channel.getMembers(), msg, channelOptions);
+ if ( pause > 0 ) {
+ if ( debug) System.out.println("Pausing sender for "+pause+" ms.");
+ Thread.sleep(pause);
+ }
+ } catch (ChannelException x) {
+ if ( debug ) log.error("Unable to send message:"+x.getMessage(),x);
+ log.error("Unable to send message:"+x.getMessage());
+ ChannelException.FaultyMember[] faulty = x.getFaultyMembers();
+ for (int i=0; i<faulty.length; i++ ) log.error("Faulty: "+faulty[i]);
+ --counter;
+ if ( this.breakonChannelException ) throw x;
+ }
+ }
+ if ( (counter % statsInterval) == 0 && (counter > 0)) {
+ //add to the global counter
+ counter = addSendStats(counter);
+ //print from the global counter
+ //printSendStats(LoadTest.messagesSent, LoadTest.messageSize, LoadTest.messageSendTime);
+ printSendStats(LoadTest.messagesSent, LoadTest.messageSize);
+
+ }
+
+ }
+ }catch ( Exception x ) {
+ log.error("Captured error while sending:"+x.getMessage());
+ if ( debug ) log.error("",x);
+ printSendStats(LoadTest.messagesSent, LoadTest.messageSize);
+ }
+ endTest();
+ }
+
+
+
+ /**
+ * memberAdded
+ *
+ * @param member Member
+ * @todo Implement this org.apache.catalina.tribes.MembershipListener
+ * method
+ */
+ public void memberAdded(Member member) {
+ log.info("Member added:"+member);
+ synchronized (mutex) {
+ mutex.notifyAll();
+ }
+ }
+
+ /**
+ * memberDisappeared
+ *
+ * @param member Member
+ * @todo Implement this org.apache.catalina.tribes.MembershipListener
+ * method
+ */
+ public void memberDisappeared(Member member) {
+ log.info("Member disappeared:"+member);
+ }
+
+ public boolean accept(Serializable msg, Member mbr){
+ return (msg instanceof LoadMessage) || (msg instanceof ByteMessage);
+ }
+
+ public void messageReceived(Serializable msg, Member mbr){
+ if ( receiveStart == 0 ) receiveStart = System.currentTimeMillis();
+ if ( debug ) {
+ if ( msg instanceof LoadMessage ) {
+ printArray(((LoadMessage)msg).getMessage());
+ }
+ }
+
+ if ( msg instanceof ByteMessage && !(msg instanceof LoadMessage)) {
+ LoadMessage tmp = new LoadMessage();
+ tmp.setMessage(((ByteMessage)msg).getMessage());
+ msg = tmp;
+ tmp = null;
+ }
+
+
+ bytesReceived+=((LoadMessage)msg).getMessage().length;
+ mBytesReceived+=((float)((LoadMessage)msg).getMessage().length)/1024f/1024f;
+ messagesReceived++;
+ if ( (messagesReceived%statsInterval)==0 || (messagesReceived==msgCount)) {
+ float bytes = (float)(((LoadMessage)msg).getMessage().length*messagesReceived);
+ float seconds = ((float)(System.currentTimeMillis()-receiveStart)) / 1000f;
+ log.info("****RECEIVE STATS-"+Thread.currentThread().getName()+"*****"+
+ "\n\tMessage count :"+(long)messagesReceived+
+ "\n\tTotal bytes :"+(long)bytes+
+ "\n\tTotal mbytes :"+(long)mBytesReceived+
+ "\n\tTime since 1st:"+seconds+" seconds"+
+ "\n\tBytes/second :"+(bytes/seconds)+
+ "\n\tMBytes/second :"+(mBytesReceived/seconds)+"\n");
+
+ }
+ }
+
+
+ public static void printArray(byte[] data) {
+ System.out.print("{");
+ for (int i=0; i<data.length; i++ ) {
+ System.out.print(data[i]);
+ System.out.print(",");
+ }
+ System.out.println("} size:"+data.length);
+ }
+
+
+
+ //public static class LoadMessage implements Serializable {
+ public static class LoadMessage extends ByteMessage implements Serializable {
+
+ public static byte[] outdata = new byte[size];
+ public static Random r = new Random(System.currentTimeMillis());
+ public static int getMessageSize (LoadMessage msg) {
+ int messageSize = msg.getMessage().length;
+ if ( ((Object)msg) instanceof ByteMessage ) return messageSize;
+ try {
+ messageSize = XByteBuffer.serialize(new LoadMessage()).length;
+ log.info("Average message size:" + messageSize + " bytes");
+ } catch (Exception x) {
+ log.error("Unable to calculate test message size.", x);
+ }
+ return messageSize;
+ }
+ static {
+ r.nextBytes(outdata);
+ }
+
+ protected byte[] message = getMessage();
+
+ public LoadMessage() {
+ }
+
+ public byte[] getMessage() {
+ if ( message == null ) {
+ message = outdata;
+ }
+ return message;
+ }
+
+ public void setMessage(byte[] data) {
+ this.message = data;
+ }
+ }
+
+ public static void usage() {
+ System.out.println("Tribes Load tester.");
+ System.out.println("The load tester can be used in sender or received mode or both");
+ System.out.println("Usage:\n\t"+
+ "java LoadTest [options]\n\t"+
+ "Options:\n\t\t"+
+ "[-mode receive|send|both] \n\t\t"+
+ "[-startoptions startflags (default is Channel.DEFAULT) ] \n\t\t"+
+ "[-debug] \n\t\t"+
+ "[-count messagecount] \n\t\t"+
+ "[-stats statinterval] \n\t\t"+
+ "[-pause nrofsecondstopausebetweensends] \n\t\t"+
+ "[-threads numberofsenderthreads] \n\t\t"+
+ "[-size messagesize] \n\t\t"+
+ "[-sendoptions channeloptions] \n\t\t"+
+ "[-break (halts execution on exception)]\n"+
+ "[-shutdown (issues a channel.stop() command after send is completed)]\n"+
+ "\tChannel options:"+
+ ChannelCreator.usage()+"\n\n"+
+ "Example:\n\t"+
+ "java LoadTest -port 4004\n\t"+
+ "java LoadTest -bind 192.168.0.45 -port 4005\n\t"+
+ "java LoadTest -bind 192.168.0.45 -port 4005 -mbind 192.168.0.45 -count 100 -stats 10\n");
+ }
+
+ public static void main(String[] args) throws Exception {
+ boolean send = true;
+ boolean debug = false;
+ long pause = 0;
+ int count = 1000000;
+ int stats = 10000;
+ boolean breakOnEx = false;
+ int threads = 1;
+ boolean shutdown = false;
+ int startoptions = Channel.DEFAULT;
+ int channelOptions = Channel.SEND_OPTIONS_DEFAULT;
+ if ( args.length == 0 ) {
+ args = new String[] {"-help"};
+ }
+ for (int i = 0; i < args.length; i++) {
+ if ("-threads".equals(args[i])) {
+ threads = Integer.parseInt(args[++i]);
+ } else if ("-count".equals(args[i])) {
+ count = Integer.parseInt(args[++i]);
+ System.out.println("Sending "+count+" messages.");
+ } else if ("-pause".equals(args[i])) {
+ pause = Long.parseLong(args[++i])*1000;
+ } else if ("-break".equals(args[i])) {
+ breakOnEx = true;
+ } else if ("-shutdown".equals(args[i])) {
+ shutdown = true;
+ } else if ("-stats".equals(args[i])) {
+ stats = Integer.parseInt(args[++i]);
+ System.out.println("Stats every "+stats+" message");
+ } else if ("-sendoptions".equals(args[i])) {
+ channelOptions = Integer.parseInt(args[++i]);
+ System.out.println("Setting send options to "+channelOptions);
+ } else if ("-startoptions".equals(args[i])) {
+ startoptions = Integer.parseInt(args[++i]);
+ System.out.println("Setting start options to "+startoptions);
+ } else if ("-size".equals(args[i])) {
+ size = Integer.parseInt(args[++i])-4;
+ System.out.println("Message size will be:"+(size+4)+" bytes");
+ } else if ("-mode".equals(args[i])) {
+ if ( "receive".equals(args[++i]) ) send = false;
+ } else if ("-debug".equals(args[i])) {
+ debug = true;
+ } else if ("-help".equals(args[i]))
+ {
+ usage();
+ System.exit(1);
+ }
+ }
+
+ ManagedChannel channel = (ManagedChannel)ChannelCreator.createChannel(args);
+
+ LoadTest test = new LoadTest(channel,send,count,debug,pause,stats,breakOnEx);
+ test.channelOptions = channelOptions;
+ LoadMessage msg = new LoadMessage();
+
+ messageSize = LoadMessage.getMessageSize(msg);
+ channel.addChannelListener(test);
+ channel.addMembershipListener(test);
+ channel.start(startoptions);
+ Runtime.getRuntime().addShutdownHook(new Shutdown(channel));
+ while ( threads > 1 ) {
+ Thread t = new Thread(test);
+ t.setDaemon(true);
+ t.start();
+ threads--;
+ test = new LoadTest(channel,send,count,debug,pause,stats,breakOnEx);
+ test.channelOptions = channelOptions;
+ }
+ test.run();
+ if ( shutdown && send ) channel.stop(channel.DEFAULT);
+ System.out.println("System test complete, sleeping to let threads finish.");
+ Thread.sleep(60*1000*60);
+ }
+
+ public static class Shutdown extends Thread {
+ ManagedChannel channel = null;
+ public Shutdown(ManagedChannel channel) {
+ this.channel = channel;
+ }
+
+ public void run() {
+ System.out.println("Shutting down...");
+ SystemExit exit = new SystemExit(5000);
+ exit.setDaemon(true);
+ exit.start();
+ try {
+ channel.stop(channel.DEFAULT);
+
+ }catch ( Exception x ) {
+ x.printStackTrace();
+ }
+ System.out.println("Channel stopped.");
+ }
+ }
+ public static class SystemExit extends Thread {
+ private long delay;
+ public SystemExit(long delay) {
+ this.delay = delay;
+ }
+ public void run () {
+ try {
+ Thread.sleep(delay);
+ }catch ( Exception x ) {
+ x.printStackTrace();
+ }
+ System.exit(0);
+
+ }
+ }
+
+}
\ No newline at end of file
--- /dev/null
+package org.apache.catalina.tribes.demos;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import java.awt.ComponentOrientation;
+import java.awt.Dimension;
+import java.awt.event.ActionEvent;
+import java.awt.event.ActionListener;
+import java.awt.event.MouseAdapter;
+import java.awt.event.MouseEvent;
+import javax.swing.BoxLayout;
+import javax.swing.JButton;
+import javax.swing.JFrame;
+import javax.swing.JPanel;
+import javax.swing.JScrollPane;
+import javax.swing.JTable;
+import javax.swing.JTextField;
+import javax.swing.table.AbstractTableModel;
+import javax.swing.table.TableModel;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.ManagedChannel;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.tipis.AbstractReplicatedMap;
+import org.apache.catalina.tribes.tipis.LazyReplicatedMap;
+import javax.swing.table.DefaultTableCellRenderer;
+import java.awt.Color;
+import java.awt.Component;
+import javax.swing.table.TableColumn;
+import org.apache.catalina.tribes.util.UUIDGenerator;
+import org.apache.catalina.tribes.util.Arrays;
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Copyright: Copyright (c) 2005</p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class MapDemo implements ChannelListener, MembershipListener{
+
+ protected LazyReplicatedMap map;
+ protected SimpleTableDemo table;
+
+ public MapDemo(Channel channel, String mapName ) {
+ map = new LazyReplicatedMap(null,channel,5000, mapName,null);
+ table = SimpleTableDemo.createAndShowGUI(map,channel.getLocalMember(false).getName());
+ channel.addChannelListener(this);
+ channel.addMembershipListener(this);
+// for ( int i=0; i<1000; i++ ) {
+// map.put("MyKey-"+i,"My String Value-"+i);
+// }
+ this.messageReceived(null,null);
+ }
+
+ public boolean accept(Serializable msg, Member source) {
+ table.dataModel.getValueAt(-1,-1);
+ return false;
+ }
+
+ public void messageReceived(Serializable msg, Member source) {
+
+ }
+
+ public void memberAdded(Member member) {
+ }
+ public void memberDisappeared(Member member) {
+ table.dataModel.getValueAt(-1,-1);
+ }
+
+ public static void usage() {
+ System.out.println("Tribes MapDemo.");
+ System.out.println("Usage:\n\t" +
+ "java MapDemo [channel options] mapName\n\t" +
+ "\tChannel options:" +
+ ChannelCreator.usage());
+ }
+
+ public static void main(String[] args) throws Exception {
+ long start = System.currentTimeMillis();
+ ManagedChannel channel = (ManagedChannel) ChannelCreator.createChannel(args);
+ String mapName = "MapDemo";
+ if ( args.length > 0 && (!args[args.length-1].startsWith("-"))) {
+ mapName = args[args.length-1];
+ }
+ channel.start(channel.DEFAULT);
+ Runtime.getRuntime().addShutdownHook(new Shutdown(channel));
+ MapDemo demo = new MapDemo(channel,mapName);
+
+ System.out.println("System test complete, time to start="+(System.currentTimeMillis()-start)+" ms. Sleeping to let threads finish.");
+ Thread.sleep(60 * 1000 * 60);
+ }
+
+ public static class Shutdown
+ extends Thread {
+ ManagedChannel channel = null;
+ public Shutdown(ManagedChannel channel) {
+ this.channel = channel;
+ }
+
+ public void run() {
+ System.out.println("Shutting down...");
+ SystemExit exit = new SystemExit(5000);
+ exit.setDaemon(true);
+ exit.start();
+ try {
+ channel.stop(channel.DEFAULT);
+
+ } catch (Exception x) {
+ x.printStackTrace();
+ }
+ System.out.println("Channel stopped.");
+ }
+ }
+
+ public static class SystemExit
+ extends Thread {
+ private long delay;
+ public SystemExit(long delay) {
+ this.delay = delay;
+ }
+
+ public void run() {
+ try {
+ Thread.sleep(delay);
+ } catch (Exception x) {
+ x.printStackTrace();
+ }
+ System.exit(0);
+
+ }
+ }
+
+ public static class SimpleTableDemo
+ extends JPanel implements ActionListener{
+ private static int WIDTH = 550;
+
+ private LazyReplicatedMap map;
+ private boolean DEBUG = false;
+ AbstractTableModel dataModel = new AbstractTableModel() {
+
+
+ String[] columnNames = {
+ "Key",
+ "Value",
+ "Backup Node",
+ "isPrimary",
+ "isProxy",
+ "isBackup"};
+
+ public int getColumnCount() { return columnNames.length; }
+
+ public int getRowCount() {return map.sizeFull() +1; }
+
+ public StringBuffer getMemberNames(Member[] members){
+ StringBuffer buf = new StringBuffer();
+ if ( members!=null ) {
+ for (int i=0;i<members.length; i++ ) {
+ buf.append(members[i].getName());
+ buf.append("; ");
+ }
+ }
+ return buf;
+ }
+
+ public Object getValueAt(int row, int col) {
+ if ( row==-1 ) {
+ update();
+ return "";
+ }
+ if ( row == 0 ) return columnNames[col];
+ Object[] entries = map.entrySetFull().toArray();
+ Map.Entry e = (Map.Entry)entries [row-1];
+ LazyReplicatedMap.MapEntry entry = (LazyReplicatedMap.MapEntry)e.getValue();
+ switch (col) {
+ case 0: return entry.getKey();
+ case 1: return entry.getValue();
+ case 2: return getMemberNames(entry.getBackupNodes());
+ case 3: return new Boolean(entry.isPrimary());
+ case 4: return new Boolean(entry.isProxy());
+ case 5: return new Boolean(entry.isBackup());
+ default: return "";
+ }
+
+ }
+
+ public void update() {
+ fireTableDataChanged();
+ }
+ };
+
+ JTextField txtAddKey = new JTextField(20);
+ JTextField txtAddValue = new JTextField(20);
+ JTextField txtRemoveKey = new JTextField(20);
+ JTextField txtChangeKey = new JTextField(20);
+ JTextField txtChangeValue = new JTextField(20);
+
+ JTable table = null;
+ public SimpleTableDemo(LazyReplicatedMap map) {
+ super();
+ this.map = map;
+
+ this.setComponentOrientation(ComponentOrientation.LEFT_TO_RIGHT);
+
+ //final JTable table = new JTable(data, columnNames);
+ table = new JTable(dataModel);
+
+ table.setPreferredScrollableViewportSize(new Dimension(WIDTH, 150));
+ for ( int i=0; i<table.getColumnCount(); i++ ) {
+ TableColumn tm = table.getColumnModel().getColumn(i);
+ tm.setCellRenderer(new ColorRenderer());
+ }
+
+
+ if (DEBUG) {
+ table.addMouseListener(new MouseAdapter() {
+ public void mouseClicked(MouseEvent e) {
+ printDebugData(table);
+ }
+ });
+ }
+
+ //setLayout(new GridLayout(5, 0));
+ setLayout(new BoxLayout(this, BoxLayout.Y_AXIS));
+
+ //Create the scroll pane and add the table to it.
+ JScrollPane scrollPane = new JScrollPane(table);
+
+ //Add the scroll pane to this panel.
+ add(scrollPane);
+
+ //create a add value button
+ JPanel addpanel = new JPanel();
+ addpanel.setPreferredSize(new Dimension(WIDTH,30));
+ addpanel.add(createButton("Add","add"));
+ addpanel.add(txtAddKey);
+ addpanel.add(txtAddValue);
+ addpanel.setMaximumSize(new Dimension(WIDTH,30));
+ add(addpanel);
+
+ //create a remove value button
+ JPanel removepanel = new JPanel( );
+ removepanel.setPreferredSize(new Dimension(WIDTH,30));
+ removepanel.add(createButton("Remove","remove"));
+ removepanel.add(txtRemoveKey);
+ removepanel.setMaximumSize(new Dimension(WIDTH,30));
+ add(removepanel);
+
+ //create a change value button
+ JPanel changepanel = new JPanel( );
+ changepanel.add(createButton("Change","change"));
+ changepanel.add(txtChangeKey);
+ changepanel.add(txtChangeValue);
+ changepanel.setPreferredSize(new Dimension(WIDTH,30));
+ changepanel.setMaximumSize(new Dimension(WIDTH,30));
+ add(changepanel);
+
+
+ //create sync button
+ JPanel syncpanel = new JPanel( );
+ syncpanel.add(createButton("Synchronize","sync"));
+ syncpanel.add(createButton("Replicate","replicate"));
+ syncpanel.add(createButton("Random","random"));
+ syncpanel.setPreferredSize(new Dimension(WIDTH,30));
+ syncpanel.setMaximumSize(new Dimension(WIDTH,30));
+ add(syncpanel);
+
+
+ }
+
+ public JButton createButton(String text, String command) {
+ JButton button = new JButton(text);
+ button.setActionCommand(command);
+ button.addActionListener(this);
+ return button;
+ }
+
+ public void actionPerformed(ActionEvent e) {
+ System.out.println(e.getActionCommand());
+ if ( "add".equals(e.getActionCommand()) ) {
+ System.out.println("Add key:"+txtAddKey.getText()+" value:"+txtAddValue.getText());
+ map.put(txtAddKey.getText(),new StringBuffer(txtAddValue.getText()));
+ }
+ if ( "change".equals(e.getActionCommand()) ) {
+ System.out.println("Change key:"+txtChangeKey.getText()+" value:"+txtChangeValue.getText());
+ StringBuffer buf = (StringBuffer)map.get(txtChangeKey.getText());
+ if ( buf!=null ) {
+ buf.delete(0,buf.length());
+ buf.append(txtChangeValue.getText());
+ map.replicate(txtChangeKey.getText(),true);
+ } else {
+ buf = new StringBuffer();
+ buf.append(txtChangeValue.getText());
+ map.put(txtChangeKey.getText(),buf);
+ }
+ }
+ if ( "remove".equals(e.getActionCommand()) ) {
+ System.out.println("Remove key:"+txtRemoveKey.getText());
+ map.remove(txtRemoveKey.getText());
+ }
+ if ( "sync".equals(e.getActionCommand()) ) {
+ System.out.println("Syncing from another node.");
+ map.transferState();
+ }
+ if ( "random".equals(e.getActionCommand()) ) {
+ Thread t = new Thread() {
+ public void run() {
+ for (int i = 0; i < 100; i++) {
+ String key = Arrays.toString(UUIDGenerator.randomUUID(false));
+ map.put(key, key);
+ dataModel.fireTableDataChanged();
+ table.paint(table.getGraphics());
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException x) {
+ Thread.currentThread().interrupted();
+ }
+ }
+ }
+ };
+ t.start();
+ }
+
+ if ( "replicate".equals(e.getActionCommand()) ) {
+ System.out.println("Replicating out to the other nodes.");
+ map.replicate(true);
+ }
+ dataModel.getValueAt(-1,-1);
+ }
+
+ private void printDebugData(JTable table) {
+ int numRows = table.getRowCount();
+ int numCols = table.getColumnCount();
+ javax.swing.table.TableModel model = table.getModel();
+
+ System.out.println("Value of data: ");
+ for (int i = 0; i < numRows; i++) {
+ System.out.print(" row " + i + ":");
+ for (int j = 0; j < numCols; j++) {
+ System.out.print(" " + model.getValueAt(i, j));
+ }
+ System.out.println();
+ }
+ System.out.println("--------------------------");
+ }
+
+ /**
+ * Create the GUI and show it. For thread safety,
+ * this method should be invoked from the
+ * event-dispatching thread.
+ */
+ public static SimpleTableDemo createAndShowGUI(LazyReplicatedMap map, String title) {
+ //Make sure we have nice window decorations.
+ JFrame.setDefaultLookAndFeelDecorated(true);
+
+ //Create and set up the window.
+ JFrame frame = new JFrame("SimpleTableDemo - "+title);
+ frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
+
+ //Create and set up the content pane.
+ SimpleTableDemo newContentPane = new SimpleTableDemo(map);
+ newContentPane.setOpaque(true); //content panes must be opaque
+ frame.setContentPane(newContentPane);
+
+ //Display the window.
+ frame.setSize(450,250);
+ newContentPane.setSize(450,300);
+ frame.pack();
+ frame.setVisible(true);
+ return newContentPane;
+ }
+ }
+
+ static class ColorRenderer extends DefaultTableCellRenderer {
+
+ public ColorRenderer() {
+ super();
+ }
+
+ public Component getTableCellRendererComponent
+ (JTable table, Object value, boolean isSelected,
+ boolean hasFocus, int row, int column) {
+ Component cell = super.getTableCellRendererComponent
+ (table, value, isSelected, hasFocus, row, column);
+ cell.setBackground(Color.WHITE);
+ if ( row > 0 ) {
+ Color color = null;
+ boolean primary = ( (Boolean) table.getValueAt(row, 3)).booleanValue();
+ boolean proxy = ( (Boolean) table.getValueAt(row, 4)).booleanValue();
+ boolean backup = ( (Boolean) table.getValueAt(row, 5)).booleanValue();
+ if (primary) color = Color.GREEN;
+ else if (proxy) color = Color.RED;
+ else if (backup) color = Color.BLUE;
+ if ( color != null ) cell.setBackground(color);
+ }
+// System.out.println("Row:"+row+" Column:"+column+" Color:"+cell.getBackground());
+// cell.setBackground(bkgndColor);
+// cell.setForeground(fgndColor);
+
+ return cell;
+ }
+
+
+ }
+
+
+}