public interface AsyncListener extends EventListener {
void onComplete(AsyncEvent event) throws IOException;
void onTimeout(AsyncEvent event) throws IOException;
+ void onError(AsyncEvent event) throws IOException;
}
//configure settings for timed out
asyncConImpl.setTimeoutState();
}
+ if (status==SocketStatus.ERROR) {
+ AsyncContextImpl asyncConImpl = (AsyncContextImpl)request.getAsyncContext();
+ //TODO SERVLET3 - async
+ //configure settings for timed out
+ asyncConImpl.setErrorState();
+ }
connector.getContainer().getPipeline().getFirst().invoke(request, response);
}catch (RuntimeException x) {
success = false;
public boolean isAsyncSupported() {
- return request.isAsyncStarted();
+ return request.isAsyncSupported();
}
public class AsyncContextImpl implements AsyncContext {
public static enum AsyncState {
- NOT_STARTED, STARTED, DISPATCHING, DISPATCHED, COMPLETING, TIMING_OUT
+ NOT_STARTED, STARTED, DISPATCHING, DISPATCHED, COMPLETING, TIMING_OUT, ERROR_DISPATCHING
}
protected static Log log = LogFactory.getLog(AsyncContextImpl.class);
((HttpServletResponse)servletResponse).setStatus(500);
}
doInternalComplete(true);
+ } else if (this.state.compareAndSet(AsyncState.ERROR_DISPATCHING, AsyncState.DISPATCHED)) {
+ log.debug("ON ERROR!");
+ boolean listenerInvoked = false;
+ for (AsyncListenerWrapper listener : listeners) {
+ listener.fireOnError(event);
+ listenerInvoked = true;
+ }
+ if (!listenerInvoked) {
+ ((HttpServletResponse)servletResponse).setStatus(500);
+ }
+ doInternalComplete(true);
+
} else if (this.state.compareAndSet(AsyncState.DISPATCHING, AsyncState.DISPATCHED)) {
if (this.dispatch!=null) {
try {
}
try {
if (!error) getResponse().flushBuffer();
-
}catch (Exception x) {
log.error("",x);
}
state.set(AsyncState.TIMING_OUT);
}
+ public void setErrorState() {
+ state.set(AsyncState.ERROR_DISPATCHING);
+ }
+
public void init(ServletRequest request, ServletResponse response) {
this.servletRequest = request;
this.servletResponse = response;
// TODO SERVLET 3 - async
listener.onTimeout(event);
}
+
+ public void fireOnError(AsyncEvent event) throws IOException {
+ // TODO SERVLET 3 - async
+ listener.onError(event);
+ }
+
public AsyncListener getListener() {
return listener;
//------------------------------------------------------ Constructor
public ErrorReportValve() {
- super(false);
+ super(true);
}
// ----------------------------------------------------- Instance Variables
// Perform the request
getNext().invoke(request, response);
-
- Throwable throwable =
- (Throwable) request.getAttribute(Globals.EXCEPTION_ATTR);
-
+
if (response.isCommitted()) {
return;
}
+ if (request.isAsyncStarted()) {
+ return;
+ }
+
+ Throwable throwable =
+ (Throwable) request.getAttribute(Globals.EXCEPTION_ATTR);
+
if (throwable != null) {
// The response is an error
import org.apache.tomcat.util.http.MimeHeaders;
import org.apache.tomcat.util.net.AprEndpoint;
import org.apache.tomcat.util.net.SocketStatus;
-import org.apache.tomcat.util.net.AprEndpoint.Handler.SocketState;
+import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
import org.apache.tomcat.util.res.StringManager;
import org.apache.tomcat.util.net.NioEndpoint;
import org.apache.tomcat.util.net.SSLSupport;
import org.apache.tomcat.util.net.SocketStatus;
-import org.apache.tomcat.util.net.NioEndpoint.Handler.SocketState;
+import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
import org.apache.tomcat.util.res.StringManager;
import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment;
* This one is a Tomcat extension to the Servlet spec.
*/
public static final String SESSION_MGR = "javax.servlet.request.ssl_session_mgr";
-
+
+ /**
+ * Different types of socket states to react upon
+ */
+ public static interface Handler {
+ public enum SocketState {
+ OPEN, CLOSED, LONG
+ }
+ }
// ----------------------------------------------------------------- Fields
* stored in the ThreadWithAttributes extra folders, or alternately in
* thread local fields.
*/
- public interface Handler {
- public enum SocketState {
- OPEN, CLOSED, LONG
- }
+ public interface Handler extends AbstractEndpoint.Handler {
public SocketState process(long socket);
public SocketState event(long socket, SocketStatus status);
}
* stored in the ThreadWithAttributes extra folders, or alternately in
* thread local fields.
*/
- public interface Handler {
- public enum SocketState {
- OPEN, CLOSED, LONG
- }
+ public interface Handler extends AbstractEndpoint.Handler {
public SocketState process(NioChannel socket);
public SocketState event(NioChannel socket, SocketStatus status);
public void releaseCaches();
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package async;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.AsyncEvent;
+import javax.servlet.AsyncListener;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import async.Stockticker.Stock;
+import async.Stockticker.TickListener;
+
+public class AsyncStockServlet extends HttpServlet implements TickListener, AsyncListener{
+
+ public static final String POLL = "POLL";
+ public static final String LONG_POLL = "LONG-POLL";
+ public static final String STREAM = "STREAM";
+
+ static ArrayList<Stock> ticks = new ArrayList<Stock>();
+ static ConcurrentLinkedQueue<AsyncContext> clients = new ConcurrentLinkedQueue<AsyncContext>();
+ static AtomicInteger clientcount = new AtomicInteger(0);
+ static Stockticker ticker = new Stockticker();
+
+ public AsyncStockServlet() {
+ System.out.println("AsyncStockServlet created");
+ }
+
+
+
+ @Override
+ protected void service(HttpServletRequest req, HttpServletResponse resp)
+ throws ServletException, IOException {
+ //get the client Id, it should be in the URL
+ String clientId = req.getParameter("clientId");
+ //get the method this client prefers
+ String method = req.getParameter("method");
+
+ //if the client doesn't have a method defined, then we will assume it is POLLING
+ if (method==null) method = POLL;
+ //if the client hasn't specified its own clientId, abort
+// if (clientId==null) {
+// resp.sendError(404,"Client not found.");
+// return;
+// }
+
+ if (req.isAsyncStarted()) {
+ req.getAsyncContext().complete();
+ } else if (req.isAsyncSupported()) {
+ AsyncContext actx = req.startAsync();
+ req.addAsyncListener(this);
+ resp.setContentType("text/plain");
+ clients.add(actx);
+ if (this.clientcount.incrementAndGet()==1) {
+ ticker.addTickListener(this);
+ }
+ } else {
+ new Exception("Async Not Supported").printStackTrace();
+ resp.sendError(400,"Async is not supported.");
+ }
+ }
+
+
+
+ @Override
+ public void tick(Stock stock) {
+ ticks.add((Stock)stock.clone());
+ Iterator<AsyncContext> it = clients.iterator();
+ while (it.hasNext()) {
+ AsyncContext actx = it.next();
+ writeStock(actx, stock);
+ }
+ }
+
+ public void writeStock(AsyncContext actx, Stock stock) {
+ HttpServletResponse response = (HttpServletResponse)actx.getResponse();
+ try {
+ PrintWriter writer = response.getWriter();
+ writer.write("STOCK#");//make client parsing easier
+ writer.write(stock.getSymbol());
+ writer.write("#");
+ writer.write(stock.getValueAsString());
+ writer.write("#");
+ writer.write(stock.getLastChangeAsString());
+ writer.write("#");
+ writer.write(String.valueOf(stock.getCnt()));
+ writer.write("\n");
+ writer.flush();
+ response.flushBuffer();
+ }catch (IOException x) {
+ try {actx.complete();}catch (Exception ignore){}
+ }
+ }
+
+ @Override
+ public void onComplete(AsyncEvent event) throws IOException {
+ clients.remove(event.getRequest().getAsyncContext());
+ if (clientcount.decrementAndGet()==0) {
+ ticker.removeTickListener(this);
+ }
+ }
+
+ @Override
+ public void onError(AsyncEvent event) throws IOException {
+ event.getRequest().getAsyncContext().complete();
+ }
+
+ @Override
+ public void onTimeout(AsyncEvent event) throws IOException {
+ event.getRequest().getAsyncContext().complete();
+ }
+
+
+
+
+
+
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package async;
+
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class Stockticker implements Runnable {
+ public volatile boolean run = true;
+ protected AtomicInteger counter = new AtomicInteger(0);
+ ArrayList<TickListener> listeners = new ArrayList<TickListener>();
+ protected volatile Thread ticker = null;
+ protected volatile int ticknr = 0;
+
+ public synchronized void start() {
+ run = true;
+ ticker = new Thread(this);
+ ticker.setName("Ticker Thread");
+ ticker.start();
+ }
+
+ public synchronized void stop() {
+ run = false;
+ try {
+ ticker.join();
+ }catch (InterruptedException x) {
+ ticker.interrupted();
+ }
+
+ ticker = null;
+ }
+
+ public void addTickListener(TickListener listener) {
+ if (listeners.add(listener)) {
+ if (counter.incrementAndGet()==1) start();
+ }
+
+ }
+
+ public void removeTickListener(TickListener listener) {
+ if (listeners.remove(listener)) {
+ if (counter.decrementAndGet()==0) stop();
+ }
+ }
+
+ public void run() {
+ try {
+
+ Stock[] stocks = new Stock[] { new Stock("GOOG", 435.43),
+ new Stock("YHOO", 27.88), new Stock("ASF", 1015.55), };
+ Random r = new Random(System.currentTimeMillis());
+ while (run) {
+ for (int j = 0; j < 1; j++) {
+ int i = r.nextInt() % 3;
+ if (i < 0)
+ i = i * (-1);
+ Stock stock = stocks[i];
+ double change = r.nextDouble();
+ boolean plus = r.nextBoolean();
+ if (plus) {
+ stock.setValue(stock.getValue() + change);
+ } else {
+ stock.setValue(stock.getValue() - change);
+ }
+ stock.setCnt(++ticknr);
+ for (TickListener l : listeners) {
+ l.tick(stock);
+ }
+
+// System.out.println("Stock: " + stock.getSymbol()
+// + " Price: " + stock.getValueAsString()
+// + " Change: " + stock.getLastChangeAsString());
+ }
+ Thread.sleep(850);
+ }
+ } catch (InterruptedException ix) {
+
+ } catch (Exception x) {
+ x.printStackTrace();
+ }
+ }
+
+
+ public static interface TickListener {
+ public void tick(Stock stock);
+ }
+
+ public static class Stock {
+ protected static DecimalFormat df = new DecimalFormat("0.00");
+ protected String symbol = "";
+ protected double value = 0.0d;
+ protected double lastchange = 0.0d;
+ protected int cnt = 0;
+
+ public Stock(String symbol, double initvalue) {
+ this.symbol = symbol;
+ this.value = initvalue;
+ }
+
+ public void setCnt(int c) {
+ this.cnt = c;
+ }
+
+ public int getCnt() {
+ return cnt;
+ }
+
+ public String getSymbol() {
+ return symbol;
+ }
+
+ public double getValue() {
+ return value;
+ }
+
+ public void setValue(double value) {
+ double old = this.value;
+ this.value = value;
+ this.lastchange = value - old;
+ }
+
+ public String getValueAsString() {
+ return df.format(value);
+ }
+
+ public double getLastChange() {
+ return this.lastchange;
+ }
+
+ public void setLastChange(double lastchange) {
+ this.lastchange = lastchange;
+ }
+
+ public String getLastChangeAsString() {
+ return df.format(lastchange);
+ }
+
+ public int hashCode() {
+ return symbol.hashCode();
+ }
+
+ public boolean equals(Object other) {
+ if (other instanceof Stock) {
+ return this.symbol.equals(((Stock) other).symbol);
+ } else {
+ return false;
+ }
+ }
+
+ public String toString() {
+ StringBuffer buf = new StringBuffer("STOCK#");
+ buf.append(getSymbol());
+ buf.append("#");
+ buf.append(getValueAsString());
+ buf.append("#");
+ buf.append(getLastChangeAsString());
+ buf.append("#");
+ buf.append(String.valueOf(getCnt()));
+ return buf.toString();
+
+ }
+
+ public Object clone() {
+ Stock s = new Stock(this.getSymbol(), this.getValue());
+ s.setLastChange(this.getLastChange());
+ s.setCnt(this.cnt);
+ return s;
+ }
+ }
+}
<servlet-name>async3</servlet-name>
<url-pattern>/async/async3</url-pattern>
</servlet-mapping>
+ <servlet>
+ <servlet-name>stock</servlet-name>
+ <servlet-class>async.AsyncStockServlet</servlet-class>
+ </servlet>
+ <servlet-mapping>
+ <servlet-name>stock</servlet-name>
+ <url-pattern>/async/stockticker</url-pattern>
+ </servlet-mapping>
+
</web-app>
- servlet1 does a dispatch to servlet2 (asyncsupported=true)
- servlet2 does a dispatch to servlet3 (asyncsupported=true)
- servlet3 does a dispatch to servlet4 (asyncsupported=false)
+
+
+7. Stock ticker
+ <a href="<%=response.encodeURL("/examples/async/stock")%>"> StockTicker </a>
</pre>
\ No newline at end of file