From 2feaf9a9ee9993f89bbe1b63b9830f99622e9c88 Mon Sep 17 00:00:00 2001
From: fhanik
Date: Fri, 8 Sep 2006 15:42:08 +0000
Subject: [PATCH] Adding in unit tests, yell at me if the location should be
elsewhere
git-svn-id: https://svn.apache.org/repos/asf/tomcat/tc6.0.x/trunk@441543 13f79535-47bb-0310-9956-ffa450edef68
---
.../apache/catalina/tribes/ChannelReceiver.java | 6 +
java/org/apache/catalina/tribes/Member.java | 11 +-
.../catalina/tribes/membership/MemberImpl.java | 1166 ++++++++++----------
.../catalina/tribes/transport/ReceiverBase.java | 9 +
.../apache/catalina/tribes/test/TestNioSender.java | 106 ++
.../catalina/tribes/test/TribesTestSuite.java | 24 +
.../tribes/test/channel/ChannelStartStop.java | 118 ++
.../tribes/test/channel/TestChannelOptionFlag.java | 76 ++
.../tribes/test/channel/TestDataIntegrity.java | 173 +++
.../test/channel/TestRemoteProcessException.java | 123 +++
.../interceptors/TestNonBlockingCoordinator.java | 95 ++
.../test/interceptors/TestTwoPhaseCommit.java | 42 +
.../tribes/test/io/TestSenderConnections.java | 112 ++
.../catalina/tribes/test/io/TestSerialization.java | 24 +
.../test/membership/MemberSerialization.java | 100 ++
.../tribes/test/membership/TestDomainFilter.java | 104 ++
.../tribes/test/membership/TestMemberArrival.java | 100 ++
.../test/membership/TestTcpFailureDetector.java | 151 +++
.../tribes/test/transport/SocketNioReceive.java | 76 ++
.../tribes/test/transport/SocketNioSend.java | 91 ++
.../test/transport/SocketNioValidateSend.java | 90 ++
.../tribes/test/transport/SocketReceive.java | 62 ++
.../catalina/tribes/test/transport/SocketSend.java | 53 +
.../tribes/test/transport/SocketTribesReceive.java | 71 ++
.../test/transport/SocketValidateReceive.java | 91 ++
25 files changed, 2503 insertions(+), 571 deletions(-)
create mode 100644 test/org/apache/catalina/tribes/test/TestNioSender.java
create mode 100644 test/org/apache/catalina/tribes/test/TribesTestSuite.java
create mode 100644 test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java
create mode 100644 test/org/apache/catalina/tribes/test/channel/TestChannelOptionFlag.java
create mode 100644 test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java
create mode 100644 test/org/apache/catalina/tribes/test/channel/TestRemoteProcessException.java
create mode 100644 test/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java
create mode 100644 test/org/apache/catalina/tribes/test/interceptors/TestTwoPhaseCommit.java
create mode 100644 test/org/apache/catalina/tribes/test/io/TestSenderConnections.java
create mode 100644 test/org/apache/catalina/tribes/test/io/TestSerialization.java
create mode 100644 test/org/apache/catalina/tribes/test/membership/MemberSerialization.java
create mode 100644 test/org/apache/catalina/tribes/test/membership/TestDomainFilter.java
create mode 100644 test/org/apache/catalina/tribes/test/membership/TestMemberArrival.java
create mode 100644 test/org/apache/catalina/tribes/test/membership/TestTcpFailureDetector.java
create mode 100644 test/org/apache/catalina/tribes/test/transport/SocketNioReceive.java
create mode 100644 test/org/apache/catalina/tribes/test/transport/SocketNioSend.java
create mode 100644 test/org/apache/catalina/tribes/test/transport/SocketNioValidateSend.java
create mode 100644 test/org/apache/catalina/tribes/test/transport/SocketReceive.java
create mode 100644 test/org/apache/catalina/tribes/test/transport/SocketSend.java
create mode 100644 test/org/apache/catalina/tribes/test/transport/SocketTribesReceive.java
create mode 100644 test/org/apache/catalina/tribes/test/transport/SocketValidateReceive.java
diff --git a/java/org/apache/catalina/tribes/ChannelReceiver.java b/java/org/apache/catalina/tribes/ChannelReceiver.java
index 346092ea9..2c7eea8fe 100644
--- a/java/org/apache/catalina/tribes/ChannelReceiver.java
+++ b/java/org/apache/catalina/tribes/ChannelReceiver.java
@@ -52,6 +52,12 @@ public interface ChannelReceiver extends Heartbeat {
public int getPort();
/**
+ * Returns the secure listening port
+ * @return port, -1 if a secure port is not activated
+ */
+ public int getSecurePort();
+
+ /**
* Sets the message listener to receive notification of incoming
* @param listener MessageListener
* @see MessageListener
diff --git a/java/org/apache/catalina/tribes/Member.java b/java/org/apache/catalina/tribes/Member.java
index 622bab842..566493bdd 100644
--- a/java/org/apache/catalina/tribes/Member.java
+++ b/java/org/apache/catalina/tribes/Member.java
@@ -52,10 +52,19 @@ public interface Member {
/**
* Returns the listen port for the ChannelReceiver implementation
- * @return IPv4 or IPv6 representation of the host address this member listens to incoming data
+ * @return the listen port for this member, -1 if its not listening on an unsecure port
* @see ChannelReceiver
*/
public int getPort();
+
+ /**
+ * Returns the secure listen port for the ChannelReceiver implementation.
+ * Returns -1 if its not listening to a secure port.
+ * @return the listen port for this member, -1 if its not listening on a secure port
+ * @see ChannelReceiver
+ */
+ public int getSecurePort();
+
/**
* Contains information on how long this member has been online.
diff --git a/java/org/apache/catalina/tribes/membership/MemberImpl.java b/java/org/apache/catalina/tribes/membership/MemberImpl.java
index cd9f00fd7..eeae00040 100644
--- a/java/org/apache/catalina/tribes/membership/MemberImpl.java
+++ b/java/org/apache/catalina/tribes/membership/MemberImpl.java
@@ -1,570 +1,596 @@
-/*
- * Copyright 1999,2004-2005 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.membership;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Arrays;
-
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.io.XByteBuffer;
-import org.apache.catalina.tribes.transport.SenderState;
-
-/**
- * A membership implementation using simple multicast.
- * This is the representation of a multicast member.
- * Carries the host, and port of the this or other cluster nodes.
- *
- * @author Filip Hanik
- * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $
- */
-public class MemberImpl implements Member, java.io.Externalizable {
-
- /**
- * Public properties specific to this implementation
- */
- public static final transient String TCP_LISTEN_PORT = "tcpListenPort";
- public static final transient String TCP_LISTEN_HOST = "tcpListenHost";
- public static final transient String MEMBER_NAME = "memberName";
-
- public static final transient byte[] TRIBES_MBR_BEGIN = new byte[] {84, 82, 73, 66, 69, 83, 45, 66};
- public static final transient byte[] TRIBES_MBR_END = new byte[] {84, 82, 73, 66, 69, 83, 45, 69};
-
- /**
- * The listen host for this member
- */
- protected byte[] host;
- protected transient String hostname;
- /**
- * The tcp listen port for this member
- */
- protected int port;
-
- /**
- * Counter for how many broadcast messages have been sent from this member
- */
- protected int msgCount = 0;
- /**
- * The number of milliseconds since this members was
- * created, is kept track of using the start time
- */
- protected long memberAliveTime = 0;
-
- /**
- * For the local member only
- */
- protected transient long serviceStartTime;
-
- /**
- * To avoid serialization over and over again, once the local dataPkg
- * has been set, we use that to transmit data
- */
- protected transient byte[] dataPkg = null;
-
- /**
- * Unique session Id for this member
- */
- protected byte[] uniqueId = new byte[16];
-
- /**
- * Custom payload that an app framework can broadcast
- * Also used to transport stop command.
- */
- protected byte[] payload = new byte[0];
-
- /**
- * Command, so that the custom payload doesn't have to be used
- * This is for internal tribes use, such as SHUTDOWN_COMMAND
- */
- protected byte[] command = new byte[0];
-
- /**
- * Domain if we want to filter based on domain.
- */
- protected byte[] domain = new byte[0];
-
- /**
- * Empty constructor for serialization
- */
- public MemberImpl() {
-
- }
-
- /**
- * Construct a new member object
- * @param name - the name of this member, cluster unique
- * @param domain - the cluster domain name of this member
- * @param host - the tcp listen host
- * @param port - the tcp listen port
- */
- public MemberImpl(String host,
- int port,
- long aliveTime) throws IOException {
- setHostname(host);
- this.port = port;
- this.memberAliveTime=aliveTime;
- }
-
- public MemberImpl(String host,
- int port,
- long aliveTime,
- byte[] payload) throws IOException {
- this(host,port,aliveTime);
- setPayload(payload);
- }
-
- public boolean isReady() {
- return SenderState.getSenderState(this).isReady();
- }
- public boolean isSuspect() {
- return SenderState.getSenderState(this).isSuspect();
- }
- public boolean isFailing() {
- return SenderState.getSenderState(this).isFailing();
- }
-
- /**
- * Increment the message count.
- */
- protected void inc() {
- msgCount++;
- }
-
- /**
- * Create a data package to send over the wire representing this member.
- * This is faster than serialization.
- * @return - the bytes for this member deserialized
- * @throws Exception
- */
- public byte[] getData() {
- return getData(true);
- }
- /**
- * Highly optimized version of serializing a member into a byte array
- * Returns a cached byte[] reference, do not modify this data
- * @param getalive boolean
- * @return byte[]
- */
- public byte[] getData(boolean getalive) {
- return getData(getalive,false);
- }
-
-
- public int getDataLength() {
- return TRIBES_MBR_BEGIN.length+ //start pkg
- 4+ //data length
- 8+ //alive time
- 4+ //port
- 1+ //host length
- host.length+ //host
- 4+ //command length
- command.length+ //command
- 4+ //domain length
- domain.length+ //domain
- 16+ //unique id
- 4+ //payload length
- payload.length+ //payload
- TRIBES_MBR_END.length; //end pkg
- }
-
- /**
- *
- * @param getalive boolean - calculate memberAlive time
- * @param reset boolean - reset the cached data package, and create a new one
- * @return byte[]
- */
- public byte[] getData(boolean getalive, boolean reset) {
- if ( reset ) dataPkg = null;
- //look in cache first
- if ( dataPkg!=null ) {
- if ( getalive ) {
- //you'd be surprised, but System.currentTimeMillis
- //shows up on the profiler
- long alive=System.currentTimeMillis()-getServiceStartTime();
- XByteBuffer.toBytes( (long) alive, dataPkg, TRIBES_MBR_BEGIN.length+4);
- }
- return dataPkg;
- }
-
- //package looks like
- //start package TRIBES_MBR_BEGIN.length
- //package length - 4 bytes
- //alive - 8 bytes
- //port - 4 bytes
- //host length - 1 byte
- //host - hl bytes
- //clen - 4 bytes
- //command - clen bytes
- //dlen - 4 bytes
- //domain - dlen bytes
- //uniqueId - 16 bytes
- //payload length - 4 bytes
- //payload plen bytes
- //end package TRIBES_MBR_END.length
- byte[] addr = host;
- long alive=System.currentTimeMillis()-getServiceStartTime();
- byte hl = (byte)addr.length;
- byte[] data = new byte[getDataLength()];
-
- int bodylength = (getDataLength() - TRIBES_MBR_BEGIN.length - TRIBES_MBR_END.length - 4);
-
- int pos = 0;
-
- //TRIBES_MBR_BEGIN
- System.arraycopy(TRIBES_MBR_BEGIN,0,data,pos,TRIBES_MBR_BEGIN.length);
- pos += TRIBES_MBR_BEGIN.length;
-
- //body length
- XByteBuffer.toBytes(bodylength,data,pos);
- pos += 4;
-
- //alive data
- XByteBuffer.toBytes((long)alive,data,pos);
- pos += 8;
- //port
- XByteBuffer.toBytes(port,data,pos);
- pos += 4;
- //host length
- data[pos++] = hl;
- //host
- System.arraycopy(addr,0,data,pos,addr.length);
- pos+=addr.length;
- //clen - 4 bytes
- XByteBuffer.toBytes(command.length,data,pos);
- pos+=4;
- //command - clen bytes
- System.arraycopy(command,0,data,pos,command.length);
- pos+=command.length;
- //dlen - 4 bytes
- XByteBuffer.toBytes(domain.length,data,pos);
- pos+=4;
- //domain - dlen bytes
- System.arraycopy(domain,0,data,pos,domain.length);
- pos+=domain.length;
- //unique Id
- System.arraycopy(uniqueId,0,data,pos,uniqueId.length);
- pos+=uniqueId.length;
- //payload
- XByteBuffer.toBytes(payload.length,data,pos);
- pos+=4;
- System.arraycopy(payload,0,data,pos,payload.length);
- pos+=payload.length;
-
- //TRIBES_MBR_END
- System.arraycopy(TRIBES_MBR_END,0,data,pos,TRIBES_MBR_END.length);
- pos += TRIBES_MBR_END.length;
-
- //create local data
- dataPkg = data;
- return data;
- }
- /**
- * Deserializes a member from data sent over the wire
- * @param data - the bytes received
- * @return a member object.
- */
- public static MemberImpl getMember(byte[] data, MemberImpl member) {
- return getMember(data,0,data.length,member);
- }
-
- public static MemberImpl getMember(byte[] data, int offset, int length, MemberImpl member) {
- //package looks like
- //start package TRIBES_MBR_BEGIN.length
- //package length - 4 bytes
- //alive - 8 bytes
- //port - 4 bytes
- //host length - 1 byte
- //host - hl bytes
- //clen - 4 bytes
- //command - clen bytes
- //dlen - 4 bytes
- //domain - dlen bytes
- //uniqueId - 16 bytes
- //payload length - 4 bytes
- //payload plen bytes
- //end package TRIBES_MBR_END.length
-
- int pos = offset;
-
- if (XByteBuffer.firstIndexOf(data,offset,TRIBES_MBR_BEGIN)!=pos) {
- throw new IllegalArgumentException("Invalid package, should start with:"+org.apache.catalina.tribes.util.Arrays.toString(TRIBES_MBR_BEGIN));
- }
-
- if ( length < (TRIBES_MBR_BEGIN.length+4) ) {
- throw new ArrayIndexOutOfBoundsException("Member package to small to validate.");
- }
-
- pos += TRIBES_MBR_BEGIN.length;
-
- int bodylength = XByteBuffer.toInt(data,pos);
- pos += 4;
-
- if ( length < (bodylength+4+TRIBES_MBR_BEGIN.length+TRIBES_MBR_END.length) ) {
- throw new ArrayIndexOutOfBoundsException("Not enough bytes in member package.");
- }
-
- int endpos = pos+bodylength;
- if (XByteBuffer.firstIndexOf(data,endpos,TRIBES_MBR_END)!=endpos) {
- throw new IllegalArgumentException("Invalid package, should end with:"+org.apache.catalina.tribes.util.Arrays.toString(TRIBES_MBR_END));
- }
-
-
- byte[] alived = new byte[8];
- System.arraycopy(data, pos, alived, 0, 8);
- pos += 8;
- byte[] portd = new byte[4];
- System.arraycopy(data, pos, portd, 0, 4);
- pos += 4;
-
- byte hl = data[pos++];
- byte[] addr = new byte[hl];
- System.arraycopy(data, pos, addr, 0, hl);
- pos += hl;
-
- int cl = XByteBuffer.toInt(data, pos);
- pos += 4;
-
- byte[] command = new byte[cl];
- System.arraycopy(data, pos, command, 0, command.length);
- pos += command.length;
-
- int dl = XByteBuffer.toInt(data, pos);
- pos += 4;
-
- byte[] domain = new byte[dl];
- System.arraycopy(data, pos, domain, 0, domain.length);
- pos += domain.length;
-
- byte[] uniqueId = new byte[16];
- System.arraycopy(data, pos, uniqueId, 0, 16);
- pos += 16;
-
- int pl = XByteBuffer.toInt(data, pos);
- pos += 4;
-
- byte[] payload = new byte[pl];
- System.arraycopy(data, pos, payload, 0, payload.length);
- pos += payload.length;
-
- member.setHost(addr);
- member.setPort(XByteBuffer.toInt(portd, 0));
- member.setMemberAliveTime(XByteBuffer.toLong(alived, 0));
- member.setUniqueId(uniqueId);
- member.payload = payload;
- member.domain = domain;
- member.command = command;
-
- member.dataPkg = new byte[length];
- System.arraycopy(data, offset, member.dataPkg, 0, length);
-
- return member;
- }
-
- public static MemberImpl getMember(byte[] data) {
- return getMember(data,new MemberImpl());
- }
-
- /**
- * Return the name of this object
- * @return a unique name to the cluster
- */
- public String getName() {
- return "tcp://"+getHostname()+":"+getPort();
- }
-
- /**
- * Return the listen port of this member
- * @return - tcp listen port
- */
- public int getPort() {
- return this.port;
- }
-
- /**
- * Return the TCP listen host for this member
- * @return IP address or host name
- */
- public byte[] getHost() {
- return host;
- }
-
- public String getHostname() {
- if ( this.hostname != null ) return hostname;
- else {
- try {
- this.hostname = java.net.InetAddress.getByAddress(host).getHostName();
- return this.hostname;
- }catch ( IOException x ) {
- throw new RuntimeException("Unable to parse hostname.",x);
- }
- }
- }
-
- /**
- * Contains information on how long this member has been online.
- * The result is the number of milli seconds this member has been
- * broadcasting its membership to the cluster.
- * @return nr of milliseconds since this member started.
- */
- public long getMemberAliveTime() {
- return memberAliveTime;
- }
-
- public long getServiceStartTime() {
- return serviceStartTime;
- }
-
- public byte[] getUniqueId() {
- return uniqueId;
- }
-
- public byte[] getPayload() {
- return payload;
- }
-
- public byte[] getCommand() {
- return command;
- }
-
- public byte[] getDomain() {
- return domain;
- }
-
- public void setMemberAliveTime(long time) {
- memberAliveTime=time;
- }
-
-
-
- /**
- * String representation of this object
- */
- public String toString() {
- StringBuffer buf = new StringBuffer("org.apache.catalina.tribes.membership.MemberImpl[");
- buf.append(getName()).append(",");
- buf.append(getHostname()).append(",");
- buf.append(port).append(", alive=");
- buf.append(memberAliveTime).append(",");
- buf.append("id=").append(bToS(this.uniqueId)).append(", ");
- buf.append("payload=").append(bToS(this.payload,8)).append(", ");
- buf.append("command=").append(bToS(this.command,8)).append(", ");
- buf.append("domain=").append(bToS(this.domain,8)).append(", ");
- buf.append("]");
- return buf.toString();
- }
- public static String bToS(byte[] data) {
- return bToS(data,data.length);
- }
- public static String bToS(byte[] data, int max) {
- StringBuffer buf = new StringBuffer(4*16);
- buf.append("{");
- for (int i=0; data!=null && i McastServiceImpl.MAX_PACKET_SIZE ) {
- this.payload = oldpayload;
- throw new IllegalArgumentException("Payload is to large for tribes to handle.");
- }
-
- }
-
- public void setCommand(byte[] command) {
- this.command = command!=null?command:new byte[0];
- getData(true,true);
- }
-
- public void setDomain(byte[] domain) {
- this.domain = domain!=null?domain:new byte[0];
- getData(true,true);
- }
-
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- int length = in.readInt();
- byte[] message = new byte[length];
- in.read(message);
- getMember(message,this);
-
- }
-
- public void writeExternal(ObjectOutput out) throws IOException {
- byte[] data = this.getData();
- out.writeInt(data.length);
- out.write(data);
- }
-
-}
+/*
+ * Copyright 1999,2004-2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.membership;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Arrays;
+
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.transport.SenderState;
+
+/**
+ * A membership implementation using simple multicast.
+ * This is the representation of a multicast member.
+ * Carries the host, and port of the this or other cluster nodes.
+ *
+ * @author Filip Hanik
+ * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $
+ */
+public class MemberImpl implements Member, java.io.Externalizable {
+
+ /**
+ * Public properties specific to this implementation
+ */
+ public static final transient String TCP_LISTEN_PORT = "tcpListenPort";
+ public static final transient String TCP_LISTEN_HOST = "tcpListenHost";
+ public static final transient String MEMBER_NAME = "memberName";
+
+ public static final transient byte[] TRIBES_MBR_BEGIN = new byte[] {84, 82, 73, 66, 69, 83, 45, 66};
+ public static final transient byte[] TRIBES_MBR_END = new byte[] {84, 82, 73, 66, 69, 83, 45, 69};
+
+ /**
+ * The listen host for this member
+ */
+ protected byte[] host;
+ protected transient String hostname;
+ /**
+ * The tcp listen port for this member
+ */
+ protected int port;
+
+ /**
+ * The tcp/SSL listen port for this member
+ */
+ protected int securePort = -1;
+
+ /**
+ * Counter for how many broadcast messages have been sent from this member
+ */
+ protected int msgCount = 0;
+ /**
+ * The number of milliseconds since this members was
+ * created, is kept track of using the start time
+ */
+ protected long memberAliveTime = 0;
+
+ /**
+ * For the local member only
+ */
+ protected transient long serviceStartTime;
+
+ /**
+ * To avoid serialization over and over again, once the local dataPkg
+ * has been set, we use that to transmit data
+ */
+ protected transient byte[] dataPkg = null;
+
+ /**
+ * Unique session Id for this member
+ */
+ protected byte[] uniqueId = new byte[16];
+
+ /**
+ * Custom payload that an app framework can broadcast
+ * Also used to transport stop command.
+ */
+ protected byte[] payload = new byte[0];
+
+ /**
+ * Command, so that the custom payload doesn't have to be used
+ * This is for internal tribes use, such as SHUTDOWN_COMMAND
+ */
+ protected byte[] command = new byte[0];
+
+ /**
+ * Domain if we want to filter based on domain.
+ */
+ protected byte[] domain = new byte[0];
+
+ /**
+ * Empty constructor for serialization
+ */
+ public MemberImpl() {
+
+ }
+
+ /**
+ * Construct a new member object
+ * @param name - the name of this member, cluster unique
+ * @param domain - the cluster domain name of this member
+ * @param host - the tcp listen host
+ * @param port - the tcp listen port
+ */
+ public MemberImpl(String host,
+ int port,
+ long aliveTime) throws IOException {
+ setHostname(host);
+ this.port = port;
+ this.memberAliveTime=aliveTime;
+ }
+
+ public MemberImpl(String host,
+ int port,
+ long aliveTime,
+ byte[] payload) throws IOException {
+ this(host,port,aliveTime);
+ setPayload(payload);
+ }
+
+ public boolean isReady() {
+ return SenderState.getSenderState(this).isReady();
+ }
+ public boolean isSuspect() {
+ return SenderState.getSenderState(this).isSuspect();
+ }
+ public boolean isFailing() {
+ return SenderState.getSenderState(this).isFailing();
+ }
+
+ /**
+ * Increment the message count.
+ */
+ protected void inc() {
+ msgCount++;
+ }
+
+ /**
+ * Create a data package to send over the wire representing this member.
+ * This is faster than serialization.
+ * @return - the bytes for this member deserialized
+ * @throws Exception
+ */
+ public byte[] getData() {
+ return getData(true);
+ }
+ /**
+ * Highly optimized version of serializing a member into a byte array
+ * Returns a cached byte[] reference, do not modify this data
+ * @param getalive boolean
+ * @return byte[]
+ */
+ public byte[] getData(boolean getalive) {
+ return getData(getalive,false);
+ }
+
+
+ public int getDataLength() {
+ return TRIBES_MBR_BEGIN.length+ //start pkg
+ 4+ //data length
+ 8+ //alive time
+ 4+ //port
+ 4+ //secure port
+ 1+ //host length
+ host.length+ //host
+ 4+ //command length
+ command.length+ //command
+ 4+ //domain length
+ domain.length+ //domain
+ 16+ //unique id
+ 4+ //payload length
+ payload.length+ //payload
+ TRIBES_MBR_END.length; //end pkg
+ }
+
+ /**
+ *
+ * @param getalive boolean - calculate memberAlive time
+ * @param reset boolean - reset the cached data package, and create a new one
+ * @return byte[]
+ */
+ public byte[] getData(boolean getalive, boolean reset) {
+ if ( reset ) dataPkg = null;
+ //look in cache first
+ if ( dataPkg!=null ) {
+ if ( getalive ) {
+ //you'd be surprised, but System.currentTimeMillis
+ //shows up on the profiler
+ long alive=System.currentTimeMillis()-getServiceStartTime();
+ XByteBuffer.toBytes( (long) alive, dataPkg, TRIBES_MBR_BEGIN.length+4);
+ }
+ return dataPkg;
+ }
+
+ //package looks like
+ //start package TRIBES_MBR_BEGIN.length
+ //package length - 4 bytes
+ //alive - 8 bytes
+ //port - 4 bytes
+ //secure port - 4 bytes
+ //host length - 1 byte
+ //host - hl bytes
+ //clen - 4 bytes
+ //command - clen bytes
+ //dlen - 4 bytes
+ //domain - dlen bytes
+ //uniqueId - 16 bytes
+ //payload length - 4 bytes
+ //payload plen bytes
+ //end package TRIBES_MBR_END.length
+ byte[] addr = host;
+ long alive=System.currentTimeMillis()-getServiceStartTime();
+ byte hl = (byte)addr.length;
+ byte[] data = new byte[getDataLength()];
+
+ int bodylength = (getDataLength() - TRIBES_MBR_BEGIN.length - TRIBES_MBR_END.length - 4);
+
+ int pos = 0;
+
+ //TRIBES_MBR_BEGIN
+ System.arraycopy(TRIBES_MBR_BEGIN,0,data,pos,TRIBES_MBR_BEGIN.length);
+ pos += TRIBES_MBR_BEGIN.length;
+
+ //body length
+ XByteBuffer.toBytes(bodylength,data,pos);
+ pos += 4;
+
+ //alive data
+ XByteBuffer.toBytes((long)alive,data,pos);
+ pos += 8;
+ //port
+ XByteBuffer.toBytes(port,data,pos);
+ pos += 4;
+ //secure port
+ XByteBuffer.toBytes(securePort,data,pos);
+ pos += 4;
+ //host length
+ data[pos++] = hl;
+ //host
+ System.arraycopy(addr,0,data,pos,addr.length);
+ pos+=addr.length;
+ //clen - 4 bytes
+ XByteBuffer.toBytes(command.length,data,pos);
+ pos+=4;
+ //command - clen bytes
+ System.arraycopy(command,0,data,pos,command.length);
+ pos+=command.length;
+ //dlen - 4 bytes
+ XByteBuffer.toBytes(domain.length,data,pos);
+ pos+=4;
+ //domain - dlen bytes
+ System.arraycopy(domain,0,data,pos,domain.length);
+ pos+=domain.length;
+ //unique Id
+ System.arraycopy(uniqueId,0,data,pos,uniqueId.length);
+ pos+=uniqueId.length;
+ //payload
+ XByteBuffer.toBytes(payload.length,data,pos);
+ pos+=4;
+ System.arraycopy(payload,0,data,pos,payload.length);
+ pos+=payload.length;
+
+ //TRIBES_MBR_END
+ System.arraycopy(TRIBES_MBR_END,0,data,pos,TRIBES_MBR_END.length);
+ pos += TRIBES_MBR_END.length;
+
+ //create local data
+ dataPkg = data;
+ return data;
+ }
+ /**
+ * Deserializes a member from data sent over the wire
+ * @param data - the bytes received
+ * @return a member object.
+ */
+ public static MemberImpl getMember(byte[] data, MemberImpl member) {
+ return getMember(data,0,data.length,member);
+ }
+
+ public static MemberImpl getMember(byte[] data, int offset, int length, MemberImpl member) {
+ //package looks like
+ //start package TRIBES_MBR_BEGIN.length
+ //package length - 4 bytes
+ //alive - 8 bytes
+ //port - 4 bytes
+ //secure port - 4 bytes
+ //host length - 1 byte
+ //host - hl bytes
+ //clen - 4 bytes
+ //command - clen bytes
+ //dlen - 4 bytes
+ //domain - dlen bytes
+ //uniqueId - 16 bytes
+ //payload length - 4 bytes
+ //payload plen bytes
+ //end package TRIBES_MBR_END.length
+
+ int pos = offset;
+
+ if (XByteBuffer.firstIndexOf(data,offset,TRIBES_MBR_BEGIN)!=pos) {
+ throw new IllegalArgumentException("Invalid package, should start with:"+org.apache.catalina.tribes.util.Arrays.toString(TRIBES_MBR_BEGIN));
+ }
+
+ if ( length < (TRIBES_MBR_BEGIN.length+4) ) {
+ throw new ArrayIndexOutOfBoundsException("Member package to small to validate.");
+ }
+
+ pos += TRIBES_MBR_BEGIN.length;
+
+ int bodylength = XByteBuffer.toInt(data,pos);
+ pos += 4;
+
+ if ( length < (bodylength+4+TRIBES_MBR_BEGIN.length+TRIBES_MBR_END.length) ) {
+ throw new ArrayIndexOutOfBoundsException("Not enough bytes in member package.");
+ }
+
+ int endpos = pos+bodylength;
+ if (XByteBuffer.firstIndexOf(data,endpos,TRIBES_MBR_END)!=endpos) {
+ throw new IllegalArgumentException("Invalid package, should end with:"+org.apache.catalina.tribes.util.Arrays.toString(TRIBES_MBR_END));
+ }
+
+
+ byte[] alived = new byte[8];
+ System.arraycopy(data, pos, alived, 0, 8);
+ pos += 8;
+ byte[] portd = new byte[4];
+ System.arraycopy(data, pos, portd, 0, 4);
+ pos += 4;
+
+ byte[] sportd = new byte[4];
+ System.arraycopy(data, pos, sportd, 0, 4);
+ pos += 4;
+
+
+
+ byte hl = data[pos++];
+ byte[] addr = new byte[hl];
+ System.arraycopy(data, pos, addr, 0, hl);
+ pos += hl;
+
+ int cl = XByteBuffer.toInt(data, pos);
+ pos += 4;
+
+ byte[] command = new byte[cl];
+ System.arraycopy(data, pos, command, 0, command.length);
+ pos += command.length;
+
+ int dl = XByteBuffer.toInt(data, pos);
+ pos += 4;
+
+ byte[] domain = new byte[dl];
+ System.arraycopy(data, pos, domain, 0, domain.length);
+ pos += domain.length;
+
+ byte[] uniqueId = new byte[16];
+ System.arraycopy(data, pos, uniqueId, 0, 16);
+ pos += 16;
+
+ int pl = XByteBuffer.toInt(data, pos);
+ pos += 4;
+
+ byte[] payload = new byte[pl];
+ System.arraycopy(data, pos, payload, 0, payload.length);
+ pos += payload.length;
+
+ member.setHost(addr);
+ member.setPort(XByteBuffer.toInt(portd, 0));
+ member.setSecurePort(XByteBuffer.toInt(sportd, 0));
+ member.setMemberAliveTime(XByteBuffer.toLong(alived, 0));
+ member.setUniqueId(uniqueId);
+ member.payload = payload;
+ member.domain = domain;
+ member.command = command;
+
+ member.dataPkg = new byte[length];
+ System.arraycopy(data, offset, member.dataPkg, 0, length);
+
+ return member;
+ }
+
+ public static MemberImpl getMember(byte[] data) {
+ return getMember(data,new MemberImpl());
+ }
+
+ /**
+ * Return the name of this object
+ * @return a unique name to the cluster
+ */
+ public String getName() {
+ return "tcp://"+getHostname()+":"+getPort();
+ }
+
+ /**
+ * Return the listen port of this member
+ * @return - tcp listen port
+ */
+ public int getPort() {
+ return this.port;
+ }
+
+ /**
+ * Return the TCP listen host for this member
+ * @return IP address or host name
+ */
+ public byte[] getHost() {
+ return host;
+ }
+
+ public String getHostname() {
+ if ( this.hostname != null ) return hostname;
+ else {
+ try {
+ this.hostname = java.net.InetAddress.getByAddress(host).getHostName();
+ return this.hostname;
+ }catch ( IOException x ) {
+ throw new RuntimeException("Unable to parse hostname.",x);
+ }
+ }
+ }
+
+ /**
+ * Contains information on how long this member has been online.
+ * The result is the number of milli seconds this member has been
+ * broadcasting its membership to the cluster.
+ * @return nr of milliseconds since this member started.
+ */
+ public long getMemberAliveTime() {
+ return memberAliveTime;
+ }
+
+ public long getServiceStartTime() {
+ return serviceStartTime;
+ }
+
+ public byte[] getUniqueId() {
+ return uniqueId;
+ }
+
+ public byte[] getPayload() {
+ return payload;
+ }
+
+ public byte[] getCommand() {
+ return command;
+ }
+
+ public byte[] getDomain() {
+ return domain;
+ }
+
+ public int getSecurePort() {
+ return securePort;
+ }
+
+ public void setMemberAliveTime(long time) {
+ memberAliveTime=time;
+ }
+
+
+
+ /**
+ * String representation of this object
+ */
+ public String toString() {
+ StringBuffer buf = new StringBuffer("org.apache.catalina.tribes.membership.MemberImpl[");
+ buf.append(getName()).append(",");
+ buf.append(getHostname()).append(",");
+ buf.append(port).append(", alive=");
+ buf.append(memberAliveTime).append(",");
+ buf.append("id=").append(bToS(this.uniqueId)).append(", ");
+ buf.append("payload=").append(bToS(this.payload,8)).append(", ");
+ buf.append("command=").append(bToS(this.command,8)).append(", ");
+ buf.append("domain=").append(bToS(this.domain,8)).append(", ");
+ buf.append("]");
+ return buf.toString();
+ }
+ public static String bToS(byte[] data) {
+ return bToS(data,data.length);
+ }
+ public static String bToS(byte[] data, int max) {
+ StringBuffer buf = new StringBuffer(4*16);
+ buf.append("{");
+ for (int i=0; data!=null && i McastServiceImpl.MAX_PACKET_SIZE ) {
+ this.payload = oldpayload;
+ throw new IllegalArgumentException("Payload is to large for tribes to handle.");
+ }
+
+ }
+
+ public void setCommand(byte[] command) {
+ this.command = command!=null?command:new byte[0];
+ getData(true,true);
+ }
+
+ public void setDomain(byte[] domain) {
+ this.domain = domain!=null?domain:new byte[0];
+ getData(true,true);
+ }
+
+ public void setSecurePort(int securePort) {
+ this.securePort = securePort;
+ }
+
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ int length = in.readInt();
+ byte[] message = new byte[length];
+ in.read(message);
+ getMember(message,this);
+
+ }
+
+ public void writeExternal(ObjectOutput out) throws IOException {
+ byte[] data = this.getData();
+ out.writeInt(data.length);
+ out.write(data);
+ }
+
+}
diff --git a/java/org/apache/catalina/tribes/transport/ReceiverBase.java b/java/org/apache/catalina/tribes/transport/ReceiverBase.java
index f09b956e9..091bb9320 100644
--- a/java/org/apache/catalina/tribes/transport/ReceiverBase.java
+++ b/java/org/apache/catalina/tribes/transport/ReceiverBase.java
@@ -49,6 +49,7 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, T
private String host = "auto";
private InetAddress bind;
private int port = 4000;
+ private int securePort = -1;
private int rxBufSize = 43800;
private int txBufSize = 25188;
private boolean listen = false;
@@ -299,6 +300,10 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, T
return useBufferPool;
}
+ public int getSecurePort() {
+ return securePort;
+ }
+
public void setTcpSelectorTimeout(long selTimeout) {
tcpSelectorTimeout = selTimeout;
}
@@ -377,6 +382,10 @@ public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, T
this.useBufferPool = useBufferPool;
}
+ public void setSecurePort(int securePort) {
+ this.securePort = securePort;
+ }
+
public void heartbeat() {
//empty operation
}
diff --git a/test/org/apache/catalina/tribes/test/TestNioSender.java b/test/org/apache/catalina/tribes/test/TestNioSender.java
new file mode 100644
index 000000000..9cd0ed0d8
--- /dev/null
+++ b/test/org/apache/catalina/tribes/test/TestNioSender.java
@@ -0,0 +1,106 @@
+package org.apache.catalina.tribes.test;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.util.Iterator;
+import java.nio.channels.Selector;
+import org.apache.catalina.tribes.transport.nio.NioSender;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.Channel;
+
+/**
+ * Title:
+ *
+ * Description:
+ *
+ * Copyright: Copyright (c) 2005
+ *
+ * Company:
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class TestNioSender {
+ private Selector selector = null;
+ private int counter = 0;
+ MemberImpl mbr;
+ private static int testOptions = Channel.SEND_OPTIONS_DEFAULT;
+ public TestNioSender() {
+
+ }
+
+ public synchronized int inc() {
+ return ++counter;
+ }
+
+ public synchronized ChannelData getMessage(Member mbr) {
+ String msg = new String("Thread-"+Thread.currentThread().getName()+" Message:"+inc());
+ ChannelData data = new ChannelData(true);
+ data.setMessage(new XByteBuffer(msg.getBytes(),false));
+ data.setAddress(mbr);
+
+ return data;
+ }
+
+ public void init() throws Exception {
+ selector = Selector.open();
+ mbr = new MemberImpl("localhost",4444,0);
+ NioSender sender = new NioSender();
+ sender.setDestination(mbr);
+ sender.setDirectBuffer(true);
+ sender.setSelector(selector);
+ sender.setMessage(XByteBuffer.createDataPackage(getMessage(mbr)));
+ sender.connect();
+ }
+
+ public void run() {
+ while (true) {
+
+ int selectedKeys = 0;
+ try {
+ selectedKeys = selector.select(100);
+ // if ( selectedKeys == 0 ) {
+ // System.out.println("No registered interests. Sleeping for a second.");
+ // Thread.sleep(100);
+ } catch (Exception e) {
+ e.printStackTrace();
+ continue;
+ }
+
+ if (selectedKeys == 0) {
+ continue;
+ }
+
+ Iterator it = selector.selectedKeys().iterator();
+ while (it.hasNext()) {
+ SelectionKey sk = (SelectionKey) it.next();
+ it.remove();
+ try {
+ int readyOps = sk.readyOps();
+ sk.interestOps(sk.interestOps() & ~readyOps);
+ NioSender sender = (NioSender) sk.attachment();
+ if ( sender.process(sk, (testOptions&Channel.SEND_OPTIONS_USE_ACK)==Channel.SEND_OPTIONS_USE_ACK) ) {
+ System.out.println("Message completed for handler:"+sender);
+ Thread.currentThread().sleep(2000);
+ sender.reset();
+ sender.setMessage(XByteBuffer.createDataPackage(getMessage(mbr)));
+ }
+
+
+ } catch (Throwable t) {
+ t.printStackTrace();
+ return;
+ }
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ TestNioSender sender = new TestNioSender();
+ sender.init();
+ sender.run();
+ }
+}
diff --git a/test/org/apache/catalina/tribes/test/TribesTestSuite.java b/test/org/apache/catalina/tribes/test/TribesTestSuite.java
new file mode 100644
index 000000000..9b846f052
--- /dev/null
+++ b/test/org/apache/catalina/tribes/test/TribesTestSuite.java
@@ -0,0 +1,24 @@
+package org.apache.catalina.tribes.test;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+public class TribesTestSuite
+ extends TestCase {
+
+ public TribesTestSuite(String s) {
+ super(s);
+ }
+
+ public static Test suite() {
+ TestSuite suite = new TestSuite();
+ suite.addTestSuite(org.apache.catalina.tribes.test.channel.ChannelStartStop.class);
+ suite.addTestSuite(org.apache.catalina.tribes.test.channel.TestChannelOptionFlag.class);
+ suite.addTestSuite(org.apache.catalina.tribes.test.membership.MemberSerialization.class);
+ suite.addTestSuite(org.apache.catalina.tribes.test.membership.TestMemberArrival.class);
+ suite.addTestSuite(org.apache.catalina.tribes.test.membership.TestTcpFailureDetector.class);
+ suite.addTestSuite(org.apache.catalina.tribes.test.channel.TestDataIntegrity.class);
+ return suite;
+ }
+}
diff --git a/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java b/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java
new file mode 100644
index 000000000..9e65c7cd0
--- /dev/null
+++ b/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+package org.apache.catalina.tribes.test.channel;
+
+import org.apache.catalina.tribes.group.GroupChannel;
+import junit.framework.TestCase;
+
+/**
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class ChannelStartStop extends TestCase {
+ GroupChannel channel = null;
+ protected void setUp() throws Exception {
+ super.setUp();
+ channel = new GroupChannel();
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ try {channel.stop(channel.DEFAULT);}catch (Exception ignore){}
+ }
+
+ public void testDoubleFullStart() throws Exception {
+ int count = 0;
+ try {
+ channel.start(channel.DEFAULT);
+ count++;
+ } catch ( Exception x){x.printStackTrace();}
+ try {
+ channel.start(channel.DEFAULT);
+ count++;
+ } catch ( Exception x){x.printStackTrace();}
+ assertEquals(count,2);
+ channel.stop(channel.DEFAULT);
+ }
+
+ public void testDoublePartialStart() throws Exception {
+ //try to double start the RX
+ int count = 0;
+ try {
+ channel.start(channel.SND_RX_SEQ);
+ channel.start(channel.MBR_RX_SEQ);
+ count++;
+ } catch ( Exception x){x.printStackTrace();}
+ try {
+ channel.start(channel.MBR_RX_SEQ);
+ count++;
+ } catch ( Exception x){/*expected*/}
+ assertEquals(count,1);
+ channel.stop(channel.DEFAULT);
+ //double the membership sender
+ count = 0;
+ try {
+ channel.start(channel.SND_RX_SEQ);
+ channel.start(channel.MBR_TX_SEQ);
+ count++;
+ } catch ( Exception x){x.printStackTrace();}
+ try {
+ channel.start(channel.MBR_TX_SEQ);
+ count++;
+ } catch ( Exception x){/*expected*/}
+ assertEquals(count,1);
+ channel.stop(channel.DEFAULT);
+
+ count = 0;
+ try {
+ channel.start(channel.SND_RX_SEQ);
+ count++;
+ } catch ( Exception x){x.printStackTrace();}
+ try {
+ channel.start(channel.SND_RX_SEQ);
+ count++;
+ } catch ( Exception x){/*expected*/}
+ assertEquals(count,1);
+ channel.stop(channel.DEFAULT);
+
+ count = 0;
+ try {
+ channel.start(channel.SND_TX_SEQ);
+ count++;
+ } catch ( Exception x){x.printStackTrace();}
+ try {
+ channel.start(channel.SND_TX_SEQ);
+ count++;
+ } catch ( Exception x){/*expected*/}
+ assertEquals(count,1);
+ channel.stop(channel.DEFAULT);
+ }
+
+ public void testFalseOption() throws Exception {
+ int flag = 0xFFF0;//should get ignored by the underlying components
+ int count = 0;
+ try {
+ channel.start(flag);
+ count++;
+ } catch ( Exception x){x.printStackTrace();}
+ try {
+ channel.start(flag);
+ count++;
+ } catch ( Exception x){/*expected*/}
+ assertEquals(count,2);
+ channel.stop(channel.DEFAULT);
+ }
+
+}
diff --git a/test/org/apache/catalina/tribes/test/channel/TestChannelOptionFlag.java b/test/org/apache/catalina/tribes/test/channel/TestChannelOptionFlag.java
new file mode 100644
index 000000000..c6119e5c1
--- /dev/null
+++ b/test/org/apache/catalina/tribes/test/channel/TestChannelOptionFlag.java
@@ -0,0 +1,76 @@
+package org.apache.catalina.tribes.test.channel;
+
+import junit.framework.*;
+import org.apache.catalina.tribes.group.*;
+import org.apache.catalina.tribes.ChannelInterceptor;
+import org.apache.catalina.tribes.ChannelException;
+
+/**
+ * Title:
+ *
+ * Description:
+ *
+ * Copyright: Copyright (c) 2005
+ *
+ * Company:
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class TestChannelOptionFlag extends TestCase {
+ GroupChannel channel = null;
+ protected void setUp() throws Exception {
+ super.setUp();
+ channel = new GroupChannel();
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ if ( channel != null ) try {channel.stop(channel.DEFAULT);}catch ( Exception ignore) {}
+ channel = null;
+ }
+
+
+ public void testOptionConflict() throws Exception {
+ boolean error = false;
+ channel.setOptionCheck(true);
+ ChannelInterceptor i = new TestInterceptor();
+ i.setOptionFlag(128);
+ channel.addInterceptor(i);
+ i = new TestInterceptor();
+ i.setOptionFlag(128);
+ channel.addInterceptor(i);
+ try {
+ channel.start(channel.DEFAULT);
+ }catch ( ChannelException x ) {
+ if ( x.getMessage().indexOf("option flag conflict") >= 0 ) error = true;
+ }
+ assertEquals(true,error);
+ }
+
+ public void testOptionNoConflict() throws Exception {
+ boolean error = false;
+ channel.setOptionCheck(true);
+ ChannelInterceptor i = new TestInterceptor();
+ i.setOptionFlag(128);
+ channel.addInterceptor(i);
+ i = new TestInterceptor();
+ i.setOptionFlag(64);
+ channel.addInterceptor(i);
+ i = new TestInterceptor();
+ i.setOptionFlag(256);
+ channel.addInterceptor(i);
+ try {
+ channel.start(channel.DEFAULT);
+ }catch ( ChannelException x ) {
+ if ( x.getMessage().indexOf("option flag conflict") >= 0 ) error = true;
+ }
+ assertEquals(false,error);
+ }
+
+ public static class TestInterceptor extends ChannelInterceptorBase {
+
+ }
+
+
+}
diff --git a/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java b/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java
new file mode 100644
index 000000000..47b85c13a
--- /dev/null
+++ b/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java
@@ -0,0 +1,173 @@
+package org.apache.catalina.tribes.test.channel;
+
+import junit.framework.TestCase;
+import java.io.Serializable;
+import java.util.Random;
+import java.util.Arrays;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.test.channel.TestDataIntegrity.Listener;
+import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
+import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
+
+/**
+ * Title:
+ *
+ * Description:
+ *
+ * Copyright: Copyright (c) 2005
+ *
+ * Company:
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class TestDataIntegrity extends TestCase {
+ int msgCount = 500;
+ int threadCount = 20;
+ GroupChannel channel1;
+ GroupChannel channel2;
+ Listener listener1;
+ int threadCounter = 0;
+ protected void setUp() throws Exception {
+ super.setUp();
+ channel1 = new GroupChannel();
+ channel1.addInterceptor(new MessageDispatch15Interceptor());
+ channel2 = new GroupChannel();
+ channel2.addInterceptor(new MessageDispatch15Interceptor());
+ listener1 = new Listener();
+ channel2.addChannelListener(listener1);
+ channel1.start(GroupChannel.DEFAULT);
+ channel2.start(GroupChannel.DEFAULT);
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ channel1.stop(GroupChannel.DEFAULT);
+ channel2.stop(GroupChannel.DEFAULT);
+ }
+
+ public void testDataSendNO_ACK() throws Exception {
+ System.err.println("Starting NO_ACK");
+ Thread[] threads = new Thread[threadCount];
+ for (int x=0; xTitle:
+ *
+ * Description:
+ *
+ * Copyright: Copyright (c) 2005
+ *
+ * Company:
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class TestRemoteProcessException extends TestCase {
+ int msgCount = 10000;
+ GroupChannel channel1;
+ GroupChannel channel2;
+ Listener listener1;
+ protected void setUp() throws Exception {
+ super.setUp();
+ channel1 = new GroupChannel();
+ channel2 = new GroupChannel();
+ listener1 = new Listener();
+ channel2.addChannelListener(listener1);
+ channel1.start(GroupChannel.DEFAULT);
+ channel2.start(GroupChannel.DEFAULT);
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ channel1.stop(GroupChannel.DEFAULT);
+ channel2.stop(GroupChannel.DEFAULT);
+ }
+
+ public void testDataSendSYNCACK() throws Exception {
+ System.err.println("Starting SYNC_ACK");
+ int errC=0, nerrC=0;
+ for (int i=0; iTitle:
+ *
+ * Description:
+ *
+ * Copyright: Copyright (c) 2005
+ *
+ * Company:
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class TestTwoPhaseCommit extends TestCase {
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+}
diff --git a/test/org/apache/catalina/tribes/test/io/TestSenderConnections.java b/test/org/apache/catalina/tribes/test/io/TestSenderConnections.java
new file mode 100644
index 000000000..d0fd33990
--- /dev/null
+++ b/test/org/apache/catalina/tribes/test/io/TestSenderConnections.java
@@ -0,0 +1,112 @@
+package org.apache.catalina.tribes.test.io;
+
+import java.util.ArrayList;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ManagedChannel;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.group.GroupChannel;
+import junit.framework.TestCase;
+import org.apache.catalina.tribes.ChannelListener;
+import java.io.Serializable;
+import java.util.Random;
+import java.util.HashMap;
+import org.apache.catalina.tribes.transport.ReplicationTransmitter;
+
+public class TestSenderConnections extends TestCase {
+ private static int count = 2;
+ private ManagedChannel[] channels = new ManagedChannel[count];
+ private TestMsgListener[] listeners = new TestMsgListener[count];
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ for (int i = 0; i < channels.length; i++) {
+ channels[i] = new GroupChannel();
+ channels[i].getMembershipService().setPayload( ("Channel-" + (i + 1)).getBytes("ASCII"));
+ listeners[i] = new TestMsgListener( ("Listener-" + (i + 1)));
+ channels[i].addChannelListener(listeners[i]);
+ channels[i].start(Channel.SND_RX_SEQ|Channel.SND_TX_SEQ);
+
+ }
+ }
+
+ public void clear() {
+ }
+
+ public void sendMessages(long delay, long sleep) throws Exception {
+ Member local = channels[0].getLocalMember(true);
+ Member dest = channels[1].getLocalMember(true);
+ int n = 3;
+ System.out.println("Sending " + n + " messages from [" + local.getName() + "] to [" + dest.getName() + "]");
+ for (int i = 0; i < n; i++) {
+ channels[0].send(new Member[] {dest}, new TestMsg(), 0);
+ if ( delay > 0 ) Thread.sleep(delay);
+ }
+ System.out.println("Messages sent. Sleeping for "+(sleep/1000)+" seconds to inspect connections");
+ if ( sleep > 0 ) Thread.sleep(sleep);
+
+ }
+
+ public void testConnectionLinger() throws Exception {
+ sendMessages(0,15000);
+ }
+
+ public void testKeepAliveCount() throws Exception {
+ System.out.println("Setting keep alive count to 0");
+ for (int i = 0; i < channels.length; i++) {
+ ReplicationTransmitter t = (ReplicationTransmitter)channels[0].getChannelSender();
+ t.getTransport().setKeepAliveCount(0);
+ }
+ sendMessages(1000,15000);
+ }
+
+ public void testKeepAliveTime() throws Exception {
+ System.out.println("Setting keep alive count to 1 second");
+ for (int i = 0; i < channels.length; i++) {
+ ReplicationTransmitter t = (ReplicationTransmitter)channels[0].getChannelSender();
+ t.getTransport().setKeepAliveTime(1000);
+ }
+ sendMessages(2000,15000);
+ }
+
+ protected void tearDown() throws Exception {
+ for (int i = 0; i < channels.length; i++) {
+ channels[i].stop(Channel.DEFAULT);
+ }
+
+ }
+
+ public static class TestMsg implements Serializable {
+ static Random r = new Random(System.currentTimeMillis());
+ HashMap map = new HashMap();
+ public TestMsg() {
+ int size = Math.abs(r.nextInt() % 200);
+ for (int i=0; iTitle:
+ *
+ * Description:
+ *
+ * Copyright: Copyright (c) 2005
+ *
+ * Company:
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class MemberSerialization extends TestCase {
+ MemberImpl m1, m2, p1,p2;
+ byte[] payload = null;
+ protected void setUp() throws Exception {
+ super.setUp();
+ payload = new byte[333];
+ Arrays.fill(payload,(byte)1);
+ m1 = new MemberImpl("localhost",3333,1,payload);
+ m2 = new MemberImpl("localhost",3333,1);
+ payload = new byte[333];
+ Arrays.fill(payload,(byte)2);
+ p1 = new MemberImpl("127.0.0.1",3333,1,payload);
+ p2 = new MemberImpl("localhost",3331,1,payload);
+ m1.setDomain(new byte[] {1,2,3,4,5,6,7,8,9});
+ m2.setDomain(new byte[] {1,2,3,4,5,6,7,8,9});
+ m1.setCommand(new byte[] {1,2,4,5,6,7,8,9});
+ m2.setCommand(new byte[] {1,2,4,5,6,7,8,9});
+ }
+
+ public void testCompare() throws Exception {
+ assertTrue(m1.equals(m2));
+ assertTrue(m2.equals(m1));
+ assertTrue(p1.equals(m2));
+ assertFalse(m1.equals(p2));
+ assertFalse(m1.equals(p2));
+ assertFalse(m2.equals(p2));
+ assertFalse(p1.equals(p2));
+ }
+
+ public void testSerializationOne() throws Exception {
+ MemberImpl m = m1;
+ byte[] md1 = m.getData(false,true);
+ byte[] mda1 = m.getData(false,false);
+ assertTrue(Arrays.equals(md1,mda1));
+ assertTrue(md1==mda1);
+ mda1 = m.getData(true,true);
+ MemberImpl ma1 = MemberImpl.getMember(mda1);
+ assertTrue(compareMembers(m,ma1));
+ mda1 = p1.getData(false);
+ assertFalse(Arrays.equals(md1,mda1));
+ ma1 = MemberImpl.getMember(mda1);
+ assertTrue(compareMembers(p1,ma1));
+
+ md1 = m.getData(true,true);
+ Thread.sleep(50);
+ mda1 = m.getData(true,true);
+ MemberImpl a1 = MemberImpl.getMember(md1);
+ MemberImpl a2 = MemberImpl.getMember(mda1);
+ assertTrue(a1.equals(a2));
+ assertFalse(Arrays.equals(md1,mda1));
+
+
+ }
+
+ public boolean compareMembers(MemberImpl impl1, MemberImpl impl2) {
+ boolean result = true;
+ result = result && Arrays.equals(impl1.getHost(),impl2.getHost());
+ result = result && Arrays.equals(impl1.getPayload(),impl2.getPayload());
+ result = result && Arrays.equals(impl1.getUniqueId(),impl2.getUniqueId());
+ result = result && impl1.getPort() == impl2.getPort();
+ return result;
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+}
diff --git a/test/org/apache/catalina/tribes/test/membership/TestDomainFilter.java b/test/org/apache/catalina/tribes/test/membership/TestDomainFilter.java
new file mode 100644
index 000000000..ca3c75240
--- /dev/null
+++ b/test/org/apache/catalina/tribes/test/membership/TestDomainFilter.java
@@ -0,0 +1,104 @@
+package org.apache.catalina.tribes.test.membership;
+
+import java.util.ArrayList;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ManagedChannel;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.group.GroupChannel;
+import junit.framework.TestCase;
+import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor;
+import org.apache.catalina.tribes.util.UUIDGenerator;
+
+public class TestDomainFilter
+ extends TestCase {
+ private static int count = 10;
+ private ManagedChannel[] channels = new ManagedChannel[count];
+ private TestMbrListener[] listeners = new TestMbrListener[count];
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ for (int i = 0; i < channels.length; i++) {
+ channels[i] = new GroupChannel();
+ channels[i].getMembershipService().setPayload( ("Channel-" + (i + 1)).getBytes("ASCII"));
+ listeners[i] = new TestMbrListener( ("Listener-" + (i + 1)));
+ channels[i].addMembershipListener(listeners[i]);
+ DomainFilterInterceptor filter = new DomainFilterInterceptor();
+ filter.setDomain(UUIDGenerator.randomUUID(false));
+ channels[i].addInterceptor(filter);
+ }
+ }
+
+ public void clear() {
+ for (int i = 0; i < channels.length; i++) {
+ listeners[i].members.clear();
+ }
+ }
+
+ public void testMemberArrival() throws Exception {
+ //purpose of this test is to make sure that we have received all the members
+ //that we can expect before the start method returns
+ Thread[] threads = new Thread[channels.length];
+ for (int i=0; i=0; i-- ) assertEquals("Checking member arrival length",0,listeners[i].members.size());
+ }
+
+ protected void tearDown() throws Exception {
+
+ for (int i = 0; i < channels.length; i++) {
+ try {
+ channels[i].stop(Channel.DEFAULT);
+ } catch (Exception ignore) {}
+ }
+ super.tearDown();
+ }
+
+ public class TestMbrListener
+ implements MembershipListener {
+ public String name = null;
+ public TestMbrListener(String name) {
+ this.name = name;
+ }
+
+ public ArrayList members = new ArrayList();
+ public void memberAdded(Member member) {
+ if (!members.contains(member)) {
+ members.add(member);
+ try {
+ System.out.println(name + ":member added[" + new String(member.getPayload(), "ASCII") + "; Thread:"+Thread.currentThread().getName()+"]");
+ } catch (Exception x) {
+ System.out.println(name + ":member added[unknown]");
+ }
+ }
+ }
+
+ public void memberDisappeared(Member member) {
+ if (members.contains(member)) {
+ members.remove(member);
+ try {
+ System.out.println(name + ":member disappeared[" + new String(member.getPayload(), "ASCII") + "; Thread:"+Thread.currentThread().getName()+"]");
+ } catch (Exception x) {
+ System.out.println(name + ":member disappeared[unknown]");
+ }
+ }
+ }
+
+ }
+
+}
diff --git a/test/org/apache/catalina/tribes/test/membership/TestMemberArrival.java b/test/org/apache/catalina/tribes/test/membership/TestMemberArrival.java
new file mode 100644
index 000000000..0c82c7106
--- /dev/null
+++ b/test/org/apache/catalina/tribes/test/membership/TestMemberArrival.java
@@ -0,0 +1,100 @@
+package org.apache.catalina.tribes.test.membership;
+
+import java.util.ArrayList;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ManagedChannel;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.group.GroupChannel;
+import junit.framework.TestCase;
+
+public class TestMemberArrival
+ extends TestCase {
+ private static int count = 10;
+ private ManagedChannel[] channels = new ManagedChannel[count];
+ private TestMbrListener[] listeners = new TestMbrListener[count];
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ for (int i = 0; i < channels.length; i++) {
+ channels[i] = new GroupChannel();
+ channels[i].getMembershipService().setPayload( ("Channel-" + (i + 1)).getBytes("ASCII"));
+ listeners[i] = new TestMbrListener( ("Listener-" + (i + 1)));
+ channels[i].addMembershipListener(listeners[i]);
+
+ }
+ }
+
+ public void clear() {
+ for (int i = 0; i < channels.length; i++) {
+ listeners[i].members.clear();
+ }
+ }
+
+ public void testMemberArrival() throws Exception {
+ //purpose of this test is to make sure that we have received all the members
+ //that we can expect before the start method returns
+ Thread[] threads = new Thread[channels.length];
+ for (int i=0; i=0; i-- ) assertEquals("Checking member arrival length",channels.length-1,listeners[i].members.size());
+ }
+
+ protected void tearDown() throws Exception {
+
+ for (int i = 0; i < channels.length; i++) {
+ try {
+ channels[i].stop(Channel.DEFAULT);
+ } catch (Exception ignore) {}
+ }
+ super.tearDown();
+ }
+
+ public class TestMbrListener
+ implements MembershipListener {
+ public String name = null;
+ public TestMbrListener(String name) {
+ this.name = name;
+ }
+
+ public ArrayList members = new ArrayList();
+ public void memberAdded(Member member) {
+ if (!members.contains(member)) {
+ members.add(member);
+ try {
+ System.out.println(name + ":member added[" + new String(member.getPayload(), "ASCII") + "; Thread:"+Thread.currentThread().getName()+"]");
+ } catch (Exception x) {
+ System.out.println(name + ":member added[unknown]");
+ }
+ }
+ }
+
+ public void memberDisappeared(Member member) {
+ if (members.contains(member)) {
+ members.remove(member);
+ try {
+ System.out.println(name + ":member disappeared[" + new String(member.getPayload(), "ASCII") + "; Thread:"+Thread.currentThread().getName()+"]");
+ } catch (Exception x) {
+ System.out.println(name + ":member disappeared[unknown]");
+ }
+ }
+ }
+
+ }
+
+}
diff --git a/test/org/apache/catalina/tribes/test/membership/TestTcpFailureDetector.java b/test/org/apache/catalina/tribes/test/membership/TestTcpFailureDetector.java
new file mode 100644
index 000000000..458558faa
--- /dev/null
+++ b/test/org/apache/catalina/tribes/test/membership/TestTcpFailureDetector.java
@@ -0,0 +1,151 @@
+package org.apache.catalina.tribes.test.membership;
+
+import java.util.ArrayList;
+
+import org.apache.catalina.tribes.ByteMessage;
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ManagedChannel;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
+import junit.framework.TestCase;
+
+/**
+ * Title:
+ *
+ * Description:
+ *
+ * Copyright: Copyright (c) 2005
+ *
+ * Company:
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class TestTcpFailureDetector extends TestCase {
+ private TcpFailureDetector tcpFailureDetector1 = null;
+ private TcpFailureDetector tcpFailureDetector2 = null;
+ private ManagedChannel channel1 = null;
+ private ManagedChannel channel2 = null;
+ private TestMbrListener mbrlist1 = null;
+ private TestMbrListener mbrlist2 = null;
+ protected void setUp() throws Exception {
+ super.setUp();
+ channel1 = new GroupChannel();
+ channel2 = new GroupChannel();
+ channel1.getMembershipService().setPayload("Channel-1".getBytes("ASCII"));
+ channel2.getMembershipService().setPayload("Channel-2".getBytes("ASCII"));
+ mbrlist1 = new TestMbrListener("Channel-1");
+ mbrlist2 = new TestMbrListener("Channel-2");
+ tcpFailureDetector1 = new TcpFailureDetector();
+ tcpFailureDetector2 = new TcpFailureDetector();
+ channel1.addInterceptor(tcpFailureDetector1);
+ channel2.addInterceptor(tcpFailureDetector2);
+ channel1.addMembershipListener(mbrlist1);
+ channel2.addMembershipListener(mbrlist2);
+ }
+
+ public void clear() {
+ mbrlist1.members.clear();
+ mbrlist2.members.clear();
+ }
+
+ public void testTcpSendFailureMemberDrop() throws Exception {
+ System.out.println("testTcpSendFailureMemberDrop()");
+ clear();
+ channel1.start(channel1.DEFAULT);
+ channel2.start(channel2.DEFAULT);
+ //Thread.sleep(1000);
+ assertEquals("Expecting member count to be equal",mbrlist1.members.size(),mbrlist2.members.size());
+ channel2.stop(channel2.SND_RX_SEQ);
+ ByteMessage msg = new ByteMessage(new byte[1024]);
+ try {
+ channel1.send(channel1.getMembers(), msg, 0);
+ assertEquals("Message send should have failed.",true,false);
+ } catch ( ChannelException x ) {
+
+ }
+ assertEquals("Expecting member count to not be equal",mbrlist1.members.size()+1,mbrlist2.members.size());
+ channel1.stop(Channel.DEFAULT);
+ channel2.stop(Channel.DEFAULT);
+ }
+
+ public void testTcpFailureMemberAdd() throws Exception {
+ System.out.println("testTcpFailureMemberAdd()");
+ clear();
+ channel1.start(channel1.DEFAULT);
+ channel2.start(channel2.SND_RX_SEQ);
+ channel2.start(channel2.SND_TX_SEQ);
+ channel2.start(channel2.MBR_RX_SEQ);
+ channel2.stop(channel2.SND_RX_SEQ);
+ channel2.start(channel2.MBR_TX_SEQ);
+ //Thread.sleep(1000);
+ assertEquals("Expecting member count to not be equal",mbrlist1.members.size()+1,mbrlist2.members.size());
+ channel1.stop(Channel.DEFAULT);
+ channel2.stop(Channel.DEFAULT);
+ }
+
+ public void testTcpMcastFail() throws Exception {
+ System.out.println("testTcpMcastFail()");
+ clear();
+ channel1.start(channel1.DEFAULT);
+ channel2.start(channel2.DEFAULT);
+ //Thread.sleep(1000);
+ assertEquals("Expecting member count to be equal",mbrlist1.members.size(),mbrlist2.members.size());
+ channel2.stop(channel2.MBR_TX_SEQ);
+ ByteMessage msg = new ByteMessage(new byte[1024]);
+ try {
+ Thread.sleep(5000);
+ assertEquals("Expecting member count to be equal",mbrlist1.members.size(),mbrlist2.members.size());
+ channel1.send(channel1.getMembers(), msg, 0);
+ } catch ( ChannelException x ) {
+ assertEquals("Message send should have succeeded.",true,false);
+ }
+ channel1.stop(Channel.DEFAULT);
+ channel2.stop(Channel.DEFAULT);
+ }
+
+
+ protected void tearDown() throws Exception {
+ tcpFailureDetector1 = null;
+ tcpFailureDetector2 = null;
+ try { channel1.stop(Channel.DEFAULT);}catch (Exception ignore){}
+ channel1 = null;
+ try { channel2.stop(Channel.DEFAULT);}catch (Exception ignore){}
+ channel2 = null;
+ super.tearDown();
+ }
+
+ public class TestMbrListener implements MembershipListener {
+ public String name = null;
+ public TestMbrListener(String name) {
+ this.name = name;
+ }
+ public ArrayList members = new ArrayList();
+ public void memberAdded(Member member) {
+ if ( !members.contains(member) ) {
+ members.add(member);
+ try{
+ System.out.println(name + ":member added[" + new String(member.getPayload(), "ASCII") + "]");
+ }catch ( Exception x ) {
+ System.out.println(name + ":member added[unknown]");
+ }
+ }
+ }
+
+ public void memberDisappeared(Member member) {
+ if ( members.contains(member) ) {
+ members.remove(member);
+ try{
+ System.out.println(name + ":member disappeared[" + new String(member.getPayload(), "ASCII") + "]");
+ }catch ( Exception x ) {
+ System.out.println(name + ":member disappeared[unknown]");
+ }
+ }
+ }
+
+ }
+
+}
diff --git a/test/org/apache/catalina/tribes/test/transport/SocketNioReceive.java b/test/org/apache/catalina/tribes/test/transport/SocketNioReceive.java
new file mode 100644
index 000000000..eb4f9cd86
--- /dev/null
+++ b/test/org/apache/catalina/tribes/test/transport/SocketNioReceive.java
@@ -0,0 +1,76 @@
+package org.apache.catalina.tribes.test.transport;
+
+import java.text.DecimalFormat;
+
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MessageListener;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.catalina.tribes.transport.nio.NioReceiver;
+
+public class SocketNioReceive {
+ static int count = 0;
+ static int accept = 0;
+ static long start = 0;
+ static double mb = 0;
+ static int len = 0;
+ static DecimalFormat df = new DecimalFormat("##.00");
+ static double seconds = 0;
+
+ protected static Object mutex = new Object();
+ public static void main(String[] args) throws Exception {
+ Member mbr = new MemberImpl("localhost", 9999, 0);
+ ChannelData data = new ChannelData();
+ data.setAddress(mbr);
+ byte[] buf = new byte[8192 * 4];
+ data.setMessage(new XByteBuffer(buf, false));
+ buf = XByteBuffer.createDataPackage(data);
+ len = buf.length;
+ NioReceiver receiver = new NioReceiver();
+ receiver.setPort(9999);
+ receiver.setHost("localhost");
+ MyList list = new MyList();
+ receiver.setMessageListener(list);
+ receiver.start();
+ System.out.println("Listening on 9999");
+ while (true) {
+ try {
+ synchronized (mutex) {
+ mutex.wait(5000);
+ if ( start != 0 ) {
+ System.out.println("Throughput " + df.format(mb / seconds) + " MB/seconds, messages "+count+" accepts "+accept+", total "+mb+" MB.");
+ }
+ }
+ }catch (Throwable x) {
+ x.printStackTrace();
+ }
+ }
+ }
+
+ public static class MyList implements MessageListener {
+ boolean first = true;
+
+
+ public void messageReceived(ChannelMessage msg) {
+ if (first) {
+ first = false;
+ start = System.currentTimeMillis();
+ }
+ mb += ( (double) len) / 1024 / 1024;
+ synchronized (this) {count++;}
+ if ( ( (count) % 10000) == 0) {
+ long time = System.currentTimeMillis();
+ seconds = ( (double) (time - start)) / 1000;
+ System.out.println("Throughput " + df.format(mb / seconds) + " MB/seconds, messages "+count+", total "+mb+" MB.");
+ }
+ }
+
+ public boolean accept(ChannelMessage msg) {
+ synchronized (this) {accept++;}
+ return true;
+ }
+
+ }
+}
diff --git a/test/org/apache/catalina/tribes/test/transport/SocketNioSend.java b/test/org/apache/catalina/tribes/test/transport/SocketNioSend.java
new file mode 100644
index 000000000..d5ec12197
--- /dev/null
+++ b/test/org/apache/catalina/tribes/test/transport/SocketNioSend.java
@@ -0,0 +1,91 @@
+package org.apache.catalina.tribes.test.transport;
+
+import java.io.OutputStream;
+import java.net.Socket;
+import java.text.DecimalFormat;
+import org.apache.catalina.tribes.transport.nio.NioSender;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import java.nio.channels.Selector;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.Member;
+import java.nio.channels.SelectionKey;
+import java.util.Iterator;
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.io.ChannelData;
+import java.math.BigDecimal;
+
+public class SocketNioSend {
+
+ public static void main(String[] args) throws Exception {
+ Selector selector = Selector.open();
+ Member mbr = new MemberImpl("localhost", 9999, 0);
+ ChannelData data = new ChannelData();
+ data.setOptions(Channel.SEND_OPTIONS_BYTE_MESSAGE);
+ data.setAddress(mbr);
+ byte[] buf = new byte[8192 * 4];
+ data.setMessage(new XByteBuffer(buf,false));
+ buf = XByteBuffer.createDataPackage(data);
+ int len = buf.length;
+ BigDecimal total = new BigDecimal((double)0);
+ BigDecimal bytes = new BigDecimal((double)len);
+ NioSender sender = new NioSender();
+ sender.setDestination(mbr);
+ sender.setDirectBuffer(true);
+ sender.setSelector(selector);
+ sender.setTxBufSize(1024*1024);
+ sender.connect();
+ sender.setMessage(buf);
+ System.out.println("Writing to 9999");
+ long start = 0;
+ double mb = 0;
+ boolean first = true;
+ int count = 0;
+ DecimalFormat df = new DecimalFormat("##.00");
+ while (count<100000) {
+ if (first) {
+ first = false;
+ start = System.currentTimeMillis();
+ }
+ sender.setMessage(buf);
+ int selectedKeys = 0;
+ try {
+ selectedKeys = selector.select(0);
+ } catch (Exception e) {
+ e.printStackTrace();
+ continue;
+ }
+
+ if (selectedKeys == 0) {
+ continue;
+ }
+
+ Iterator it = selector.selectedKeys().iterator();
+ while (it.hasNext()) {
+ SelectionKey sk = (SelectionKey) it.next();
+ it.remove();
+ try {
+ int readyOps = sk.readyOps();
+ sk.interestOps(sk.interestOps() & ~readyOps);
+ if (sender.process(sk, false)) {
+ total = total.add(bytes);
+ sender.reset();
+ sender.setMessage(buf);
+ mb += ( (double) len) / 1024 / 1024;
+ if ( ( (++count) % 10000) == 0) {
+ long time = System.currentTimeMillis();
+ double seconds = ( (double) (time - start)) / 1000;
+ System.out.println("Throughput " + df.format(mb / seconds) + " MB/seconds, total "+mb+" MB, total "+total+" bytes.");
+ }
+ }
+
+ } catch (Throwable t) {
+ t.printStackTrace();
+ return;
+ }
+ }
+ selector.selectedKeys().clear();
+ }
+ System.out.println("Complete, sleeping 15 seconds");
+ Thread.sleep(15000);
+ }
+}
diff --git a/test/org/apache/catalina/tribes/test/transport/SocketNioValidateSend.java b/test/org/apache/catalina/tribes/test/transport/SocketNioValidateSend.java
new file mode 100644
index 000000000..f1415a418
--- /dev/null
+++ b/test/org/apache/catalina/tribes/test/transport/SocketNioValidateSend.java
@@ -0,0 +1,90 @@
+package org.apache.catalina.tribes.test.transport;
+
+import java.io.OutputStream;
+import java.net.Socket;
+import java.text.DecimalFormat;
+import org.apache.catalina.tribes.transport.nio.NioSender;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import java.nio.channels.Selector;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.Member;
+import java.nio.channels.SelectionKey;
+import java.util.Iterator;
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.io.ChannelData;
+import java.math.BigDecimal;
+import java.util.Arrays;
+
+public class SocketNioValidateSend {
+
+ public static void main(String[] args) throws Exception {
+ Selector selector = Selector.open();
+ Member mbr = new MemberImpl("localhost", 9999, 0);
+ byte seq = 0;
+ byte[] buf = new byte[50000];
+ Arrays.fill(buf,seq);
+ int len = buf.length;
+ BigDecimal total = new BigDecimal((double)0);
+ BigDecimal bytes = new BigDecimal((double)len);
+ NioSender sender = new NioSender();
+ sender.setDestination(mbr);
+ sender.setDirectBuffer(true);
+ sender.setSelector(selector);
+ sender.connect();
+ sender.setMessage(buf);
+ System.out.println("Writing to 9999");
+ long start = 0;
+ double mb = 0;
+ boolean first = true;
+ int count = 0;
+
+ DecimalFormat df = new DecimalFormat("##.00");
+ while (count<100000) {
+ if (first) {
+ first = false;
+ start = System.currentTimeMillis();
+ }
+ sender.setMessage(buf);
+ int selectedKeys = 0;
+ try {
+ selectedKeys = selector.select(0);
+ } catch (Exception e) {
+ e.printStackTrace();
+ continue;
+ }
+
+ if (selectedKeys == 0) {
+ continue;
+ }
+
+ Iterator it = selector.selectedKeys().iterator();
+ while (it.hasNext()) {
+ SelectionKey sk = (SelectionKey) it.next();
+ it.remove();
+ try {
+ int readyOps = sk.readyOps();
+ sk.interestOps(sk.interestOps() & ~readyOps);
+ if (sender.process(sk, false)) {
+ total = total.add(bytes);
+ sender.reset();
+ seq++;
+ Arrays.fill(buf,seq);
+ sender.setMessage(buf);
+ mb += ( (double) len) / 1024 / 1024;
+ if ( ( (++count) % 10000) == 0) {
+ long time = System.currentTimeMillis();
+ double seconds = ( (double) (time - start)) / 1000;
+ System.out.println("Throughput " + df.format(mb / seconds) + " MB/seconds, total "+mb+" MB, total "+total+" bytes.");
+ }
+ }
+
+ } catch (Throwable t) {
+ t.printStackTrace();
+ return;
+ }
+ }
+ }
+ System.out.println("Complete, sleeping 15 seconds");
+ Thread.sleep(15000);
+ }
+}
diff --git a/test/org/apache/catalina/tribes/test/transport/SocketReceive.java b/test/org/apache/catalina/tribes/test/transport/SocketReceive.java
new file mode 100644
index 000000000..eabe10b4f
--- /dev/null
+++ b/test/org/apache/catalina/tribes/test/transport/SocketReceive.java
@@ -0,0 +1,62 @@
+package org.apache.catalina.tribes.test.transport;
+
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.io.InputStream;
+import java.text.DecimalFormat;
+import java.math.BigDecimal;
+
+public class SocketReceive {
+ static long start = 0;
+ static double mb = 0;
+ static byte[] buf = new byte[8192 * 4];
+ static boolean first = true;
+ static int count = 0;
+ static DecimalFormat df = new DecimalFormat("##.00");
+ static BigDecimal total = new BigDecimal(0);
+ static BigDecimal bytes = new BigDecimal(32871);
+
+
+ public static void main(String[] args) throws Exception {
+
+ ServerSocket srvSocket = new ServerSocket(9999);
+ System.out.println("Listening on 9999");
+ Socket socket = srvSocket.accept();
+ socket.setReceiveBufferSize(43800);
+ InputStream in = socket.getInputStream();
+ Thread t = new Thread() {
+ public void run() {
+ while ( true ) {
+ try {
+ Thread.sleep(1000);
+ printStats(start, mb, count, df, total);
+ }catch ( Exception x ) {}
+ }
+ }
+ };
+ t.setDaemon(true);
+ t.start();
+
+ while ( true ) {
+ if ( first ) { first = false; start = System.currentTimeMillis();}
+ int len = in.read(buf);
+ if ( len == -1 ) {
+ printStats(start, mb, count, df, total);
+ System.exit(1);
+ }
+ if ( bytes.intValue() != len ) bytes = new BigDecimal((double)len);
+ total = total.add(bytes);
+ mb += ( (double) len) / 1024 / 1024;
+ if ( ((++count) % 10000) == 0 ) {
+ printStats(start, mb, count, df, total);
+ }
+ }
+
+ }
+
+ private static void printStats(long start, double mb, int count, DecimalFormat df, BigDecimal total) {
+ long time = System.currentTimeMillis();
+ double seconds = ((double)(time-start))/1000;
+ System.out.println("Throughput "+df.format(mb/seconds)+" MB/seconds messages "+count+", total "+mb+" MB, total "+total+" bytes.");
+ }
+}
\ No newline at end of file
diff --git a/test/org/apache/catalina/tribes/test/transport/SocketSend.java b/test/org/apache/catalina/tribes/test/transport/SocketSend.java
new file mode 100644
index 000000000..fff73c116
--- /dev/null
+++ b/test/org/apache/catalina/tribes/test/transport/SocketSend.java
@@ -0,0 +1,53 @@
+package org.apache.catalina.tribes.test.transport;
+
+import java.io.OutputStream;
+import java.net.Socket;
+import java.text.DecimalFormat;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.Channel;
+import java.math.BigDecimal;
+
+public class SocketSend {
+
+ public static void main(String[] args) throws Exception {
+
+
+ Member mbr = new MemberImpl("localhost", 9999, 0);
+ ChannelData data = new ChannelData();
+ data.setOptions(Channel.SEND_OPTIONS_BYTE_MESSAGE);
+ data.setAddress(mbr);
+ byte[] buf = new byte[8192 * 4];
+ data.setMessage(new XByteBuffer(buf,false));
+ buf = XByteBuffer.createDataPackage(data);
+ int len = buf.length;
+ System.out.println("Message size:"+len+" bytes");
+ BigDecimal total = new BigDecimal((double)0);
+ BigDecimal bytes = new BigDecimal((double)len);
+ Socket socket = new Socket("localhost",9999);
+ System.out.println("Writing to 9999");
+ OutputStream out = socket.getOutputStream();
+ long start = 0;
+ double mb = 0;
+ boolean first = true;
+ int count = 0;
+ DecimalFormat df = new DecimalFormat("##.00");
+ while ( count<100000 ) {
+ if ( first ) { first = false; start = System.currentTimeMillis();}
+ out.write(buf,0,buf.length);
+ mb += ( (double) buf.length) / 1024 / 1024;
+ total = total.add(bytes);
+ if ( ((++count) % 10000) == 0 ) {
+ long time = System.currentTimeMillis();
+ double seconds = ((double)(time-start))/1000;
+ System.out.println("Throughput "+df.format(mb/seconds)+" MB/seconds messages "+count+", total "+mb+" MB, total "+total+" bytes.");
+ }
+ }
+ out.flush();
+ System.out.println("Complete, sleeping 5 seconds");
+ Thread.sleep(5000);
+
+ }
+}
diff --git a/test/org/apache/catalina/tribes/test/transport/SocketTribesReceive.java b/test/org/apache/catalina/tribes/test/transport/SocketTribesReceive.java
new file mode 100644
index 000000000..1dad58b64
--- /dev/null
+++ b/test/org/apache/catalina/tribes/test/transport/SocketTribesReceive.java
@@ -0,0 +1,71 @@
+package org.apache.catalina.tribes.test.transport;
+
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.io.InputStream;
+import java.text.DecimalFormat;
+import java.math.BigDecimal;
+import org.apache.catalina.tribes.io.XByteBuffer;
+
+public class SocketTribesReceive {
+ static long start = 0;
+ static double mb = 0;
+ //static byte[] buf = new byte[32871];
+ static byte[] buf = new byte[32871];
+ static boolean first = true;
+ static int count = 0;
+ static DecimalFormat df = new DecimalFormat("##.00");
+ static BigDecimal total = new BigDecimal((double)0);
+ static BigDecimal bytes = new BigDecimal((double)32871);
+
+
+ public static void main(String[] args) throws Exception {
+ int size = 43800;
+ if (args.length > 0 ) try {size=Integer.parseInt(args[0]);}catch(Exception x){}
+ XByteBuffer xbuf = new XByteBuffer(43800,true);
+ ServerSocket srvSocket = new ServerSocket(9999);
+ System.out.println("Listening on 9999");
+ Socket socket = srvSocket.accept();
+ socket.setReceiveBufferSize(size);
+ InputStream in = socket.getInputStream();
+ Thread t = new Thread() {
+ public void run() {
+ while ( true ) {
+ try {
+ Thread.sleep(1000);
+ printStats(start, mb, count, df, total);
+ }catch ( Exception x ) {}
+ }
+ }
+ };
+ t.setDaemon(true);
+ t.start();
+
+ while ( true ) {
+ if ( first ) { first = false; start = System.currentTimeMillis();}
+ int len = in.read(buf);
+ if ( len == -1 ) {
+ printStats(start, mb, count, df, total);
+ System.exit(1);
+ }
+ xbuf.append(buf,0,len);
+ if ( bytes.intValue() != len ) bytes = new BigDecimal((double)len);
+ total = total.add(bytes);
+ while ( xbuf.countPackages(true) > 0 ) {
+ xbuf.extractPackage(true);
+ count++;
+ }
+ mb += ( (double) len) / 1024 / 1024;
+ if ( ((count) % 10000) == 0 ) {
+ printStats(start, mb, count, df, total);
+ }
+ }
+
+ }
+
+ private static void printStats(long start, double mb, int count, DecimalFormat df, BigDecimal total) {
+ long time = System.currentTimeMillis();
+ double seconds = ((double)(time-start))/1000;
+ System.out.println("Throughput "+df.format(mb/seconds)+" MB/seconds messages "+count+", total "+mb+" MB, total "+total+" bytes.");
+ }
+}
\ No newline at end of file
diff --git a/test/org/apache/catalina/tribes/test/transport/SocketValidateReceive.java b/test/org/apache/catalina/tribes/test/transport/SocketValidateReceive.java
new file mode 100644
index 000000000..d4e56d656
--- /dev/null
+++ b/test/org/apache/catalina/tribes/test/transport/SocketValidateReceive.java
@@ -0,0 +1,91 @@
+package org.apache.catalina.tribes.test.transport;
+
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.io.InputStream;
+import java.text.DecimalFormat;
+import java.math.BigDecimal;
+
+public class SocketValidateReceive {
+ static long start = 0;
+ static double mb = 0;
+ static byte[] buf = new byte[8192 * 4];
+ static boolean first = true;
+ static int count = 0;
+ static DecimalFormat df = new DecimalFormat("##.00");
+ static BigDecimal total = new BigDecimal(0);
+ static BigDecimal bytes = new BigDecimal(32871);
+
+
+ public static void main(String[] args) throws Exception {
+ int size = 43800;
+ if (args.length > 0 ) try {size=Integer.parseInt(args[0]);}catch(Exception x){}
+
+ ServerSocket srvSocket = new ServerSocket(9999);
+ System.out.println("Listening on 9999");
+ Socket socket = srvSocket.accept();
+ socket.setReceiveBufferSize(size);
+ InputStream in = socket.getInputStream();
+ MyDataReader reader = new MyDataReader(50000);
+ Thread t = new Thread() {
+ public void run() {
+ while ( true ) {
+ try {
+ Thread.sleep(1000);
+ printStats(start, mb, count, df, total);
+ }catch ( Exception x ) {}
+ }
+ }
+ };
+ t.setDaemon(true);
+ t.start();
+
+ while ( true ) {
+ if ( first ) { first = false; start = System.currentTimeMillis();}
+ int len = in.read(buf);
+ if ( len == -1 ) {
+ printStats(start, mb, count, df, total);
+ System.exit(1);
+ }
+ count += reader.append(buf,0,len);
+
+ if ( bytes.intValue() != len ) bytes = new BigDecimal((double)len);
+ total = total.add(bytes);
+ mb += ( (double) len) / 1024 / 1024;
+ if ( ((count) % 10000) == 0 ) {
+ printStats(start, mb, count, df, total);
+ }
+ }
+
+ }
+
+ private static void printStats(long start, double mb, int count, DecimalFormat df, BigDecimal total) {
+ long time = System.currentTimeMillis();
+ double seconds = ((double)(time-start))/1000;
+ System.out.println("Throughput "+df.format(mb/seconds)+" MB/seconds messages "+count+", total "+mb+" MB, total "+total+" bytes.");
+ }
+
+ public static class MyDataReader {
+ byte[] data = new byte[43800];
+ int length = 10;
+ int cur = 0;
+ byte seq = 0;
+ public MyDataReader(int len) {
+ length = len;
+ }
+
+ public int append(byte[] b, int off, int len) throws Exception {
+ int packages = 0;
+ for ( int i=off; i