From: fhanik Date: Thu, 17 May 2007 13:19:59 +0000 (+0000) Subject: Added in a unit test for ordering messages X-Git-Url: https://git.internetallee.de/?a=commitdiff_plain;h=eecb49f6d54bfa59b6d55e4970d451e5273ac04a;p=tomcat7.0 Added in a unit test for ordering messages git-svn-id: https://svn.apache.org/repos/asf/tomcat/tc6.0.x/trunk@538908 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/java/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java b/java/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java index e651b96cf..34d88523d 100644 --- a/java/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java +++ b/java/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java @@ -24,6 +24,7 @@ import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.group.ChannelInterceptorBase; import org.apache.catalina.tribes.group.InterceptorPayload; import org.apache.catalina.tribes.io.XByteBuffer; +import java.util.concurrent.atomic.AtomicInteger; @@ -59,7 +60,7 @@ public class OrderInterceptor extends ChannelInterceptorBase { private boolean forwardExpired = true; private int maxQueue = Integer.MAX_VALUE; - public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { + public synchronized void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { if ( !okToProcess(msg.getOptions()) ) { super.sendMessage(destination, msg, payload); return; @@ -76,7 +77,7 @@ public class OrderInterceptor extends ChannelInterceptorBase { } } - public void messageReceived(ChannelMessage msg) { + public synchronized void messageReceived(ChannelMessage msg) { if ( !okToProcess(msg.getOptions()) ) { super.messageReceived(msg); return; @@ -87,7 +88,7 @@ public class OrderInterceptor extends ChannelInterceptorBase { if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false); } - public synchronized void processLeftOvers(Member member, boolean force) { + public void processLeftOvers(Member member, boolean force) { MessageOrder tmp = (MessageOrder)incoming.get(member); if ( force ) { Counter cnt = getInCounter(member); @@ -100,7 +101,7 @@ public class OrderInterceptor extends ChannelInterceptorBase { * @param order MessageOrder * @return boolean - true if a message expired and was processed */ - public synchronized boolean processIncoming(MessageOrder order) { + public boolean processIncoming(MessageOrder order) { boolean result = false; Member member = order.getMessage().getAddress(); Counter cnt = getInCounter(member); @@ -130,7 +131,8 @@ public class OrderInterceptor extends ChannelInterceptorBase { //reset the head if ( tmp == head ) head = tmp.next; cnt.setCounter(tmp.getMsgNr()+1); - if ( getForwardExpired() ) super.messageReceived(tmp.getMessage()); + if ( getForwardExpired() ) + super.messageReceived(tmp.getMessage()); tmp.setMessage(null); tmp = tmp.next; if ( prev != null ) prev.next = tmp; @@ -145,14 +147,14 @@ public class OrderInterceptor extends ChannelInterceptorBase { return result; } - public void memberAdded(Member member) { + public synchronized void memberAdded(Member member) { //notify upwards getInCounter(member); getOutCounter(member); super.memberAdded(member); } - public void memberDisappeared(Member member) { + public synchronized void memberDisappeared(Member member) { //notify upwards outcounter.remove(member); incounter.remove(member); @@ -166,7 +168,7 @@ public class OrderInterceptor extends ChannelInterceptorBase { return cnt.inc(); } - public synchronized Counter getInCounter(Member mbr) { + public Counter getInCounter(Member mbr) { Counter cnt = (Counter)incounter.get(mbr); if ( cnt == null ) { cnt = new Counter(); @@ -176,7 +178,7 @@ public class OrderInterceptor extends ChannelInterceptorBase { return cnt; } - public synchronized Counter getOutCounter(Member mbr) { + public Counter getOutCounter(Member mbr) { Counter cnt = (Counter)outcounter.get(mbr); if ( cnt == null ) { cnt = new Counter(); @@ -186,18 +188,18 @@ public class OrderInterceptor extends ChannelInterceptorBase { } public static class Counter { - private int value = 0; + private AtomicInteger value = new AtomicInteger(0); public int getCounter() { - return value; + return value.get(); } - public synchronized void setCounter(int counter) { - this.value = counter; + public void setCounter(int counter) { + this.value.set(counter); } - public synchronized int inc() { - return ++value; + public int inc() { + return value.addAndGet(1); } } diff --git a/test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java b/test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java new file mode 100644 index 000000000..cd4f99dab --- /dev/null +++ b/test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.tribes.test.interceptors; + +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.group.GroupChannel; +import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator; +import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector; +import junit.framework.TestCase; +import junit.framework.TestResult; +import junit.framework.TestSuite; +import org.apache.catalina.tribes.ChannelListener; +import java.io.Serializable; +import org.apache.catalina.tribes.group.interceptors.OrderInterceptor; +import org.apache.catalina.tribes.group.ChannelInterceptorBase; +import org.apache.catalina.tribes.ChannelMessage; +import org.apache.catalina.tribes.group.InterceptorPayload; +import org.apache.catalina.tribes.ChannelException; + +public class TestOrderInterceptor extends TestCase { + + GroupChannel[] channels = null; + OrderInterceptor[] orderitcs = null; + MangleOrderInterceptor[] mangleitcs = null; + TestListener[] test = null; + int channelCount = 2; + Thread[] threads = null; + protected void setUp() throws Exception { + System.out.println("Setup"); + super.setUp(); + channels = new GroupChannel[channelCount]; + orderitcs = new OrderInterceptor[channelCount]; + mangleitcs = new MangleOrderInterceptor[channelCount]; + test = new TestListener[channelCount]; + threads = new Thread[channelCount]; + for ( int i=0; i