From 71d67acf528379e8429319bd4788dc13f4a5b75b Mon Sep 17 00:00:00 2001 From: fhanik Date: Thu, 15 Oct 2009 01:10:34 +0000 Subject: [PATCH] Add a simple asynchronous stock ticker. Some more refactoring around common code git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@825366 13f79535-47bb-0310-9956-ffa450edef68 --- java/javax/servlet/AsyncListener.java | 1 + .../apache/catalina/connector/CoyoteAdapter.java | 6 + .../apache/catalina/connector/RequestFacade.java | 2 +- .../org/apache/catalina/core/AsyncContextImpl.java | 19 ++- .../apache/catalina/core/AsyncListenerWrapper.java | 6 + .../apache/catalina/valves/ErrorReportValve.java | 14 +- .../apache/coyote/http11/Http11AprProcessor.java | 2 +- .../apache/coyote/http11/Http11NioProcessor.java | 2 +- .../apache/tomcat/util/net/AbstractEndpoint.java | 10 +- java/org/apache/tomcat/util/net/AprEndpoint.java | 5 +- java/org/apache/tomcat/util/net/NioEndpoint.java | 5 +- .../WEB-INF/classes/async/AsyncStockServlet.java | 141 ++++++++++++++++ .../WEB-INF/classes/async/Stockticker.java | 186 +++++++++++++++++++++ webapps/examples/WEB-INF/web.xml | 9 + webapps/examples/jsp/async/index.jsp | 4 + 15 files changed, 393 insertions(+), 19 deletions(-) create mode 100644 webapps/examples/WEB-INF/classes/async/AsyncStockServlet.java create mode 100644 webapps/examples/WEB-INF/classes/async/Stockticker.java diff --git a/java/javax/servlet/AsyncListener.java b/java/javax/servlet/AsyncListener.java index c83b225a3..01dd54b8a 100644 --- a/java/javax/servlet/AsyncListener.java +++ b/java/javax/servlet/AsyncListener.java @@ -27,4 +27,5 @@ import java.util.EventListener; public interface AsyncListener extends EventListener { void onComplete(AsyncEvent event) throws IOException; void onTimeout(AsyncEvent event) throws IOException; + void onError(AsyncEvent event) throws IOException; } diff --git a/java/org/apache/catalina/connector/CoyoteAdapter.java b/java/org/apache/catalina/connector/CoyoteAdapter.java index 07139f9df..7ac3b21d3 100644 --- a/java/org/apache/catalina/connector/CoyoteAdapter.java +++ b/java/org/apache/catalina/connector/CoyoteAdapter.java @@ -272,6 +272,12 @@ public class CoyoteAdapter //configure settings for timed out asyncConImpl.setTimeoutState(); } + if (status==SocketStatus.ERROR) { + AsyncContextImpl asyncConImpl = (AsyncContextImpl)request.getAsyncContext(); + //TODO SERVLET3 - async + //configure settings for timed out + asyncConImpl.setErrorState(); + } connector.getContainer().getPipeline().getFirst().invoke(request, response); }catch (RuntimeException x) { success = false; diff --git a/java/org/apache/catalina/connector/RequestFacade.java b/java/org/apache/catalina/connector/RequestFacade.java index 56053f49d..c82f37127 100644 --- a/java/org/apache/catalina/connector/RequestFacade.java +++ b/java/org/apache/catalina/connector/RequestFacade.java @@ -969,7 +969,7 @@ public class RequestFacade implements HttpServletRequest { public boolean isAsyncSupported() { - return request.isAsyncStarted(); + return request.isAsyncSupported(); } diff --git a/java/org/apache/catalina/core/AsyncContextImpl.java b/java/org/apache/catalina/core/AsyncContextImpl.java index 29b754abd..fa985ac8b 100644 --- a/java/org/apache/catalina/core/AsyncContextImpl.java +++ b/java/org/apache/catalina/core/AsyncContextImpl.java @@ -48,7 +48,7 @@ import org.apache.juli.logging.LogFactory; public class AsyncContextImpl implements AsyncContext { public static enum AsyncState { - NOT_STARTED, STARTED, DISPATCHING, DISPATCHED, COMPLETING, TIMING_OUT + NOT_STARTED, STARTED, DISPATCHING, DISPATCHED, COMPLETING, TIMING_OUT, ERROR_DISPATCHING } protected static Log log = LogFactory.getLog(AsyncContextImpl.class); @@ -265,6 +265,18 @@ public class AsyncContextImpl implements AsyncContext { ((HttpServletResponse)servletResponse).setStatus(500); } doInternalComplete(true); + } else if (this.state.compareAndSet(AsyncState.ERROR_DISPATCHING, AsyncState.DISPATCHED)) { + log.debug("ON ERROR!"); + boolean listenerInvoked = false; + for (AsyncListenerWrapper listener : listeners) { + listener.fireOnError(event); + listenerInvoked = true; + } + if (!listenerInvoked) { + ((HttpServletResponse)servletResponse).setStatus(500); + } + doInternalComplete(true); + } else if (this.state.compareAndSet(AsyncState.DISPATCHING, AsyncState.DISPATCHED)) { if (this.dispatch!=null) { try { @@ -303,7 +315,6 @@ public class AsyncContextImpl implements AsyncContext { } try { if (!error) getResponse().flushBuffer(); - }catch (Exception x) { log.error("",x); } @@ -335,6 +346,10 @@ public class AsyncContextImpl implements AsyncContext { state.set(AsyncState.TIMING_OUT); } + public void setErrorState() { + state.set(AsyncState.ERROR_DISPATCHING); + } + public void init(ServletRequest request, ServletResponse response) { this.servletRequest = request; this.servletResponse = response; diff --git a/java/org/apache/catalina/core/AsyncListenerWrapper.java b/java/org/apache/catalina/core/AsyncListenerWrapper.java index 0ac12cdba..ae8626df8 100644 --- a/java/org/apache/catalina/core/AsyncListenerWrapper.java +++ b/java/org/apache/catalina/core/AsyncListenerWrapper.java @@ -40,6 +40,12 @@ public class AsyncListenerWrapper { // TODO SERVLET 3 - async listener.onTimeout(event); } + + public void fireOnError(AsyncEvent event) throws IOException { + // TODO SERVLET 3 - async + listener.onError(event); + } + public AsyncListener getListener() { return listener; diff --git a/java/org/apache/catalina/valves/ErrorReportValve.java b/java/org/apache/catalina/valves/ErrorReportValve.java index c80d3fa97..2f1d39fb2 100644 --- a/java/org/apache/catalina/valves/ErrorReportValve.java +++ b/java/org/apache/catalina/valves/ErrorReportValve.java @@ -53,7 +53,7 @@ public class ErrorReportValve //------------------------------------------------------ Constructor public ErrorReportValve() { - super(false); + super(true); } // ----------------------------------------------------- Instance Variables @@ -104,14 +104,18 @@ public class ErrorReportValve // Perform the request getNext().invoke(request, response); - - Throwable throwable = - (Throwable) request.getAttribute(Globals.EXCEPTION_ATTR); - + if (response.isCommitted()) { return; } + if (request.isAsyncStarted()) { + return; + } + + Throwable throwable = + (Throwable) request.getAttribute(Globals.EXCEPTION_ATTR); + if (throwable != null) { // The response is an error diff --git a/java/org/apache/coyote/http11/Http11AprProcessor.java b/java/org/apache/coyote/http11/Http11AprProcessor.java index 8a1ac9380..30a24be1b 100644 --- a/java/org/apache/coyote/http11/Http11AprProcessor.java +++ b/java/org/apache/coyote/http11/Http11AprProcessor.java @@ -54,7 +54,7 @@ import org.apache.tomcat.util.http.FastHttpDateFormat; import org.apache.tomcat.util.http.MimeHeaders; import org.apache.tomcat.util.net.AprEndpoint; import org.apache.tomcat.util.net.SocketStatus; -import org.apache.tomcat.util.net.AprEndpoint.Handler.SocketState; +import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; import org.apache.tomcat.util.res.StringManager; diff --git a/java/org/apache/coyote/http11/Http11NioProcessor.java b/java/org/apache/coyote/http11/Http11NioProcessor.java index a480edfe5..8ee167d38 100644 --- a/java/org/apache/coyote/http11/Http11NioProcessor.java +++ b/java/org/apache/coyote/http11/Http11NioProcessor.java @@ -51,7 +51,7 @@ import org.apache.tomcat.util.net.NioChannel; import org.apache.tomcat.util.net.NioEndpoint; import org.apache.tomcat.util.net.SSLSupport; import org.apache.tomcat.util.net.SocketStatus; -import org.apache.tomcat.util.net.NioEndpoint.Handler.SocketState; +import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; import org.apache.tomcat.util.res.StringManager; import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment; diff --git a/java/org/apache/tomcat/util/net/AbstractEndpoint.java b/java/org/apache/tomcat/util/net/AbstractEndpoint.java index 13f4f81d7..fb79504f7 100644 --- a/java/org/apache/tomcat/util/net/AbstractEndpoint.java +++ b/java/org/apache/tomcat/util/net/AbstractEndpoint.java @@ -67,7 +67,15 @@ public abstract class AbstractEndpoint { * This one is a Tomcat extension to the Servlet spec. */ public static final String SESSION_MGR = "javax.servlet.request.ssl_session_mgr"; - + + /** + * Different types of socket states to react upon + */ + public static interface Handler { + public enum SocketState { + OPEN, CLOSED, LONG + } + } // ----------------------------------------------------------------- Fields diff --git a/java/org/apache/tomcat/util/net/AprEndpoint.java b/java/org/apache/tomcat/util/net/AprEndpoint.java index 8a4ef2a62..065a4c29e 100644 --- a/java/org/apache/tomcat/util/net/AprEndpoint.java +++ b/java/org/apache/tomcat/util/net/AprEndpoint.java @@ -1344,10 +1344,7 @@ public class AprEndpoint extends AbstractEndpoint { * stored in the ThreadWithAttributes extra folders, or alternately in * thread local fields. */ - public interface Handler { - public enum SocketState { - OPEN, CLOSED, LONG - } + public interface Handler extends AbstractEndpoint.Handler { public SocketState process(long socket); public SocketState event(long socket, SocketStatus status); } diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java index 09f9dbda6..356100f12 100644 --- a/java/org/apache/tomcat/util/net/NioEndpoint.java +++ b/java/org/apache/tomcat/util/net/NioEndpoint.java @@ -1568,10 +1568,7 @@ public class NioEndpoint extends AbstractEndpoint { * stored in the ThreadWithAttributes extra folders, or alternately in * thread local fields. */ - public interface Handler { - public enum SocketState { - OPEN, CLOSED, LONG - } + public interface Handler extends AbstractEndpoint.Handler { public SocketState process(NioChannel socket); public SocketState event(NioChannel socket, SocketStatus status); public void releaseCaches(); diff --git a/webapps/examples/WEB-INF/classes/async/AsyncStockServlet.java b/webapps/examples/WEB-INF/classes/async/AsyncStockServlet.java new file mode 100644 index 000000000..535d606ed --- /dev/null +++ b/webapps/examples/WEB-INF/classes/async/AsyncStockServlet.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package async; + +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.servlet.AsyncContext; +import javax.servlet.AsyncEvent; +import javax.servlet.AsyncListener; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import async.Stockticker.Stock; +import async.Stockticker.TickListener; + +public class AsyncStockServlet extends HttpServlet implements TickListener, AsyncListener{ + + public static final String POLL = "POLL"; + public static final String LONG_POLL = "LONG-POLL"; + public static final String STREAM = "STREAM"; + + static ArrayList ticks = new ArrayList(); + static ConcurrentLinkedQueue clients = new ConcurrentLinkedQueue(); + static AtomicInteger clientcount = new AtomicInteger(0); + static Stockticker ticker = new Stockticker(); + + public AsyncStockServlet() { + System.out.println("AsyncStockServlet created"); + } + + + + @Override + protected void service(HttpServletRequest req, HttpServletResponse resp) + throws ServletException, IOException { + //get the client Id, it should be in the URL + String clientId = req.getParameter("clientId"); + //get the method this client prefers + String method = req.getParameter("method"); + + //if the client doesn't have a method defined, then we will assume it is POLLING + if (method==null) method = POLL; + //if the client hasn't specified its own clientId, abort +// if (clientId==null) { +// resp.sendError(404,"Client not found."); +// return; +// } + + if (req.isAsyncStarted()) { + req.getAsyncContext().complete(); + } else if (req.isAsyncSupported()) { + AsyncContext actx = req.startAsync(); + req.addAsyncListener(this); + resp.setContentType("text/plain"); + clients.add(actx); + if (this.clientcount.incrementAndGet()==1) { + ticker.addTickListener(this); + } + } else { + new Exception("Async Not Supported").printStackTrace(); + resp.sendError(400,"Async is not supported."); + } + } + + + + @Override + public void tick(Stock stock) { + ticks.add((Stock)stock.clone()); + Iterator it = clients.iterator(); + while (it.hasNext()) { + AsyncContext actx = it.next(); + writeStock(actx, stock); + } + } + + public void writeStock(AsyncContext actx, Stock stock) { + HttpServletResponse response = (HttpServletResponse)actx.getResponse(); + try { + PrintWriter writer = response.getWriter(); + writer.write("STOCK#");//make client parsing easier + writer.write(stock.getSymbol()); + writer.write("#"); + writer.write(stock.getValueAsString()); + writer.write("#"); + writer.write(stock.getLastChangeAsString()); + writer.write("#"); + writer.write(String.valueOf(stock.getCnt())); + writer.write("\n"); + writer.flush(); + response.flushBuffer(); + }catch (IOException x) { + try {actx.complete();}catch (Exception ignore){} + } + } + + @Override + public void onComplete(AsyncEvent event) throws IOException { + clients.remove(event.getRequest().getAsyncContext()); + if (clientcount.decrementAndGet()==0) { + ticker.removeTickListener(this); + } + } + + @Override + public void onError(AsyncEvent event) throws IOException { + event.getRequest().getAsyncContext().complete(); + } + + @Override + public void onTimeout(AsyncEvent event) throws IOException { + event.getRequest().getAsyncContext().complete(); + } + + + + + + +} diff --git a/webapps/examples/WEB-INF/classes/async/Stockticker.java b/webapps/examples/WEB-INF/classes/async/Stockticker.java new file mode 100644 index 000000000..a0e5563b5 --- /dev/null +++ b/webapps/examples/WEB-INF/classes/async/Stockticker.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package async; + +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +public class Stockticker implements Runnable { + public volatile boolean run = true; + protected AtomicInteger counter = new AtomicInteger(0); + ArrayList listeners = new ArrayList(); + protected volatile Thread ticker = null; + protected volatile int ticknr = 0; + + public synchronized void start() { + run = true; + ticker = new Thread(this); + ticker.setName("Ticker Thread"); + ticker.start(); + } + + public synchronized void stop() { + run = false; + try { + ticker.join(); + }catch (InterruptedException x) { + ticker.interrupted(); + } + + ticker = null; + } + + public void addTickListener(TickListener listener) { + if (listeners.add(listener)) { + if (counter.incrementAndGet()==1) start(); + } + + } + + public void removeTickListener(TickListener listener) { + if (listeners.remove(listener)) { + if (counter.decrementAndGet()==0) stop(); + } + } + + public void run() { + try { + + Stock[] stocks = new Stock[] { new Stock("GOOG", 435.43), + new Stock("YHOO", 27.88), new Stock("ASF", 1015.55), }; + Random r = new Random(System.currentTimeMillis()); + while (run) { + for (int j = 0; j < 1; j++) { + int i = r.nextInt() % 3; + if (i < 0) + i = i * (-1); + Stock stock = stocks[i]; + double change = r.nextDouble(); + boolean plus = r.nextBoolean(); + if (plus) { + stock.setValue(stock.getValue() + change); + } else { + stock.setValue(stock.getValue() - change); + } + stock.setCnt(++ticknr); + for (TickListener l : listeners) { + l.tick(stock); + } + +// System.out.println("Stock: " + stock.getSymbol() +// + " Price: " + stock.getValueAsString() +// + " Change: " + stock.getLastChangeAsString()); + } + Thread.sleep(850); + } + } catch (InterruptedException ix) { + + } catch (Exception x) { + x.printStackTrace(); + } + } + + + public static interface TickListener { + public void tick(Stock stock); + } + + public static class Stock { + protected static DecimalFormat df = new DecimalFormat("0.00"); + protected String symbol = ""; + protected double value = 0.0d; + protected double lastchange = 0.0d; + protected int cnt = 0; + + public Stock(String symbol, double initvalue) { + this.symbol = symbol; + this.value = initvalue; + } + + public void setCnt(int c) { + this.cnt = c; + } + + public int getCnt() { + return cnt; + } + + public String getSymbol() { + return symbol; + } + + public double getValue() { + return value; + } + + public void setValue(double value) { + double old = this.value; + this.value = value; + this.lastchange = value - old; + } + + public String getValueAsString() { + return df.format(value); + } + + public double getLastChange() { + return this.lastchange; + } + + public void setLastChange(double lastchange) { + this.lastchange = lastchange; + } + + public String getLastChangeAsString() { + return df.format(lastchange); + } + + public int hashCode() { + return symbol.hashCode(); + } + + public boolean equals(Object other) { + if (other instanceof Stock) { + return this.symbol.equals(((Stock) other).symbol); + } else { + return false; + } + } + + public String toString() { + StringBuffer buf = new StringBuffer("STOCK#"); + buf.append(getSymbol()); + buf.append("#"); + buf.append(getValueAsString()); + buf.append("#"); + buf.append(getLastChangeAsString()); + buf.append("#"); + buf.append(String.valueOf(getCnt())); + return buf.toString(); + + } + + public Object clone() { + Stock s = new Stock(this.getSymbol(), this.getValue()); + s.setLastChange(this.getLastChange()); + s.setCnt(this.cnt); + return s; + } + } +} diff --git a/webapps/examples/WEB-INF/web.xml b/webapps/examples/WEB-INF/web.xml index d3a684922..7bc9a5e3b 100644 --- a/webapps/examples/WEB-INF/web.xml +++ b/webapps/examples/WEB-INF/web.xml @@ -328,4 +328,13 @@ async3 /async/async3 + + stock + async.AsyncStockServlet + + + stock + /async/stockticker + + diff --git a/webapps/examples/jsp/async/index.jsp b/webapps/examples/jsp/async/index.jsp index bd3efaeb5..31174c679 100644 --- a/webapps/examples/jsp/async/index.jsp +++ b/webapps/examples/jsp/async/index.jsp @@ -62,4 +62,8 @@ Use cases: - servlet1 does a dispatch to servlet2 (asyncsupported=true) - servlet2 does a dispatch to servlet3 (asyncsupported=true) - servlet3 does a dispatch to servlet4 (asyncsupported=false) + + +7. Stock ticker + "> StockTicker \ No newline at end of file -- 2.11.0