*/
public class ChannelException extends Exception {
+ /**
+ * Empty list to avoid reinstatiating lists
+ */
+ protected static final FaultyMember[] EMPTY_LIST = new FaultyMember[0];
/*
* Holds a list of faulty members
*/
* @param mbr Member
* @param x Exception
*/
- public void addFaultyMember(Member mbr, Exception x ) {
- addFaultyMember(new FaultyMember(mbr,x));
+ public boolean addFaultyMember(Member mbr, Exception x ) {
+ return addFaultyMember(new FaultyMember(mbr,x));
}
/**
* Adds a list of faulty members
* @param mbrs FaultyMember[]
*/
- public void addFaultyMember(FaultyMember[] mbrs) {
+ public int addFaultyMember(FaultyMember[] mbrs) {
+ int result = 0;
for (int i=0; mbrs!=null && i<mbrs.length; i++ ) {
- addFaultyMember(mbrs[i]);
+ if ( addFaultyMember(mbrs[i]) ) result++;
}
+ return result;
}
/**
* Adds a faulty member
* @param mbr FaultyMember
*/
- public void addFaultyMember(FaultyMember mbr) {
+ public boolean addFaultyMember(FaultyMember mbr) {
if ( this.faultyMembers==null ) this.faultyMembers = new ArrayList();
- faultyMembers.add(mbr);
+ if ( !faultyMembers.contains(mbr) ) return faultyMembers.add(mbr);
+ else return false;
}
/**
* @return FaultyMember[]
*/
public FaultyMember[] getFaultyMembers() {
- if ( this.faultyMembers==null ) return new FaultyMember[0];
+ if ( this.faultyMembers==null ) return EMPTY_LIST;
return (FaultyMember[])faultyMembers.toArray(new FaultyMember[faultyMembers.size()]);
}
public String toString() {
return "FaultyMember:"+member.toString();
}
+
+ public int hashCode() {
+ return (member!=null)?member.hashCode():0;
+ }
+
+ public boolean equals(Object o) {
+ if (member==null || (!(o instanceof FaultyMember)) || (((FaultyMember)o).member==null)) return false;
+ return member.equals(((FaultyMember)o).member);
+ }
}
}
try {
remaining -= doLoop(selectTimeout, getMaxRetryAttempts(),waitForAck,msg);
} catch (Exception x ) {
+ int faulty = (cx == null)?0:cx.getFaultyMembers().length;
if ( cx == null ) {
if ( x instanceof ChannelException ) cx = (ChannelException)x;
else cx = new ChannelException("Parallel NIO send failed.", x);
} else {
if (x instanceof ChannelException) cx.addFaultyMember( ( (ChannelException) x).getFaultyMembers());
}
+ //count down the remaining on an error
+ if (faulty<cx.getFaultyMembers().length) remaining -= (cx.getFaultyMembers().length-faulty);
}
//bail out if all remaining senders are failing
if ( cx != null && cx.getFaultyMembers().length == remaining ) throw cx;
}
if ( remaining > 0 ) {
//timeout has occured
- cx = new ChannelException("Operation has timed out("+getTimeout()+" ms.).");
+ ChannelException cxtimeout = new ChannelException("Operation has timed out("+getTimeout()+" ms.).");
+ if ( cx==null ) cx = new ChannelException("Operation has timed out("+getTimeout()+" ms.).");
for (int i=0; i<senders.length; i++ ) {
- if (!senders[i].isComplete() ) cx.addFaultyMember(senders[i].getDestination(),null);
+ if (!senders[i].isComplete() ) cx.addFaultyMember(senders[i].getDestination(),cxtimeout);
}
throw cx;
+ } else if ( cx != null ) {
+ //there was an error
+ throw cx;
}
} catch (Exception x ) {
try { this.disconnect(); } catch (Exception ignore) {}