From: fhanik Date: Fri, 8 Sep 2006 15:42:08 +0000 (+0000) Subject: Adding in unit tests, yell at me if the location should be elsewhere X-Git-Url: https://git.internetallee.de/?a=commitdiff_plain;h=2feaf9a9ee9993f89bbe1b63b9830f99622e9c88;p=tomcat7.0 Adding in unit tests, yell at me if the location should be elsewhere git-svn-id: https://svn.apache.org/repos/asf/tomcat/tc6.0.x/trunk@441543 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/java/org/apache/catalina/tribes/ChannelReceiver.java b/java/org/apache/catalina/tribes/ChannelReceiver.java index 346092ea9..2c7eea8fe 100644 --- a/java/org/apache/catalina/tribes/ChannelReceiver.java +++ b/java/org/apache/catalina/tribes/ChannelReceiver.java @@ -52,6 +52,12 @@ public interface ChannelReceiver extends Heartbeat { public int getPort(); /** + * Returns the secure listening port + * @return port, -1 if a secure port is not activated + */ + public int getSecurePort(); + + /** * Sets the message listener to receive notification of incoming * @param listener MessageListener * @see MessageListener diff --git a/java/org/apache/catalina/tribes/Member.java b/java/org/apache/catalina/tribes/Member.java index 622bab842..566493bdd 100644 --- a/java/org/apache/catalina/tribes/Member.java +++ b/java/org/apache/catalina/tribes/Member.java @@ -52,10 +52,19 @@ public interface Member { /** * Returns the listen port for the ChannelReceiver implementation - * @return IPv4 or IPv6 representation of the host address this member listens to incoming data + * @return the listen port for this member, -1 if its not listening on an unsecure port * @see ChannelReceiver */ public int getPort(); + + /** + * Returns the secure listen port for the ChannelReceiver implementation. + * Returns -1 if its not listening to a secure port. + * @return the listen port for this member, -1 if its not listening on a secure port + * @see ChannelReceiver + */ + public int getSecurePort(); + /** * Contains information on how long this member has been online. diff --git a/java/org/apache/catalina/tribes/membership/MemberImpl.java b/java/org/apache/catalina/tribes/membership/MemberImpl.java index cd9f00fd7..eeae00040 100644 --- a/java/org/apache/catalina/tribes/membership/MemberImpl.java +++ b/java/org/apache/catalina/tribes/membership/MemberImpl.java @@ -1,570 +1,596 @@ -/* - * Copyright 1999,2004-2005 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.catalina.tribes.membership; - -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.Arrays; - -import org.apache.catalina.tribes.Member; -import org.apache.catalina.tribes.io.XByteBuffer; -import org.apache.catalina.tribes.transport.SenderState; - -/** - * A membership implementation using simple multicast. - * This is the representation of a multicast member. - * Carries the host, and port of the this or other cluster nodes. - * - * @author Filip Hanik - * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $ - */ -public class MemberImpl implements Member, java.io.Externalizable { - - /** - * Public properties specific to this implementation - */ - public static final transient String TCP_LISTEN_PORT = "tcpListenPort"; - public static final transient String TCP_LISTEN_HOST = "tcpListenHost"; - public static final transient String MEMBER_NAME = "memberName"; - - public static final transient byte[] TRIBES_MBR_BEGIN = new byte[] {84, 82, 73, 66, 69, 83, 45, 66}; - public static final transient byte[] TRIBES_MBR_END = new byte[] {84, 82, 73, 66, 69, 83, 45, 69}; - - /** - * The listen host for this member - */ - protected byte[] host; - protected transient String hostname; - /** - * The tcp listen port for this member - */ - protected int port; - - /** - * Counter for how many broadcast messages have been sent from this member - */ - protected int msgCount = 0; - /** - * The number of milliseconds since this members was - * created, is kept track of using the start time - */ - protected long memberAliveTime = 0; - - /** - * For the local member only - */ - protected transient long serviceStartTime; - - /** - * To avoid serialization over and over again, once the local dataPkg - * has been set, we use that to transmit data - */ - protected transient byte[] dataPkg = null; - - /** - * Unique session Id for this member - */ - protected byte[] uniqueId = new byte[16]; - - /** - * Custom payload that an app framework can broadcast - * Also used to transport stop command. - */ - protected byte[] payload = new byte[0]; - - /** - * Command, so that the custom payload doesn't have to be used - * This is for internal tribes use, such as SHUTDOWN_COMMAND - */ - protected byte[] command = new byte[0]; - - /** - * Domain if we want to filter based on domain. - */ - protected byte[] domain = new byte[0]; - - /** - * Empty constructor for serialization - */ - public MemberImpl() { - - } - - /** - * Construct a new member object - * @param name - the name of this member, cluster unique - * @param domain - the cluster domain name of this member - * @param host - the tcp listen host - * @param port - the tcp listen port - */ - public MemberImpl(String host, - int port, - long aliveTime) throws IOException { - setHostname(host); - this.port = port; - this.memberAliveTime=aliveTime; - } - - public MemberImpl(String host, - int port, - long aliveTime, - byte[] payload) throws IOException { - this(host,port,aliveTime); - setPayload(payload); - } - - public boolean isReady() { - return SenderState.getSenderState(this).isReady(); - } - public boolean isSuspect() { - return SenderState.getSenderState(this).isSuspect(); - } - public boolean isFailing() { - return SenderState.getSenderState(this).isFailing(); - } - - /** - * Increment the message count. - */ - protected void inc() { - msgCount++; - } - - /** - * Create a data package to send over the wire representing this member. - * This is faster than serialization. - * @return - the bytes for this member deserialized - * @throws Exception - */ - public byte[] getData() { - return getData(true); - } - /** - * Highly optimized version of serializing a member into a byte array - * Returns a cached byte[] reference, do not modify this data - * @param getalive boolean - * @return byte[] - */ - public byte[] getData(boolean getalive) { - return getData(getalive,false); - } - - - public int getDataLength() { - return TRIBES_MBR_BEGIN.length+ //start pkg - 4+ //data length - 8+ //alive time - 4+ //port - 1+ //host length - host.length+ //host - 4+ //command length - command.length+ //command - 4+ //domain length - domain.length+ //domain - 16+ //unique id - 4+ //payload length - payload.length+ //payload - TRIBES_MBR_END.length; //end pkg - } - - /** - * - * @param getalive boolean - calculate memberAlive time - * @param reset boolean - reset the cached data package, and create a new one - * @return byte[] - */ - public byte[] getData(boolean getalive, boolean reset) { - if ( reset ) dataPkg = null; - //look in cache first - if ( dataPkg!=null ) { - if ( getalive ) { - //you'd be surprised, but System.currentTimeMillis - //shows up on the profiler - long alive=System.currentTimeMillis()-getServiceStartTime(); - XByteBuffer.toBytes( (long) alive, dataPkg, TRIBES_MBR_BEGIN.length+4); - } - return dataPkg; - } - - //package looks like - //start package TRIBES_MBR_BEGIN.length - //package length - 4 bytes - //alive - 8 bytes - //port - 4 bytes - //host length - 1 byte - //host - hl bytes - //clen - 4 bytes - //command - clen bytes - //dlen - 4 bytes - //domain - dlen bytes - //uniqueId - 16 bytes - //payload length - 4 bytes - //payload plen bytes - //end package TRIBES_MBR_END.length - byte[] addr = host; - long alive=System.currentTimeMillis()-getServiceStartTime(); - byte hl = (byte)addr.length; - byte[] data = new byte[getDataLength()]; - - int bodylength = (getDataLength() - TRIBES_MBR_BEGIN.length - TRIBES_MBR_END.length - 4); - - int pos = 0; - - //TRIBES_MBR_BEGIN - System.arraycopy(TRIBES_MBR_BEGIN,0,data,pos,TRIBES_MBR_BEGIN.length); - pos += TRIBES_MBR_BEGIN.length; - - //body length - XByteBuffer.toBytes(bodylength,data,pos); - pos += 4; - - //alive data - XByteBuffer.toBytes((long)alive,data,pos); - pos += 8; - //port - XByteBuffer.toBytes(port,data,pos); - pos += 4; - //host length - data[pos++] = hl; - //host - System.arraycopy(addr,0,data,pos,addr.length); - pos+=addr.length; - //clen - 4 bytes - XByteBuffer.toBytes(command.length,data,pos); - pos+=4; - //command - clen bytes - System.arraycopy(command,0,data,pos,command.length); - pos+=command.length; - //dlen - 4 bytes - XByteBuffer.toBytes(domain.length,data,pos); - pos+=4; - //domain - dlen bytes - System.arraycopy(domain,0,data,pos,domain.length); - pos+=domain.length; - //unique Id - System.arraycopy(uniqueId,0,data,pos,uniqueId.length); - pos+=uniqueId.length; - //payload - XByteBuffer.toBytes(payload.length,data,pos); - pos+=4; - System.arraycopy(payload,0,data,pos,payload.length); - pos+=payload.length; - - //TRIBES_MBR_END - System.arraycopy(TRIBES_MBR_END,0,data,pos,TRIBES_MBR_END.length); - pos += TRIBES_MBR_END.length; - - //create local data - dataPkg = data; - return data; - } - /** - * Deserializes a member from data sent over the wire - * @param data - the bytes received - * @return a member object. - */ - public static MemberImpl getMember(byte[] data, MemberImpl member) { - return getMember(data,0,data.length,member); - } - - public static MemberImpl getMember(byte[] data, int offset, int length, MemberImpl member) { - //package looks like - //start package TRIBES_MBR_BEGIN.length - //package length - 4 bytes - //alive - 8 bytes - //port - 4 bytes - //host length - 1 byte - //host - hl bytes - //clen - 4 bytes - //command - clen bytes - //dlen - 4 bytes - //domain - dlen bytes - //uniqueId - 16 bytes - //payload length - 4 bytes - //payload plen bytes - //end package TRIBES_MBR_END.length - - int pos = offset; - - if (XByteBuffer.firstIndexOf(data,offset,TRIBES_MBR_BEGIN)!=pos) { - throw new IllegalArgumentException("Invalid package, should start with:"+org.apache.catalina.tribes.util.Arrays.toString(TRIBES_MBR_BEGIN)); - } - - if ( length < (TRIBES_MBR_BEGIN.length+4) ) { - throw new ArrayIndexOutOfBoundsException("Member package to small to validate."); - } - - pos += TRIBES_MBR_BEGIN.length; - - int bodylength = XByteBuffer.toInt(data,pos); - pos += 4; - - if ( length < (bodylength+4+TRIBES_MBR_BEGIN.length+TRIBES_MBR_END.length) ) { - throw new ArrayIndexOutOfBoundsException("Not enough bytes in member package."); - } - - int endpos = pos+bodylength; - if (XByteBuffer.firstIndexOf(data,endpos,TRIBES_MBR_END)!=endpos) { - throw new IllegalArgumentException("Invalid package, should end with:"+org.apache.catalina.tribes.util.Arrays.toString(TRIBES_MBR_END)); - } - - - byte[] alived = new byte[8]; - System.arraycopy(data, pos, alived, 0, 8); - pos += 8; - byte[] portd = new byte[4]; - System.arraycopy(data, pos, portd, 0, 4); - pos += 4; - - byte hl = data[pos++]; - byte[] addr = new byte[hl]; - System.arraycopy(data, pos, addr, 0, hl); - pos += hl; - - int cl = XByteBuffer.toInt(data, pos); - pos += 4; - - byte[] command = new byte[cl]; - System.arraycopy(data, pos, command, 0, command.length); - pos += command.length; - - int dl = XByteBuffer.toInt(data, pos); - pos += 4; - - byte[] domain = new byte[dl]; - System.arraycopy(data, pos, domain, 0, domain.length); - pos += domain.length; - - byte[] uniqueId = new byte[16]; - System.arraycopy(data, pos, uniqueId, 0, 16); - pos += 16; - - int pl = XByteBuffer.toInt(data, pos); - pos += 4; - - byte[] payload = new byte[pl]; - System.arraycopy(data, pos, payload, 0, payload.length); - pos += payload.length; - - member.setHost(addr); - member.setPort(XByteBuffer.toInt(portd, 0)); - member.setMemberAliveTime(XByteBuffer.toLong(alived, 0)); - member.setUniqueId(uniqueId); - member.payload = payload; - member.domain = domain; - member.command = command; - - member.dataPkg = new byte[length]; - System.arraycopy(data, offset, member.dataPkg, 0, length); - - return member; - } - - public static MemberImpl getMember(byte[] data) { - return getMember(data,new MemberImpl()); - } - - /** - * Return the name of this object - * @return a unique name to the cluster - */ - public String getName() { - return "tcp://"+getHostname()+":"+getPort(); - } - - /** - * Return the listen port of this member - * @return - tcp listen port - */ - public int getPort() { - return this.port; - } - - /** - * Return the TCP listen host for this member - * @return IP address or host name - */ - public byte[] getHost() { - return host; - } - - public String getHostname() { - if ( this.hostname != null ) return hostname; - else { - try { - this.hostname = java.net.InetAddress.getByAddress(host).getHostName(); - return this.hostname; - }catch ( IOException x ) { - throw new RuntimeException("Unable to parse hostname.",x); - } - } - } - - /** - * Contains information on how long this member has been online. - * The result is the number of milli seconds this member has been - * broadcasting its membership to the cluster. - * @return nr of milliseconds since this member started. - */ - public long getMemberAliveTime() { - return memberAliveTime; - } - - public long getServiceStartTime() { - return serviceStartTime; - } - - public byte[] getUniqueId() { - return uniqueId; - } - - public byte[] getPayload() { - return payload; - } - - public byte[] getCommand() { - return command; - } - - public byte[] getDomain() { - return domain; - } - - public void setMemberAliveTime(long time) { - memberAliveTime=time; - } - - - - /** - * String representation of this object - */ - public String toString() { - StringBuffer buf = new StringBuffer("org.apache.catalina.tribes.membership.MemberImpl["); - buf.append(getName()).append(","); - buf.append(getHostname()).append(","); - buf.append(port).append(", alive="); - buf.append(memberAliveTime).append(","); - buf.append("id=").append(bToS(this.uniqueId)).append(", "); - buf.append("payload=").append(bToS(this.payload,8)).append(", "); - buf.append("command=").append(bToS(this.command,8)).append(", "); - buf.append("domain=").append(bToS(this.domain,8)).append(", "); - buf.append("]"); - return buf.toString(); - } - public static String bToS(byte[] data) { - return bToS(data,data.length); - } - public static String bToS(byte[] data, int max) { - StringBuffer buf = new StringBuffer(4*16); - buf.append("{"); - for (int i=0; data!=null && i McastServiceImpl.MAX_PACKET_SIZE ) { - this.payload = oldpayload; - throw new IllegalArgumentException("Payload is to large for tribes to handle."); - } - - } - - public void setCommand(byte[] command) { - this.command = command!=null?command:new byte[0]; - getData(true,true); - } - - public void setDomain(byte[] domain) { - this.domain = domain!=null?domain:new byte[0]; - getData(true,true); - } - - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - int length = in.readInt(); - byte[] message = new byte[length]; - in.read(message); - getMember(message,this); - - } - - public void writeExternal(ObjectOutput out) throws IOException { - byte[] data = this.getData(); - out.writeInt(data.length); - out.write(data); - } - -} +/* + * Copyright 1999,2004-2005 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.catalina.tribes.membership; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Arrays; + +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.io.XByteBuffer; +import org.apache.catalina.tribes.transport.SenderState; + +/** + * A membership implementation using simple multicast. + * This is the representation of a multicast member. + * Carries the host, and port of the this or other cluster nodes. + * + * @author Filip Hanik + * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $ + */ +public class MemberImpl implements Member, java.io.Externalizable { + + /** + * Public properties specific to this implementation + */ + public static final transient String TCP_LISTEN_PORT = "tcpListenPort"; + public static final transient String TCP_LISTEN_HOST = "tcpListenHost"; + public static final transient String MEMBER_NAME = "memberName"; + + public static final transient byte[] TRIBES_MBR_BEGIN = new byte[] {84, 82, 73, 66, 69, 83, 45, 66}; + public static final transient byte[] TRIBES_MBR_END = new byte[] {84, 82, 73, 66, 69, 83, 45, 69}; + + /** + * The listen host for this member + */ + protected byte[] host; + protected transient String hostname; + /** + * The tcp listen port for this member + */ + protected int port; + + /** + * The tcp/SSL listen port for this member + */ + protected int securePort = -1; + + /** + * Counter for how many broadcast messages have been sent from this member + */ + protected int msgCount = 0; + /** + * The number of milliseconds since this members was + * created, is kept track of using the start time + */ + protected long memberAliveTime = 0; + + /** + * For the local member only + */ + protected transient long serviceStartTime; + + /** + * To avoid serialization over and over again, once the local dataPkg + * has been set, we use that to transmit data + */ + protected transient byte[] dataPkg = null; + + /** + * Unique session Id for this member + */ + protected byte[] uniqueId = new byte[16]; + + /** + * Custom payload that an app framework can broadcast + * Also used to transport stop command. + */ + protected byte[] payload = new byte[0]; + + /** + * Command, so that the custom payload doesn't have to be used + * This is for internal tribes use, such as SHUTDOWN_COMMAND + */ + protected byte[] command = new byte[0]; + + /** + * Domain if we want to filter based on domain. + */ + protected byte[] domain = new byte[0]; + + /** + * Empty constructor for serialization + */ + public MemberImpl() { + + } + + /** + * Construct a new member object + * @param name - the name of this member, cluster unique + * @param domain - the cluster domain name of this member + * @param host - the tcp listen host + * @param port - the tcp listen port + */ + public MemberImpl(String host, + int port, + long aliveTime) throws IOException { + setHostname(host); + this.port = port; + this.memberAliveTime=aliveTime; + } + + public MemberImpl(String host, + int port, + long aliveTime, + byte[] payload) throws IOException { + this(host,port,aliveTime); + setPayload(payload); + } + + public boolean isReady() { + return SenderState.getSenderState(this).isReady(); + } + public boolean isSuspect() { + return SenderState.getSenderState(this).isSuspect(); + } + public boolean isFailing() { + return SenderState.getSenderState(this).isFailing(); + } + + /** + * Increment the message count. + */ + protected void inc() { + msgCount++; + } + + /** + * Create a data package to send over the wire representing this member. + * This is faster than serialization. + * @return - the bytes for this member deserialized + * @throws Exception + */ + public byte[] getData() { + return getData(true); + } + /** + * Highly optimized version of serializing a member into a byte array + * Returns a cached byte[] reference, do not modify this data + * @param getalive boolean + * @return byte[] + */ + public byte[] getData(boolean getalive) { + return getData(getalive,false); + } + + + public int getDataLength() { + return TRIBES_MBR_BEGIN.length+ //start pkg + 4+ //data length + 8+ //alive time + 4+ //port + 4+ //secure port + 1+ //host length + host.length+ //host + 4+ //command length + command.length+ //command + 4+ //domain length + domain.length+ //domain + 16+ //unique id + 4+ //payload length + payload.length+ //payload + TRIBES_MBR_END.length; //end pkg + } + + /** + * + * @param getalive boolean - calculate memberAlive time + * @param reset boolean - reset the cached data package, and create a new one + * @return byte[] + */ + public byte[] getData(boolean getalive, boolean reset) { + if ( reset ) dataPkg = null; + //look in cache first + if ( dataPkg!=null ) { + if ( getalive ) { + //you'd be surprised, but System.currentTimeMillis + //shows up on the profiler + long alive=System.currentTimeMillis()-getServiceStartTime(); + XByteBuffer.toBytes( (long) alive, dataPkg, TRIBES_MBR_BEGIN.length+4); + } + return dataPkg; + } + + //package looks like + //start package TRIBES_MBR_BEGIN.length + //package length - 4 bytes + //alive - 8 bytes + //port - 4 bytes + //secure port - 4 bytes + //host length - 1 byte + //host - hl bytes + //clen - 4 bytes + //command - clen bytes + //dlen - 4 bytes + //domain - dlen bytes + //uniqueId - 16 bytes + //payload length - 4 bytes + //payload plen bytes + //end package TRIBES_MBR_END.length + byte[] addr = host; + long alive=System.currentTimeMillis()-getServiceStartTime(); + byte hl = (byte)addr.length; + byte[] data = new byte[getDataLength()]; + + int bodylength = (getDataLength() - TRIBES_MBR_BEGIN.length - TRIBES_MBR_END.length - 4); + + int pos = 0; + + //TRIBES_MBR_BEGIN + System.arraycopy(TRIBES_MBR_BEGIN,0,data,pos,TRIBES_MBR_BEGIN.length); + pos += TRIBES_MBR_BEGIN.length; + + //body length + XByteBuffer.toBytes(bodylength,data,pos); + pos += 4; + + //alive data + XByteBuffer.toBytes((long)alive,data,pos); + pos += 8; + //port + XByteBuffer.toBytes(port,data,pos); + pos += 4; + //secure port + XByteBuffer.toBytes(securePort,data,pos); + pos += 4; + //host length + data[pos++] = hl; + //host + System.arraycopy(addr,0,data,pos,addr.length); + pos+=addr.length; + //clen - 4 bytes + XByteBuffer.toBytes(command.length,data,pos); + pos+=4; + //command - clen bytes + System.arraycopy(command,0,data,pos,command.length); + pos+=command.length; + //dlen - 4 bytes + XByteBuffer.toBytes(domain.length,data,pos); + pos+=4; + //domain - dlen bytes + System.arraycopy(domain,0,data,pos,domain.length); + pos+=domain.length; + //unique Id + System.arraycopy(uniqueId,0,data,pos,uniqueId.length); + pos+=uniqueId.length; + //payload + XByteBuffer.toBytes(payload.length,data,pos); + pos+=4; + System.arraycopy(payload,0,data,pos,payload.length); + pos+=payload.length; + + //TRIBES_MBR_END + System.arraycopy(TRIBES_MBR_END,0,data,pos,TRIBES_MBR_END.length); + pos += TRIBES_MBR_END.length; + + //create local data + dataPkg = data; + return data; + } + /** + * Deserializes a member from data sent over the wire + * @param data - the bytes received + * @return a member object. + */ + public static MemberImpl getMember(byte[] data, MemberImpl member) { + return getMember(data,0,data.length,member); + } + + public static MemberImpl getMember(byte[] data, int offset, int length, MemberImpl member) { + //package looks like + //start package TRIBES_MBR_BEGIN.length + //package length - 4 bytes + //alive - 8 bytes + //port - 4 bytes + //secure port - 4 bytes + //host length - 1 byte + //host - hl bytes + //clen - 4 bytes + //command - clen bytes + //dlen - 4 bytes + //domain - dlen bytes + //uniqueId - 16 bytes + //payload length - 4 bytes + //payload plen bytes + //end package TRIBES_MBR_END.length + + int pos = offset; + + if (XByteBuffer.firstIndexOf(data,offset,TRIBES_MBR_BEGIN)!=pos) { + throw new IllegalArgumentException("Invalid package, should start with:"+org.apache.catalina.tribes.util.Arrays.toString(TRIBES_MBR_BEGIN)); + } + + if ( length < (TRIBES_MBR_BEGIN.length+4) ) { + throw new ArrayIndexOutOfBoundsException("Member package to small to validate."); + } + + pos += TRIBES_MBR_BEGIN.length; + + int bodylength = XByteBuffer.toInt(data,pos); + pos += 4; + + if ( length < (bodylength+4+TRIBES_MBR_BEGIN.length+TRIBES_MBR_END.length) ) { + throw new ArrayIndexOutOfBoundsException("Not enough bytes in member package."); + } + + int endpos = pos+bodylength; + if (XByteBuffer.firstIndexOf(data,endpos,TRIBES_MBR_END)!=endpos) { + throw new IllegalArgumentException("Invalid package, should end with:"+org.apache.catalina.tribes.util.Arrays.toString(TRIBES_MBR_END)); + } + + + byte[] alived = new byte[8]; + System.arraycopy(data, pos, alived, 0, 8); + pos += 8; + byte[] portd = new byte[4]; + System.arraycopy(data, pos, portd, 0, 4); + pos += 4; + + byte[] sportd = new byte[4]; + System.arraycopy(data, pos, sportd, 0, 4); + pos += 4; + + + + byte hl = data[pos++]; + byte[] addr = new byte[hl]; + System.arraycopy(data, pos, addr, 0, hl); + pos += hl; + + int cl = XByteBuffer.toInt(data, pos); + pos += 4; + + byte[] command = new byte[cl]; + System.arraycopy(data, pos, command, 0, command.length); + pos += command.length; + + int dl = XByteBuffer.toInt(data, pos); + pos += 4; + + byte[] domain = new byte[dl]; + System.arraycopy(data, pos, domain, 0, domain.length); + pos += domain.length; + + byte[] uniqueId = new byte[16]; + System.arraycopy(data, pos, uniqueId, 0, 16); + pos += 16; + + int pl = XByteBuffer.toInt(data, pos); + pos += 4; + + byte[] payload = new byte[pl]; + System.arraycopy(data, pos, payload, 0, payload.length); + pos += payload.length; + + member.setHost(addr); + member.setPort(XByteBuffer.toInt(portd, 0)); + member.setSecurePort(XByteBuffer.toInt(sportd, 0)); + member.setMemberAliveTime(XByteBuffer.toLong(alived, 0)); + member.setUniqueId(uniqueId); + member.payload = payload; + member.domain = domain; + member.command = command; + + member.dataPkg = new byte[length]; + System.arraycopy(data, offset, member.dataPkg, 0, length); + + return member; + } + + public static MemberImpl getMember(byte[] data) { + return getMember(data,new MemberImpl()); + } + + /** + * Return the name of this object + * @return a unique name to the cluster + */ + public String getName() { + return "tcp://"+getHostname()+":"+getPort(); + } + + /** + * Return the listen port of this member + * @return - tcp listen port + */ + public int getPort() { + return this.port; + } + + /** + * Return the TCP listen host for this member + * @return IP address or host name + */ + public byte[] getHost() { + return host; + } + + public String getHostname() { + if ( this.hostname != null ) return hostname; + else { + try { + this.hostname = java.net.InetAddress.getByAddress(host).getHostName(); + return this.hostname; + }catch ( IOException x ) { + throw new RuntimeException("Unable to parse hostname.",x); + } + } + } + + /** + * Contains information on how long this member has been online. + * The result is the number of milli seconds this member has been + * broadcasting its membership to the cluster. + * @return nr of milliseconds since this member started. + */ + public long getMemberAliveTime() { + return memberAliveTime; + } + + public long getServiceStartTime() { + return serviceStartTime; + } + + public byte[] getUniqueId() { + return uniqueId; + } + + public byte[] getPayload() { + return payload; + } + + public byte[] getCommand() { + return command; + } + + public byte[] getDomain() { + return domain; + } + + public int getSecurePort() { + return securePort; + } + + public void setMemberAliveTime(long time) { + memberAliveTime=time; + } + + + + /** + * String representation of this object + */ + public String toString() { + StringBuffer buf = new StringBuffer("org.apache.catalina.tribes.membership.MemberImpl["); + buf.append(getName()).append(","); + buf.append(getHostname()).append(","); + buf.append(port).append(", alive="); + buf.append(memberAliveTime).append(","); + buf.append("id=").append(bToS(this.uniqueId)).append(", "); + buf.append("payload=").append(bToS(this.payload,8)).append(", "); + buf.append("command=").append(bToS(this.command,8)).append(", "); + buf.append("domain=").append(bToS(this.domain,8)).append(", "); + buf.append("]"); + return buf.toString(); + } + public static String bToS(byte[] data) { + return bToS(data,data.length); + } + public static String bToS(byte[] data, int max) { + StringBuffer buf = new StringBuffer(4*16); + buf.append("{"); + for (int i=0; data!=null && i McastServiceImpl.MAX_PACKET_SIZE ) { + this.payload = oldpayload; + throw new IllegalArgumentException("Payload is to large for tribes to handle."); + } + + } + + public void setCommand(byte[] command) { + this.command = command!=null?command:new byte[0]; + getData(true,true); + } + + public void setDomain(byte[] domain) { + this.domain = domain!=null?domain:new byte[0]; + getData(true,true); + } + + public void setSecurePort(int securePort) { + this.securePort = securePort; + } + + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + int length = in.readInt(); + byte[] message = new byte[length]; + in.read(message); + getMember(message,this); + + } + + public void writeExternal(ObjectOutput out) throws IOException { + byte[] data = this.getData(); + out.writeInt(data.length); + out.write(data); + } + +} diff --git a/java/org/apache/catalina/tribes/transport/ReceiverBase.java b/java/org/apache/catalina/tribes/transport/ReceiverBase.java index f09b956e9..091bb9320 100644 --- a/java/org/apache/catalina/tribes/transport/ReceiverBase.java +++ b/java/org/apache/catalina/tribes/transport/ReceiverBase.java @@ -49,6 +49,7 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, T private String host = "auto"; private InetAddress bind; private int port = 4000; + private int securePort = -1; private int rxBufSize = 43800; private int txBufSize = 25188; private boolean listen = false; @@ -299,6 +300,10 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, T return useBufferPool; } + public int getSecurePort() { + return securePort; + } + public void setTcpSelectorTimeout(long selTimeout) { tcpSelectorTimeout = selTimeout; } @@ -377,6 +382,10 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, T this.useBufferPool = useBufferPool; } + public void setSecurePort(int securePort) { + this.securePort = securePort; + } + public void heartbeat() { //empty operation } diff --git a/test/org/apache/catalina/tribes/test/TestNioSender.java b/test/org/apache/catalina/tribes/test/TestNioSender.java new file mode 100644 index 000000000..9cd0ed0d8 --- /dev/null +++ b/test/org/apache/catalina/tribes/test/TestNioSender.java @@ -0,0 +1,106 @@ +package org.apache.catalina.tribes.test; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.util.Iterator; +import java.nio.channels.Selector; +import org.apache.catalina.tribes.transport.nio.NioSender; +import org.apache.catalina.tribes.membership.MemberImpl; +import org.apache.catalina.tribes.io.ChannelData; +import org.apache.catalina.tribes.io.XByteBuffer; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.Channel; + +/** + *

Title:

+ * + *

Description:

+ * + *

Copyright: Copyright (c) 2005

+ * + *

Company:

+ * + * @author not attributable + * @version 1.0 + */ +public class TestNioSender { + private Selector selector = null; + private int counter = 0; + MemberImpl mbr; + private static int testOptions = Channel.SEND_OPTIONS_DEFAULT; + public TestNioSender() { + + } + + public synchronized int inc() { + return ++counter; + } + + public synchronized ChannelData getMessage(Member mbr) { + String msg = new String("Thread-"+Thread.currentThread().getName()+" Message:"+inc()); + ChannelData data = new ChannelData(true); + data.setMessage(new XByteBuffer(msg.getBytes(),false)); + data.setAddress(mbr); + + return data; + } + + public void init() throws Exception { + selector = Selector.open(); + mbr = new MemberImpl("localhost",4444,0); + NioSender sender = new NioSender(); + sender.setDestination(mbr); + sender.setDirectBuffer(true); + sender.setSelector(selector); + sender.setMessage(XByteBuffer.createDataPackage(getMessage(mbr))); + sender.connect(); + } + + public void run() { + while (true) { + + int selectedKeys = 0; + try { + selectedKeys = selector.select(100); + // if ( selectedKeys == 0 ) { + // System.out.println("No registered interests. Sleeping for a second."); + // Thread.sleep(100); + } catch (Exception e) { + e.printStackTrace(); + continue; + } + + if (selectedKeys == 0) { + continue; + } + + Iterator it = selector.selectedKeys().iterator(); + while (it.hasNext()) { + SelectionKey sk = (SelectionKey) it.next(); + it.remove(); + try { + int readyOps = sk.readyOps(); + sk.interestOps(sk.interestOps() & ~readyOps); + NioSender sender = (NioSender) sk.attachment(); + if ( sender.process(sk, (testOptions&Channel.SEND_OPTIONS_USE_ACK)==Channel.SEND_OPTIONS_USE_ACK) ) { + System.out.println("Message completed for handler:"+sender); + Thread.currentThread().sleep(2000); + sender.reset(); + sender.setMessage(XByteBuffer.createDataPackage(getMessage(mbr))); + } + + + } catch (Throwable t) { + t.printStackTrace(); + return; + } + } + } + } + + public static void main(String[] args) throws Exception { + TestNioSender sender = new TestNioSender(); + sender.init(); + sender.run(); + } +} diff --git a/test/org/apache/catalina/tribes/test/TribesTestSuite.java b/test/org/apache/catalina/tribes/test/TribesTestSuite.java new file mode 100644 index 000000000..9b846f052 --- /dev/null +++ b/test/org/apache/catalina/tribes/test/TribesTestSuite.java @@ -0,0 +1,24 @@ +package org.apache.catalina.tribes.test; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +public class TribesTestSuite + extends TestCase { + + public TribesTestSuite(String s) { + super(s); + } + + public static Test suite() { + TestSuite suite = new TestSuite(); + suite.addTestSuite(org.apache.catalina.tribes.test.channel.ChannelStartStop.class); + suite.addTestSuite(org.apache.catalina.tribes.test.channel.TestChannelOptionFlag.class); + suite.addTestSuite(org.apache.catalina.tribes.test.membership.MemberSerialization.class); + suite.addTestSuite(org.apache.catalina.tribes.test.membership.TestMemberArrival.class); + suite.addTestSuite(org.apache.catalina.tribes.test.membership.TestTcpFailureDetector.class); + suite.addTestSuite(org.apache.catalina.tribes.test.channel.TestDataIntegrity.class); + return suite; + } +} diff --git a/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java b/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java new file mode 100644 index 000000000..9e65c7cd0 --- /dev/null +++ b/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java @@ -0,0 +1,118 @@ +/* + * Copyright 1999,2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + */ +package org.apache.catalina.tribes.test.channel; + +import org.apache.catalina.tribes.group.GroupChannel; +import junit.framework.TestCase; + +/** + * @author Filip Hanik + * @version 1.0 + */ +public class ChannelStartStop extends TestCase { + GroupChannel channel = null; + protected void setUp() throws Exception { + super.setUp(); + channel = new GroupChannel(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + try {channel.stop(channel.DEFAULT);}catch (Exception ignore){} + } + + public void testDoubleFullStart() throws Exception { + int count = 0; + try { + channel.start(channel.DEFAULT); + count++; + } catch ( Exception x){x.printStackTrace();} + try { + channel.start(channel.DEFAULT); + count++; + } catch ( Exception x){x.printStackTrace();} + assertEquals(count,2); + channel.stop(channel.DEFAULT); + } + + public void testDoublePartialStart() throws Exception { + //try to double start the RX + int count = 0; + try { + channel.start(channel.SND_RX_SEQ); + channel.start(channel.MBR_RX_SEQ); + count++; + } catch ( Exception x){x.printStackTrace();} + try { + channel.start(channel.MBR_RX_SEQ); + count++; + } catch ( Exception x){/*expected*/} + assertEquals(count,1); + channel.stop(channel.DEFAULT); + //double the membership sender + count = 0; + try { + channel.start(channel.SND_RX_SEQ); + channel.start(channel.MBR_TX_SEQ); + count++; + } catch ( Exception x){x.printStackTrace();} + try { + channel.start(channel.MBR_TX_SEQ); + count++; + } catch ( Exception x){/*expected*/} + assertEquals(count,1); + channel.stop(channel.DEFAULT); + + count = 0; + try { + channel.start(channel.SND_RX_SEQ); + count++; + } catch ( Exception x){x.printStackTrace();} + try { + channel.start(channel.SND_RX_SEQ); + count++; + } catch ( Exception x){/*expected*/} + assertEquals(count,1); + channel.stop(channel.DEFAULT); + + count = 0; + try { + channel.start(channel.SND_TX_SEQ); + count++; + } catch ( Exception x){x.printStackTrace();} + try { + channel.start(channel.SND_TX_SEQ); + count++; + } catch ( Exception x){/*expected*/} + assertEquals(count,1); + channel.stop(channel.DEFAULT); + } + + public void testFalseOption() throws Exception { + int flag = 0xFFF0;//should get ignored by the underlying components + int count = 0; + try { + channel.start(flag); + count++; + } catch ( Exception x){x.printStackTrace();} + try { + channel.start(flag); + count++; + } catch ( Exception x){/*expected*/} + assertEquals(count,2); + channel.stop(channel.DEFAULT); + } + +} diff --git a/test/org/apache/catalina/tribes/test/channel/TestChannelOptionFlag.java b/test/org/apache/catalina/tribes/test/channel/TestChannelOptionFlag.java new file mode 100644 index 000000000..c6119e5c1 --- /dev/null +++ b/test/org/apache/catalina/tribes/test/channel/TestChannelOptionFlag.java @@ -0,0 +1,76 @@ +package org.apache.catalina.tribes.test.channel; + +import junit.framework.*; +import org.apache.catalina.tribes.group.*; +import org.apache.catalina.tribes.ChannelInterceptor; +import org.apache.catalina.tribes.ChannelException; + +/** + *

Title:

+ * + *

Description:

+ * + *

Copyright: Copyright (c) 2005

+ * + *

Company:

+ * + * @author not attributable + * @version 1.0 + */ +public class TestChannelOptionFlag extends TestCase { + GroupChannel channel = null; + protected void setUp() throws Exception { + super.setUp(); + channel = new GroupChannel(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + if ( channel != null ) try {channel.stop(channel.DEFAULT);}catch ( Exception ignore) {} + channel = null; + } + + + public void testOptionConflict() throws Exception { + boolean error = false; + channel.setOptionCheck(true); + ChannelInterceptor i = new TestInterceptor(); + i.setOptionFlag(128); + channel.addInterceptor(i); + i = new TestInterceptor(); + i.setOptionFlag(128); + channel.addInterceptor(i); + try { + channel.start(channel.DEFAULT); + }catch ( ChannelException x ) { + if ( x.getMessage().indexOf("option flag conflict") >= 0 ) error = true; + } + assertEquals(true,error); + } + + public void testOptionNoConflict() throws Exception { + boolean error = false; + channel.setOptionCheck(true); + ChannelInterceptor i = new TestInterceptor(); + i.setOptionFlag(128); + channel.addInterceptor(i); + i = new TestInterceptor(); + i.setOptionFlag(64); + channel.addInterceptor(i); + i = new TestInterceptor(); + i.setOptionFlag(256); + channel.addInterceptor(i); + try { + channel.start(channel.DEFAULT); + }catch ( ChannelException x ) { + if ( x.getMessage().indexOf("option flag conflict") >= 0 ) error = true; + } + assertEquals(false,error); + } + + public static class TestInterceptor extends ChannelInterceptorBase { + + } + + +} diff --git a/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java b/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java new file mode 100644 index 000000000..47b85c13a --- /dev/null +++ b/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java @@ -0,0 +1,173 @@ +package org.apache.catalina.tribes.test.channel; + +import junit.framework.TestCase; +import java.io.Serializable; +import java.util.Random; +import java.util.Arrays; +import org.apache.catalina.tribes.ChannelListener; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.group.GroupChannel; +import org.apache.catalina.tribes.test.channel.TestDataIntegrity.Listener; +import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor; +import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor; + +/** + *

Title:

+ * + *

Description:

+ * + *

Copyright: Copyright (c) 2005

+ * + *

Company:

+ * + * @author not attributable + * @version 1.0 + */ +public class TestDataIntegrity extends TestCase { + int msgCount = 500; + int threadCount = 20; + GroupChannel channel1; + GroupChannel channel2; + Listener listener1; + int threadCounter = 0; + protected void setUp() throws Exception { + super.setUp(); + channel1 = new GroupChannel(); + channel1.addInterceptor(new MessageDispatch15Interceptor()); + channel2 = new GroupChannel(); + channel2.addInterceptor(new MessageDispatch15Interceptor()); + listener1 = new Listener(); + channel2.addChannelListener(listener1); + channel1.start(GroupChannel.DEFAULT); + channel2.start(GroupChannel.DEFAULT); + } + + protected void tearDown() throws Exception { + super.tearDown(); + channel1.stop(GroupChannel.DEFAULT); + channel2.stop(GroupChannel.DEFAULT); + } + + public void testDataSendNO_ACK() throws Exception { + System.err.println("Starting NO_ACK"); + Thread[] threads = new Thread[threadCount]; + for (int x=0; xTitle:

+ * + *

Description:

+ * + *

Copyright: Copyright (c) 2005

+ * + *

Company:

+ * + * @author not attributable + * @version 1.0 + */ +public class TestRemoteProcessException extends TestCase { + int msgCount = 10000; + GroupChannel channel1; + GroupChannel channel2; + Listener listener1; + protected void setUp() throws Exception { + super.setUp(); + channel1 = new GroupChannel(); + channel2 = new GroupChannel(); + listener1 = new Listener(); + channel2.addChannelListener(listener1); + channel1.start(GroupChannel.DEFAULT); + channel2.start(GroupChannel.DEFAULT); + } + + protected void tearDown() throws Exception { + super.tearDown(); + channel1.stop(GroupChannel.DEFAULT); + channel2.stop(GroupChannel.DEFAULT); + } + + public void testDataSendSYNCACK() throws Exception { + System.err.println("Starting SYNC_ACK"); + int errC=0, nerrC=0; + for (int i=0; iTitle:

+ * + *

Description:

+ * + *

Copyright: Copyright (c) 2005

+ * + *

Company:

+ * + * @author not attributable + * @version 1.0 + */ +public class TestTwoPhaseCommit extends TestCase { + + protected void setUp() throws Exception { + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + } + +} diff --git a/test/org/apache/catalina/tribes/test/io/TestSenderConnections.java b/test/org/apache/catalina/tribes/test/io/TestSenderConnections.java new file mode 100644 index 000000000..d0fd33990 --- /dev/null +++ b/test/org/apache/catalina/tribes/test/io/TestSenderConnections.java @@ -0,0 +1,112 @@ +package org.apache.catalina.tribes.test.io; + +import java.util.ArrayList; + +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ManagedChannel; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.MembershipListener; +import org.apache.catalina.tribes.group.GroupChannel; +import junit.framework.TestCase; +import org.apache.catalina.tribes.ChannelListener; +import java.io.Serializable; +import java.util.Random; +import java.util.HashMap; +import org.apache.catalina.tribes.transport.ReplicationTransmitter; + +public class TestSenderConnections extends TestCase { + private static int count = 2; + private ManagedChannel[] channels = new ManagedChannel[count]; + private TestMsgListener[] listeners = new TestMsgListener[count]; + + protected void setUp() throws Exception { + super.setUp(); + for (int i = 0; i < channels.length; i++) { + channels[i] = new GroupChannel(); + channels[i].getMembershipService().setPayload( ("Channel-" + (i + 1)).getBytes("ASCII")); + listeners[i] = new TestMsgListener( ("Listener-" + (i + 1))); + channels[i].addChannelListener(listeners[i]); + channels[i].start(Channel.SND_RX_SEQ|Channel.SND_TX_SEQ); + + } + } + + public void clear() { + } + + public void sendMessages(long delay, long sleep) throws Exception { + Member local = channels[0].getLocalMember(true); + Member dest = channels[1].getLocalMember(true); + int n = 3; + System.out.println("Sending " + n + " messages from [" + local.getName() + "] to [" + dest.getName() + "]"); + for (int i = 0; i < n; i++) { + channels[0].send(new Member[] {dest}, new TestMsg(), 0); + if ( delay > 0 ) Thread.sleep(delay); + } + System.out.println("Messages sent. Sleeping for "+(sleep/1000)+" seconds to inspect connections"); + if ( sleep > 0 ) Thread.sleep(sleep); + + } + + public void testConnectionLinger() throws Exception { + sendMessages(0,15000); + } + + public void testKeepAliveCount() throws Exception { + System.out.println("Setting keep alive count to 0"); + for (int i = 0; i < channels.length; i++) { + ReplicationTransmitter t = (ReplicationTransmitter)channels[0].getChannelSender(); + t.getTransport().setKeepAliveCount(0); + } + sendMessages(1000,15000); + } + + public void testKeepAliveTime() throws Exception { + System.out.println("Setting keep alive count to 1 second"); + for (int i = 0; i < channels.length; i++) { + ReplicationTransmitter t = (ReplicationTransmitter)channels[0].getChannelSender(); + t.getTransport().setKeepAliveTime(1000); + } + sendMessages(2000,15000); + } + + protected void tearDown() throws Exception { + for (int i = 0; i < channels.length; i++) { + channels[i].stop(Channel.DEFAULT); + } + + } + + public static class TestMsg implements Serializable { + static Random r = new Random(System.currentTimeMillis()); + HashMap map = new HashMap(); + public TestMsg() { + int size = Math.abs(r.nextInt() % 200); + for (int i=0; iTitle:

+ * + *

Description:

+ * + *

Copyright: Copyright (c) 2005

+ * + *

Company:

+ * + * @author not attributable + * @version 1.0 + */ +public class MemberSerialization extends TestCase { + MemberImpl m1, m2, p1,p2; + byte[] payload = null; + protected void setUp() throws Exception { + super.setUp(); + payload = new byte[333]; + Arrays.fill(payload,(byte)1); + m1 = new MemberImpl("localhost",3333,1,payload); + m2 = new MemberImpl("localhost",3333,1); + payload = new byte[333]; + Arrays.fill(payload,(byte)2); + p1 = new MemberImpl("127.0.0.1",3333,1,payload); + p2 = new MemberImpl("localhost",3331,1,payload); + m1.setDomain(new byte[] {1,2,3,4,5,6,7,8,9}); + m2.setDomain(new byte[] {1,2,3,4,5,6,7,8,9}); + m1.setCommand(new byte[] {1,2,4,5,6,7,8,9}); + m2.setCommand(new byte[] {1,2,4,5,6,7,8,9}); + } + + public void testCompare() throws Exception { + assertTrue(m1.equals(m2)); + assertTrue(m2.equals(m1)); + assertTrue(p1.equals(m2)); + assertFalse(m1.equals(p2)); + assertFalse(m1.equals(p2)); + assertFalse(m2.equals(p2)); + assertFalse(p1.equals(p2)); + } + + public void testSerializationOne() throws Exception { + MemberImpl m = m1; + byte[] md1 = m.getData(false,true); + byte[] mda1 = m.getData(false,false); + assertTrue(Arrays.equals(md1,mda1)); + assertTrue(md1==mda1); + mda1 = m.getData(true,true); + MemberImpl ma1 = MemberImpl.getMember(mda1); + assertTrue(compareMembers(m,ma1)); + mda1 = p1.getData(false); + assertFalse(Arrays.equals(md1,mda1)); + ma1 = MemberImpl.getMember(mda1); + assertTrue(compareMembers(p1,ma1)); + + md1 = m.getData(true,true); + Thread.sleep(50); + mda1 = m.getData(true,true); + MemberImpl a1 = MemberImpl.getMember(md1); + MemberImpl a2 = MemberImpl.getMember(mda1); + assertTrue(a1.equals(a2)); + assertFalse(Arrays.equals(md1,mda1)); + + + } + + public boolean compareMembers(MemberImpl impl1, MemberImpl impl2) { + boolean result = true; + result = result && Arrays.equals(impl1.getHost(),impl2.getHost()); + result = result && Arrays.equals(impl1.getPayload(),impl2.getPayload()); + result = result && Arrays.equals(impl1.getUniqueId(),impl2.getUniqueId()); + result = result && impl1.getPort() == impl2.getPort(); + return result; + } + + protected void tearDown() throws Exception { + super.tearDown(); + } + +} diff --git a/test/org/apache/catalina/tribes/test/membership/TestDomainFilter.java b/test/org/apache/catalina/tribes/test/membership/TestDomainFilter.java new file mode 100644 index 000000000..ca3c75240 --- /dev/null +++ b/test/org/apache/catalina/tribes/test/membership/TestDomainFilter.java @@ -0,0 +1,104 @@ +package org.apache.catalina.tribes.test.membership; + +import java.util.ArrayList; + +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ManagedChannel; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.MembershipListener; +import org.apache.catalina.tribes.group.GroupChannel; +import junit.framework.TestCase; +import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor; +import org.apache.catalina.tribes.util.UUIDGenerator; + +public class TestDomainFilter + extends TestCase { + private static int count = 10; + private ManagedChannel[] channels = new ManagedChannel[count]; + private TestMbrListener[] listeners = new TestMbrListener[count]; + + protected void setUp() throws Exception { + super.setUp(); + for (int i = 0; i < channels.length; i++) { + channels[i] = new GroupChannel(); + channels[i].getMembershipService().setPayload( ("Channel-" + (i + 1)).getBytes("ASCII")); + listeners[i] = new TestMbrListener( ("Listener-" + (i + 1))); + channels[i].addMembershipListener(listeners[i]); + DomainFilterInterceptor filter = new DomainFilterInterceptor(); + filter.setDomain(UUIDGenerator.randomUUID(false)); + channels[i].addInterceptor(filter); + } + } + + public void clear() { + for (int i = 0; i < channels.length; i++) { + listeners[i].members.clear(); + } + } + + public void testMemberArrival() throws Exception { + //purpose of this test is to make sure that we have received all the members + //that we can expect before the start method returns + Thread[] threads = new Thread[channels.length]; + for (int i=0; i=0; i-- ) assertEquals("Checking member arrival length",0,listeners[i].members.size()); + } + + protected void tearDown() throws Exception { + + for (int i = 0; i < channels.length; i++) { + try { + channels[i].stop(Channel.DEFAULT); + } catch (Exception ignore) {} + } + super.tearDown(); + } + + public class TestMbrListener + implements MembershipListener { + public String name = null; + public TestMbrListener(String name) { + this.name = name; + } + + public ArrayList members = new ArrayList(); + public void memberAdded(Member member) { + if (!members.contains(member)) { + members.add(member); + try { + System.out.println(name + ":member added[" + new String(member.getPayload(), "ASCII") + "; Thread:"+Thread.currentThread().getName()+"]"); + } catch (Exception x) { + System.out.println(name + ":member added[unknown]"); + } + } + } + + public void memberDisappeared(Member member) { + if (members.contains(member)) { + members.remove(member); + try { + System.out.println(name + ":member disappeared[" + new String(member.getPayload(), "ASCII") + "; Thread:"+Thread.currentThread().getName()+"]"); + } catch (Exception x) { + System.out.println(name + ":member disappeared[unknown]"); + } + } + } + + } + +} diff --git a/test/org/apache/catalina/tribes/test/membership/TestMemberArrival.java b/test/org/apache/catalina/tribes/test/membership/TestMemberArrival.java new file mode 100644 index 000000000..0c82c7106 --- /dev/null +++ b/test/org/apache/catalina/tribes/test/membership/TestMemberArrival.java @@ -0,0 +1,100 @@ +package org.apache.catalina.tribes.test.membership; + +import java.util.ArrayList; + +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ManagedChannel; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.MembershipListener; +import org.apache.catalina.tribes.group.GroupChannel; +import junit.framework.TestCase; + +public class TestMemberArrival + extends TestCase { + private static int count = 10; + private ManagedChannel[] channels = new ManagedChannel[count]; + private TestMbrListener[] listeners = new TestMbrListener[count]; + + protected void setUp() throws Exception { + super.setUp(); + for (int i = 0; i < channels.length; i++) { + channels[i] = new GroupChannel(); + channels[i].getMembershipService().setPayload( ("Channel-" + (i + 1)).getBytes("ASCII")); + listeners[i] = new TestMbrListener( ("Listener-" + (i + 1))); + channels[i].addMembershipListener(listeners[i]); + + } + } + + public void clear() { + for (int i = 0; i < channels.length; i++) { + listeners[i].members.clear(); + } + } + + public void testMemberArrival() throws Exception { + //purpose of this test is to make sure that we have received all the members + //that we can expect before the start method returns + Thread[] threads = new Thread[channels.length]; + for (int i=0; i=0; i-- ) assertEquals("Checking member arrival length",channels.length-1,listeners[i].members.size()); + } + + protected void tearDown() throws Exception { + + for (int i = 0; i < channels.length; i++) { + try { + channels[i].stop(Channel.DEFAULT); + } catch (Exception ignore) {} + } + super.tearDown(); + } + + public class TestMbrListener + implements MembershipListener { + public String name = null; + public TestMbrListener(String name) { + this.name = name; + } + + public ArrayList members = new ArrayList(); + public void memberAdded(Member member) { + if (!members.contains(member)) { + members.add(member); + try { + System.out.println(name + ":member added[" + new String(member.getPayload(), "ASCII") + "; Thread:"+Thread.currentThread().getName()+"]"); + } catch (Exception x) { + System.out.println(name + ":member added[unknown]"); + } + } + } + + public void memberDisappeared(Member member) { + if (members.contains(member)) { + members.remove(member); + try { + System.out.println(name + ":member disappeared[" + new String(member.getPayload(), "ASCII") + "; Thread:"+Thread.currentThread().getName()+"]"); + } catch (Exception x) { + System.out.println(name + ":member disappeared[unknown]"); + } + } + } + + } + +} diff --git a/test/org/apache/catalina/tribes/test/membership/TestTcpFailureDetector.java b/test/org/apache/catalina/tribes/test/membership/TestTcpFailureDetector.java new file mode 100644 index 000000000..458558faa --- /dev/null +++ b/test/org/apache/catalina/tribes/test/membership/TestTcpFailureDetector.java @@ -0,0 +1,151 @@ +package org.apache.catalina.tribes.test.membership; + +import java.util.ArrayList; + +import org.apache.catalina.tribes.ByteMessage; +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ManagedChannel; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.MembershipListener; +import org.apache.catalina.tribes.group.GroupChannel; +import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector; +import junit.framework.TestCase; + +/** + *

Title:

+ * + *

Description:

+ * + *

Copyright: Copyright (c) 2005

+ * + *

Company:

+ * + * @author not attributable + * @version 1.0 + */ +public class TestTcpFailureDetector extends TestCase { + private TcpFailureDetector tcpFailureDetector1 = null; + private TcpFailureDetector tcpFailureDetector2 = null; + private ManagedChannel channel1 = null; + private ManagedChannel channel2 = null; + private TestMbrListener mbrlist1 = null; + private TestMbrListener mbrlist2 = null; + protected void setUp() throws Exception { + super.setUp(); + channel1 = new GroupChannel(); + channel2 = new GroupChannel(); + channel1.getMembershipService().setPayload("Channel-1".getBytes("ASCII")); + channel2.getMembershipService().setPayload("Channel-2".getBytes("ASCII")); + mbrlist1 = new TestMbrListener("Channel-1"); + mbrlist2 = new TestMbrListener("Channel-2"); + tcpFailureDetector1 = new TcpFailureDetector(); + tcpFailureDetector2 = new TcpFailureDetector(); + channel1.addInterceptor(tcpFailureDetector1); + channel2.addInterceptor(tcpFailureDetector2); + channel1.addMembershipListener(mbrlist1); + channel2.addMembershipListener(mbrlist2); + } + + public void clear() { + mbrlist1.members.clear(); + mbrlist2.members.clear(); + } + + public void testTcpSendFailureMemberDrop() throws Exception { + System.out.println("testTcpSendFailureMemberDrop()"); + clear(); + channel1.start(channel1.DEFAULT); + channel2.start(channel2.DEFAULT); + //Thread.sleep(1000); + assertEquals("Expecting member count to be equal",mbrlist1.members.size(),mbrlist2.members.size()); + channel2.stop(channel2.SND_RX_SEQ); + ByteMessage msg = new ByteMessage(new byte[1024]); + try { + channel1.send(channel1.getMembers(), msg, 0); + assertEquals("Message send should have failed.",true,false); + } catch ( ChannelException x ) { + + } + assertEquals("Expecting member count to not be equal",mbrlist1.members.size()+1,mbrlist2.members.size()); + channel1.stop(Channel.DEFAULT); + channel2.stop(Channel.DEFAULT); + } + + public void testTcpFailureMemberAdd() throws Exception { + System.out.println("testTcpFailureMemberAdd()"); + clear(); + channel1.start(channel1.DEFAULT); + channel2.start(channel2.SND_RX_SEQ); + channel2.start(channel2.SND_TX_SEQ); + channel2.start(channel2.MBR_RX_SEQ); + channel2.stop(channel2.SND_RX_SEQ); + channel2.start(channel2.MBR_TX_SEQ); + //Thread.sleep(1000); + assertEquals("Expecting member count to not be equal",mbrlist1.members.size()+1,mbrlist2.members.size()); + channel1.stop(Channel.DEFAULT); + channel2.stop(Channel.DEFAULT); + } + + public void testTcpMcastFail() throws Exception { + System.out.println("testTcpMcastFail()"); + clear(); + channel1.start(channel1.DEFAULT); + channel2.start(channel2.DEFAULT); + //Thread.sleep(1000); + assertEquals("Expecting member count to be equal",mbrlist1.members.size(),mbrlist2.members.size()); + channel2.stop(channel2.MBR_TX_SEQ); + ByteMessage msg = new ByteMessage(new byte[1024]); + try { + Thread.sleep(5000); + assertEquals("Expecting member count to be equal",mbrlist1.members.size(),mbrlist2.members.size()); + channel1.send(channel1.getMembers(), msg, 0); + } catch ( ChannelException x ) { + assertEquals("Message send should have succeeded.",true,false); + } + channel1.stop(Channel.DEFAULT); + channel2.stop(Channel.DEFAULT); + } + + + protected void tearDown() throws Exception { + tcpFailureDetector1 = null; + tcpFailureDetector2 = null; + try { channel1.stop(Channel.DEFAULT);}catch (Exception ignore){} + channel1 = null; + try { channel2.stop(Channel.DEFAULT);}catch (Exception ignore){} + channel2 = null; + super.tearDown(); + } + + public class TestMbrListener implements MembershipListener { + public String name = null; + public TestMbrListener(String name) { + this.name = name; + } + public ArrayList members = new ArrayList(); + public void memberAdded(Member member) { + if ( !members.contains(member) ) { + members.add(member); + try{ + System.out.println(name + ":member added[" + new String(member.getPayload(), "ASCII") + "]"); + }catch ( Exception x ) { + System.out.println(name + ":member added[unknown]"); + } + } + } + + public void memberDisappeared(Member member) { + if ( members.contains(member) ) { + members.remove(member); + try{ + System.out.println(name + ":member disappeared[" + new String(member.getPayload(), "ASCII") + "]"); + }catch ( Exception x ) { + System.out.println(name + ":member disappeared[unknown]"); + } + } + } + + } + +} diff --git a/test/org/apache/catalina/tribes/test/transport/SocketNioReceive.java b/test/org/apache/catalina/tribes/test/transport/SocketNioReceive.java new file mode 100644 index 000000000..eb4f9cd86 --- /dev/null +++ b/test/org/apache/catalina/tribes/test/transport/SocketNioReceive.java @@ -0,0 +1,76 @@ +package org.apache.catalina.tribes.test.transport; + +import java.text.DecimalFormat; + +import org.apache.catalina.tribes.ChannelMessage; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.MessageListener; +import org.apache.catalina.tribes.io.ChannelData; +import org.apache.catalina.tribes.io.XByteBuffer; +import org.apache.catalina.tribes.membership.MemberImpl; +import org.apache.catalina.tribes.transport.nio.NioReceiver; + +public class SocketNioReceive { + static int count = 0; + static int accept = 0; + static long start = 0; + static double mb = 0; + static int len = 0; + static DecimalFormat df = new DecimalFormat("##.00"); + static double seconds = 0; + + protected static Object mutex = new Object(); + public static void main(String[] args) throws Exception { + Member mbr = new MemberImpl("localhost", 9999, 0); + ChannelData data = new ChannelData(); + data.setAddress(mbr); + byte[] buf = new byte[8192 * 4]; + data.setMessage(new XByteBuffer(buf, false)); + buf = XByteBuffer.createDataPackage(data); + len = buf.length; + NioReceiver receiver = new NioReceiver(); + receiver.setPort(9999); + receiver.setHost("localhost"); + MyList list = new MyList(); + receiver.setMessageListener(list); + receiver.start(); + System.out.println("Listening on 9999"); + while (true) { + try { + synchronized (mutex) { + mutex.wait(5000); + if ( start != 0 ) { + System.out.println("Throughput " + df.format(mb / seconds) + " MB/seconds, messages "+count+" accepts "+accept+", total "+mb+" MB."); + } + } + }catch (Throwable x) { + x.printStackTrace(); + } + } + } + + public static class MyList implements MessageListener { + boolean first = true; + + + public void messageReceived(ChannelMessage msg) { + if (first) { + first = false; + start = System.currentTimeMillis(); + } + mb += ( (double) len) / 1024 / 1024; + synchronized (this) {count++;} + if ( ( (count) % 10000) == 0) { + long time = System.currentTimeMillis(); + seconds = ( (double) (time - start)) / 1000; + System.out.println("Throughput " + df.format(mb / seconds) + " MB/seconds, messages "+count+", total "+mb+" MB."); + } + } + + public boolean accept(ChannelMessage msg) { + synchronized (this) {accept++;} + return true; + } + + } +} diff --git a/test/org/apache/catalina/tribes/test/transport/SocketNioSend.java b/test/org/apache/catalina/tribes/test/transport/SocketNioSend.java new file mode 100644 index 000000000..d5ec12197 --- /dev/null +++ b/test/org/apache/catalina/tribes/test/transport/SocketNioSend.java @@ -0,0 +1,91 @@ +package org.apache.catalina.tribes.test.transport; + +import java.io.OutputStream; +import java.net.Socket; +import java.text.DecimalFormat; +import org.apache.catalina.tribes.transport.nio.NioSender; +import org.apache.catalina.tribes.membership.MemberImpl; +import java.nio.channels.Selector; +import org.apache.catalina.tribes.io.XByteBuffer; +import org.apache.catalina.tribes.Member; +import java.nio.channels.SelectionKey; +import java.util.Iterator; +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.io.ChannelData; +import java.math.BigDecimal; + +public class SocketNioSend { + + public static void main(String[] args) throws Exception { + Selector selector = Selector.open(); + Member mbr = new MemberImpl("localhost", 9999, 0); + ChannelData data = new ChannelData(); + data.setOptions(Channel.SEND_OPTIONS_BYTE_MESSAGE); + data.setAddress(mbr); + byte[] buf = new byte[8192 * 4]; + data.setMessage(new XByteBuffer(buf,false)); + buf = XByteBuffer.createDataPackage(data); + int len = buf.length; + BigDecimal total = new BigDecimal((double)0); + BigDecimal bytes = new BigDecimal((double)len); + NioSender sender = new NioSender(); + sender.setDestination(mbr); + sender.setDirectBuffer(true); + sender.setSelector(selector); + sender.setTxBufSize(1024*1024); + sender.connect(); + sender.setMessage(buf); + System.out.println("Writing to 9999"); + long start = 0; + double mb = 0; + boolean first = true; + int count = 0; + DecimalFormat df = new DecimalFormat("##.00"); + while (count<100000) { + if (first) { + first = false; + start = System.currentTimeMillis(); + } + sender.setMessage(buf); + int selectedKeys = 0; + try { + selectedKeys = selector.select(0); + } catch (Exception e) { + e.printStackTrace(); + continue; + } + + if (selectedKeys == 0) { + continue; + } + + Iterator it = selector.selectedKeys().iterator(); + while (it.hasNext()) { + SelectionKey sk = (SelectionKey) it.next(); + it.remove(); + try { + int readyOps = sk.readyOps(); + sk.interestOps(sk.interestOps() & ~readyOps); + if (sender.process(sk, false)) { + total = total.add(bytes); + sender.reset(); + sender.setMessage(buf); + mb += ( (double) len) / 1024 / 1024; + if ( ( (++count) % 10000) == 0) { + long time = System.currentTimeMillis(); + double seconds = ( (double) (time - start)) / 1000; + System.out.println("Throughput " + df.format(mb / seconds) + " MB/seconds, total "+mb+" MB, total "+total+" bytes."); + } + } + + } catch (Throwable t) { + t.printStackTrace(); + return; + } + } + selector.selectedKeys().clear(); + } + System.out.println("Complete, sleeping 15 seconds"); + Thread.sleep(15000); + } +} diff --git a/test/org/apache/catalina/tribes/test/transport/SocketNioValidateSend.java b/test/org/apache/catalina/tribes/test/transport/SocketNioValidateSend.java new file mode 100644 index 000000000..f1415a418 --- /dev/null +++ b/test/org/apache/catalina/tribes/test/transport/SocketNioValidateSend.java @@ -0,0 +1,90 @@ +package org.apache.catalina.tribes.test.transport; + +import java.io.OutputStream; +import java.net.Socket; +import java.text.DecimalFormat; +import org.apache.catalina.tribes.transport.nio.NioSender; +import org.apache.catalina.tribes.membership.MemberImpl; +import java.nio.channels.Selector; +import org.apache.catalina.tribes.io.XByteBuffer; +import org.apache.catalina.tribes.Member; +import java.nio.channels.SelectionKey; +import java.util.Iterator; +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.io.ChannelData; +import java.math.BigDecimal; +import java.util.Arrays; + +public class SocketNioValidateSend { + + public static void main(String[] args) throws Exception { + Selector selector = Selector.open(); + Member mbr = new MemberImpl("localhost", 9999, 0); + byte seq = 0; + byte[] buf = new byte[50000]; + Arrays.fill(buf,seq); + int len = buf.length; + BigDecimal total = new BigDecimal((double)0); + BigDecimal bytes = new BigDecimal((double)len); + NioSender sender = new NioSender(); + sender.setDestination(mbr); + sender.setDirectBuffer(true); + sender.setSelector(selector); + sender.connect(); + sender.setMessage(buf); + System.out.println("Writing to 9999"); + long start = 0; + double mb = 0; + boolean first = true; + int count = 0; + + DecimalFormat df = new DecimalFormat("##.00"); + while (count<100000) { + if (first) { + first = false; + start = System.currentTimeMillis(); + } + sender.setMessage(buf); + int selectedKeys = 0; + try { + selectedKeys = selector.select(0); + } catch (Exception e) { + e.printStackTrace(); + continue; + } + + if (selectedKeys == 0) { + continue; + } + + Iterator it = selector.selectedKeys().iterator(); + while (it.hasNext()) { + SelectionKey sk = (SelectionKey) it.next(); + it.remove(); + try { + int readyOps = sk.readyOps(); + sk.interestOps(sk.interestOps() & ~readyOps); + if (sender.process(sk, false)) { + total = total.add(bytes); + sender.reset(); + seq++; + Arrays.fill(buf,seq); + sender.setMessage(buf); + mb += ( (double) len) / 1024 / 1024; + if ( ( (++count) % 10000) == 0) { + long time = System.currentTimeMillis(); + double seconds = ( (double) (time - start)) / 1000; + System.out.println("Throughput " + df.format(mb / seconds) + " MB/seconds, total "+mb+" MB, total "+total+" bytes."); + } + } + + } catch (Throwable t) { + t.printStackTrace(); + return; + } + } + } + System.out.println("Complete, sleeping 15 seconds"); + Thread.sleep(15000); + } +} diff --git a/test/org/apache/catalina/tribes/test/transport/SocketReceive.java b/test/org/apache/catalina/tribes/test/transport/SocketReceive.java new file mode 100644 index 000000000..eabe10b4f --- /dev/null +++ b/test/org/apache/catalina/tribes/test/transport/SocketReceive.java @@ -0,0 +1,62 @@ +package org.apache.catalina.tribes.test.transport; + +import java.net.ServerSocket; +import java.net.Socket; +import java.io.InputStream; +import java.text.DecimalFormat; +import java.math.BigDecimal; + +public class SocketReceive { + static long start = 0; + static double mb = 0; + static byte[] buf = new byte[8192 * 4]; + static boolean first = true; + static int count = 0; + static DecimalFormat df = new DecimalFormat("##.00"); + static BigDecimal total = new BigDecimal(0); + static BigDecimal bytes = new BigDecimal(32871); + + + public static void main(String[] args) throws Exception { + + ServerSocket srvSocket = new ServerSocket(9999); + System.out.println("Listening on 9999"); + Socket socket = srvSocket.accept(); + socket.setReceiveBufferSize(43800); + InputStream in = socket.getInputStream(); + Thread t = new Thread() { + public void run() { + while ( true ) { + try { + Thread.sleep(1000); + printStats(start, mb, count, df, total); + }catch ( Exception x ) {} + } + } + }; + t.setDaemon(true); + t.start(); + + while ( true ) { + if ( first ) { first = false; start = System.currentTimeMillis();} + int len = in.read(buf); + if ( len == -1 ) { + printStats(start, mb, count, df, total); + System.exit(1); + } + if ( bytes.intValue() != len ) bytes = new BigDecimal((double)len); + total = total.add(bytes); + mb += ( (double) len) / 1024 / 1024; + if ( ((++count) % 10000) == 0 ) { + printStats(start, mb, count, df, total); + } + } + + } + + private static void printStats(long start, double mb, int count, DecimalFormat df, BigDecimal total) { + long time = System.currentTimeMillis(); + double seconds = ((double)(time-start))/1000; + System.out.println("Throughput "+df.format(mb/seconds)+" MB/seconds messages "+count+", total "+mb+" MB, total "+total+" bytes."); + } +} \ No newline at end of file diff --git a/test/org/apache/catalina/tribes/test/transport/SocketSend.java b/test/org/apache/catalina/tribes/test/transport/SocketSend.java new file mode 100644 index 000000000..fff73c116 --- /dev/null +++ b/test/org/apache/catalina/tribes/test/transport/SocketSend.java @@ -0,0 +1,53 @@ +package org.apache.catalina.tribes.test.transport; + +import java.io.OutputStream; +import java.net.Socket; +import java.text.DecimalFormat; +import org.apache.catalina.tribes.membership.MemberImpl; +import org.apache.catalina.tribes.io.XByteBuffer; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.io.ChannelData; +import org.apache.catalina.tribes.Channel; +import java.math.BigDecimal; + +public class SocketSend { + + public static void main(String[] args) throws Exception { + + + Member mbr = new MemberImpl("localhost", 9999, 0); + ChannelData data = new ChannelData(); + data.setOptions(Channel.SEND_OPTIONS_BYTE_MESSAGE); + data.setAddress(mbr); + byte[] buf = new byte[8192 * 4]; + data.setMessage(new XByteBuffer(buf,false)); + buf = XByteBuffer.createDataPackage(data); + int len = buf.length; + System.out.println("Message size:"+len+" bytes"); + BigDecimal total = new BigDecimal((double)0); + BigDecimal bytes = new BigDecimal((double)len); + Socket socket = new Socket("localhost",9999); + System.out.println("Writing to 9999"); + OutputStream out = socket.getOutputStream(); + long start = 0; + double mb = 0; + boolean first = true; + int count = 0; + DecimalFormat df = new DecimalFormat("##.00"); + while ( count<100000 ) { + if ( first ) { first = false; start = System.currentTimeMillis();} + out.write(buf,0,buf.length); + mb += ( (double) buf.length) / 1024 / 1024; + total = total.add(bytes); + if ( ((++count) % 10000) == 0 ) { + long time = System.currentTimeMillis(); + double seconds = ((double)(time-start))/1000; + System.out.println("Throughput "+df.format(mb/seconds)+" MB/seconds messages "+count+", total "+mb+" MB, total "+total+" bytes."); + } + } + out.flush(); + System.out.println("Complete, sleeping 5 seconds"); + Thread.sleep(5000); + + } +} diff --git a/test/org/apache/catalina/tribes/test/transport/SocketTribesReceive.java b/test/org/apache/catalina/tribes/test/transport/SocketTribesReceive.java new file mode 100644 index 000000000..1dad58b64 --- /dev/null +++ b/test/org/apache/catalina/tribes/test/transport/SocketTribesReceive.java @@ -0,0 +1,71 @@ +package org.apache.catalina.tribes.test.transport; + +import java.net.ServerSocket; +import java.net.Socket; +import java.io.InputStream; +import java.text.DecimalFormat; +import java.math.BigDecimal; +import org.apache.catalina.tribes.io.XByteBuffer; + +public class SocketTribesReceive { + static long start = 0; + static double mb = 0; + //static byte[] buf = new byte[32871]; + static byte[] buf = new byte[32871]; + static boolean first = true; + static int count = 0; + static DecimalFormat df = new DecimalFormat("##.00"); + static BigDecimal total = new BigDecimal((double)0); + static BigDecimal bytes = new BigDecimal((double)32871); + + + public static void main(String[] args) throws Exception { + int size = 43800; + if (args.length > 0 ) try {size=Integer.parseInt(args[0]);}catch(Exception x){} + XByteBuffer xbuf = new XByteBuffer(43800,true); + ServerSocket srvSocket = new ServerSocket(9999); + System.out.println("Listening on 9999"); + Socket socket = srvSocket.accept(); + socket.setReceiveBufferSize(size); + InputStream in = socket.getInputStream(); + Thread t = new Thread() { + public void run() { + while ( true ) { + try { + Thread.sleep(1000); + printStats(start, mb, count, df, total); + }catch ( Exception x ) {} + } + } + }; + t.setDaemon(true); + t.start(); + + while ( true ) { + if ( first ) { first = false; start = System.currentTimeMillis();} + int len = in.read(buf); + if ( len == -1 ) { + printStats(start, mb, count, df, total); + System.exit(1); + } + xbuf.append(buf,0,len); + if ( bytes.intValue() != len ) bytes = new BigDecimal((double)len); + total = total.add(bytes); + while ( xbuf.countPackages(true) > 0 ) { + xbuf.extractPackage(true); + count++; + } + mb += ( (double) len) / 1024 / 1024; + if ( ((count) % 10000) == 0 ) { + printStats(start, mb, count, df, total); + } + } + + } + + private static void printStats(long start, double mb, int count, DecimalFormat df, BigDecimal total) { + long time = System.currentTimeMillis(); + double seconds = ((double)(time-start))/1000; + System.out.println("Throughput "+df.format(mb/seconds)+" MB/seconds messages "+count+", total "+mb+" MB, total "+total+" bytes."); + } +} \ No newline at end of file diff --git a/test/org/apache/catalina/tribes/test/transport/SocketValidateReceive.java b/test/org/apache/catalina/tribes/test/transport/SocketValidateReceive.java new file mode 100644 index 000000000..d4e56d656 --- /dev/null +++ b/test/org/apache/catalina/tribes/test/transport/SocketValidateReceive.java @@ -0,0 +1,91 @@ +package org.apache.catalina.tribes.test.transport; + +import java.net.ServerSocket; +import java.net.Socket; +import java.io.InputStream; +import java.text.DecimalFormat; +import java.math.BigDecimal; + +public class SocketValidateReceive { + static long start = 0; + static double mb = 0; + static byte[] buf = new byte[8192 * 4]; + static boolean first = true; + static int count = 0; + static DecimalFormat df = new DecimalFormat("##.00"); + static BigDecimal total = new BigDecimal(0); + static BigDecimal bytes = new BigDecimal(32871); + + + public static void main(String[] args) throws Exception { + int size = 43800; + if (args.length > 0 ) try {size=Integer.parseInt(args[0]);}catch(Exception x){} + + ServerSocket srvSocket = new ServerSocket(9999); + System.out.println("Listening on 9999"); + Socket socket = srvSocket.accept(); + socket.setReceiveBufferSize(size); + InputStream in = socket.getInputStream(); + MyDataReader reader = new MyDataReader(50000); + Thread t = new Thread() { + public void run() { + while ( true ) { + try { + Thread.sleep(1000); + printStats(start, mb, count, df, total); + }catch ( Exception x ) {} + } + } + }; + t.setDaemon(true); + t.start(); + + while ( true ) { + if ( first ) { first = false; start = System.currentTimeMillis();} + int len = in.read(buf); + if ( len == -1 ) { + printStats(start, mb, count, df, total); + System.exit(1); + } + count += reader.append(buf,0,len); + + if ( bytes.intValue() != len ) bytes = new BigDecimal((double)len); + total = total.add(bytes); + mb += ( (double) len) / 1024 / 1024; + if ( ((count) % 10000) == 0 ) { + printStats(start, mb, count, df, total); + } + } + + } + + private static void printStats(long start, double mb, int count, DecimalFormat df, BigDecimal total) { + long time = System.currentTimeMillis(); + double seconds = ((double)(time-start))/1000; + System.out.println("Throughput "+df.format(mb/seconds)+" MB/seconds messages "+count+", total "+mb+" MB, total "+total+" bytes."); + } + + public static class MyDataReader { + byte[] data = new byte[43800]; + int length = 10; + int cur = 0; + byte seq = 0; + public MyDataReader(int len) { + length = len; + } + + public int append(byte[] b, int off, int len) throws Exception { + int packages = 0; + for ( int i=off; i