From: fhanik Date: Mon, 2 Jul 2007 18:46:20 +0000 (+0000) Subject: Force closure of connections upon a server shutdown X-Git-Url: https://git.internetallee.de/?a=commitdiff_plain;h=d8d48fe00cad9730e4619e6c31b754049d4b5fb1;p=tomcat7.0 Force closure of connections upon a server shutdown git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@552560 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java b/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java index a7ba368f9..86707b3bd 100644 --- a/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java +++ b/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java @@ -37,6 +37,7 @@ import org.apache.catalina.tribes.util.StringManager; import java.util.LinkedList; import java.util.Set; import java.nio.channels.CancelledKeyException; +import java.nio.channels.ClosedSelectorException; /** * @author Filip Hanik @@ -303,8 +304,7 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv } serverChannel.close(); - if (selector != null) - selector.close(); + closeSelector(); } @@ -319,7 +319,7 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv if (selector != null) { try { selector.wakeup(); - selector.close(); + closeSelector(); } catch (Exception x) { log.error("Unable to close cluster receiver selector.", x); } finally { @@ -328,6 +328,27 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv } } + private void closeSelector() throws IOException { + Selector selector = this.selector; + this.selector = null; + if (selector==null) return; + try { + Iterator it = selector.keys().iterator(); + // look at each key in the selected set + while (it.hasNext()) { + SelectionKey key = (SelectionKey)it.next(); + key.channel().close(); + key.attach(null); + key.cancel(); + } + }catch ( IOException ignore ){ + if (log.isWarnEnabled()) { + log.warn("Unable to cleanup on selector close.",ignore); + } + }catch ( ClosedSelectorException ignore){} + selector.close(); + } + // ---------------------------------------------------------- /** diff --git a/test/org/apache/catalina/tribes/test/io/TestSenderConnections.java b/test/org/apache/catalina/tribes/test/io/TestSenderConnections.java index 6b52f9b90..65032446f 100644 --- a/test/org/apache/catalina/tribes/test/io/TestSenderConnections.java +++ b/test/org/apache/catalina/tribes/test/io/TestSenderConnections.java @@ -80,6 +80,45 @@ public class TestSenderConnections extends TestCase { channels[0].send(new Member[]{impl},new TestMsg(),0); } + + public void testSendToRemote() throws Exception { + ReplicationTransmitter transmitter = (ReplicationTransmitter) channels[0].getChannelSender(); + AbstractSender sender = (AbstractSender)transmitter.getTransport(); + sender.setMaxRetryAttempts(0); + sender.setTimeout(60000); + MemberImpl impl = new MemberImpl("127.0.0.1",9999,1000,new byte[]{1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8}); + for (int i=0; i<1000; i++) { + if (i%100==0) System.out.println("Sending message:"+(i+1)); + channels[0].send(new Member[] {impl}, new TestMsg(), 0); + } + } + + + public void testSendToFailing() throws Exception { + ReplicationTransmitter transmitter = (ReplicationTransmitter) channels[0].getChannelSender(); + AbstractSender sender = (AbstractSender)transmitter.getTransport(); + sender.setMaxRetryAttempts(0); + sender.setTimeout(60000); + Member[] ma = channels[0].getMembers(); + final Member m = channels[1].getLocalMember(true); + Thread st = new Thread() { + public void run() { + try { + for (int i=0; i<10000; i++ ) { + channels[0].send(new Member[] {m}, new TestMsg(), 0); + } + } catch (Exception x) { + x.printStackTrace(); + } + } + }; + st.start(); + Thread.sleep(250); + channels[1].stop(Channel.DEFAULT); + st.join(); + } + + public void testKeepAliveCount() throws Exception { System.out.println("Setting keep alive count to 0"); for (int i = 0; i < channels.length; i++) {