import org.apache.catalina.tribes.group.InterceptorPayload;
import org.apache.catalina.tribes.io.XByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
private long expire = 3000;
private boolean forwardExpired = true;
private int maxQueue = Integer.MAX_VALUE;
+
+ ReentrantReadWriteLock inLock = new ReentrantReadWriteLock(true);
+ ReentrantReadWriteLock outLock= new ReentrantReadWriteLock(true);
- public synchronized void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
+ public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
if ( !okToProcess(msg.getOptions()) ) {
super.sendMessage(destination, msg, payload);
return;
}
- for ( int i=0; i<destination.length; i++ ) {
- int nr = incCounter(destination[i]);
- //reduce byte copy
- msg.getMessage().append(nr);
- try {
- getNext().sendMessage(new Member[] {destination[i]}, msg, payload);
- }finally {
- msg.getMessage().trim(4);
+ try {
+ outLock.writeLock().lock();
+ for ( int i=0; i<destination.length; i++ ) {
+ int nr = incCounter(destination[i]);
+ //reduce byte copy
+ msg.getMessage().append(nr);
+ try {
+ getNext().sendMessage(new Member[] {destination[i]}, msg, payload);
+ }finally {
+ msg.getMessage().trim(4);
+ }
}
+ }finally {
+ outLock.writeLock().unlock();
}
}
- public synchronized void messageReceived(ChannelMessage msg) {
+ public void messageReceived(ChannelMessage msg) {
if ( !okToProcess(msg.getOptions()) ) {
super.messageReceived(msg);
return;
}
- int msgnr = XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4);
- msg.getMessage().trim(4);
- MessageOrder order = new MessageOrder(msgnr,(ChannelMessage)msg.deepclone());
- if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false);
- }
+ try {
+ inLock.writeLock().lock();
+ int msgnr = XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4);
+ msg.getMessage().trim(4);
+ MessageOrder order = new MessageOrder(msgnr,(ChannelMessage)msg.deepclone());
+ if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false);
- public void processLeftOvers(Member member, boolean force) {
+ }finally {
+ inLock.writeLock().unlock();
+ }
+ }
+ protected void processLeftOvers(Member member, boolean force) {
MessageOrder tmp = (MessageOrder)incoming.get(member);
if ( force ) {
Counter cnt = getInCounter(member);
* @param order MessageOrder
* @return boolean - true if a message expired and was processed
*/
- public boolean processIncoming(MessageOrder order) {
+ protected boolean processIncoming(MessageOrder order) {
boolean result = false;
Member member = order.getMessage().getAddress();
Counter cnt = getInCounter(member);
return result;
}
- public synchronized void memberAdded(Member member) {
+ public void memberAdded(Member member) {
+ //reset counters
+ try {
+ inLock.writeLock().lock();
+ getInCounter(member);
+ }finally {
+ inLock.writeLock().unlock();
+ }
+ try {
+ outLock.writeLock().lock();
+ getOutCounter(member);
+ }finally {
+ outLock.writeLock().unlock();
+ }
//notify upwards
- getInCounter(member);
- getOutCounter(member);
super.memberAdded(member);
}
- public synchronized void memberDisappeared(Member member) {
- //notify upwards
- outcounter.remove(member);
- incounter.remove(member);
+ public void memberDisappeared(Member member) {
+ //reset counters
+ try {
+ inLock.writeLock().lock();
+ incounter.remove(member);
+ }finally {
+ inLock.writeLock().unlock();
+ }
+ try {
+ outLock.writeLock().lock();
+ outcounter.remove(member);
+ }finally {
+ outLock.writeLock().unlock();
+ }
//clear the remaining queue
processLeftOvers(member,true);
+ //notify upwards
super.memberDisappeared(member);
}
- public int incCounter(Member mbr) {
+ protected int incCounter(Member mbr) {
Counter cnt = getOutCounter(mbr);
return cnt.inc();
}
- public Counter getInCounter(Member mbr) {
+ protected Counter getInCounter(Member mbr) {
Counter cnt = (Counter)incounter.get(mbr);
if ( cnt == null ) {
cnt = new Counter();
return cnt;
}
- public Counter getOutCounter(Member mbr) {
+ protected Counter getOutCounter(Member mbr) {
Counter cnt = (Counter)outcounter.get(mbr);
if ( cnt == null ) {
cnt = new Counter();
return cnt;
}
- public static class Counter {
+ protected static class Counter {
private AtomicInteger value = new AtomicInteger(0);
public int getCounter() {
}
}
- public static class MessageOrder {
+ protected static class MessageOrder {
private long received = System.currentTimeMillis();
private MessageOrder next;
private int msgNr;
import org.apache.catalina.tribes.ChannelMessage;\r
import org.apache.catalina.tribes.group.InterceptorPayload;\r
import org.apache.catalina.tribes.ChannelException;\r
+import java.util.concurrent.atomic.AtomicInteger;\r
\r
public class TestOrderInterceptor extends TestCase {\r
\r
\r
public void testOrder1() throws Exception {\r
Member[] dest = channels[0].getMembers();\r
+ final AtomicInteger value = new AtomicInteger(0);\r
for ( int i=0; i<100; i++ ) {\r
- channels[0].send(dest,new Integer(i),0);\r
+ channels[0].send(dest,new Integer(value.getAndAdd(1)),0);\r
}\r
Thread.sleep(5000);\r
for ( int i=0; i<test.length; i++ ) {\r
}\r
}\r
\r
+ public void testOrder2() throws Exception {\r
+ final Member[] dest = channels[0].getMembers();\r
+ final AtomicInteger value = new AtomicInteger(0);\r
+ Runnable run = new Runnable() {\r
+ public void run() {\r
+ for (int i = 0; i < 100; i++) {\r
+ try {\r
+ synchronized (channels[0]) {\r
+ channels[0].send(dest, new Integer(value.getAndAdd(1)), 0);\r
+ }\r
+ }catch ( Exception x ) {\r
+ x.printStackTrace();\r
+ assertEquals(true,false);\r
+ }\r
+ }\r
+ }\r
+ };\r
+ Thread[] threads = new Thread[5];\r
+ for (int i=0;i<threads.length;i++) {\r
+ threads[i] = new Thread(run);\r
+ }\r
+ for (int i=0;i<threads.length;i++) {\r
+ threads[i].start();\r
+ }\r
+ for (int i=0;i<threads.length;i++) {\r
+ threads[i].join();\r
+ }\r
+ Thread.sleep(5000);\r
+ for ( int i=0; i<test.length; i++ ) {\r
+ super.assertEquals(false,test[i].fail);\r
+ }\r
+ }\r
+\r
+\r
protected void tearDown() throws Exception {\r
System.out.println("tearDown");\r
super.tearDown();\r
Integer i = (Integer)msg;\r
if ( i.intValue() != cnt ) fail = true;\r
else cnt++;\r
- System.out.println("Listener["+id+"] Message received:"+i+" Count:"+total);\r
+ System.out.println("Listener["+id+"] Message received:"+i+" Count:"+total+" Fail:"+fail);\r
\r
}\r
\r