From: fhanik Date: Wed, 13 Jun 2007 22:51:56 +0000 (+0000) Subject: added simple example code snippets to comet usage X-Git-Url: https://git.internetallee.de/?a=commitdiff_plain;h=7b278fba0dcd786ed5323a5afd09f5bd4c761a97;p=tomcat7.0 added simple example code snippets to comet usage git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@547055 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/webapps/docs/aio.xml b/webapps/docs/aio.xml index b6143d590..c802a4381 100644 --- a/webapps/docs/aio.xml +++ b/webapps/docs/aio.xml @@ -151,6 +151,26 @@

+ +

+ The previous section touched a little bit on the comet connection lifecycle. + It is important to remember that comet events are based around IO on an actual socket.
+ To clarify the Comet API, it has been created to resemble the java.nio channel/selector APIs. + In the case of Comet, Tomcat is the selector and using the CometEvent object, you can + register and unregister your Comet event for different event type notifications. + We call the parameter to the CometEvent.register/unregister method a comet operation. + This is similar to the interestOps method of a SelectionKey in the java.nio implementation. +
The Comet implementation of register and unregister has been greatly simplified to not force the + comet developer to implement complex synchronizations around the register and unregister code. +

+

+ It is important to realize, just like the java.nio API, that once an operation has been registered, it will + remain registered until it is unregistered. If you have registered OP_READ, then the comet connection will + fire READ events, every time data arrives until your unregister the OP_READ operation.
+ OP_CALLBACK/OP_WRITE work in the same way, essentially, register(OP_CALLBACK|OP_WRITE) will keep spawning + CALLBACK/WRITE events until you unregister the operation(s). +

+
@@ -261,8 +281,269 @@ - + +

+ Imagine you are writing a servlet that is updating a set of + stock tickers. You have a back ground thread that is receiving + updates for tickers as they happen, and you wish to push out these + to all the tickers that have the stocks in their list.
+ In the following example, you can accomplish maximum through put by + taking advantage of Tomcat's non blocking Comet features.
+ When the StockUpdater thread is running, it receives a set of stock updates. + It gets all the clients that are registered for the stocks that have changed.
+ For each client, there is an associated CometEvent object, the StockUpdater + checks if it can write without blocking, if so it writes directly, otherwise + it registers the client for a WRITE event, that will tell the + system that we can write the data now. + This is a perfect example of how we can take advantage of the combination of the write event + and the isWriteable method to determine, when we can write the data.
+ As with any kind of non blocking IO, you will need to synchronize your code, + this has not been done in the example below since the focus is on the + data delivery, not synchronization. +

+ +public class ExampleCometStockStreamer implements CometProcessor { + ... + public class StockUpdater extends Thread { + public void run() { + ... + StockUpdates[] updates = fetchUpdates(); + Client[] clients = getClients(updates); + for (int i=0; i<clients.length; i++ ) { + CometEvent event = client.getEvent(); + StockUpdates[] clientList = getClientUpdates(client,updates); + client.setAndMergeNextUpdates(clientList); + if (event.isWriteable()) { + byte[] data = getUpdateChunk(client.getNextUpdates()); + event.getHttpServletResponse().getOutputStream().write(data); + } else { + event.register(OP_WRITE); + } + } + ... + } + } + ... + public void event(CometEvent event) throws IOException, ServletException { + ... + if ( event.getEventType() == CometEvent.EventType.BEGIN ) { + //configure non blocking + event.configureBlocking(false); + } if ( event.getEventType() == CometEvent.EventType.READ ) { + //read client Id and stock list from client + //and add the event to our list + String clientId = readClientInfo(event,stocks); + clients.add(clientId, event, stocks); + } if ( event.getEventType() == CometEvent.EventType.WRITE ) { + //unregister from the write event + event.unregister(OP_WRITE); + //we can now write + byte[] data = getUpdateChunk(client.getNextUpdates()); + event.getHttpServletResponse().getOutputStream().write(data); + } else if (...) { + ... + } + ... + } + +} + + +

+ The above stock ticker example is extremely powerful, + but also creates a great deal of complexity trying to + synchronize between the registration of interested events + between the threads.
+ Another option is to simplify this by using blocking IO. + That implementation would look like this.
+ Notice that writing data to the client is only done + upon an event and never asynchronously. Assuming that the data written is smaller + than the socket network buffer, this write is almost always guaranteed + to be written without a delay. Should the write be blocked, + the system is still concurrent, as the writing happens on a thread + from Tomcat's thread pool.
+ In this case, the only synchronization that needs to be done is in between + client.getNextUpdates() and client.setAndMergeNextUpdates(clientList). +

+ +public class ExampleCometStockStreamer implements CometProcessor { + ... + public class StockUpdater extends Thread { + public void run() { + ... + StockUpdates[] updates = fetchUpdates(); + Client[] clients = getClients(updates); + for (int i=0; i<clients.length; i++ ) { + StockUpdates[] clientList = getClientUpdates(client,updates); + client.setAndMergeNextUpdates(clientList); + client.getEvent().register(OP_WRITE); + } + ... + } + } + ... + public void event(CometEvent event) throws IOException, ServletException { + ... + if ( event.getEventType() == CometEvent.EventType.BEGIN ) { + //configure blocking + event.configureBlocking(true); + } if ( event.getEventType() == CometEvent.EventType.READ ) { + //read client Id and stock list from client + //and add the event to our list + String clientId = readClientInfo(event,stocks); + clients.add(clientId, event, stocks); + } if ( event.getEventType() == CometEvent.EventType.WRITE ) { + Client client = clients.get(event); + //unregister from the write event + event.unregister(OP_WRITE); + //we can now write + byte[] data = getUpdateChunk(client.getNextUpdates()); + event.getHttpServletResponse().getOutputStream().write(data); + } else if (...) { + ... + } + ... + } + +} + + +

+ Imagine that you wish to write a pseudo transactional system, + (please take the word transaction with a grain of salt), + and be able to do your own write scheduling.
+ In the next example we are going to demonstrate the ability to + use the isReadable() and isWriteable() methods in a poller sense, + and do all writes and reads asynchronously on a single thread.
+ Our goal here is to implement a comet servlet that reads a client request, + then writes a chunk of data when the request has been received. + The servlet will not write the next chunk until the first request has been read + and first chunk has been written to all clients. + The code below is far from optimal, but it demonstrates the ability + to not rely on any IO events, and schedule yourself when you wish to read + or write data. All operations are non blocking, so the AllWriterThread + will never block on any operation.
In the example below, + we just do a busy spin cycle. + other +

+ +public class ExampleAllReadThenWriteComet implements CometProcessor { + ... + public class AllWriterThread extends Thread { + byte[] dataChunks = ...; + public void run() { + ... + for (int i=0; i<dataChunks.length; i++ ) { + for (int j=0; j<clients.size(); j++) { + boolean done = false; + while (!done) { + //first read the first request + //but only if our previous write was completed + if ( clients[j].getEvent().isWriteable() && clients[j].getEvent().isReadable() ) { + done = readClientData(clients[j]); //returns true if all data has been received for a request + } + } + done = false; + while (!done) { + //write the response + if ( clients[j].getEvent().isWriteable() { + clients[j].getEvent().getHttpServletResponse().write(dataChunks[i]); + done = true; + } + } + } + } + ... + } + } + ... + public void event(CometEvent event) throws IOException, ServletException { + ... + if ( event.getEventType() == CometEvent.EventType.BEGIN ) { + //configure non blocking + event.configureBlocking(false); + //disable all events + event.unregister(event.getRegisteredOps()); + //add the event to our client list + clients.add(event); + //start our writer if all clients have arrived + if (clients.size()==5) { + AllWriterThread thread = new AllWriterThread(); + thread.start(); + } + } if ( event.getEventType() == CometEvent.EventType.READ ) { + } if ( event.getEventType() == CometEvent.EventType.WRITE ) { + } else if (...) { + ... + } + ... + } +} + + +

+ Ok, so the previous example was kind of silly, but we demonstrated that + you are able to read/write on a single thread, in a non blocking fashion. + Now we are going to achieve the exact same functionality, but not using + any asynchronous data, instead we are going to use + blocking IO and tomcat's worker threads +

+ +public class ExampleAllReadThenWriteComet implements CometProcessor { + ... + byte[] dataChunks = ...; + ... + public void event(CometEvent event) throws IOException, ServletException { + ... + if ( event.getEventType() == CometEvent.EventType.BEGIN ) { + //configure blocking + event.configureBlocking(true); + //disable all events + event.unregister(event.getRegisteredOps()); + //add the event to our client list + clients.add(event); + //if all our clients have arrived, register them for read. + if (atomicClientCounter.addAndGet(1)==5) { + atomicClientReadCounter.set(5); + for (Client c : clients) { + c.getEvent().register(OP_READ); + } + } + } if ( event.getEventType() == CometEvent.EventType.READ ) { + event.unregister(OP_READ); + Client client = clients.get(event); + readClientData(client); + if (atomicClientReadCounter.addAndGet(-1) == 0 ) { + //all clients have read + atomicClientWriteCounter.set(5); + for (Client c : clients) { + c.getEvent().register(OP_WRITE); + } + } + } if ( event.getEventType() == CometEvent.EventType.WRITE ) { + event.unregister(OP_WRITE); + Client client = clients.get(event); + writeNextChunk(client); + if (atomicClientWriteCounter.addAndGet(-1) == 0 ) { + //all clients have been written, start reading the next request + atomicClientReadCounter.set(5); + for (Client c : clients) { + c.getEvent().register(OP_READ); + } + } + } else if (...) { + ... + } + ... + } +} + + + +
+ +

The following pseudo code servlet implments asynchronous chat functionality using the API described above: