--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.group;
+
+import java.util.ArrayList;
+
+import junit.framework.TestCase;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ManagedChannel;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipListener;
+
+public class TestGroupChannelMemberArrival
+ extends TestCase {
+ private static int count = 10;
+ private ManagedChannel[] channels = new ManagedChannel[count];
+ private TestMbrListener[] listeners = new TestMbrListener[count];
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ for (int i = 0; i < channels.length; i++) {
+ channels[i] = new GroupChannel();
+ channels[i].getMembershipService().setPayload( ("Channel-" + (i + 1)).getBytes("ASCII"));
+ listeners[i] = new TestMbrListener( ("Listener-" + (i + 1)));
+ channels[i].addMembershipListener(listeners[i]);
+
+ }
+ }
+
+ public void clear() {
+ for (int i = 0; i < channels.length; i++) {
+ listeners[i].members.clear();
+ }
+ }
+
+ public void testMemberArrival() throws Exception {
+ //purpose of this test is to make sure that we have received all the members
+ //that we can expect before the start method returns
+ Thread[] threads = new Thread[channels.length];
+ for (int i=0; i<channels.length; i++ ) {
+ final Channel channel = channels[i];
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ channel.start(Channel.DEFAULT);
+ }catch ( Exception x ) {
+ throw new RuntimeException(x);
+ }
+ }
+ };
+ threads[i] = t;
+ }
+ for (int i=0; i<threads.length; i++ ) threads[i].start();
+ for (int i=0; i<threads.length; i++ ) threads[i].join();
+ Thread.sleep(2000);
+ System.out.println("All channels started.");
+ for (int i=listeners.length-1; i>=0; i-- ) assertEquals("Checking member arrival length",channels.length-1,listeners[i].members.size());
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+
+ for (int i = 0; i < channels.length; i++) {
+ try {
+ channels[i].stop(Channel.DEFAULT);
+ } catch (Exception ignore) {
+ // Ignore
+ }
+ }
+ super.tearDown();
+ }
+
+ public static class TestMbrListener
+ implements MembershipListener {
+ public String name = null;
+ public TestMbrListener(String name) {
+ this.name = name;
+ }
+
+ public ArrayList<Member> members = new ArrayList<Member>();
+ @Override
+ public void memberAdded(Member member) {
+ if (!members.contains(member)) {
+ members.add(member);
+ try {
+ System.out.println(name + ":member added[" + new String(member.getPayload(), "ASCII") + "; Thread:"+Thread.currentThread().getName()+"]");
+ } catch (Exception x) {
+ System.out.println(name + ":member added[unknown]");
+ }
+ }
+ }
+
+ @Override
+ public void memberDisappeared(Member member) {
+ if (members.contains(member)) {
+ members.remove(member);
+ try {
+ System.out.println(name + ":member disappeared[" + new String(member.getPayload(), "ASCII") + "; Thread:"+Thread.currentThread().getName()+"]");
+ } catch (Exception x) {
+ System.out.println(name + ":member disappeared[unknown]");
+ }
+ }
+ }
+
+ }
+
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.group;
+
+import junit.framework.TestCase;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelInterceptor;
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class TestGroupChannelOptionFlag extends TestCase {
+ GroupChannel channel = null;
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ channel = new GroupChannel();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ if ( channel != null ) try {channel.stop(Channel.DEFAULT);}catch ( Exception ignore) { /* Ignore */ }
+ channel = null;
+ }
+
+
+ public void testOptionConflict() throws Exception {
+ boolean error = false;
+ channel.setOptionCheck(true);
+ ChannelInterceptor i = new TestInterceptor();
+ i.setOptionFlag(128);
+ channel.addInterceptor(i);
+ i = new TestInterceptor();
+ i.setOptionFlag(128);
+ channel.addInterceptor(i);
+ try {
+ channel.start(Channel.DEFAULT);
+ }catch ( ChannelException x ) {
+ if ( x.getMessage().indexOf("option flag conflict") >= 0 ) error = true;
+ }
+ assertEquals(true,error);
+ }
+
+ public void testOptionNoConflict() throws Exception {
+ boolean error = false;
+ channel.setOptionCheck(true);
+ ChannelInterceptor i = new TestInterceptor();
+ i.setOptionFlag(128);
+ channel.addInterceptor(i);
+ i = new TestInterceptor();
+ i.setOptionFlag(64);
+ channel.addInterceptor(i);
+ i = new TestInterceptor();
+ i.setOptionFlag(256);
+ channel.addInterceptor(i);
+ try {
+ channel.start(Channel.DEFAULT);
+ }catch ( ChannelException x ) {
+ if ( x.getMessage().indexOf("option flag conflict") >= 0 ) error = true;
+ }
+ assertEquals(false,error);
+ }
+
+ public static class TestInterceptor extends ChannelInterceptorBase {
+ // Just use base class
+ }
+
+
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.group;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.ManagedChannel;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.transport.ReplicationTransmitter;
+
+public class TestGroupChannelSenderConnections extends TestCase {
+ private static int count = 2;
+ private ManagedChannel[] channels = new ManagedChannel[count];
+ private TestMsgListener[] listeners = new TestMsgListener[count];
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ for (int i = 0; i < channels.length; i++) {
+ channels[i] = new GroupChannel();
+ channels[i].getMembershipService().setPayload( ("Channel-" + (i + 1)).getBytes("ASCII"));
+ listeners[i] = new TestMsgListener( ("Listener-" + (i + 1)));
+ channels[i].addChannelListener(listeners[i]);
+ channels[i].start(Channel.SND_RX_SEQ|Channel.SND_TX_SEQ);
+
+ }
+ }
+
+ public void clear() {
+ // NOOP
+ }
+
+ public void sendMessages(long delay, long sleep) throws Exception {
+ Member local = channels[0].getLocalMember(true);
+ Member dest = channels[1].getLocalMember(true);
+ int n = 3;
+ System.out.println("Sending " + n + " messages from [" + local.getName() + "] to [" + dest.getName() + "]");
+ for (int i = 0; i < n; i++) {
+ channels[0].send(new Member[] {dest}, new TestMsg(), 0);
+ if ( delay > 0 ) Thread.sleep(delay);
+ }
+ System.out.println("Messages sent. Sleeping for "+(sleep/1000)+" seconds to inspect connections");
+ if ( sleep > 0 ) Thread.sleep(sleep);
+
+ }
+
+ public void testConnectionLinger() throws Exception {
+ sendMessages(0,15000);
+ }
+
+ public void testKeepAliveCount() throws Exception {
+ System.out.println("Setting keep alive count to 0");
+ for (int i = 0; i < channels.length; i++) {
+ ReplicationTransmitter t = (ReplicationTransmitter)channels[0].getChannelSender();
+ t.getTransport().setKeepAliveCount(0);
+ }
+ sendMessages(1000,15000);
+ }
+
+ public void testKeepAliveTime() throws Exception {
+ System.out.println("Setting keep alive count to 1 second");
+ for (int i = 0; i < channels.length; i++) {
+ ReplicationTransmitter t = (ReplicationTransmitter)channels[0].getChannelSender();
+ t.getTransport().setKeepAliveTime(1000);
+ }
+ sendMessages(2000,15000);
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ for (int i = 0; i < channels.length; i++) {
+ channels[i].stop(Channel.DEFAULT);
+ }
+
+ }
+
+ public static class TestMsg implements Serializable {
+ private static final long serialVersionUID = 1L;
+ static Random r = new Random();
+ HashMap<Integer, ArrayList<Object>> map =
+ new HashMap<Integer, ArrayList<Object>>();
+ public TestMsg() {
+ int size = Math.abs(r.nextInt() % 200);
+ for (int i=0; i<size; i++ ) {
+ int length = Math.abs(r.nextInt() %65000);
+ ArrayList<Object> list = new ArrayList<Object>(length);
+ map.put(Integer.valueOf(i),list);
+ }
+ }
+ }
+
+ public static class TestMsgListener implements ChannelListener {
+ public String name = null;
+ public TestMsgListener(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public void messageReceived(Serializable msg, Member sender) {
+ System.out.println("["+name+"] Received message:"+msg+" from " + sender.getName());
+ }
+
+
+ @Override
+ public boolean accept(Serializable msg, Member sender) {
+ return true;
+ }
+
+
+ }
+
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.group;
+
+import junit.framework.TestCase;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.transport.ReceiverBase;
+
+/**
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class TestGroupChannelStartStop extends TestCase {
+ GroupChannel channel = null;
+ int udpPort = 45543;
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ channel = new GroupChannel();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ try {channel.stop(Channel.DEFAULT);}catch (Exception ignore){ /* Ignore */ }
+ }
+
+ public void testDoubleFullStart() throws Exception {
+ int count = 0;
+ try {
+ channel.start(Channel.DEFAULT);
+ count++;
+ } catch ( Exception x){x.printStackTrace();}
+ try {
+ channel.start(Channel.DEFAULT);
+ count++;
+ } catch ( Exception x){x.printStackTrace();}
+ assertEquals(count,2);
+ channel.stop(Channel.DEFAULT);
+ }
+
+ public void testScrap() throws Exception {
+ System.out.println(channel.getChannelReceiver().getClass());
+ ((ReceiverBase)channel.getChannelReceiver()).setMaxThreads(1);
+ }
+
+
+ public void testDoublePartialStart() throws Exception {
+ //try to double start the RX
+ int count = 0;
+ try {
+ channel.start(Channel.SND_RX_SEQ);
+ channel.start(Channel.MBR_RX_SEQ);
+ count++;
+ } catch ( Exception x){x.printStackTrace();}
+ try {
+ channel.start(Channel.MBR_RX_SEQ);
+ count++;
+ } catch ( Exception x){/*expected*/}
+ assertEquals(count,1);
+ channel.stop(Channel.DEFAULT);
+ //double the membership sender
+ count = 0;
+ try {
+ channel.start(Channel.SND_RX_SEQ);
+ channel.start(Channel.MBR_TX_SEQ);
+ count++;
+ } catch ( Exception x){x.printStackTrace();}
+ try {
+ channel.start(Channel.MBR_TX_SEQ);
+ count++;
+ } catch ( Exception x){/*expected*/}
+ assertEquals(count,1);
+ channel.stop(Channel.DEFAULT);
+
+ count = 0;
+ try {
+ channel.start(Channel.SND_RX_SEQ);
+ count++;
+ } catch ( Exception x){x.printStackTrace();}
+ try {
+ channel.start(Channel.SND_RX_SEQ);
+ count++;
+ } catch ( Exception x){/*expected*/}
+ assertEquals(count,1);
+ channel.stop(Channel.DEFAULT);
+
+ count = 0;
+ try {
+ channel.start(Channel.SND_TX_SEQ);
+ count++;
+ } catch ( Exception x){x.printStackTrace();}
+ try {
+ channel.start(Channel.SND_TX_SEQ);
+ count++;
+ } catch ( Exception x){/*expected*/}
+ assertEquals(count,1);
+ channel.stop(Channel.DEFAULT);
+ }
+
+ public void testFalseOption() throws Exception {
+ int flag = 0xFFF0;//should get ignored by the underlying components
+ int count = 0;
+ try {
+ channel.start(flag);
+ count++;
+ } catch ( Exception x){x.printStackTrace();}
+ try {
+ channel.start(flag);
+ count++;
+ } catch ( Exception x){/*expected*/}
+ assertEquals(count,2);
+ channel.stop(Channel.DEFAULT);
+ }
+
+ public void testUdpReceiverStart() throws Exception {
+ ReceiverBase rb = (ReceiverBase)channel.getChannelReceiver();
+ rb.setUdpPort(udpPort);
+ channel.start(Channel.DEFAULT);
+ Thread.sleep(1000);
+ channel.stop(Channel.DEFAULT);
+ }
+
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.group.interceptors;
+
+import java.util.ArrayList;
+
+import junit.framework.TestCase;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ManagedChannel;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.util.UUIDGenerator;
+
+public class TestDomainFilterInterceptor
+ extends TestCase {
+ private static int count = 10;
+ private ManagedChannel[] channels = new ManagedChannel[count];
+ private TestMbrListener[] listeners = new TestMbrListener[count];
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ for (int i = 0; i < channels.length; i++) {
+ channels[i] = new GroupChannel();
+ channels[i].getMembershipService().setPayload( ("Channel-" + (i + 1)).getBytes("ASCII"));
+ listeners[i] = new TestMbrListener( ("Listener-" + (i + 1)));
+ channels[i].addMembershipListener(listeners[i]);
+ DomainFilterInterceptor filter = new DomainFilterInterceptor();
+ filter.setDomain(UUIDGenerator.randomUUID(false));
+ channels[i].addInterceptor(filter);
+ }
+ }
+
+ public void clear() {
+ for (int i = 0; i < channels.length; i++) {
+ listeners[i].members.clear();
+ }
+ }
+
+ public void testMemberArrival() throws Exception {
+ //purpose of this test is to make sure that we have received all the members
+ //that we can expect before the start method returns
+ Thread[] threads = new Thread[channels.length];
+ for (int i=0; i<channels.length; i++ ) {
+ final Channel channel = channels[i];
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ channel.start(Channel.DEFAULT);
+ }catch ( Exception x ) {
+ throw new RuntimeException(x);
+ }
+ }
+ };
+ threads[i] = t;
+ }
+ for (int i=0; i<threads.length; i++ ) threads[i].start();
+ for (int i=0; i<threads.length; i++ ) threads[i].join();
+ System.out.println("All channels started.");
+ for (int i=listeners.length-1; i>=0; i-- ) assertEquals("Checking member arrival length",0,listeners[i].members.size());
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+
+ for (int i = 0; i < channels.length; i++) {
+ try {
+ channels[i].stop(Channel.DEFAULT);
+ } catch (Exception ignore) {
+ // Ignore
+ }
+ }
+ super.tearDown();
+ }
+
+ public static class TestMbrListener
+ implements MembershipListener {
+ public String name = null;
+ public TestMbrListener(String name) {
+ this.name = name;
+ }
+
+ public ArrayList<Member> members = new ArrayList<Member>();
+ @Override
+ public void memberAdded(Member member) {
+ if (!members.contains(member)) {
+ members.add(member);
+ try {
+ System.out.println(name + ":member added[" + new String(member.getPayload(), "ASCII") + "; Thread:"+Thread.currentThread().getName()+"]");
+ } catch (Exception x) {
+ System.out.println(name + ":member added[unknown]");
+ }
+ }
+ }
+
+ @Override
+ public void memberDisappeared(Member member) {
+ if (members.contains(member)) {
+ members.remove(member);
+ try {
+ System.out.println(name + ":member disappeared[" + new String(member.getPayload(), "ASCII") + "; Thread:"+Thread.currentThread().getName()+"]");
+ } catch (Exception x) {
+ System.out.println(name + ":member disappeared[unknown]");
+ }
+ }
+ }
+
+ }
+
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.group.interceptors;
+
+import junit.framework.TestCase;
+import junit.framework.TestResult;
+import junit.framework.TestSuite;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.GroupChannel;
+
+public class TestNonBlockingCoordinator extends TestCase {
+
+ GroupChannel[] channels = null;
+ NonBlockingCoordinator[] coordinators = null;
+ int channelCount = 10;
+ Thread[] threads = null;
+ @Override
+ protected void setUp() throws Exception {
+ System.out.println("Setup");
+ super.setUp();
+ channels = new GroupChannel[channelCount];
+ coordinators = new NonBlockingCoordinator[channelCount];
+ threads = new Thread[channelCount];
+ for ( int i=0; i<channelCount; i++ ) {
+ channels[i] = new GroupChannel();
+ coordinators[i] = new NonBlockingCoordinator();
+ channels[i].addInterceptor(coordinators[i]);
+ channels[i].addInterceptor(new TcpFailureDetector());
+ final int j = i;
+ threads[i] = new Thread() {
+ @Override
+ public void run() {
+ try {
+ channels[j].start(Channel.DEFAULT);
+ Thread.sleep(50);
+ } catch (Exception x) {
+ x.printStackTrace();
+ }
+ }
+ };
+ }
+ for ( int i=0; i<channelCount; i++ ) threads[i].start();
+ for ( int i=0; i<channelCount; i++ ) threads[i].join();
+ Thread.sleep(1000);
+ }
+
+ public void testCoord1() throws Exception {
+ for (int i=1; i<channelCount; i++ )
+ assertEquals("Message count expected to be equal.",channels[i-1].getMembers().length,channels[i].getMembers().length);
+ Member member = coordinators[0].getCoordinator();
+ int cnt = 0;
+ while ( member == null && (cnt++ < 100 ) ) try {Thread.sleep(100); member = coordinators[0].getCoordinator();}catch ( Exception x){ /* Ignore */ }
+ for (int i=0; i<channelCount; i++ ) assertEquals(member,coordinators[i].getCoordinator());
+ System.out.println("Coordinator[1] is:"+member);
+
+ }
+
+ public void testCoord2() throws Exception {
+ Member member = coordinators[1].getCoordinator();
+ System.out.println("Coordinator[2a] is:" + member);
+ int index = -1;
+ for ( int i=0; i<channelCount; i++ ) {
+ if ( channels[i].getLocalMember(false).equals(member) ) {
+ System.out.println("Shutting down:" + channels[i].getLocalMember(true).toString());
+ channels[i].stop(Channel.DEFAULT);
+ index = i;
+ }
+ }
+ int dead = index;
+ Thread.sleep(1000);
+ if ( index == 0 ) index = 1; else index = 0;
+ System.out.println("Member count:"+channels[index].getMembers().length);
+ member = coordinators[index].getCoordinator();
+ for (int i = 1; i < channelCount; i++) if ( i != dead ) assertEquals(member, coordinators[i].getCoordinator());
+ System.out.println("Coordinator[2b] is:" + member);
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ System.out.println("tearDown");
+ super.tearDown();
+ for ( int i=0; i<channelCount; i++ ) {
+ channels[i].stop(Channel.DEFAULT);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ TestSuite suite = new TestSuite();
+ suite.addTestSuite(TestNonBlockingCoordinator.class);
+ suite.run(new TestResult());
+ }
+
+
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.group.interceptors;
+
+import java.io.Serializable;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import junit.framework.TestCase;
+import junit.framework.TestResult;
+import junit.framework.TestSuite;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+
+public class TestOrderInterceptor extends TestCase {
+
+ GroupChannel[] channels = null;
+ OrderInterceptor[] orderitcs = null;
+ MangleOrderInterceptor[] mangleitcs = null;
+ TestListener[] test = null;
+ int channelCount = 2;
+ Thread[] threads = null;
+ @Override
+ protected void setUp() throws Exception {
+ System.out.println("Setup");
+ super.setUp();
+ channels = new GroupChannel[channelCount];
+ orderitcs = new OrderInterceptor[channelCount];
+ mangleitcs = new MangleOrderInterceptor[channelCount];
+ test = new TestListener[channelCount];
+ threads = new Thread[channelCount];
+ for ( int i=0; i<channelCount; i++ ) {
+ channels[i] = new GroupChannel();
+
+ orderitcs[i] = new OrderInterceptor();
+ mangleitcs[i] = new MangleOrderInterceptor();
+ orderitcs[i].setExpire(Long.MAX_VALUE);
+ channels[i].addInterceptor(orderitcs[i]);
+ channels[i].addInterceptor(mangleitcs[i]);
+ test[i] = new TestListener(i);
+ channels[i].addChannelListener(test[i]);
+ final int j = i;
+ threads[i] = new Thread() {
+ @Override
+ public void run() {
+ try {
+ channels[j].start(Channel.DEFAULT);
+ Thread.sleep(50);
+ } catch (Exception x) {
+ x.printStackTrace();
+ }
+ }
+ };
+ }
+ for ( int i=0; i<channelCount; i++ ) threads[i].start();
+ for ( int i=0; i<channelCount; i++ ) threads[i].join();
+ Thread.sleep(1000);
+ }
+
+ public void testOrder1() throws Exception {
+ Member[] dest = channels[0].getMembers();
+ final AtomicInteger value = new AtomicInteger(0);
+ for ( int i=0; i<100; i++ ) {
+ channels[0].send(dest,Integer.valueOf(value.getAndAdd(1)),0);
+ }
+ Thread.sleep(5000);
+ for ( int i=0; i<test.length; i++ ) {
+ assertEquals(false,test[i].fail);
+ }
+ }
+
+ public void testOrder2() throws Exception {
+ final Member[] dest = channels[0].getMembers();
+ final AtomicInteger value = new AtomicInteger(0);
+ final Queue<Exception> exceptionQueue = new ConcurrentLinkedQueue<Exception>();
+ Runnable run = new Runnable() {
+ @Override
+ public void run() {
+ for (int i = 0; i < 100; i++) {
+ try {
+ synchronized (channels[0]) {
+ channels[0].send(dest, Integer.valueOf(value.getAndAdd(1)), 0);
+ }
+ }catch ( Exception x ) {
+ exceptionQueue.add(x);
+ }
+ }
+ }
+ };
+ Thread[] threads = new Thread[5];
+ for (int i=0;i<threads.length;i++) {
+ threads[i] = new Thread(run);
+ }
+ for (int i=0;i<threads.length;i++) {
+ threads[i].start();
+ }
+ for (int i=0;i<threads.length;i++) {
+ threads[i].join();
+ }
+ if (!exceptionQueue.isEmpty()) {
+ fail("Exception while sending in threads: "
+ + exceptionQueue.remove().toString());
+ }
+ Thread.sleep(5000);
+ for ( int i=0; i<test.length; i++ ) {
+ assertEquals(false,test[i].fail);
+ }
+ }
+
+
+ @Override
+ protected void tearDown() throws Exception {
+ System.out.println("tearDown");
+ super.tearDown();
+ for ( int i=0; i<channelCount; i++ ) {
+ channels[i].stop(Channel.DEFAULT);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ TestSuite suite = new TestSuite();
+ suite.addTestSuite(TestOrderInterceptor.class);
+ suite.run(new TestResult());
+ }
+
+ public static class TestListener implements ChannelListener {
+ int id = -1;
+ public TestListener(int id) {
+ this.id = id;
+ }
+ int cnt = 0;
+ int total = 0;
+ volatile boolean fail = false;
+ @Override
+ public synchronized void messageReceived(Serializable msg, Member sender) {
+ total++;
+ Integer i = (Integer)msg;
+ if ( i.intValue() != cnt ) fail = true;
+ else cnt++;
+ System.out.println("Listener["+id+"] Message received:"+i+" Count:"+total+" Fail:"+fail);
+
+ }
+
+ @Override
+ public boolean accept(Serializable msg, Member sender) {
+ return (msg instanceof Integer);
+ }
+ }
+
+ public static class MangleOrderInterceptor extends ChannelInterceptorBase {
+ ChannelMessage hold = null;
+ Member[] dest = null;
+ @Override
+ public synchronized void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
+ if ( hold == null ) {
+ //System.out.println("Skipping message:"+msg);
+ hold = (ChannelMessage)msg.deepclone();
+ dest = new Member[destination.length];
+ System.arraycopy(destination,0,dest,0,dest.length);
+ } else {
+ //System.out.println("Sending message:"+msg);
+ super.sendMessage(destination,msg,payload);
+ //System.out.println("Sending message:"+hold);
+ super.sendMessage(dest,hold,null);
+ hold = null;
+ dest = null;
+ }
+ }
+ }
+
+
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.group.interceptors;
+
+import java.util.ArrayList;
+
+import junit.framework.TestCase;
+
+import org.apache.catalina.tribes.ByteMessage;
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ManagedChannel;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.group.GroupChannel;
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class TestTcpFailureDetector extends TestCase {
+ private TcpFailureDetector tcpFailureDetector1 = null;
+ private TcpFailureDetector tcpFailureDetector2 = null;
+ private ManagedChannel channel1 = null;
+ private ManagedChannel channel2 = null;
+ private TestMbrListener mbrlist1 = null;
+ private TestMbrListener mbrlist2 = null;
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ channel1 = new GroupChannel();
+ channel2 = new GroupChannel();
+ channel1.getMembershipService().setPayload("Channel-1".getBytes("ASCII"));
+ channel2.getMembershipService().setPayload("Channel-2".getBytes("ASCII"));
+ mbrlist1 = new TestMbrListener("Channel-1");
+ mbrlist2 = new TestMbrListener("Channel-2");
+ tcpFailureDetector1 = new TcpFailureDetector();
+ tcpFailureDetector2 = new TcpFailureDetector();
+ channel1.addInterceptor(tcpFailureDetector1);
+ channel2.addInterceptor(tcpFailureDetector2);
+ channel1.addMembershipListener(mbrlist1);
+ channel2.addMembershipListener(mbrlist2);
+ }
+
+ public void clear() {
+ mbrlist1.members.clear();
+ mbrlist2.members.clear();
+ }
+
+ public void testTcpSendFailureMemberDrop() throws Exception {
+ System.out.println("testTcpSendFailureMemberDrop()");
+ clear();
+ channel1.start(Channel.DEFAULT);
+ channel2.start(Channel.DEFAULT);
+ //Thread.sleep(1000);
+ assertEquals("Expecting member count to be equal",mbrlist1.members.size(),mbrlist2.members.size());
+ channel2.stop(Channel.SND_RX_SEQ);
+ ByteMessage msg = new ByteMessage(new byte[1024]);
+ try {
+ channel1.send(channel1.getMembers(), msg, 0);
+ assertEquals("Message send should have failed.",true,false);
+ } catch ( ChannelException x ) {
+ // Ignore
+ }
+ assertEquals("Expecting member count to not be equal",mbrlist1.members.size()+1,mbrlist2.members.size());
+ channel1.stop(Channel.DEFAULT);
+ channel2.stop(Channel.DEFAULT);
+ }
+
+ public void testTcpFailureMemberAdd() throws Exception {
+ System.out.println("testTcpFailureMemberAdd()");
+ clear();
+ channel1.start(Channel.DEFAULT);
+ channel2.start(Channel.SND_RX_SEQ);
+ channel2.start(Channel.SND_TX_SEQ);
+ channel2.start(Channel.MBR_RX_SEQ);
+ channel2.stop(Channel.SND_RX_SEQ);
+ channel2.start(Channel.MBR_TX_SEQ);
+ //Thread.sleep(1000);
+ assertEquals("Expecting member count to not be equal",mbrlist1.members.size()+1,mbrlist2.members.size());
+ channel1.stop(Channel.DEFAULT);
+ channel2.stop(Channel.DEFAULT);
+ }
+
+ public void testTcpMcastFail() throws Exception {
+ System.out.println("testTcpMcastFail()");
+ clear();
+ channel1.start(Channel.DEFAULT);
+ channel2.start(Channel.DEFAULT);
+ //Thread.sleep(1000);
+ assertEquals("Expecting member count to be equal",mbrlist1.members.size(),mbrlist2.members.size());
+ channel2.stop(Channel.MBR_TX_SEQ);
+ ByteMessage msg = new ByteMessage(new byte[1024]);
+ try {
+ Thread.sleep(5000);
+ assertEquals("Expecting member count to be equal",mbrlist1.members.size(),mbrlist2.members.size());
+ channel1.send(channel1.getMembers(), msg, 0);
+ } catch ( ChannelException x ) {
+ assertEquals("Message send should have succeeded.",true,false);
+ }
+ channel1.stop(Channel.DEFAULT);
+ channel2.stop(Channel.DEFAULT);
+ }
+
+
+ @Override
+ protected void tearDown() throws Exception {
+ tcpFailureDetector1 = null;
+ tcpFailureDetector2 = null;
+ try { channel1.stop(Channel.DEFAULT);}catch (Exception ignore){ /* Ignore */ }
+ channel1 = null;
+ try { channel2.stop(Channel.DEFAULT);}catch (Exception ignore){ /* Ignore */ }
+ channel2 = null;
+ super.tearDown();
+ }
+
+ public static class TestMbrListener implements MembershipListener {
+ public String name = null;
+ public TestMbrListener(String name) {
+ this.name = name;
+ }
+ public ArrayList<Member> members = new ArrayList<Member>();
+ @Override
+ public void memberAdded(Member member) {
+ if ( !members.contains(member) ) {
+ members.add(member);
+ try{
+ System.out.println(name + ":member added[" + new String(member.getPayload(), "ASCII") + "]");
+ }catch ( Exception x ) {
+ System.out.println(name + ":member added[unknown]");
+ }
+ }
+ }
+
+ @Override
+ public void memberDisappeared(Member member) {
+ if ( members.contains(member) ) {
+ members.remove(member);
+ try{
+ System.out.println(name + ":member disappeared[" + new String(member.getPayload(), "ASCII") + "]");
+ }catch ( Exception x ) {
+ System.out.println(name + ":member disappeared[unknown]");
+ }
+ }
+ }
+
+ }
+
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.io;
+
+import junit.framework.TestCase;
+
+public class TestXByteBuffer extends TestCase {
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ }
+
+ public void testEmptyArray() throws Exception {
+ // TODO
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ public static void main(String[] args) throws Exception {
+ //XByteBuffer.deserialize(new byte[0]);
+ XByteBuffer.deserialize(new byte[] {-84, -19, 0, 5, 115, 114, 0, 17, 106});
+ }
+
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.membership;
+
+import java.util.Arrays;
+
+import junit.framework.TestCase;
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class TestMemberImplSerialization extends TestCase {
+ MemberImpl m1, m2, p1,p2;
+ byte[] payload = null;
+ int udpPort = 3445;
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ payload = new byte[333];
+ Arrays.fill(payload,(byte)1);
+ m1 = new MemberImpl("localhost",3333,1,payload);
+ m2 = new MemberImpl("localhost",3333,1);
+ payload = new byte[333];
+ Arrays.fill(payload,(byte)2);
+ p1 = new MemberImpl("127.0.0.1",3333,1,payload);
+ p2 = new MemberImpl("localhost",3331,1,payload);
+ m1.setDomain(new byte[] {1,2,3,4,5,6,7,8,9});
+ m2.setDomain(new byte[] {1,2,3,4,5,6,7,8,9});
+ m1.setCommand(new byte[] {1,2,4,5,6,7,8,9});
+ m2.setCommand(new byte[] {1,2,4,5,6,7,8,9});
+ m1.setUdpPort(udpPort);
+ m2.setUdpPort(m1.getUdpPort());
+ }
+
+ public void testCompare() throws Exception {
+ assertTrue(m1.equals(m2));
+ assertTrue(m2.equals(m1));
+ assertTrue(p1.equals(m2));
+ assertFalse(m1.equals(p2));
+ assertFalse(m1.equals(p2));
+ assertFalse(m2.equals(p2));
+ assertFalse(p1.equals(p2));
+ }
+
+ public void testUdpPort() throws Exception {
+ byte[] md1 = m1.getData();
+ byte[] md2 = m2.getData();
+
+ MemberImpl a1 = MemberImpl.getMember(md1);
+ MemberImpl a2 = MemberImpl.getMember(md2);
+
+ assertEquals(true, a1.getUdpPort()==a2.getUdpPort());
+ assertEquals(true,a1.getUdpPort()==udpPort);
+ }
+
+ public void testSerializationOne() throws Exception {
+ MemberImpl m = m1;
+ byte[] md1 = m.getData(false,true);
+ byte[] mda1 = m.getData(false,false);
+ assertTrue(Arrays.equals(md1,mda1));
+ assertTrue(md1==mda1);
+ mda1 = m.getData(true,true);
+ MemberImpl ma1 = MemberImpl.getMember(mda1);
+ assertTrue(compareMembers(m,ma1));
+ mda1 = p1.getData(false);
+ assertFalse(Arrays.equals(md1,mda1));
+ ma1 = MemberImpl.getMember(mda1);
+ assertTrue(compareMembers(p1,ma1));
+
+ md1 = m.getData(true,true);
+ Thread.sleep(50);
+ mda1 = m.getData(true,true);
+ MemberImpl a1 = MemberImpl.getMember(md1);
+ MemberImpl a2 = MemberImpl.getMember(mda1);
+ assertTrue(a1.equals(a2));
+ assertFalse(Arrays.equals(md1,mda1));
+
+
+ }
+
+ public boolean compareMembers(MemberImpl impl1, MemberImpl impl2) {
+ boolean result = true;
+ result = result && Arrays.equals(impl1.getHost(),impl2.getHost());
+ result = result && Arrays.equals(impl1.getPayload(),impl2.getPayload());
+ result = result && Arrays.equals(impl1.getUniqueId(),impl2.getUniqueId());
+ result = result && impl1.getPort() == impl2.getPort();
+ return result;
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+}
import junit.framework.TestCase;
import junit.framework.TestSuite;
-import org.apache.catalina.tribes.test.channel.TestChannelOptionFlag;
-import org.apache.catalina.tribes.test.channel.TestChannelStartStop;
+import org.apache.catalina.tribes.group.TestGroupChannelMemberArrival;
+import org.apache.catalina.tribes.group.TestGroupChannelOptionFlag;
+import org.apache.catalina.tribes.group.TestGroupChannelSenderConnections;
+import org.apache.catalina.tribes.group.TestGroupChannelStartStop;
+import org.apache.catalina.tribes.group.interceptors.TestDomainFilterInterceptor;
+import org.apache.catalina.tribes.group.interceptors.TestNonBlockingCoordinator;
+import org.apache.catalina.tribes.group.interceptors.TestOrderInterceptor;
+import org.apache.catalina.tribes.group.interceptors.TestTcpFailureDetector;
+import org.apache.catalina.tribes.io.TestXByteBuffer;
+import org.apache.catalina.tribes.membership.TestMemberImplSerialization;
import org.apache.catalina.tribes.test.channel.TestDataIntegrity;
import org.apache.catalina.tribes.test.channel.TestMulticastPackages;
import org.apache.catalina.tribes.test.channel.TestRemoteProcessException;
import org.apache.catalina.tribes.test.channel.TestUdpPackages;
-import org.apache.catalina.tribes.test.interceptors.TestNonBlockingCoordinator;
-import org.apache.catalina.tribes.test.interceptors.TestOrderInterceptor;
-import org.apache.catalina.tribes.test.io.TestSenderConnections;
-import org.apache.catalina.tribes.test.io.TestSerialization;
-import org.apache.catalina.tribes.test.membership.TestDomainFilter;
-import org.apache.catalina.tribes.test.membership.TestMemberArrival;
-import org.apache.catalina.tribes.test.membership.TestMemberSerialization;
-import org.apache.catalina.tribes.test.membership.TestTcpFailureDetector;
public class TribesTestSuite
extends TestCase {
public static Test suite() {
TestSuite suite = new TestSuite();
// o.a.catalina.tribes.test.channel
- suite.addTestSuite(TestChannelStartStop.class);
- suite.addTestSuite(TestChannelOptionFlag.class);
+ suite.addTestSuite(TestGroupChannelStartStop.class);
+ suite.addTestSuite(TestGroupChannelOptionFlag.class);
suite.addTestSuite(TestDataIntegrity.class);
suite.addTestSuite(TestMulticastPackages.class);
suite.addTestSuite(TestRemoteProcessException.class);
suite.addTestSuite(TestNonBlockingCoordinator.class);
suite.addTestSuite(TestOrderInterceptor.class);
// o.a.catalina.tribes.test.io
- suite.addTestSuite(TestSenderConnections.class);
- suite.addTestSuite(TestSerialization.class);
+ suite.addTestSuite(TestGroupChannelSenderConnections.class);
+ suite.addTestSuite(TestXByteBuffer.class);
// o.a.catalina.tribes.test.membership
- suite.addTestSuite(TestMemberSerialization.class);
- suite.addTestSuite(TestDomainFilter.class);
- suite.addTestSuite(TestMemberArrival.class);
+ suite.addTestSuite(TestMemberImplSerialization.class);
+ suite.addTestSuite(TestDomainFilterInterceptor.class);
+ suite.addTestSuite(TestGroupChannelMemberArrival.class);
suite.addTestSuite(TestTcpFailureDetector.class);
return suite;
}
+++ /dev/null
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.catalina.tribes.test.channel;
-
-import junit.framework.TestCase;
-
-import org.apache.catalina.tribes.Channel;
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.ChannelInterceptor;
-import org.apache.catalina.tribes.group.ChannelInterceptorBase;
-import org.apache.catalina.tribes.group.GroupChannel;
-
-/**
- * <p>Title: </p>
- *
- * <p>Description: </p>
- *
- * <p>Company: </p>
- *
- * @author not attributable
- * @version 1.0
- */
-public class TestChannelOptionFlag extends TestCase {
- GroupChannel channel = null;
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- channel = new GroupChannel();
- }
-
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- if ( channel != null ) try {channel.stop(Channel.DEFAULT);}catch ( Exception ignore) { /* Ignore */ }
- channel = null;
- }
-
-
- public void testOptionConflict() throws Exception {
- boolean error = false;
- channel.setOptionCheck(true);
- ChannelInterceptor i = new TestInterceptor();
- i.setOptionFlag(128);
- channel.addInterceptor(i);
- i = new TestInterceptor();
- i.setOptionFlag(128);
- channel.addInterceptor(i);
- try {
- channel.start(Channel.DEFAULT);
- }catch ( ChannelException x ) {
- if ( x.getMessage().indexOf("option flag conflict") >= 0 ) error = true;
- }
- assertEquals(true,error);
- }
-
- public void testOptionNoConflict() throws Exception {
- boolean error = false;
- channel.setOptionCheck(true);
- ChannelInterceptor i = new TestInterceptor();
- i.setOptionFlag(128);
- channel.addInterceptor(i);
- i = new TestInterceptor();
- i.setOptionFlag(64);
- channel.addInterceptor(i);
- i = new TestInterceptor();
- i.setOptionFlag(256);
- channel.addInterceptor(i);
- try {
- channel.start(Channel.DEFAULT);
- }catch ( ChannelException x ) {
- if ( x.getMessage().indexOf("option flag conflict") >= 0 ) error = true;
- }
- assertEquals(false,error);
- }
-
- public static class TestInterceptor extends ChannelInterceptorBase {
- // Just use base class
- }
-
-
-}
+++ /dev/null
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.catalina.tribes.test.channel;
-
-import junit.framework.TestCase;
-
-import org.apache.catalina.tribes.Channel;
-import org.apache.catalina.tribes.group.GroupChannel;
-import org.apache.catalina.tribes.transport.ReceiverBase;
-
-/**
- * @author Filip Hanik
- * @version 1.0
- */
-public class TestChannelStartStop extends TestCase {
- GroupChannel channel = null;
- int udpPort = 45543;
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- channel = new GroupChannel();
- }
-
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- try {channel.stop(Channel.DEFAULT);}catch (Exception ignore){ /* Ignore */ }
- }
-
- public void testDoubleFullStart() throws Exception {
- int count = 0;
- try {
- channel.start(Channel.DEFAULT);
- count++;
- } catch ( Exception x){x.printStackTrace();}
- try {
- channel.start(Channel.DEFAULT);
- count++;
- } catch ( Exception x){x.printStackTrace();}
- assertEquals(count,2);
- channel.stop(Channel.DEFAULT);
- }
-
- public void testScrap() throws Exception {
- System.out.println(channel.getChannelReceiver().getClass());
- ((ReceiverBase)channel.getChannelReceiver()).setMaxThreads(1);
- }
-
-
- public void testDoublePartialStart() throws Exception {
- //try to double start the RX
- int count = 0;
- try {
- channel.start(Channel.SND_RX_SEQ);
- channel.start(Channel.MBR_RX_SEQ);
- count++;
- } catch ( Exception x){x.printStackTrace();}
- try {
- channel.start(Channel.MBR_RX_SEQ);
- count++;
- } catch ( Exception x){/*expected*/}
- assertEquals(count,1);
- channel.stop(Channel.DEFAULT);
- //double the membership sender
- count = 0;
- try {
- channel.start(Channel.SND_RX_SEQ);
- channel.start(Channel.MBR_TX_SEQ);
- count++;
- } catch ( Exception x){x.printStackTrace();}
- try {
- channel.start(Channel.MBR_TX_SEQ);
- count++;
- } catch ( Exception x){/*expected*/}
- assertEquals(count,1);
- channel.stop(Channel.DEFAULT);
-
- count = 0;
- try {
- channel.start(Channel.SND_RX_SEQ);
- count++;
- } catch ( Exception x){x.printStackTrace();}
- try {
- channel.start(Channel.SND_RX_SEQ);
- count++;
- } catch ( Exception x){/*expected*/}
- assertEquals(count,1);
- channel.stop(Channel.DEFAULT);
-
- count = 0;
- try {
- channel.start(Channel.SND_TX_SEQ);
- count++;
- } catch ( Exception x){x.printStackTrace();}
- try {
- channel.start(Channel.SND_TX_SEQ);
- count++;
- } catch ( Exception x){/*expected*/}
- assertEquals(count,1);
- channel.stop(Channel.DEFAULT);
- }
-
- public void testFalseOption() throws Exception {
- int flag = 0xFFF0;//should get ignored by the underlying components
- int count = 0;
- try {
- channel.start(flag);
- count++;
- } catch ( Exception x){x.printStackTrace();}
- try {
- channel.start(flag);
- count++;
- } catch ( Exception x){/*expected*/}
- assertEquals(count,2);
- channel.stop(Channel.DEFAULT);
- }
-
- public void testUdpReceiverStart() throws Exception {
- ReceiverBase rb = (ReceiverBase)channel.getChannelReceiver();
- rb.setUdpPort(udpPort);
- channel.start(Channel.DEFAULT);
- Thread.sleep(1000);
- channel.stop(Channel.DEFAULT);
- }
-
-}
+++ /dev/null
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.catalina.tribes.test.interceptors;
-
-import junit.framework.TestCase;
-import junit.framework.TestResult;
-import junit.framework.TestSuite;
-
-import org.apache.catalina.tribes.Channel;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.group.GroupChannel;
-import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator;
-import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
-
-public class TestNonBlockingCoordinator extends TestCase {
-
- GroupChannel[] channels = null;
- NonBlockingCoordinator[] coordinators = null;
- int channelCount = 10;
- Thread[] threads = null;
- @Override
- protected void setUp() throws Exception {
- System.out.println("Setup");
- super.setUp();
- channels = new GroupChannel[channelCount];
- coordinators = new NonBlockingCoordinator[channelCount];
- threads = new Thread[channelCount];
- for ( int i=0; i<channelCount; i++ ) {
- channels[i] = new GroupChannel();
- coordinators[i] = new NonBlockingCoordinator();
- channels[i].addInterceptor(coordinators[i]);
- channels[i].addInterceptor(new TcpFailureDetector());
- final int j = i;
- threads[i] = new Thread() {
- @Override
- public void run() {
- try {
- channels[j].start(Channel.DEFAULT);
- Thread.sleep(50);
- } catch (Exception x) {
- x.printStackTrace();
- }
- }
- };
- }
- for ( int i=0; i<channelCount; i++ ) threads[i].start();
- for ( int i=0; i<channelCount; i++ ) threads[i].join();
- Thread.sleep(1000);
- }
-
- public void testCoord1() throws Exception {
- for (int i=1; i<channelCount; i++ )
- assertEquals("Message count expected to be equal.",channels[i-1].getMembers().length,channels[i].getMembers().length);
- Member member = coordinators[0].getCoordinator();
- int cnt = 0;
- while ( member == null && (cnt++ < 100 ) ) try {Thread.sleep(100); member = coordinators[0].getCoordinator();}catch ( Exception x){ /* Ignore */ }
- for (int i=0; i<channelCount; i++ ) assertEquals(member,coordinators[i].getCoordinator());
- System.out.println("Coordinator[1] is:"+member);
-
- }
-
- public void testCoord2() throws Exception {
- Member member = coordinators[1].getCoordinator();
- System.out.println("Coordinator[2a] is:" + member);
- int index = -1;
- for ( int i=0; i<channelCount; i++ ) {
- if ( channels[i].getLocalMember(false).equals(member) ) {
- System.out.println("Shutting down:" + channels[i].getLocalMember(true).toString());
- channels[i].stop(Channel.DEFAULT);
- index = i;
- }
- }
- int dead = index;
- Thread.sleep(1000);
- if ( index == 0 ) index = 1; else index = 0;
- System.out.println("Member count:"+channels[index].getMembers().length);
- member = coordinators[index].getCoordinator();
- for (int i = 1; i < channelCount; i++) if ( i != dead ) assertEquals(member, coordinators[i].getCoordinator());
- System.out.println("Coordinator[2b] is:" + member);
- }
-
- @Override
- protected void tearDown() throws Exception {
- System.out.println("tearDown");
- super.tearDown();
- for ( int i=0; i<channelCount; i++ ) {
- channels[i].stop(Channel.DEFAULT);
- }
- }
-
- public static void main(String[] args) throws Exception {
- TestSuite suite = new TestSuite();
- suite.addTestSuite(TestNonBlockingCoordinator.class);
- suite.run(new TestResult());
- }
-
-
-}
+++ /dev/null
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.catalina.tribes.test.interceptors;
-
-import java.io.Serializable;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import junit.framework.TestCase;
-import junit.framework.TestResult;
-import junit.framework.TestSuite;
-
-import org.apache.catalina.tribes.Channel;
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.ChannelListener;
-import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.group.ChannelInterceptorBase;
-import org.apache.catalina.tribes.group.GroupChannel;
-import org.apache.catalina.tribes.group.InterceptorPayload;
-import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
-
-public class TestOrderInterceptor extends TestCase {
-
- GroupChannel[] channels = null;
- OrderInterceptor[] orderitcs = null;
- MangleOrderInterceptor[] mangleitcs = null;
- TestListener[] test = null;
- int channelCount = 2;
- Thread[] threads = null;
- @Override
- protected void setUp() throws Exception {
- System.out.println("Setup");
- super.setUp();
- channels = new GroupChannel[channelCount];
- orderitcs = new OrderInterceptor[channelCount];
- mangleitcs = new MangleOrderInterceptor[channelCount];
- test = new TestListener[channelCount];
- threads = new Thread[channelCount];
- for ( int i=0; i<channelCount; i++ ) {
- channels[i] = new GroupChannel();
-
- orderitcs[i] = new OrderInterceptor();
- mangleitcs[i] = new MangleOrderInterceptor();
- orderitcs[i].setExpire(Long.MAX_VALUE);
- channels[i].addInterceptor(orderitcs[i]);
- channels[i].addInterceptor(mangleitcs[i]);
- test[i] = new TestListener(i);
- channels[i].addChannelListener(test[i]);
- final int j = i;
- threads[i] = new Thread() {
- @Override
- public void run() {
- try {
- channels[j].start(Channel.DEFAULT);
- Thread.sleep(50);
- } catch (Exception x) {
- x.printStackTrace();
- }
- }
- };
- }
- for ( int i=0; i<channelCount; i++ ) threads[i].start();
- for ( int i=0; i<channelCount; i++ ) threads[i].join();
- Thread.sleep(1000);
- }
-
- public void testOrder1() throws Exception {
- Member[] dest = channels[0].getMembers();
- final AtomicInteger value = new AtomicInteger(0);
- for ( int i=0; i<100; i++ ) {
- channels[0].send(dest,Integer.valueOf(value.getAndAdd(1)),0);
- }
- Thread.sleep(5000);
- for ( int i=0; i<test.length; i++ ) {
- assertEquals(false,test[i].fail);
- }
- }
-
- public void testOrder2() throws Exception {
- final Member[] dest = channels[0].getMembers();
- final AtomicInteger value = new AtomicInteger(0);
- final Queue<Exception> exceptionQueue = new ConcurrentLinkedQueue<Exception>();
- Runnable run = new Runnable() {
- @Override
- public void run() {
- for (int i = 0; i < 100; i++) {
- try {
- synchronized (channels[0]) {
- channels[0].send(dest, Integer.valueOf(value.getAndAdd(1)), 0);
- }
- }catch ( Exception x ) {
- exceptionQueue.add(x);
- }
- }
- }
- };
- Thread[] threads = new Thread[5];
- for (int i=0;i<threads.length;i++) {
- threads[i] = new Thread(run);
- }
- for (int i=0;i<threads.length;i++) {
- threads[i].start();
- }
- for (int i=0;i<threads.length;i++) {
- threads[i].join();
- }
- if (!exceptionQueue.isEmpty()) {
- fail("Exception while sending in threads: "
- + exceptionQueue.remove().toString());
- }
- Thread.sleep(5000);
- for ( int i=0; i<test.length; i++ ) {
- assertEquals(false,test[i].fail);
- }
- }
-
-
- @Override
- protected void tearDown() throws Exception {
- System.out.println("tearDown");
- super.tearDown();
- for ( int i=0; i<channelCount; i++ ) {
- channels[i].stop(Channel.DEFAULT);
- }
- }
-
- public static void main(String[] args) throws Exception {
- TestSuite suite = new TestSuite();
- suite.addTestSuite(TestOrderInterceptor.class);
- suite.run(new TestResult());
- }
-
- public static class TestListener implements ChannelListener {
- int id = -1;
- public TestListener(int id) {
- this.id = id;
- }
- int cnt = 0;
- int total = 0;
- volatile boolean fail = false;
- @Override
- public synchronized void messageReceived(Serializable msg, Member sender) {
- total++;
- Integer i = (Integer)msg;
- if ( i.intValue() != cnt ) fail = true;
- else cnt++;
- System.out.println("Listener["+id+"] Message received:"+i+" Count:"+total+" Fail:"+fail);
-
- }
-
- @Override
- public boolean accept(Serializable msg, Member sender) {
- return (msg instanceof Integer);
- }
- }
-
- public static class MangleOrderInterceptor extends ChannelInterceptorBase {
- ChannelMessage hold = null;
- Member[] dest = null;
- @Override
- public synchronized void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
- if ( hold == null ) {
- //System.out.println("Skipping message:"+msg);
- hold = (ChannelMessage)msg.deepclone();
- dest = new Member[destination.length];
- System.arraycopy(destination,0,dest,0,dest.length);
- } else {
- //System.out.println("Sending message:"+msg);
- super.sendMessage(destination,msg,payload);
- //System.out.println("Sending message:"+hold);
- super.sendMessage(dest,hold,null);
- hold = null;
- dest = null;
- }
- }
- }
-
-
-}
+++ /dev/null
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.catalina.tribes.test.io;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Random;
-
-import junit.framework.TestCase;
-
-import org.apache.catalina.tribes.Channel;
-import org.apache.catalina.tribes.ChannelListener;
-import org.apache.catalina.tribes.ManagedChannel;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.group.GroupChannel;
-import org.apache.catalina.tribes.transport.ReplicationTransmitter;
-
-public class TestSenderConnections extends TestCase {
- private static int count = 2;
- private ManagedChannel[] channels = new ManagedChannel[count];
- private TestMsgListener[] listeners = new TestMsgListener[count];
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- for (int i = 0; i < channels.length; i++) {
- channels[i] = new GroupChannel();
- channels[i].getMembershipService().setPayload( ("Channel-" + (i + 1)).getBytes("ASCII"));
- listeners[i] = new TestMsgListener( ("Listener-" + (i + 1)));
- channels[i].addChannelListener(listeners[i]);
- channels[i].start(Channel.SND_RX_SEQ|Channel.SND_TX_SEQ);
-
- }
- }
-
- public void clear() {
- // NOOP
- }
-
- public void sendMessages(long delay, long sleep) throws Exception {
- Member local = channels[0].getLocalMember(true);
- Member dest = channels[1].getLocalMember(true);
- int n = 3;
- System.out.println("Sending " + n + " messages from [" + local.getName() + "] to [" + dest.getName() + "]");
- for (int i = 0; i < n; i++) {
- channels[0].send(new Member[] {dest}, new TestMsg(), 0);
- if ( delay > 0 ) Thread.sleep(delay);
- }
- System.out.println("Messages sent. Sleeping for "+(sleep/1000)+" seconds to inspect connections");
- if ( sleep > 0 ) Thread.sleep(sleep);
-
- }
-
- public void testConnectionLinger() throws Exception {
- sendMessages(0,15000);
- }
-
- public void testKeepAliveCount() throws Exception {
- System.out.println("Setting keep alive count to 0");
- for (int i = 0; i < channels.length; i++) {
- ReplicationTransmitter t = (ReplicationTransmitter)channels[0].getChannelSender();
- t.getTransport().setKeepAliveCount(0);
- }
- sendMessages(1000,15000);
- }
-
- public void testKeepAliveTime() throws Exception {
- System.out.println("Setting keep alive count to 1 second");
- for (int i = 0; i < channels.length; i++) {
- ReplicationTransmitter t = (ReplicationTransmitter)channels[0].getChannelSender();
- t.getTransport().setKeepAliveTime(1000);
- }
- sendMessages(2000,15000);
- }
-
- @Override
- protected void tearDown() throws Exception {
- for (int i = 0; i < channels.length; i++) {
- channels[i].stop(Channel.DEFAULT);
- }
-
- }
-
- public static class TestMsg implements Serializable {
- private static final long serialVersionUID = 1L;
- static Random r = new Random();
- HashMap<Integer, ArrayList<Object>> map =
- new HashMap<Integer, ArrayList<Object>>();
- public TestMsg() {
- int size = Math.abs(r.nextInt() % 200);
- for (int i=0; i<size; i++ ) {
- int length = Math.abs(r.nextInt() %65000);
- ArrayList<Object> list = new ArrayList<Object>(length);
- map.put(Integer.valueOf(i),list);
- }
- }
- }
-
- public static class TestMsgListener implements ChannelListener {
- public String name = null;
- public TestMsgListener(String name) {
- this.name = name;
- }
-
- @Override
- public void messageReceived(Serializable msg, Member sender) {
- System.out.println("["+name+"] Received message:"+msg+" from " + sender.getName());
- }
-
-
- @Override
- public boolean accept(Serializable msg, Member sender) {
- return true;
- }
-
-
- }
-
-}
+++ /dev/null
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.catalina.tribes.test.io;
-
-import junit.framework.TestCase;
-
-import org.apache.catalina.tribes.io.XByteBuffer;
-
-public class TestSerialization extends TestCase {
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- }
-
- public void testEmptyArray() throws Exception {
- // TODO
- }
-
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- }
-
- public static void main(String[] args) throws Exception {
- //XByteBuffer.deserialize(new byte[0]);
- XByteBuffer.deserialize(new byte[] {-84, -19, 0, 5, 115, 114, 0, 17, 106});
- }
-
-}
+++ /dev/null
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.catalina.tribes.test.membership;
-
-import java.util.ArrayList;
-
-import junit.framework.TestCase;
-
-import org.apache.catalina.tribes.Channel;
-import org.apache.catalina.tribes.ManagedChannel;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.MembershipListener;
-import org.apache.catalina.tribes.group.GroupChannel;
-import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor;
-import org.apache.catalina.tribes.util.UUIDGenerator;
-
-public class TestDomainFilter
- extends TestCase {
- private static int count = 10;
- private ManagedChannel[] channels = new ManagedChannel[count];
- private TestMbrListener[] listeners = new TestMbrListener[count];
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- for (int i = 0; i < channels.length; i++) {
- channels[i] = new GroupChannel();
- channels[i].getMembershipService().setPayload( ("Channel-" + (i + 1)).getBytes("ASCII"));
- listeners[i] = new TestMbrListener( ("Listener-" + (i + 1)));
- channels[i].addMembershipListener(listeners[i]);
- DomainFilterInterceptor filter = new DomainFilterInterceptor();
- filter.setDomain(UUIDGenerator.randomUUID(false));
- channels[i].addInterceptor(filter);
- }
- }
-
- public void clear() {
- for (int i = 0; i < channels.length; i++) {
- listeners[i].members.clear();
- }
- }
-
- public void testMemberArrival() throws Exception {
- //purpose of this test is to make sure that we have received all the members
- //that we can expect before the start method returns
- Thread[] threads = new Thread[channels.length];
- for (int i=0; i<channels.length; i++ ) {
- final Channel channel = channels[i];
- Thread t = new Thread() {
- @Override
- public void run() {
- try {
- channel.start(Channel.DEFAULT);
- }catch ( Exception x ) {
- throw new RuntimeException(x);
- }
- }
- };
- threads[i] = t;
- }
- for (int i=0; i<threads.length; i++ ) threads[i].start();
- for (int i=0; i<threads.length; i++ ) threads[i].join();
- System.out.println("All channels started.");
- for (int i=listeners.length-1; i>=0; i-- ) assertEquals("Checking member arrival length",0,listeners[i].members.size());
- }
-
- @Override
- protected void tearDown() throws Exception {
-
- for (int i = 0; i < channels.length; i++) {
- try {
- channels[i].stop(Channel.DEFAULT);
- } catch (Exception ignore) {
- // Ignore
- }
- }
- super.tearDown();
- }
-
- public static class TestMbrListener
- implements MembershipListener {
- public String name = null;
- public TestMbrListener(String name) {
- this.name = name;
- }
-
- public ArrayList<Member> members = new ArrayList<Member>();
- @Override
- public void memberAdded(Member member) {
- if (!members.contains(member)) {
- members.add(member);
- try {
- System.out.println(name + ":member added[" + new String(member.getPayload(), "ASCII") + "; Thread:"+Thread.currentThread().getName()+"]");
- } catch (Exception x) {
- System.out.println(name + ":member added[unknown]");
- }
- }
- }
-
- @Override
- public void memberDisappeared(Member member) {
- if (members.contains(member)) {
- members.remove(member);
- try {
- System.out.println(name + ":member disappeared[" + new String(member.getPayload(), "ASCII") + "; Thread:"+Thread.currentThread().getName()+"]");
- } catch (Exception x) {
- System.out.println(name + ":member disappeared[unknown]");
- }
- }
- }
-
- }
-
-}
+++ /dev/null
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.catalina.tribes.test.membership;
-
-import java.util.ArrayList;
-
-import junit.framework.TestCase;
-
-import org.apache.catalina.tribes.Channel;
-import org.apache.catalina.tribes.ManagedChannel;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.MembershipListener;
-import org.apache.catalina.tribes.group.GroupChannel;
-
-public class TestMemberArrival
- extends TestCase {
- private static int count = 10;
- private ManagedChannel[] channels = new ManagedChannel[count];
- private TestMbrListener[] listeners = new TestMbrListener[count];
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- for (int i = 0; i < channels.length; i++) {
- channels[i] = new GroupChannel();
- channels[i].getMembershipService().setPayload( ("Channel-" + (i + 1)).getBytes("ASCII"));
- listeners[i] = new TestMbrListener( ("Listener-" + (i + 1)));
- channels[i].addMembershipListener(listeners[i]);
-
- }
- }
-
- public void clear() {
- for (int i = 0; i < channels.length; i++) {
- listeners[i].members.clear();
- }
- }
-
- public void testMemberArrival() throws Exception {
- //purpose of this test is to make sure that we have received all the members
- //that we can expect before the start method returns
- Thread[] threads = new Thread[channels.length];
- for (int i=0; i<channels.length; i++ ) {
- final Channel channel = channels[i];
- Thread t = new Thread() {
- @Override
- public void run() {
- try {
- channel.start(Channel.DEFAULT);
- }catch ( Exception x ) {
- throw new RuntimeException(x);
- }
- }
- };
- threads[i] = t;
- }
- for (int i=0; i<threads.length; i++ ) threads[i].start();
- for (int i=0; i<threads.length; i++ ) threads[i].join();
- Thread.sleep(2000);
- System.out.println("All channels started.");
- for (int i=listeners.length-1; i>=0; i-- ) assertEquals("Checking member arrival length",channels.length-1,listeners[i].members.size());
- }
-
- @Override
- protected void tearDown() throws Exception {
-
- for (int i = 0; i < channels.length; i++) {
- try {
- channels[i].stop(Channel.DEFAULT);
- } catch (Exception ignore) {
- // Ignore
- }
- }
- super.tearDown();
- }
-
- public static class TestMbrListener
- implements MembershipListener {
- public String name = null;
- public TestMbrListener(String name) {
- this.name = name;
- }
-
- public ArrayList<Member> members = new ArrayList<Member>();
- @Override
- public void memberAdded(Member member) {
- if (!members.contains(member)) {
- members.add(member);
- try {
- System.out.println(name + ":member added[" + new String(member.getPayload(), "ASCII") + "; Thread:"+Thread.currentThread().getName()+"]");
- } catch (Exception x) {
- System.out.println(name + ":member added[unknown]");
- }
- }
- }
-
- @Override
- public void memberDisappeared(Member member) {
- if (members.contains(member)) {
- members.remove(member);
- try {
- System.out.println(name + ":member disappeared[" + new String(member.getPayload(), "ASCII") + "; Thread:"+Thread.currentThread().getName()+"]");
- } catch (Exception x) {
- System.out.println(name + ":member disappeared[unknown]");
- }
- }
- }
-
- }
-
-}
+++ /dev/null
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.catalina.tribes.test.membership;
-
-import java.util.Arrays;
-
-import junit.framework.TestCase;
-
-import org.apache.catalina.tribes.membership.MemberImpl;
-
-/**
- * <p>Title: </p>
- *
- * <p>Description: </p>
- *
- * <p>Company: </p>
- *
- * @author not attributable
- * @version 1.0
- */
-public class TestMemberSerialization extends TestCase {
- MemberImpl m1, m2, p1,p2;
- byte[] payload = null;
- int udpPort = 3445;
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- payload = new byte[333];
- Arrays.fill(payload,(byte)1);
- m1 = new MemberImpl("localhost",3333,1,payload);
- m2 = new MemberImpl("localhost",3333,1);
- payload = new byte[333];
- Arrays.fill(payload,(byte)2);
- p1 = new MemberImpl("127.0.0.1",3333,1,payload);
- p2 = new MemberImpl("localhost",3331,1,payload);
- m1.setDomain(new byte[] {1,2,3,4,5,6,7,8,9});
- m2.setDomain(new byte[] {1,2,3,4,5,6,7,8,9});
- m1.setCommand(new byte[] {1,2,4,5,6,7,8,9});
- m2.setCommand(new byte[] {1,2,4,5,6,7,8,9});
- m1.setUdpPort(udpPort);
- m2.setUdpPort(m1.getUdpPort());
- }
-
- public void testCompare() throws Exception {
- assertTrue(m1.equals(m2));
- assertTrue(m2.equals(m1));
- assertTrue(p1.equals(m2));
- assertFalse(m1.equals(p2));
- assertFalse(m1.equals(p2));
- assertFalse(m2.equals(p2));
- assertFalse(p1.equals(p2));
- }
-
- public void testUdpPort() throws Exception {
- byte[] md1 = m1.getData();
- byte[] md2 = m2.getData();
-
- MemberImpl a1 = MemberImpl.getMember(md1);
- MemberImpl a2 = MemberImpl.getMember(md2);
-
- assertEquals(true, a1.getUdpPort()==a2.getUdpPort());
- assertEquals(true,a1.getUdpPort()==udpPort);
- }
-
- public void testSerializationOne() throws Exception {
- MemberImpl m = m1;
- byte[] md1 = m.getData(false,true);
- byte[] mda1 = m.getData(false,false);
- assertTrue(Arrays.equals(md1,mda1));
- assertTrue(md1==mda1);
- mda1 = m.getData(true,true);
- MemberImpl ma1 = MemberImpl.getMember(mda1);
- assertTrue(compareMembers(m,ma1));
- mda1 = p1.getData(false);
- assertFalse(Arrays.equals(md1,mda1));
- ma1 = MemberImpl.getMember(mda1);
- assertTrue(compareMembers(p1,ma1));
-
- md1 = m.getData(true,true);
- Thread.sleep(50);
- mda1 = m.getData(true,true);
- MemberImpl a1 = MemberImpl.getMember(md1);
- MemberImpl a2 = MemberImpl.getMember(mda1);
- assertTrue(a1.equals(a2));
- assertFalse(Arrays.equals(md1,mda1));
-
-
- }
-
- public boolean compareMembers(MemberImpl impl1, MemberImpl impl2) {
- boolean result = true;
- result = result && Arrays.equals(impl1.getHost(),impl2.getHost());
- result = result && Arrays.equals(impl1.getPayload(),impl2.getPayload());
- result = result && Arrays.equals(impl1.getUniqueId(),impl2.getUniqueId());
- result = result && impl1.getPort() == impl2.getPort();
- return result;
- }
-
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- }
-
-}
+++ /dev/null
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.catalina.tribes.test.membership;
-
-import java.util.ArrayList;
-
-import junit.framework.TestCase;
-
-import org.apache.catalina.tribes.ByteMessage;
-import org.apache.catalina.tribes.Channel;
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.ManagedChannel;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.MembershipListener;
-import org.apache.catalina.tribes.group.GroupChannel;
-import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
-
-/**
- * <p>Title: </p>
- *
- * <p>Description: </p>
- *
- * <p>Company: </p>
- *
- * @author not attributable
- * @version 1.0
- */
-public class TestTcpFailureDetector extends TestCase {
- private TcpFailureDetector tcpFailureDetector1 = null;
- private TcpFailureDetector tcpFailureDetector2 = null;
- private ManagedChannel channel1 = null;
- private ManagedChannel channel2 = null;
- private TestMbrListener mbrlist1 = null;
- private TestMbrListener mbrlist2 = null;
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- channel1 = new GroupChannel();
- channel2 = new GroupChannel();
- channel1.getMembershipService().setPayload("Channel-1".getBytes("ASCII"));
- channel2.getMembershipService().setPayload("Channel-2".getBytes("ASCII"));
- mbrlist1 = new TestMbrListener("Channel-1");
- mbrlist2 = new TestMbrListener("Channel-2");
- tcpFailureDetector1 = new TcpFailureDetector();
- tcpFailureDetector2 = new TcpFailureDetector();
- channel1.addInterceptor(tcpFailureDetector1);
- channel2.addInterceptor(tcpFailureDetector2);
- channel1.addMembershipListener(mbrlist1);
- channel2.addMembershipListener(mbrlist2);
- }
-
- public void clear() {
- mbrlist1.members.clear();
- mbrlist2.members.clear();
- }
-
- public void testTcpSendFailureMemberDrop() throws Exception {
- System.out.println("testTcpSendFailureMemberDrop()");
- clear();
- channel1.start(Channel.DEFAULT);
- channel2.start(Channel.DEFAULT);
- //Thread.sleep(1000);
- assertEquals("Expecting member count to be equal",mbrlist1.members.size(),mbrlist2.members.size());
- channel2.stop(Channel.SND_RX_SEQ);
- ByteMessage msg = new ByteMessage(new byte[1024]);
- try {
- channel1.send(channel1.getMembers(), msg, 0);
- assertEquals("Message send should have failed.",true,false);
- } catch ( ChannelException x ) {
- // Ignore
- }
- assertEquals("Expecting member count to not be equal",mbrlist1.members.size()+1,mbrlist2.members.size());
- channel1.stop(Channel.DEFAULT);
- channel2.stop(Channel.DEFAULT);
- }
-
- public void testTcpFailureMemberAdd() throws Exception {
- System.out.println("testTcpFailureMemberAdd()");
- clear();
- channel1.start(Channel.DEFAULT);
- channel2.start(Channel.SND_RX_SEQ);
- channel2.start(Channel.SND_TX_SEQ);
- channel2.start(Channel.MBR_RX_SEQ);
- channel2.stop(Channel.SND_RX_SEQ);
- channel2.start(Channel.MBR_TX_SEQ);
- //Thread.sleep(1000);
- assertEquals("Expecting member count to not be equal",mbrlist1.members.size()+1,mbrlist2.members.size());
- channel1.stop(Channel.DEFAULT);
- channel2.stop(Channel.DEFAULT);
- }
-
- public void testTcpMcastFail() throws Exception {
- System.out.println("testTcpMcastFail()");
- clear();
- channel1.start(Channel.DEFAULT);
- channel2.start(Channel.DEFAULT);
- //Thread.sleep(1000);
- assertEquals("Expecting member count to be equal",mbrlist1.members.size(),mbrlist2.members.size());
- channel2.stop(Channel.MBR_TX_SEQ);
- ByteMessage msg = new ByteMessage(new byte[1024]);
- try {
- Thread.sleep(5000);
- assertEquals("Expecting member count to be equal",mbrlist1.members.size(),mbrlist2.members.size());
- channel1.send(channel1.getMembers(), msg, 0);
- } catch ( ChannelException x ) {
- assertEquals("Message send should have succeeded.",true,false);
- }
- channel1.stop(Channel.DEFAULT);
- channel2.stop(Channel.DEFAULT);
- }
-
-
- @Override
- protected void tearDown() throws Exception {
- tcpFailureDetector1 = null;
- tcpFailureDetector2 = null;
- try { channel1.stop(Channel.DEFAULT);}catch (Exception ignore){ /* Ignore */ }
- channel1 = null;
- try { channel2.stop(Channel.DEFAULT);}catch (Exception ignore){ /* Ignore */ }
- channel2 = null;
- super.tearDown();
- }
-
- public static class TestMbrListener implements MembershipListener {
- public String name = null;
- public TestMbrListener(String name) {
- this.name = name;
- }
- public ArrayList<Member> members = new ArrayList<Member>();
- @Override
- public void memberAdded(Member member) {
- if ( !members.contains(member) ) {
- members.add(member);
- try{
- System.out.println(name + ":member added[" + new String(member.getPayload(), "ASCII") + "]");
- }catch ( Exception x ) {
- System.out.println(name + ":member added[unknown]");
- }
- }
- }
-
- @Override
- public void memberDisappeared(Member member) {
- if ( members.contains(member) ) {
- members.remove(member);
- try{
- System.out.println(name + ":member disappeared[" + new String(member.getPayload(), "ASCII") + "]");
- }catch ( Exception x ) {
- System.out.println(name + ":member disappeared[unknown]");
- }
- }
- }
-
- }
-
-}