import org.apache.catalina.util.StringManager;
import org.apache.coyote.ActionCode;
import org.apache.tomcat.util.net.PollerInterest;
+import java.util.Arrays;
public class CometEventImpl implements CometEvent {
public void register(CometEvent.CometOperation... operations)
throws IOException, IllegalStateException {
//add it to the registered set
- for (CometEvent.CometOperation co : operations) {
- if (!cometOperations.contains(co)) {
- cometOperations.add(co);
- request.action(ActionCode.ACTION_COMET_REGISTER, translate(co));
- }
- }
+ cometOperations.addAll(Arrays.asList(operations));
+ request.action(ActionCode.ACTION_COMET_REGISTER, translate(cometOperations.toArray(new CometOperation[0])));
}
public void unregister(CometOperation... operations)
throws IOException, IllegalStateException {
//remove from the registered set
- for (CometEvent.CometOperation co : operations) {
- if (cometOperations.contains(co)) {
- cometOperations.remove(co);
- request.action(ActionCode.ACTION_COMET_UNREGISTER, translate(co));
- }
- }
+ cometOperations.removeAll(Arrays.asList(operations));
+ request.action(ActionCode.ACTION_COMET_UNREGISTER, translate(cometOperations.toArray(new CometOperation[0])));
}
public CometConfiguration[] getConfiguration() {
throw new IllegalStateException("The operation can only be performed when invoked by a Tomcat worker thread.");
}
- protected PollerInterest translate(CometOperation op) {
- if ( op == CometEvent.CometOperation.OP_READ )
- return PollerInterest.READ;
- else if ( op == CometEvent.CometOperation.OP_WRITE )
- return PollerInterest.WRITE;
- else if ( op == CometEvent.CometOperation.OP_CALLBACK )
- return PollerInterest.CALLBACK;
- else
- throw new IllegalArgumentException(op!=null?op.toString():"null");
+ protected PollerInterest[] translate(CometOperation... op) {
+ PollerInterest[] result = new PollerInterest[op.length];
+ for (int i=0; i<result.length; i++) {
+ if (op[i] == CometEvent.CometOperation.OP_READ)
+ result[i] = PollerInterest.READ;
+ else if (op[i] == CometEvent.CometOperation.OP_WRITE)
+ result[i] = PollerInterest.WRITE;
+ else if (op[i] == CometEvent.CometOperation.OP_CALLBACK)
+ result[i] = PollerInterest.CALLBACK;
+ else
+ throw new IllegalArgumentException(op != null ? op.toString() : "null");
+ }
+ return result;
}
//inner class used to keep track if the current thread is a worker thread.
}
private int getPollerInterest(Object param) throws IllegalArgumentException {
- if ( param == null || (!(param instanceof PollerInterest)) )
- throw new IllegalArgumentException("Action parameter must be a PollerInterest object.");
+ if ( param == null || (!(param instanceof PollerInterest[])) )
+ throw new IllegalArgumentException("Action parameter must be a PollerInterest[] object.");
int interest = 0;
- PollerInterest pi = (PollerInterest)param;
- if ( pi == PollerInterest.CALLBACK )
- interest = NioEndpoint.OP_CALLBACK;
- else if ( pi == PollerInterest.READ )
- interest = SelectionKey.OP_READ;
- else if ( pi == PollerInterest.WRITE )
- interest = SelectionKey.OP_WRITE;
- else
- throw new IllegalArgumentException(pi!=null?pi.toString():"null");
+ PollerInterest[] piarr = (PollerInterest[])param;
+ for ( PollerInterest pi : piarr ) {
+ if (pi == PollerInterest.CALLBACK)
+ interest = interest | NioEndpoint.OP_CALLBACK;
+ else if (pi == PollerInterest.READ)
+ interest = interest | SelectionKey.OP_READ;
+ else if (pi == PollerInterest.WRITE)
+ interest = interest | SelectionKey.OP_WRITE;
+ else
+ throw new IllegalArgumentException(pi != null ? pi.toString() : "null");
+ }
return interest;
}
public void cometInterest(NioChannel socket) {
KeyAttachment att = (KeyAttachment)socket.getAttachment(false);
add(socket,att.getCometOps());
+ if ( (att.getCometOps()&OP_CALLBACK) == OP_CALLBACK ) {
+ nextExpiration = 0; //force the check for faster callback
+ selector.wakeup();
+ }
}
public void wakeup() {
} else if ( attachment.getComet() ) {
//check if thread is available
if ( isWorkerAvailable() ) {
- unreg(sk, attachment, sk.readyOps());
- if (!processSocket(channel, SocketStatus.OPEN_READ))
- processSocket(channel, SocketStatus.DISCONNECT);
+ //set interest ops to 0 so we don't get multiple
+ //invokations
+ reg(sk, attachment, 0);
+ //read goes before write
+ if (sk.isReadable())
+ if (!processSocket(channel, SocketStatus.OPEN_READ)) processSocket(channel, SocketStatus.DISCONNECT);
+ else
+ if (!processSocket(channel, SocketStatus.OPEN_WRITE)) processSocket(channel, SocketStatus.DISCONNECT);
} else {
result = false;
}