</source>
</p>
</subsection>
+ <subsection name="Comet Operations">
+ <p>
+ The previous section touched a little bit on the comet connection lifecycle.
+ It is important to remember that comet events are based around IO on an actual socket.<br/>
+ To clarify the Comet API, it has been created to resemble the java.nio channel/selector APIs.
+ In the case of Comet, Tomcat is the selector and using the CometEvent object, you can
+ register and unregister your Comet event for different event type notifications.
+ We call the parameter to the <code>CometEvent.register/unregister</code> method a comet operation.
+ This is similar to the interestOps method of a <code>SelectionKey</code> in the java.nio implementation.
+ <br/>The Comet implementation of register and unregister has been greatly simplified to not force the
+ comet developer to implement complex synchronizations around the register and unregister code.
+ </p>
+ <p>
+ It is important to realize, just like the java.nio API, that once an operation has been registered, it will
+ remain registered until it is unregistered. If you have registered OP_READ, then the comet connection will
+ fire READ events, every time data arrives until your unregister the OP_READ operation.<br/>
+ OP_CALLBACK/OP_WRITE work in the same way, essentially, register(OP_CALLBACK|OP_WRITE) will keep spawning
+ CALLBACK/WRITE events until you unregister the operation(s).
+ </p>
+ </subsection>
<subsection name="CometEvent">
</subsection>
- <subsection name="Example code">
+ <subsection name="Example code snippets">
+ <p>
+ Imagine you are writing a servlet that is updating a set of
+ stock tickers. You have a back ground thread that is receiving
+ updates for tickers as they happen, and you wish to push out these
+ to all the tickers that have the stocks in their list.<br/>
+ In the following example, you can accomplish maximum through put by
+ taking advantage of Tomcat's non blocking Comet features.<br/>
+ When the StockUpdater thread is running, it receives a set of stock updates.
+ It gets all the clients that are registered for the stocks that have changed.<br/>
+ For each client, there is an associated CometEvent object, the StockUpdater
+ checks if it can write without blocking, if so it writes directly, otherwise
+ it registers the client for a WRITE event, that will tell the
+ system that we can write the data now.
+ This is a perfect example of how we can take advantage of the combination of the write event
+ and the isWriteable method to determine, when we can write the data.<br/>
+ As with any kind of non blocking IO, you will need to synchronize your code,
+ this has not been done in the example below since the focus is on the
+ data delivery, not synchronization.
+ </p>
+ <source>
+public class ExampleCometStockStreamer implements CometProcessor {
+ ...
+ public class StockUpdater extends Thread {
+ public void run() {
+ ...
+ StockUpdates[] updates = fetchUpdates();
+ Client[] clients = getClients(updates);
+ for (int i=0; i<clients.length; i++ ) {
+ CometEvent event = client.getEvent();
+ StockUpdates[] clientList = getClientUpdates(client,updates);
+ client.setAndMergeNextUpdates(clientList);
+ if (event.isWriteable()) {
+ byte[] data = getUpdateChunk(client.getNextUpdates());
+ event.getHttpServletResponse().getOutputStream().write(data);
+ } else {
+ event.register(OP_WRITE);
+ }
+ }
+ ...
+ }
+ }
+ ...
+ public void event(CometEvent event) throws IOException, ServletException {
+ ...
+ if ( event.getEventType() == CometEvent.EventType.BEGIN ) {
+ //configure non blocking
+ event.configureBlocking(false);
+ } if ( event.getEventType() == CometEvent.EventType.READ ) {
+ //read client Id and stock list from client
+ //and add the event to our list
+ String clientId = readClientInfo(event,stocks);
+ clients.add(clientId, event, stocks);
+ } if ( event.getEventType() == CometEvent.EventType.WRITE ) {
+ //unregister from the write event
+ event.unregister(OP_WRITE);
+ //we can now write
+ byte[] data = getUpdateChunk(client.getNextUpdates());
+ event.getHttpServletResponse().getOutputStream().write(data);
+ } else if (...) {
+ ...
+ }
+ ...
+ }
+
+}
+ </source>
+
+ <p>
+ The above stock ticker example is extremely powerful,
+ but also creates a great deal of complexity trying to
+ synchronize between the registration of interested events
+ between the threads. <br/>
+ Another option is to simplify this by using blocking IO.
+ That implementation would look like this.<br/>
+ Notice that writing data to the client is only done
+ upon an event and never asynchronously. Assuming that the data written is smaller
+ than the socket network buffer, this write is almost always guaranteed
+ to be written without a delay. Should the write be blocked,
+ the system is still concurrent, as the writing happens on a thread
+ from Tomcat's thread pool. <br/>
+ In this case, the only synchronization that needs to be done is in between
+ <code>client.getNextUpdates()</code> and <code>client.setAndMergeNextUpdates(clientList)</code>.
+ </p>
+ <source>
+public class ExampleCometStockStreamer implements CometProcessor {
+ ...
+ public class StockUpdater extends Thread {
+ public void run() {
+ ...
+ StockUpdates[] updates = fetchUpdates();
+ Client[] clients = getClients(updates);
+ for (int i=0; i<clients.length; i++ ) {
+ StockUpdates[] clientList = getClientUpdates(client,updates);
+ client.setAndMergeNextUpdates(clientList);
+ client.getEvent().register(OP_WRITE);
+ }
+ ...
+ }
+ }
+ ...
+ public void event(CometEvent event) throws IOException, ServletException {
+ ...
+ if ( event.getEventType() == CometEvent.EventType.BEGIN ) {
+ //configure blocking
+ event.configureBlocking(true);
+ } if ( event.getEventType() == CometEvent.EventType.READ ) {
+ //read client Id and stock list from client
+ //and add the event to our list
+ String clientId = readClientInfo(event,stocks);
+ clients.add(clientId, event, stocks);
+ } if ( event.getEventType() == CometEvent.EventType.WRITE ) {
+ Client client = clients.get(event);
+ //unregister from the write event
+ event.unregister(OP_WRITE);
+ //we can now write
+ byte[] data = getUpdateChunk(client.getNextUpdates());
+ event.getHttpServletResponse().getOutputStream().write(data);
+ } else if (...) {
+ ...
+ }
+ ...
+ }
+
+}
+ </source>
+
+ <p>
+ Imagine that you wish to write a pseudo transactional system,
+ (please take the word transaction with a grain of salt),
+ and be able to do your own write scheduling.<br/>
+ In the next example we are going to demonstrate the ability to
+ use the isReadable() and isWriteable() methods in a poller sense,
+ and do all writes and reads asynchronously on a single thread.<br/>
+ Our goal here is to implement a comet servlet that reads a client request,
+ then writes a chunk of data when the request has been received.
+ The servlet will not write the next chunk until the first request has been read
+ and first chunk has been written to all clients.
+ The code below is far from optimal, but it demonstrates the ability
+ to not rely on any IO events, and schedule yourself when you wish to read
+ or write data. All operations are non blocking, so the AllWriterThread
+ will never block on any operation. <br/>In the example below,
+ we just do a busy spin cycle.
+ other
+ </p>
+ <source>
+public class ExampleAllReadThenWriteComet implements CometProcessor {
+ ...
+ public class AllWriterThread extends Thread {
+ byte[] dataChunks = ...;
+ public void run() {
+ ...
+ for (int i=0; i<dataChunks.length; i++ ) {
+ for (int j=0; j<clients.size(); j++) {
+ boolean done = false;
+ while (!done) {
+ //first read the first request
+ //but only if our previous write was completed
+ if ( clients[j].getEvent().isWriteable() && clients[j].getEvent().isReadable() ) {
+ done = readClientData(clients[j]); //returns true if all data has been received for a request
+ }
+ }
+ done = false;
+ while (!done) {
+ //write the response
+ if ( clients[j].getEvent().isWriteable() {
+ clients[j].getEvent().getHttpServletResponse().write(dataChunks[i]);
+ done = true;
+ }
+ }
+ }
+ }
+ ...
+ }
+ }
+ ...
+ public void event(CometEvent event) throws IOException, ServletException {
+ ...
+ if ( event.getEventType() == CometEvent.EventType.BEGIN ) {
+ //configure non blocking
+ event.configureBlocking(false);
+ //disable all events
+ event.unregister(event.getRegisteredOps());
+ //add the event to our client list
+ clients.add(event);
+ //start our writer if all clients have arrived
+ if (clients.size()==5) {
+ AllWriterThread thread = new AllWriterThread();
+ thread.start();
+ }
+ } if ( event.getEventType() == CometEvent.EventType.READ ) {
+ } if ( event.getEventType() == CometEvent.EventType.WRITE ) {
+ } else if (...) {
+ ...
+ }
+ ...
+ }
+}
+ </source>
+
+ <p>
+ Ok, so the previous example was kind of silly, but we demonstrated that
+ you are able to read/write on a single thread, in a non blocking fashion.
+ Now we are going to achieve the exact same functionality, but not using
+ any asynchronous data, instead we are going to use
+ blocking IO and tomcat's worker threads
+ </p>
+ <source>
+public class ExampleAllReadThenWriteComet implements CometProcessor {
+ ...
+ byte[] dataChunks = ...;
+ ...
+ public void event(CometEvent event) throws IOException, ServletException {
+ ...
+ if ( event.getEventType() == CometEvent.EventType.BEGIN ) {
+ //configure blocking
+ event.configureBlocking(true);
+ //disable all events
+ event.unregister(event.getRegisteredOps());
+ //add the event to our client list
+ clients.add(event);
+ //if all our clients have arrived, register them for read.
+ if (atomicClientCounter.addAndGet(1)==5) {
+ atomicClientReadCounter.set(5);
+ for (Client c : clients) {
+ c.getEvent().register(OP_READ);
+ }
+ }
+ } if ( event.getEventType() == CometEvent.EventType.READ ) {
+ event.unregister(OP_READ);
+ Client client = clients.get(event);
+ readClientData(client);
+ if (atomicClientReadCounter.addAndGet(-1) == 0 ) {
+ //all clients have read
+ atomicClientWriteCounter.set(5);
+ for (Client c : clients) {
+ c.getEvent().register(OP_WRITE);
+ }
+ }
+ } if ( event.getEventType() == CometEvent.EventType.WRITE ) {
+ event.unregister(OP_WRITE);
+ Client client = clients.get(event);
+ writeNextChunk(client);
+ if (atomicClientWriteCounter.addAndGet(-1) == 0 ) {
+ //all clients have been written, start reading the next request
+ atomicClientReadCounter.set(5);
+ for (Client c : clients) {
+ c.getEvent().register(OP_READ);
+ }
+ }
+ } else if (...) {
+ ...
+ }
+ ...
+ }
+}
+ </source>
+
+
+ </subsection>
+
+ <subsection name="Example code">
<p>
The following pseudo code servlet implments asynchronous chat functionality using the API
described above: