From cce4b714e17cda689f09dc06f43d8f4faa6e607b Mon Sep 17 00:00:00 2001 From: fhanik Date: Mon, 7 Jan 2008 21:11:26 +0000 Subject: [PATCH] minor fixes git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@609778 13f79535-47bb-0310-9956-ffa450edef68 --- .../group/interceptors/ThroughputInterceptor.java | 50 +++++++++++++++++++--- .../catalina/tribes/transport/nio/NioReceiver.java | 27 ++++++++++-- 2 files changed, 69 insertions(+), 8 deletions(-) diff --git a/java/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java b/java/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java index e9636a91b..2d4935fdd 100644 --- a/java/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java +++ b/java/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java @@ -16,6 +16,10 @@ package org.apache.catalina.tribes.group.interceptors; +import java.text.DecimalFormat; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.catalina.tribes.ChannelException; import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.Member; @@ -23,10 +27,6 @@ import org.apache.catalina.tribes.group.ChannelInterceptorBase; import org.apache.catalina.tribes.group.InterceptorPayload; import org.apache.catalina.tribes.io.ChannelData; import org.apache.catalina.tribes.io.XByteBuffer; -import java.text.DecimalFormat; -import org.apache.catalina.tribes.membership.MemberImpl; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; @@ -61,7 +61,7 @@ public class ThroughputInterceptor extends ChannelInterceptorBase { super.sendMessage(destination, msg, payload); }catch ( ChannelException x ) { msgTxErr.addAndGet(1); - access.addAndGet(-1); + if ( access.get() == 1 ) access.addAndGet(-1); throw x; } mbTx += ((double)(bytes*destination.length))/(1024d*1024d); @@ -117,4 +117,44 @@ public class ThroughputInterceptor extends ChannelInterceptorBase { return interval; } + public double getLastCnt() { + return lastCnt; + } + + public double getMbAppTx() { + return mbAppTx; + } + + public double getMbRx() { + return mbRx; + } + + public double getMbTx() { + return mbTx; + } + + public AtomicLong getMsgRxCnt() { + return msgRxCnt; + } + + public AtomicLong getMsgTxCnt() { + return msgTxCnt; + } + + public AtomicLong getMsgTxErr() { + return msgTxErr; + } + + public long getRxStart() { + return rxStart; + } + + public double getTimeTx() { + return timeTx; + } + + public long getTxStart() { + return txStart; + } + } diff --git a/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java b/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java index a7ba368f9..86707b3bd 100644 --- a/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java +++ b/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java @@ -37,6 +37,7 @@ import org.apache.catalina.tribes.util.StringManager; import java.util.LinkedList; import java.util.Set; import java.nio.channels.CancelledKeyException; +import java.nio.channels.ClosedSelectorException; /** * @author Filip Hanik @@ -303,8 +304,7 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv } serverChannel.close(); - if (selector != null) - selector.close(); + closeSelector(); } @@ -319,7 +319,7 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv if (selector != null) { try { selector.wakeup(); - selector.close(); + closeSelector(); } catch (Exception x) { log.error("Unable to close cluster receiver selector.", x); } finally { @@ -328,6 +328,27 @@ public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiv } } + private void closeSelector() throws IOException { + Selector selector = this.selector; + this.selector = null; + if (selector==null) return; + try { + Iterator it = selector.keys().iterator(); + // look at each key in the selected set + while (it.hasNext()) { + SelectionKey key = (SelectionKey)it.next(); + key.channel().close(); + key.attach(null); + key.cancel(); + } + }catch ( IOException ignore ){ + if (log.isWarnEnabled()) { + log.warn("Unable to cleanup on selector close.",ignore); + } + }catch ( ClosedSelectorException ignore){} + selector.close(); + } + // ---------------------------------------------------------- /** -- 2.11.0