Add a simple asynchronous stock ticker.
authorfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Thu, 15 Oct 2009 01:10:34 +0000 (01:10 +0000)
committerfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Thu, 15 Oct 2009 01:10:34 +0000 (01:10 +0000)
Some more refactoring around common code

git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@825366 13f79535-47bb-0310-9956-ffa450edef68

15 files changed:
java/javax/servlet/AsyncListener.java
java/org/apache/catalina/connector/CoyoteAdapter.java
java/org/apache/catalina/connector/RequestFacade.java
java/org/apache/catalina/core/AsyncContextImpl.java
java/org/apache/catalina/core/AsyncListenerWrapper.java
java/org/apache/catalina/valves/ErrorReportValve.java
java/org/apache/coyote/http11/Http11AprProcessor.java
java/org/apache/coyote/http11/Http11NioProcessor.java
java/org/apache/tomcat/util/net/AbstractEndpoint.java
java/org/apache/tomcat/util/net/AprEndpoint.java
java/org/apache/tomcat/util/net/NioEndpoint.java
webapps/examples/WEB-INF/classes/async/AsyncStockServlet.java [new file with mode: 0644]
webapps/examples/WEB-INF/classes/async/Stockticker.java [new file with mode: 0644]
webapps/examples/WEB-INF/web.xml
webapps/examples/jsp/async/index.jsp

index c83b225..01dd54b 100644 (file)
@@ -27,4 +27,5 @@ import java.util.EventListener;
 public interface AsyncListener extends EventListener {
     void onComplete(AsyncEvent event) throws IOException;
     void onTimeout(AsyncEvent event) throws IOException;
+    void onError(AsyncEvent event) throws IOException;
 }
index 07139f9..7ac3b21 100644 (file)
@@ -272,6 +272,12 @@ public class CoyoteAdapter
                    //configure settings for timed out
                    asyncConImpl.setTimeoutState();
                 }
+                if (status==SocketStatus.ERROR) {
+                    AsyncContextImpl asyncConImpl = (AsyncContextImpl)request.getAsyncContext();
+                    //TODO SERVLET3 - async
+                    //configure settings for timed out
+                    asyncConImpl.setErrorState();
+                }
                 connector.getContainer().getPipeline().getFirst().invoke(request, response);
             }catch (RuntimeException x) {
                 success = false;
index 56053f4..c82f371 100644 (file)
@@ -969,7 +969,7 @@ public class RequestFacade implements HttpServletRequest {
 
 
     public boolean isAsyncSupported() {
-        return request.isAsyncStarted();
+        return request.isAsyncSupported();
     }
 
     
index 29b754a..fa985ac 100644 (file)
@@ -48,7 +48,7 @@ import org.apache.juli.logging.LogFactory;
 public class AsyncContextImpl implements AsyncContext {
     
     public static enum AsyncState {
-        NOT_STARTED, STARTED, DISPATCHING, DISPATCHED, COMPLETING, TIMING_OUT
+        NOT_STARTED, STARTED, DISPATCHING, DISPATCHED, COMPLETING, TIMING_OUT, ERROR_DISPATCHING
     }
     
     protected static Log log = LogFactory.getLog(AsyncContextImpl.class);
@@ -265,6 +265,18 @@ public class AsyncContextImpl implements AsyncContext {
                 ((HttpServletResponse)servletResponse).setStatus(500);
             }
             doInternalComplete(true);
+        } else if (this.state.compareAndSet(AsyncState.ERROR_DISPATCHING, AsyncState.DISPATCHED)) {
+            log.debug("ON ERROR!");
+            boolean listenerInvoked = false;
+            for (AsyncListenerWrapper listener : listeners) {
+                listener.fireOnError(event);
+                listenerInvoked = true;
+            }
+            if (!listenerInvoked) {
+                ((HttpServletResponse)servletResponse).setStatus(500);
+            }
+            doInternalComplete(true);
+        
         } else if (this.state.compareAndSet(AsyncState.DISPATCHING, AsyncState.DISPATCHED)) {
             if (this.dispatch!=null) {
                 try {
@@ -303,7 +315,6 @@ public class AsyncContextImpl implements AsyncContext {
             }
             try {
                 if (!error) getResponse().flushBuffer();
-
             }catch (Exception x) {
                 log.error("",x);
             }
@@ -335,6 +346,10 @@ public class AsyncContextImpl implements AsyncContext {
         state.set(AsyncState.TIMING_OUT);
     }
     
+    public void setErrorState() {
+        state.set(AsyncState.ERROR_DISPATCHING);
+    }
+    
     public void init(ServletRequest request, ServletResponse response) {
         this.servletRequest = request;
         this.servletResponse = response;
index 0ac12cd..ae8626d 100644 (file)
@@ -40,6 +40,12 @@ public class AsyncListenerWrapper {
         // TODO SERVLET 3 - async 
         listener.onTimeout(event);
     }
+    
+    public void fireOnError(AsyncEvent event) throws IOException {
+        // TODO SERVLET 3 - async 
+        listener.onError(event);
+    }
+
 
     public AsyncListener getListener() {
         return listener;
index c80d3fa..2f1d39f 100644 (file)
@@ -53,7 +53,7 @@ public class ErrorReportValve
 
     //------------------------------------------------------ Constructor
     public ErrorReportValve() {
-        super(false);
+        super(true);
     }
 
     // ----------------------------------------------------- Instance Variables
@@ -104,14 +104,18 @@ public class ErrorReportValve
 
         // Perform the request
         getNext().invoke(request, response);
-
-        Throwable throwable =
-            (Throwable) request.getAttribute(Globals.EXCEPTION_ATTR);
-
+        
         if (response.isCommitted()) {
             return;
         }
 
+        if (request.isAsyncStarted()) {
+            return;
+        }
+        
+        Throwable throwable =
+            (Throwable) request.getAttribute(Globals.EXCEPTION_ATTR);
+
         if (throwable != null) {
 
             // The response is an error
index 8a1ac93..30a24be 100644 (file)
@@ -54,7 +54,7 @@ import org.apache.tomcat.util.http.FastHttpDateFormat;
 import org.apache.tomcat.util.http.MimeHeaders;
 import org.apache.tomcat.util.net.AprEndpoint;
 import org.apache.tomcat.util.net.SocketStatus;
-import org.apache.tomcat.util.net.AprEndpoint.Handler.SocketState;
+import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
 import org.apache.tomcat.util.res.StringManager;
 
 
index a480edf..8ee167d 100644 (file)
@@ -51,7 +51,7 @@ import org.apache.tomcat.util.net.NioChannel;
 import org.apache.tomcat.util.net.NioEndpoint;
 import org.apache.tomcat.util.net.SSLSupport;
 import org.apache.tomcat.util.net.SocketStatus;
-import org.apache.tomcat.util.net.NioEndpoint.Handler.SocketState;
+import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
 import org.apache.tomcat.util.res.StringManager;
 import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment;
 
index 13f4f81..fb79504 100644 (file)
@@ -67,7 +67,15 @@ public abstract class AbstractEndpoint {
      * This one is a Tomcat extension to the Servlet spec.
      */
     public static final String SESSION_MGR = "javax.servlet.request.ssl_session_mgr";
-    
+   
+    /**
+     * Different types of socket states to react upon
+     */
+    public static interface Handler {
+        public enum SocketState {
+            OPEN, CLOSED, LONG
+        }
+    }    
     // ----------------------------------------------------------------- Fields
 
 
index 8a4ef2a..065a4c2 100644 (file)
@@ -1344,10 +1344,7 @@ public class AprEndpoint extends AbstractEndpoint {
      * stored in the ThreadWithAttributes extra folders, or alternately in
      * thread local fields.
      */
-    public interface Handler {
-        public enum SocketState {
-            OPEN, CLOSED, LONG
-        }
+    public interface Handler extends AbstractEndpoint.Handler {
         public SocketState process(long socket);
         public SocketState event(long socket, SocketStatus status);
     }
index 09f9dbd..356100f 100644 (file)
@@ -1568,10 +1568,7 @@ public class NioEndpoint extends AbstractEndpoint {
      * stored in the ThreadWithAttributes extra folders, or alternately in
      * thread local fields.
      */
-    public interface Handler {
-        public enum SocketState {
-            OPEN, CLOSED, LONG
-        }
+    public interface Handler extends AbstractEndpoint.Handler {
         public SocketState process(NioChannel socket);
         public SocketState event(NioChannel socket, SocketStatus status);
         public void releaseCaches();
diff --git a/webapps/examples/WEB-INF/classes/async/AsyncStockServlet.java b/webapps/examples/WEB-INF/classes/async/AsyncStockServlet.java
new file mode 100644 (file)
index 0000000..535d606
--- /dev/null
@@ -0,0 +1,141 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package async;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.AsyncEvent;
+import javax.servlet.AsyncListener;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import async.Stockticker.Stock;
+import async.Stockticker.TickListener;
+
+public class AsyncStockServlet extends HttpServlet implements TickListener, AsyncListener{
+
+    public static final String POLL = "POLL";
+    public static final String LONG_POLL = "LONG-POLL";
+    public static final String STREAM = "STREAM";
+    
+    static ArrayList<Stock> ticks = new ArrayList<Stock>();
+    static ConcurrentLinkedQueue<AsyncContext> clients = new ConcurrentLinkedQueue<AsyncContext>();
+    static AtomicInteger clientcount = new AtomicInteger(0);
+    static Stockticker ticker = new Stockticker();
+    
+    public AsyncStockServlet() {
+        System.out.println("AsyncStockServlet created");
+    }
+    
+    
+
+    @Override
+    protected void service(HttpServletRequest req, HttpServletResponse resp)
+            throws ServletException, IOException {
+        //get the client Id, it should be in the URL
+        String clientId = req.getParameter("clientId");
+        //get the method this client prefers
+        String method = req.getParameter("method");
+
+        //if the client doesn't have a method defined, then we will assume it is POLLING
+        if (method==null) method = POLL;
+        //if the client hasn't specified its own clientId, abort
+//        if (clientId==null) {
+//            resp.sendError(404,"Client not found.");
+//            return;
+//        }
+        
+        if (req.isAsyncStarted()) {
+            req.getAsyncContext().complete();
+        } else if (req.isAsyncSupported()) {
+            AsyncContext actx = req.startAsync();
+            req.addAsyncListener(this);
+            resp.setContentType("text/plain");
+            clients.add(actx);
+            if (this.clientcount.incrementAndGet()==1) {
+                ticker.addTickListener(this);
+            }
+        } else {
+            new Exception("Async Not Supported").printStackTrace();
+            resp.sendError(400,"Async is not supported.");
+        }
+    }
+
+
+
+    @Override
+    public void tick(Stock stock) {
+        ticks.add((Stock)stock.clone());
+        Iterator<AsyncContext> it = clients.iterator();
+        while (it.hasNext()) {
+            AsyncContext actx = it.next();
+            writeStock(actx, stock);
+        }
+    }
+    
+    public void writeStock(AsyncContext actx, Stock stock) {
+        HttpServletResponse response = (HttpServletResponse)actx.getResponse();
+        try {
+            PrintWriter writer = response.getWriter();
+            writer.write("STOCK#");//make client parsing easier
+            writer.write(stock.getSymbol());
+            writer.write("#");
+            writer.write(stock.getValueAsString());
+            writer.write("#");
+            writer.write(stock.getLastChangeAsString());
+            writer.write("#");
+            writer.write(String.valueOf(stock.getCnt()));
+            writer.write("\n");
+            writer.flush();
+            response.flushBuffer();
+        }catch (IOException x) {
+            try {actx.complete();}catch (Exception ignore){}
+        }
+    }
+
+    @Override
+    public void onComplete(AsyncEvent event) throws IOException {
+        clients.remove(event.getRequest().getAsyncContext());
+        if (clientcount.decrementAndGet()==0) {
+            ticker.removeTickListener(this);
+        }
+    }
+
+    @Override
+    public void onError(AsyncEvent event) throws IOException {
+        event.getRequest().getAsyncContext().complete();
+    }
+
+    @Override
+    public void onTimeout(AsyncEvent event) throws IOException {
+        event.getRequest().getAsyncContext().complete();
+    }
+    
+    
+    
+    
+    
+
+}
diff --git a/webapps/examples/WEB-INF/classes/async/Stockticker.java b/webapps/examples/WEB-INF/classes/async/Stockticker.java
new file mode 100644 (file)
index 0000000..a0e5563
--- /dev/null
@@ -0,0 +1,186 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package async;
+
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class Stockticker implements Runnable {
+        public volatile boolean run = true;
+        protected AtomicInteger counter = new AtomicInteger(0);
+        ArrayList<TickListener> listeners = new ArrayList<TickListener>();
+        protected volatile Thread ticker = null;
+        protected volatile int ticknr = 0;
+        
+        public synchronized void start() {
+            run = true;
+            ticker = new Thread(this);
+            ticker.setName("Ticker Thread");
+            ticker.start();
+        }
+        
+        public synchronized void stop() {
+            run = false;
+            try {
+                ticker.join();
+            }catch (InterruptedException x) {
+                ticker.interrupted();
+            }
+            
+            ticker = null;
+        }
+        
+        public void addTickListener(TickListener listener) {
+            if (listeners.add(listener)) {
+                if (counter.incrementAndGet()==1) start();
+            }
+                
+        }
+
+        public void removeTickListener(TickListener listener) {
+            if (listeners.remove(listener)) {
+                if (counter.decrementAndGet()==0) stop();
+            }
+        }
+
+        public void run() {
+            try {
+
+                Stock[] stocks = new Stock[] { new Stock("GOOG", 435.43),
+                        new Stock("YHOO", 27.88), new Stock("ASF", 1015.55), };
+                Random r = new Random(System.currentTimeMillis());
+                while (run) {
+                    for (int j = 0; j < 1; j++) {
+                        int i = r.nextInt() % 3;
+                        if (i < 0)
+                            i = i * (-1);
+                        Stock stock = stocks[i];
+                        double change = r.nextDouble();
+                        boolean plus = r.nextBoolean();
+                        if (plus) {
+                            stock.setValue(stock.getValue() + change);
+                        } else {
+                            stock.setValue(stock.getValue() - change);
+                        }
+                        stock.setCnt(++ticknr);
+                        for (TickListener l : listeners) {
+                            l.tick(stock);
+                        }
+                        
+//                        System.out.println("Stock: " + stock.getSymbol()
+//                                + " Price: " + stock.getValueAsString()
+//                                + " Change: " + stock.getLastChangeAsString());
+                    }
+                    Thread.sleep(850);
+                }
+            } catch (InterruptedException ix) {
+
+            } catch (Exception x) {
+                x.printStackTrace();
+            }
+        }
+    
+    
+    public static interface TickListener {
+        public void tick(Stock stock);
+    }
+    
+    public static class Stock {
+        protected static DecimalFormat df = new DecimalFormat("0.00");
+        protected String symbol = "";
+        protected double value = 0.0d;
+        protected double lastchange = 0.0d;
+        protected int cnt = 0;
+
+        public Stock(String symbol, double initvalue) {
+            this.symbol = symbol;
+            this.value = initvalue;
+        }
+
+        public void setCnt(int c) {
+            this.cnt = c;
+        }
+
+        public int getCnt() {
+            return cnt;
+        }
+
+        public String getSymbol() {
+            return symbol;
+        }
+
+        public double getValue() {
+            return value;
+        }
+
+        public void setValue(double value) {
+            double old = this.value;
+            this.value = value;
+            this.lastchange = value - old;
+        }
+
+        public String getValueAsString() {
+            return df.format(value);
+        }
+
+        public double getLastChange() {
+            return this.lastchange;
+        }
+
+        public void setLastChange(double lastchange) {
+            this.lastchange = lastchange;
+        }
+
+        public String getLastChangeAsString() {
+            return df.format(lastchange);
+        }
+
+        public int hashCode() {
+            return symbol.hashCode();
+        }
+
+        public boolean equals(Object other) {
+            if (other instanceof Stock) {
+                return this.symbol.equals(((Stock) other).symbol);
+            } else {
+                return false;
+            }
+        }
+
+        public String toString() {
+            StringBuffer buf = new StringBuffer("STOCK#");
+            buf.append(getSymbol());
+            buf.append("#");
+            buf.append(getValueAsString());
+            buf.append("#");
+            buf.append(getLastChangeAsString());
+            buf.append("#");
+            buf.append(String.valueOf(getCnt()));
+            return buf.toString();
+
+        }
+
+        public Object clone() {
+            Stock s = new Stock(this.getSymbol(), this.getValue());
+            s.setLastChange(this.getLastChange());
+            s.setCnt(this.cnt);
+            return s;
+        }
+    }
+}
index d3a6849..7bc9a5e 100644 (file)
       <servlet-name>async3</servlet-name>
       <url-pattern>/async/async3</url-pattern>
     </servlet-mapping>
+    <servlet>
+      <servlet-name>stock</servlet-name>
+      <servlet-class>async.AsyncStockServlet</servlet-class>
+    </servlet>
+    <servlet-mapping>
+      <servlet-name>stock</servlet-name>
+      <url-pattern>/async/stockticker</url-pattern>
+    </servlet-mapping>
+
 </web-app>
index bd3efae..31174c6 100644 (file)
@@ -62,4 +62,8 @@ Use cases:
  - servlet1 does a dispatch to servlet2 (asyncsupported=true)
  - servlet2 does a dispatch to servlet3 (asyncsupported=true)
  - servlet3 does a dispatch to servlet4 (asyncsupported=false) 
+7. Stock ticker
+   <a href="<%=response.encodeURL("/examples/async/stock")%>"> StockTicker </a>
 </pre>
\ No newline at end of file