import java.io.IOException;
import java.util.List;
-
import org.apache.catalina.tribes.Member;
/**
}
public void add(Member member) {
- // we don't need it senders are pooled...
+ // no op, senders created upon demans
}
-
- public void remove(Member member) {
- queue.remove(member) ;
- }
+ public void remove(Member member) {
+ //no op for now, should not cancel out any keys
+ //can create serious sync issues
+ //all TCP connections are cleared out through keepalive
+ //and if remote node disappears
+ }
// ----------------------------------------------------- Inner Class
private class SenderQueue {
PooledSender parent = null;
- private List<DataSender> notinuse = null;
+ private List notinuse = null;
- private List<DataSender> inuse = null;
+ private List inuse = null;
private boolean isOpen = true;
public SenderQueue(PooledSender parent, int limit) {
this.limit = limit;
this.parent = parent;
- notinuse = new java.util.LinkedList<DataSender>();
- inuse = new java.util.LinkedList<DataSender>();
+ notinuse = new java.util.LinkedList();
+ inuse = new java.util.LinkedList();
}
/**
return result;
}
- // FIXME: remove also inuse senders. but then we must synch with sendMessage!
- public synchronized void remove(Member member) {
- if (isOpen) {
- DataSender[] list = new DataSender[notinuse.size()];
- notinuse.toArray(list);
- for (int i=0; i<list.length; i++) {
- if(list[i] instanceof MultiPointSender)
- ((MultiPointSender)list[i]).remove(member);
- }
- }
- }
-
public synchronized DataSender getSender(long timeout) {
long start = System.currentTimeMillis();
while ( true ) {
notinuse.clear();
inuse.clear();
notify();
+
+
+
}
public synchronized void open() {
}
public boolean keepalive() {
- //throw new UnsupportedOperationException("Method ParallelNioSender.checkKeepAlive() not implemented");
boolean result = false;
for ( Iterator i = nioSenders.entrySet().iterator(); i.hasNext(); ) {
Map.Entry entry = (Map.Entry)i.next();
if ( sender.keepalive() ) {
nioSenders.remove(entry.getKey());
result = true;
+ } else {
+ try {
+ sender.read(null);
+ }catch ( IOException x ) {
+ sender.disconnect();
+ sender.reset();
+ nioSenders.remove(entry.getKey());
+ result = true;
+ }catch ( Exception x ) {
+ log.warn("Error during keepalive test for sender:"+sender,x);
+ }
}
}
+ //clean up any cancelled keys
+ if ( result ) try { selector.selectNow(); }catch (Exception ignore){}
return result;
}