simplify register and poller interest for comet, all can be done in one call
authorfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Thu, 31 May 2007 08:48:01 +0000 (08:48 +0000)
committerfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Thu, 31 May 2007 08:48:01 +0000 (08:48 +0000)
git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@543086 13f79535-47bb-0310-9956-ffa450edef68

java/org/apache/catalina/connector/CometEventImpl.java
java/org/apache/coyote/http11/Http11NioProcessor.java
java/org/apache/tomcat/util/net/NioEndpoint.java

index b0b68df..728f054 100644 (file)
@@ -28,6 +28,7 @@ import org.apache.catalina.CometEvent;
 import org.apache.catalina.util.StringManager;
 import org.apache.coyote.ActionCode;
 import org.apache.tomcat.util.net.PollerInterest;
+import java.util.Arrays;
 
 public class CometEventImpl implements CometEvent {
 
@@ -160,23 +161,15 @@ public class CometEventImpl implements CometEvent {
     public void register(CometEvent.CometOperation... operations)
         throws IOException, IllegalStateException {
         //add it to the registered set
-        for (CometEvent.CometOperation co : operations) {
-            if (!cometOperations.contains(co)) {
-                cometOperations.add(co);
-                request.action(ActionCode.ACTION_COMET_REGISTER, translate(co));
-            }
-        }
+        cometOperations.addAll(Arrays.asList(operations));
+        request.action(ActionCode.ACTION_COMET_REGISTER, translate(cometOperations.toArray(new CometOperation[0])));
     }
 
     public void unregister(CometOperation... operations)
         throws IOException, IllegalStateException {
         //remove from the registered set
-        for (CometEvent.CometOperation co : operations) {
-            if (cometOperations.contains(co)) {
-                cometOperations.remove(co);
-                request.action(ActionCode.ACTION_COMET_UNREGISTER, translate(co));
-            }
-        }
+        cometOperations.removeAll(Arrays.asList(operations));
+        request.action(ActionCode.ACTION_COMET_UNREGISTER, translate(cometOperations.toArray(new CometOperation[0])));
     }
     
     public CometConfiguration[] getConfiguration() {
@@ -211,15 +204,19 @@ public class CometEventImpl implements CometEvent {
             throw new IllegalStateException("The operation can only be performed when invoked by a Tomcat worker thread.");
     }
     
-    protected PollerInterest translate(CometOperation op) {
-        if ( op == CometEvent.CometOperation.OP_READ )
-            return PollerInterest.READ;
-        else if ( op == CometEvent.CometOperation.OP_WRITE )
-            return PollerInterest.WRITE;
-        else if ( op == CometEvent.CometOperation.OP_CALLBACK )
-            return PollerInterest.CALLBACK;
-        else 
-            throw new IllegalArgumentException(op!=null?op.toString():"null");
+    protected PollerInterest[] translate(CometOperation... op) {
+        PollerInterest[] result = new PollerInterest[op.length];
+        for (int i=0; i<result.length; i++) {
+            if (op[i] == CometEvent.CometOperation.OP_READ)
+                result[i] = PollerInterest.READ;
+            else if (op[i] == CometEvent.CometOperation.OP_WRITE)
+                result[i] = PollerInterest.WRITE;
+            else if (op[i] == CometEvent.CometOperation.OP_CALLBACK)
+                result[i] = PollerInterest.CALLBACK;
+            else
+                throw new IllegalArgumentException(op != null ? op.toString() : "null");
+        }
+        return result;
     }
     
     //inner class used to keep track if the current thread is a worker thread.
index 9c416c6..a068dd4 100644 (file)
@@ -1242,18 +1242,20 @@ public class Http11NioProcessor implements ActionHook {
     }
 
     private int getPollerInterest(Object param) throws IllegalArgumentException {
-        if ( param == null || (!(param instanceof PollerInterest)) )
-            throw new IllegalArgumentException("Action parameter must be a PollerInterest object.");
+        if ( param == null || (!(param instanceof PollerInterest[])) )
+            throw new IllegalArgumentException("Action parameter must be a PollerInterest[] object.");
         int interest = 0;
-        PollerInterest pi = (PollerInterest)param;
-        if ( pi == PollerInterest.CALLBACK )
-            interest = NioEndpoint.OP_CALLBACK;
-        else if ( pi == PollerInterest.READ ) 
-            interest  = SelectionKey.OP_READ;
-        else if ( pi == PollerInterest.WRITE ) 
-            interest = SelectionKey.OP_WRITE;
-        else
-            throw new IllegalArgumentException(pi!=null?pi.toString():"null");
+        PollerInterest[] piarr = (PollerInterest[])param;
+        for ( PollerInterest pi : piarr ) {
+            if (pi == PollerInterest.CALLBACK)
+                interest = interest | NioEndpoint.OP_CALLBACK;
+            else if (pi == PollerInterest.READ)
+                interest = interest | SelectionKey.OP_READ;
+            else if (pi == PollerInterest.WRITE)
+                interest = interest | SelectionKey.OP_WRITE;
+            else
+                throw new IllegalArgumentException(pi != null ? pi.toString() : "null");
+        }
         return interest;
     }
 
index 268c957..3f37b4a 100644 (file)
@@ -1324,6 +1324,10 @@ public class NioEndpoint {
         public void cometInterest(NioChannel socket) {
             KeyAttachment att = (KeyAttachment)socket.getAttachment(false);
             add(socket,att.getCometOps());
+            if ( (att.getCometOps()&OP_CALLBACK) == OP_CALLBACK ) {
+                nextExpiration = 0; //force the check for faster callback
+                selector.wakeup();
+            }
         }
         
         public void wakeup() {
@@ -1510,9 +1514,14 @@ public class NioEndpoint {
                         } else if ( attachment.getComet() ) {
                             //check if thread is available
                             if ( isWorkerAvailable() ) {
-                                unreg(sk, attachment, sk.readyOps());
-                                if (!processSocket(channel, SocketStatus.OPEN_READ))
-                                    processSocket(channel, SocketStatus.DISCONNECT);
+                                //set interest ops to 0 so we don't get multiple
+                                //invokations
+                                reg(sk, attachment, 0);
+                                //read goes before write
+                                if (sk.isReadable())
+                                    if (!processSocket(channel, SocketStatus.OPEN_READ)) processSocket(channel, SocketStatus.DISCONNECT);
+                                else
+                                    if (!processSocket(channel, SocketStatus.OPEN_WRITE)) processSocket(channel, SocketStatus.DISCONNECT);
                             } else {
                                 result = false;
                             }