import org.apache.catalina.tribes.group.ChannelInterceptorBase;
import org.apache.catalina.tribes.group.InterceptorPayload;
import org.apache.catalina.tribes.io.XByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
private boolean forwardExpired = true;
private int maxQueue = Integer.MAX_VALUE;
- public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
+ public synchronized void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
if ( !okToProcess(msg.getOptions()) ) {
super.sendMessage(destination, msg, payload);
return;
}
}
- public void messageReceived(ChannelMessage msg) {
+ public synchronized void messageReceived(ChannelMessage msg) {
if ( !okToProcess(msg.getOptions()) ) {
super.messageReceived(msg);
return;
if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false);
}
- public synchronized void processLeftOvers(Member member, boolean force) {
+ public void processLeftOvers(Member member, boolean force) {
MessageOrder tmp = (MessageOrder)incoming.get(member);
if ( force ) {
Counter cnt = getInCounter(member);
* @param order MessageOrder
* @return boolean - true if a message expired and was processed
*/
- public synchronized boolean processIncoming(MessageOrder order) {
+ public boolean processIncoming(MessageOrder order) {
boolean result = false;
Member member = order.getMessage().getAddress();
Counter cnt = getInCounter(member);
//reset the head
if ( tmp == head ) head = tmp.next;
cnt.setCounter(tmp.getMsgNr()+1);
- if ( getForwardExpired() ) super.messageReceived(tmp.getMessage());
+ if ( getForwardExpired() )
+ super.messageReceived(tmp.getMessage());
tmp.setMessage(null);
tmp = tmp.next;
if ( prev != null ) prev.next = tmp;
return result;
}
- public void memberAdded(Member member) {
+ public synchronized void memberAdded(Member member) {
//notify upwards
getInCounter(member);
getOutCounter(member);
super.memberAdded(member);
}
- public void memberDisappeared(Member member) {
+ public synchronized void memberDisappeared(Member member) {
//notify upwards
outcounter.remove(member);
incounter.remove(member);
return cnt.inc();
}
- public synchronized Counter getInCounter(Member mbr) {
+ public Counter getInCounter(Member mbr) {
Counter cnt = (Counter)incounter.get(mbr);
if ( cnt == null ) {
cnt = new Counter();
return cnt;
}
- public synchronized Counter getOutCounter(Member mbr) {
+ public Counter getOutCounter(Member mbr) {
Counter cnt = (Counter)outcounter.get(mbr);
if ( cnt == null ) {
cnt = new Counter();
}
public static class Counter {
- private int value = 0;
+ private AtomicInteger value = new AtomicInteger(0);
public int getCounter() {
- return value;
+ return value.get();
}
- public synchronized void setCounter(int counter) {
- this.value = counter;
+ public void setCounter(int counter) {
+ this.value.set(counter);
}
- public synchronized int inc() {
- return ++value;
+ public int inc() {
+ return value.addAndGet(1);
}
}
--- /dev/null
+/*\r
+ * Licensed to the Apache Software Foundation (ASF) under one or more\r
+ * contributor license agreements. See the NOTICE file distributed with\r
+ * this work for additional information regarding copyright ownership.\r
+ * The ASF licenses this file to You under the Apache License, Version 2.0\r
+ * (the "License"); you may not use this file except in compliance with\r
+ * the License. You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ */\r
+package org.apache.catalina.tribes.test.interceptors;\r
+\r
+import org.apache.catalina.tribes.Channel;\r
+import org.apache.catalina.tribes.Member;\r
+import org.apache.catalina.tribes.group.GroupChannel;\r
+import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator;\r
+import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;\r
+import junit.framework.TestCase;\r
+import junit.framework.TestResult;\r
+import junit.framework.TestSuite;\r
+import org.apache.catalina.tribes.ChannelListener;\r
+import java.io.Serializable;\r
+import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;\r
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;\r
+import org.apache.catalina.tribes.ChannelMessage;\r
+import org.apache.catalina.tribes.group.InterceptorPayload;\r
+import org.apache.catalina.tribes.ChannelException;\r
+\r
+public class TestOrderInterceptor extends TestCase {\r
+\r
+ GroupChannel[] channels = null;\r
+ OrderInterceptor[] orderitcs = null;\r
+ MangleOrderInterceptor[] mangleitcs = null;\r
+ TestListener[] test = null;\r
+ int channelCount = 2;\r
+ Thread[] threads = null;\r
+ protected void setUp() throws Exception {\r
+ System.out.println("Setup");\r
+ super.setUp();\r
+ channels = new GroupChannel[channelCount];\r
+ orderitcs = new OrderInterceptor[channelCount];\r
+ mangleitcs = new MangleOrderInterceptor[channelCount];\r
+ test = new TestListener[channelCount];\r
+ threads = new Thread[channelCount];\r
+ for ( int i=0; i<channelCount; i++ ) {\r
+ channels[i] = new GroupChannel();\r
+ orderitcs[i] = new OrderInterceptor();\r
+ mangleitcs[i] = new MangleOrderInterceptor();\r
+ orderitcs[i].setExpire(Long.MAX_VALUE);\r
+ channels[i].addInterceptor(orderitcs[i]);\r
+ channels[i].addInterceptor(mangleitcs[i]);\r
+ test[i] = new TestListener(i);\r
+ channels[i].addChannelListener(test[i]);\r
+ final int j = i;\r
+ threads[i] = new Thread() {\r
+ public void run() {\r
+ try {\r
+ channels[j].start(Channel.DEFAULT);\r
+ Thread.sleep(50);\r
+ } catch (Exception x) {\r
+ x.printStackTrace();\r
+ }\r
+ }\r
+ };\r
+ }\r
+ for ( int i=0; i<channelCount; i++ ) threads[i].start();\r
+ for ( int i=0; i<channelCount; i++ ) threads[i].join();\r
+ Thread.sleep(1000);\r
+ }\r
+ \r
+ public void testOrder1() throws Exception {\r
+ Member[] dest = channels[0].getMembers();\r
+ for ( int i=0; i<100; i++ ) {\r
+ channels[0].send(dest,new Integer(i),0);\r
+ }\r
+ Thread.sleep(5000);\r
+ for ( int i=0; i<test.length; i++ ) {\r
+ super.assertEquals(false,test[i].fail);\r
+ }\r
+ }\r
+ \r
+ protected void tearDown() throws Exception {\r
+ System.out.println("tearDown");\r
+ super.tearDown();\r
+ for ( int i=0; i<channelCount; i++ ) {\r
+ channels[i].stop(Channel.DEFAULT);\r
+ }\r
+ }\r
+ \r
+ public static void main(String[] args) throws Exception {\r
+ TestSuite suite = new TestSuite();\r
+ suite.addTestSuite(TestOrderInterceptor.class);\r
+ suite.run(new TestResult());\r
+ }\r
+ \r
+ public static class TestListener implements ChannelListener {\r
+ int id = -1;\r
+ public TestListener(int id) {\r
+ this.id = id;\r
+ }\r
+ int cnt = 0;\r
+ int total = 0;\r
+ boolean fail = false;\r
+ public synchronized void messageReceived(Serializable msg, Member sender) {\r
+ total++;\r
+ Integer i = (Integer)msg;\r
+ if ( i.intValue() != cnt ) fail = true;\r
+ else cnt++;\r
+ System.out.println("Listener["+id+"] Message received:"+i+" Count:"+total);\r
+\r
+ }\r
+\r
+ public boolean accept(Serializable msg, Member sender) {\r
+ return (msg instanceof Integer);\r
+ }\r
+ }\r
+ \r
+ public static class MangleOrderInterceptor extends ChannelInterceptorBase {\r
+ int cnt = 1;\r
+ ChannelMessage hold = null;\r
+ Member[] dest = null;\r
+ public synchronized void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {\r
+ if ( hold == null ) {\r
+ //System.out.println("Skipping message:"+msg);\r
+ hold = (ChannelMessage)msg.deepclone();\r
+ dest = new Member[destination.length];\r
+ System.arraycopy(destination,0,dest,0,dest.length);\r
+ } else {\r
+ //System.out.println("Sending message:"+msg);\r
+ super.sendMessage(destination,msg,payload);\r
+ //System.out.println("Sending message:"+hold);\r
+ super.sendMessage(dest,hold,null);\r
+ hold = null;\r
+ dest = null;\r
+ }\r
+ }\r
+ }\r
+ \r
+ \r
+ \r
+ \r
+\r
+}\r