Force closure of connections upon a server shutdown
authorfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Mon, 2 Jul 2007 18:46:20 +0000 (18:46 +0000)
committerfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Mon, 2 Jul 2007 18:46:20 +0000 (18:46 +0000)
git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@552560 13f79535-47bb-0310-9956-ffa450edef68

java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
test/org/apache/catalina/tribes/test/io/TestSenderConnections.java

index a7ba368..86707b3 100644 (file)
@@ -37,6 +37,7 @@ import org.apache.catalina.tribes.util.StringManager;
 import java.util.LinkedList;
 import java.util.Set;
 import java.nio.channels.CancelledKeyException;
+import java.nio.channels.ClosedSelectorException;
 
 /**
  * @author Filip Hanik
@@ -303,8 +304,7 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv
 
         }
         serverChannel.close();
-        if (selector != null)
-            selector.close();
+        closeSelector();
     }
 
     
@@ -319,7 +319,7 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv
         if (selector != null) {
             try {
                 selector.wakeup();
-                selector.close();
+                closeSelector();
             } catch (Exception x) {
                 log.error("Unable to close cluster receiver selector.", x);
             } finally {
@@ -328,6 +328,27 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv
         }
     }
 
+    private void closeSelector() throws IOException {
+        Selector selector = this.selector;
+        this.selector = null;
+        if (selector==null) return;
+        try {
+            Iterator it = selector.keys().iterator();
+            // look at each key in the selected set
+            while (it.hasNext()) {
+                SelectionKey key = (SelectionKey)it.next();
+                key.channel().close();
+                key.attach(null);
+                key.cancel();
+            }
+        }catch ( IOException ignore ){
+            if (log.isWarnEnabled()) {
+                log.warn("Unable to cleanup on selector close.",ignore);
+            }
+        }catch ( ClosedSelectorException ignore){}
+        selector.close();
+    }
+
     // ----------------------------------------------------------
 
     /**
index 6b52f9b..6503244 100644 (file)
@@ -80,6 +80,45 @@ public class TestSenderConnections extends TestCase {
         channels[0].send(new Member[]{impl},new TestMsg(),0);
     }
     
+
+    public void testSendToRemote() throws Exception {
+        ReplicationTransmitter transmitter = (ReplicationTransmitter) channels[0].getChannelSender();
+        AbstractSender sender = (AbstractSender)transmitter.getTransport();
+        sender.setMaxRetryAttempts(0);
+        sender.setTimeout(60000);
+        MemberImpl impl = new MemberImpl("127.0.0.1",9999,1000,new byte[]{1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8});
+        for (int i=0; i<1000; i++) {
+            if (i%100==0) System.out.println("Sending message:"+(i+1));
+            channels[0].send(new Member[] {impl}, new TestMsg(), 0);
+        }
+    }
+
+
+    public void testSendToFailing() throws Exception {
+        ReplicationTransmitter transmitter = (ReplicationTransmitter) channels[0].getChannelSender();
+        AbstractSender sender = (AbstractSender)transmitter.getTransport();
+        sender.setMaxRetryAttempts(0);
+        sender.setTimeout(60000);
+        Member[] ma = channels[0].getMembers();
+        final Member m = channels[1].getLocalMember(true);
+        Thread st = new Thread() {
+            public void run() {
+                try {
+                    for (int i=0; i<10000; i++ ) { 
+                        channels[0].send(new Member[] {m}, new TestMsg(), 0);
+                    }
+                } catch (Exception x) {
+                    x.printStackTrace();
+                }
+            }
+        };
+        st.start();
+        Thread.sleep(250);
+        channels[1].stop(Channel.DEFAULT);
+        st.join();
+    }
+
+
     public void testKeepAliveCount() throws Exception {
         System.out.println("Setting keep alive count to 0");
         for (int i = 0; i < channels.length; i++) {