From: costin Date: Wed, 13 Jan 2010 02:07:25 +0000 (+0000) Subject: Moved the connection pool to a top level class and started to add more code. Still... X-Git-Url: https://git.internetallee.de/?a=commitdiff_plain;h=a13d203fde202238a63f93d775d83c0349dfbdaf;p=tomcat7.0 Moved the connection pool to a top level class and started to add more code. Still missing is evicting kept-alive connections and queueing to limit the number of active requests per host ( and probably more ). Started to make spdy more like a part of a http request - i.e. upgrade if supported by both ends, etc. Now load tests seem to work - no more OOM. Due to compression spdy it's using more memory per connection, current tests don't enable compression ( it's accepted for incoming connections ). git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@898619 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/CompressFilter.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/CompressFilter.java index 612cd18cf..ffeb036fa 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/CompressFilter.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/CompressFilter.java @@ -38,22 +38,34 @@ public class CompressFilter { long dictId; public CompressFilter() { - cStream = new ZStream(); - cStream.deflateInit(JZlib.Z_BEST_COMPRESSION); - - dStream = new ZStream(); - dStream.inflateInit(); } public void recycle() { + if (cStream == null) { + return; + } + cStream.free(); + cStream = null; + dStream.free(); + dStream = null; + } + + public void init() { + if (cStream != null) { + return; + } // can't call: cStream.free(); - will kill the adler, NPE - - cStream.deflateInit(JZlib.Z_BEST_COMPRESSION); + cStream = new ZStream(); + // BEST_COMRESSION results in 256Kb per Deflate + cStream.deflateInit(JZlib.Z_BEST_SPEED); + + dStream = new ZStream(); dStream.inflateInit(); } CompressFilter setDictionary(byte[] dict, long id) { + init(); this.dict = dict; this.dictId = id; cStream.deflateSetDictionary(dict, dict.length); @@ -61,6 +73,7 @@ public class CompressFilter { } void compress(IOBuffer in, IOBuffer out) throws IOException { + init(); BBucket bb = in.popFirst(); while (bb != null) { @@ -80,6 +93,7 @@ public class CompressFilter { // TODO: only the last one needs flush // TODO: size missmatches ? + init(); int flush = JZlib.Z_PARTIAL_FLUSH; cStream.next_in = bb.array(); @@ -129,6 +143,7 @@ public class CompressFilter { } void decompress(IOBuffer in, IOBuffer out, int len) throws IOException { + init(); BBucket bb = in.peekFirst(); while (bb != null && len > 0) { diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/Http11Connection.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/Http11Connection.java index 2e704a836..7794fadfa 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/Http11Connection.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/Http11Connection.java @@ -12,12 +12,16 @@ import org.apache.tomcat.lite.http.HttpMessage.HttpMessageBytes; import org.apache.tomcat.lite.io.BBucket; import org.apache.tomcat.lite.io.BBuffer; import org.apache.tomcat.lite.io.CBuffer; +import org.apache.tomcat.lite.io.IOConnector; +import org.apache.tomcat.lite.io.DumpChannel; import org.apache.tomcat.lite.io.FastHttpDateFormat; import org.apache.tomcat.lite.io.Hex; import org.apache.tomcat.lite.io.IOBuffer; import org.apache.tomcat.lite.io.IOChannel; +import org.apache.tomcat.lite.io.SslChannel; -public class Http11Connection extends HttpConnection { +public class Http11Connection extends HttpConnection + implements IOConnector.ConnectedCallback { public static final String CHUNKED = "chunked"; public static final String CLOSE = "close"; @@ -33,8 +37,7 @@ public class Http11Connection extends HttpConnection { static final byte COLON = (byte) ':'; // super.net is the socket - - HttpChannel activeHttp; + boolean debug; BBuffer line = BBuffer.wrapper(); boolean endSent = false; @@ -74,7 +77,11 @@ public class Http11Connection extends HttpConnection { } public void beforeRequest() { - activeHttp = null; + nextRequest(); + headRecvBuf.recycle(); + } + + public void nextRequest() { endSent = false; keepAlive = true; receiveBodyState.recycle(); @@ -84,7 +91,6 @@ public class Http11Connection extends HttpConnection { http10 = false; headersReceived = false; bodyReceived = false; - headRecvBuf.recycle(); } public Http11Connection serverMode() { @@ -108,7 +114,8 @@ public class Http11Connection extends HttpConnection { if (headRecvBuf.get(0) == 0x80 && headRecvBuf.get(1) == 0x01) { // SPDY signature ( experimental ) - switchedProtocol = new SpdyConnection(httpConnector); + switchedProtocol = new SpdyConnection(httpConnector, + remoteHost); if (serverMode) { switchedProtocol.serverMode = true; } @@ -167,9 +174,9 @@ public class Http11Connection extends HttpConnection { } @Override - public void handleReceived(IOChannel netx) throws IOException { + public void dataReceived(IOBuffer netx) throws IOException { if (switchedProtocol != null) { - switchedProtocol.handleReceived(netx); + switchedProtocol.dataReceived(netx); return; } //trace("handleReceived " + headersReceived); @@ -299,7 +306,7 @@ public class Http11Connection extends HttpConnection { if (http09) { keepAlive = false; } - if (!net.isOpen()) { + if (net != null && !net.isOpen()) { keepAlive = false; } return keepAlive; @@ -311,8 +318,8 @@ public class Http11Connection extends HttpConnection { switchedProtocol.endSendReceive(http); return; } - activeHttp = null; - if (!keepAlive()) { + boolean keepAlive = keepAlive(); + if (!keepAlive) { if (debug) { log.info("--- Close socket, no keepalive " + net); } @@ -321,27 +328,16 @@ public class Http11Connection extends HttpConnection { net.startSending(); } - beforeRequest(); - return; } synchronized (readLock) { - beforeRequest(); // will clear head buffer requestCount++; - if (serverMode) { - if (debug) { - log.info(">>> server socket KEEP_ALIVE " + net.getTarget() + - " " + net + " " + net.getIn().available()); - } + beforeRequest(); + httpConnector.cpool.afterRequest(http, this, true); + + if (serverMode && keepAlive) { handleReceived(net); // will attempt to read next req - } else { - if (debug) { - log.info(">>> client socket KEEP_ALIVE " + net.getTarget() + - " " + net); - } - httpConnector.cpool.returnChannel(this); } } - } private void trace(String s) { @@ -741,8 +737,6 @@ public class Http11Connection extends HttpConnection { return; } - this.activeHttp = http; - // Update transfer fields based on headers. processProtocol(http.getRequest().protocol()); updateKeepAlive(http.getRequest().getMimeHeaders(), true); @@ -1402,5 +1396,33 @@ public class Http11Connection extends HttpConnection { (bodyReceived ? " BODY " : "") ; } - + + @Override + public void handleConnected(IOChannel net) throws IOException { + + HttpChannel httpCh = activeHttp; + boolean ssl = httpCh.getRequest().isSecure(); + if (ssl) { + SslChannel ch1 = new SslChannel(); + ch1.setSslContext(httpConnector.sslConnector.getSSLContext()); + ch1.setSink(net); + net.addFilterAfter(ch1); + net = ch1; + } + if (httpConnector.debugHttp) { + IOChannel ch1 = new DumpChannel(""); + net.addFilterAfter(ch1); + net = ch1; + } + + if (!net.isOpen()) { + httpCh.abort("Can't connect"); + return; + } + + setSink(net); + + sendRequest(httpCh); + } + } diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpChannel.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpChannel.java index f293b08d2..11b2f8642 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpChannel.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpChannel.java @@ -316,7 +316,7 @@ public class HttpChannel extends IOChannel { if (target == null) { return ":0"; // server mode ? } - return target; + return target.toString(); } @@ -797,11 +797,25 @@ public class HttpChannel extends IOChannel { } - + /** + * This method will be called when the http headers have been received - + * the body may or may not be available. + * + * In server mode this is equivalent with a servlet request. + * This is also called for http client, when the response headers + * are received. + * + * TODO: rename it to HttMessageReceived or something similar. + */ public static interface HttpService { void service(HttpRequest httpReq, HttpResponse httpRes) throws IOException; } + /** + * Called when both request and response bodies have been sent/ + * received. After this call the HttpChannel will be disconnected + * from the http connection, which can be used for other requests. + */ public static interface RequestCompleted { void handle(HttpChannel data, Object extraData) throws IOException; } @@ -814,11 +828,4 @@ public class HttpChannel extends IOChannel { }; - IOConnector.ConnectedCallback connectedCallback = new IOConnector.ConnectedCallback() { - @Override - public void handleConnected(IOChannel ch) throws IOException { - httpConnector.handleConnected(ch, HttpChannel.this); - } - }; - } \ No newline at end of file diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpConnector.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpConnector.java index 11e06e06e..42313ab0e 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpConnector.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpConnector.java @@ -44,8 +44,6 @@ public class HttpConnector { public void onDestroy(HttpChannel ch, HttpConnector con) throws IOException; } - HttpConnectionManager conManager = new HttpConnectionManager(); - private static Logger log = Logger.getLogger("HttpConnector"); /** @@ -84,7 +82,7 @@ public class HttpConnector { public AtomicInteger recycledChannels = new AtomicInteger(); public AtomicInteger reusedChannels = new AtomicInteger(); - public ConnectionPool cpool = new ConnectionPool(); + public HttpConnectionPool cpool = new HttpConnectionPool(this); // Host + context mapper. Dispatcher dispatcher; @@ -332,11 +330,6 @@ public class HttpConnector { } - HttpConnection newConnection() { - return conManager.newConnection(this); - } - - private class AcceptorCallback implements IOConnector.ConnectedCallback { @Override public void handleConnected(IOChannel accepted) throws IOException { @@ -346,7 +339,7 @@ public class HttpConnector { public HttpConnection handleAccepted(IOChannel accepted) throws IOException { // TODO: reuse - HttpConnection shttp = newConnection(); + HttpConnection shttp = cpool.accepted(accepted); shttp.serverMode = true; if (debugHttp) { @@ -371,50 +364,11 @@ public class HttpConnector { return this; } - public void handleConnected(IOChannel net, HttpChannel httpCh) - throws IOException { - boolean ssl = httpCh.getRequest().isSecure(); - if (ssl) { - SslChannel ch1 = new SslChannel(); - ch1.setSslContext(sslConnector.getSSLContext()); - ch1.setSink(net); - net.addFilterAfter(ch1); - net = ch1; - } - if (debugHttp) { - IOChannel ch1 = new DumpChannel(""); - net.addFilterAfter(ch1); - net = ch1; - } - - if (!net.isOpen()) { - httpCh.abort("Can't connect"); - return; - } - HttpConnection httpStream = newConnection(); - httpStream.setSink(net); - - // TODO: add it to the cpool - httpCh.setConnection(httpStream); - - httpStream.sendRequest(httpCh); - } - - public static class HttpConnectionManager { - public HttpConnection newConnection(HttpConnector con) { - return new Http11Connection(con); - } - - public HttpConnection getFromPool(RemoteServer t) { - return t.connections.remove(t.connections.size() - 1); - } - } - /** * Actual HTTP/1.1 wire protocol. * */ - public static class HttpConnection extends IOChannel + public static abstract class HttpConnection extends IOChannel implements DataReceivedCallback { protected HttpConnector httpConnector; @@ -422,9 +376,18 @@ public class HttpConnector { protected BBuffer headRecvBuf = BBuffer.allocate(8192); protected CompressFilter compress = new CompressFilter(); + + protected boolean secure = false; + + protected HttpConnectionPool.RemoteServer remoteHost; + // If set, the connection is in use ( active ) + // null == keep alive. Changes synchronized on remoteHost + // before/after request + protected HttpChannel activeHttp; @Override - public void handleReceived(IOChannel ch) throws IOException { + public final void handleReceived(IOChannel ch) throws IOException { + int before = ch.getIn().available(); dataReceived(ch.getIn()); } @@ -451,9 +414,7 @@ public class HttpConnector { /** * Incoming data. */ - public void dataReceived(IOBuffer iob) throws IOException { - - } + public abstract void dataReceived(IOBuffer iob) throws IOException; /** * Framing error, client interrupt, etc. @@ -474,12 +435,12 @@ public class HttpConnector { @Override public IOBuffer getIn() { - return net.getIn(); + return net == null ? null : net.getIn(); } @Override public IOBuffer getOut() { - return net.getOut(); + return net == null ? null : net.getOut(); } @Override @@ -493,188 +454,22 @@ public class HttpConnector { protected void outClosed(HttpChannel http) throws IOException { } - protected void endSendReceive(HttpChannel httpChannel) throws IOException { - return; - } - - public void withExtraBuffer(BBuffer received) { - return; - } - - } - - /** - * Connections for one remote host. - * This should't be restricted by IP:port or even hostname, - * for example if a server has multiple IPs or LB replicas - any would work. - */ - public static class RemoteServer { - public ArrayList connections = new ArrayList(); - } - - // TODO: add timeouts, limits per host/total, expire old entries - // TODO: discover apr and use it - - public class ConnectionPool { - // visible for debugging - will be made private, with accessor /** - * Map from client names to socket pools. + * Called by HttpChannel when both input and output are fully + * sent/received. When this happens the request is no longer associated + * with the Connection, and the connection can be re-used. + * + * The channel can still be used to access the retrieved data that may + * still be buffered until HttpChannel.release() is called. + * + * This method will be called only once, for both succesful and aborted + * requests. */ - public Map hosts = new HashMap(); - - // Statistics - public AtomicInteger waitingSockets = new AtomicInteger(); - public AtomicInteger closedSockets = new AtomicInteger(); - - public AtomicInteger hits = new AtomicInteger(); - public AtomicInteger misses = new AtomicInteger(); - - public int getTargetCount() { - return hosts.size(); - } - - public int getSocketCount() { - return waitingSockets.get(); - } - - public int getClosedSockets() { - return closedSockets.get(); - } - - public Set getKeepAliveTargets() { - return hosts.keySet(); - } - - /** - * @param key host:port, or some other key if multiple hosts:ips - * are connected to equivalent servers ( LB ) - * @param httpCh - * @throws IOException - */ - public HttpConnection send(HttpChannel httpCh) - throws IOException { - String target = httpCh.getTarget(); - HttpConnection con = null; - // TODO: check ssl on connection - now if a second request - // is received on a ssl connection - we just send it - boolean ssl = httpCh.getRequest().isSecure(); - - RemoteServer t = null; - synchronized (hosts) { - t = hosts.get(target); - if (t == null) { - misses.incrementAndGet(); - } - } - if (t != null) { - synchronized (t) { - if (t.connections.size() == 0) { - misses.incrementAndGet(); - } else { - con = conManager.getFromPool(t); - - if (!con.isOpen()) { - con.setDataReceivedCallback(null); - con.close(); - log.fine("Already closed " + con); - con = null; - misses.incrementAndGet(); - } else { - hits.incrementAndGet(); - if (debug) { - httpCh.trace("HTTP_CONNECT: Reuse connection " + target + " " + this); - } - } - } - } - } - - if (con == null) { - if (debug) { - httpCh.trace("HTTP_CONNECT: New connection " + target); - } - String[] hostPort = target.split(":"); - - int targetPort = ssl ? 443 : 80; - if (hostPort.length > 1) { - targetPort = Integer.parseInt(hostPort[1]); - } - - getIOConnector().connect(hostPort[0], targetPort, - httpCh.connectedCallback); - } else { - con.beforeRequest(); - httpCh.setConnection(con); - con.sendRequest(httpCh); - } - - - return con; - } + protected abstract void endSendReceive(HttpChannel httpChannel) throws IOException; - /** - * Must be called in IOThread for the channel - */ - public void returnChannel(HttpConnection ch) - throws IOException { - CharSequence key = ch.getTarget(); - if (key == null) { - ch.close(); - if (debug) { - log.info("Return channel, no target ..." + key + " " + ch); - } - return; - } - - if (!ch.isOpen()) { - ch.close(); // make sure all closed - if (debug) { - log.info("Return closed channel ..." + key + " " + ch); - } - return; - } - - RemoteServer t = null; - synchronized (hosts) { - t = hosts.get(key); - if (t == null) { - t = new RemoteServer(); - hosts.put(key, t); - } - } - waitingSockets.incrementAndGet(); - - ch.ts = System.currentTimeMillis(); - synchronized (t) { - t.connections.add(ch); - } + public void withExtraBuffer(BBuffer received) { + return; } - // Called by handleClosed - void stopKeepAlive(IOChannel schannel) { - CharSequence target = schannel.getTarget(); - RemoteServer t = null; - synchronized (hosts) { - t = hosts.get(target); - if (t == null) { - return; - } - } - synchronized (t) { - if (t.connections.remove(schannel)) { - waitingSockets.decrementAndGet(); - if (t.connections.size() == 0) { - hosts.remove(target); - } - } - } - } - } - - public HttpConnector withConnectionManager( - HttpConnectionManager connectionManager) { - this.conManager = connectionManager; - return this; } } diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/SpdyConnection.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/SpdyConnection.java index 0d4d1f55a..9cdb95f27 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/SpdyConnection.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/SpdyConnection.java @@ -3,18 +3,26 @@ package org.apache.tomcat.lite.http; import java.io.IOException; +import java.util.Collection; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.tomcat.lite.http.HttpConnector.HttpConnection; -import org.apache.tomcat.lite.http.HttpConnector.RemoteServer; +import org.apache.tomcat.lite.http.HttpConnectionPool.RemoteServer; import org.apache.tomcat.lite.http.HttpMessage.HttpMessageBytes; import org.apache.tomcat.lite.io.BBucket; import org.apache.tomcat.lite.io.BBuffer; import org.apache.tomcat.lite.io.CBuffer; +import org.apache.tomcat.lite.io.DumpChannel; import org.apache.tomcat.lite.io.IOBuffer; +import org.apache.tomcat.lite.io.IOChannel; +import org.apache.tomcat.lite.io.IOConnector; +import org.apache.tomcat.lite.io.SslChannel; /* * TODO: expectations ? @@ -27,23 +35,9 @@ import org.apache.tomcat.lite.io.IOBuffer; * http://localhost:8802/hello */ -public class SpdyConnection extends HttpConnector.HttpConnection { +public class SpdyConnection extends HttpConnector.HttpConnection + implements IOConnector.ConnectedCallback { - public static class SpdyConnectionManager - extends HttpConnector.HttpConnectionManager { - @Override - public HttpConnection newConnection(HttpConnector con) { - return new SpdyConnection(con); - } - - @Override - public HttpConnection getFromPool(RemoteServer t) { - // TODO: we may initiate multiple SPDY connections with each server - // Sending frames is synchronized, receiving is muxed - return t.connections.get(0); - } - - } /** Use compression for headers. Will magically turn to false * if the first request doesn't have x8xx ( i.e. compress header ) @@ -77,11 +71,16 @@ public class SpdyConnection extends HttpConnector.HttpConnection { /** * @param spdyConnector + * @param remoteServer */ - SpdyConnection(HttpConnector spdyConnector) { + SpdyConnection(HttpConnector spdyConnector, RemoteServer remoteServer) { this.httpConnector = spdyConnector; + this.remoteHost = remoteServer; + this.target = remoteServer.target; } + AtomicInteger streamErrors = new AtomicInteger(); + AtomicInteger lastInStream = new AtomicInteger(); AtomicInteger lastOutStream = new AtomicInteger(); @@ -111,9 +110,12 @@ public class SpdyConnection extends HttpConnector.HttpConnection { } @Override - public void dataReceived(IOBuffer iob) throws IOException { - int avail = iob.available(); - while (avail > 0) { + public synchronized void dataReceived(IOBuffer iob) throws IOException { + while (true) { + int avail = iob.available(); + if (avail == 0) { + return; + } if (currentInFrame == null) { if (inFrameBuffer.remaining() + avail < 8) { return; @@ -121,12 +123,11 @@ public class SpdyConnection extends HttpConnector.HttpConnection { if (inFrameBuffer.remaining() < 8) { int headRest = 8 - inFrameBuffer.remaining(); int rd = iob.read(inFrameBuffer, headRest); - avail -= rd; } currentInFrame = new SpdyConnection.Frame(); // TODO: reuse - currentInFrame.parse(inFrameBuffer); + currentInFrame.parse(this, inFrameBuffer); } - if (avail < currentInFrame.length) { + if (iob.available() < currentInFrame.length) { return; } // We have a full frame. Process it. @@ -134,11 +135,21 @@ public class SpdyConnection extends HttpConnector.HttpConnection { // TODO: extra checks, make sure the frame is correct and // it consumed all data. - avail -= currentInFrame.length; currentInFrame = null; } } + AtomicInteger inFrames = new AtomicInteger(); + AtomicInteger inDataFrames = new AtomicInteger(); + AtomicInteger inSyncStreamFrames = new AtomicInteger(); + AtomicInteger inBytes = new AtomicInteger(); + + AtomicInteger outFrames = new AtomicInteger(); + AtomicInteger outDataFrames = new AtomicInteger(); + AtomicInteger outBytes = new AtomicInteger(); + + + /** * Frame received. Must consume all data for the frame. * @@ -148,11 +159,14 @@ public class SpdyConnection extends HttpConnector.HttpConnection { protected void onFrame(IOBuffer iob) throws IOException { // TODO: make sure we have enough data. lastFrame = currentInFrame; + inFrames.incrementAndGet(); + inBytes.addAndGet(currentInFrame.length + 8); if (currentInFrame.c) { if (currentInFrame.type == SpdyConnection.Frame.TYPE_HELO) { // receivedHello = currentInFrame; } else if (currentInFrame.type == SpdyConnection.Frame.TYPE_SYN_STREAM) { + inSyncStreamFrames.incrementAndGet(); HttpChannel ch = new HttpChannel(); // TODO: reuse ch.channelId = SpdyConnection.readInt(iob); ch.setConnection(this); @@ -164,15 +178,22 @@ public class SpdyConnection extends HttpConnector.HttpConnection { ch.setHttpService(this.httpConnector.defaultService); } - channels.put(ch.channelId, ch); + synchronized (this) { + channels.put(ch.channelId, ch); + } - // pri and unused - SpdyConnection.readShort(iob); + try { + // pri and unused + SpdyConnection.readShort(iob); - HttpMessageBytes reqBytes = ch.getRequest().getMsgBytes(); - - BBuffer head = processHeaders(iob, ch, reqBytes); + HttpMessageBytes reqBytes = ch.getRequest().getMsgBytes(); + processHeaders(iob, ch, reqBytes); + } catch (Throwable t) { + log.log(Level.SEVERE, "Error parsing head", t); + abort("Error reading headers " + t); + return; + } ch.getRequest().processReceivedHeaders(); ch.handleHeadersReceived(ch.getRequest()); @@ -183,14 +204,24 @@ public class SpdyConnection extends HttpConnector.HttpConnection { } } else if (currentInFrame.type == SpdyConnection.Frame.TYPE_SYN_REPLY) { int chId = SpdyConnection.readInt(iob); - HttpChannel ch = channels.get(chId); - - SpdyConnection.readShort(iob); + HttpChannel ch; + synchronized (this) { + ch = channels.get(chId); + if (ch == null) { + abort("Channel not found"); + } + } + try { + SpdyConnection.readShort(iob); - HttpMessageBytes resBytes = ch.getResponse().getMsgBytes(); + HttpMessageBytes resBytes = ch.getResponse().getMsgBytes(); - BBuffer head = processHeaders(iob, ch, resBytes); - + BBuffer head = processHeaders(iob, ch, resBytes); + } catch (Throwable t) { + log.log(Level.SEVERE, "Error parsing head", t); + abort("Error reading headers " + t); + return; + } ch.getResponse().processReceivedHeaders(); ch.handleHeadersReceived(ch.getResponse()); @@ -204,8 +235,12 @@ public class SpdyConnection extends HttpConnector.HttpConnection { iob.advance(currentInFrame.length); } } else { + inDataFrames.incrementAndGet(); // data frame - part of an existing stream - HttpChannel ch = channels.get(currentInFrame.streamId); + HttpChannel ch; + synchronized (this) { + ch = channels.get(currentInFrame.streamId); + } if (ch == null) { log.warning("Unknown stream "); net.close(); @@ -215,9 +250,14 @@ public class SpdyConnection extends HttpConnector.HttpConnection { int len = currentInFrame.length; while (len > 0) { BBucket bb = iob.peekFirst(); + if (bb == null) { + // we should have all data + abort("Unexpected short read"); + return; + } if (len > bb.remaining()) { ch.getIn().append(bb); - len += bb.remaining(); + len -= bb.remaining(); bb.position(bb.limit()); } else { ch.getIn().append(bb, len); @@ -228,19 +268,31 @@ public class SpdyConnection extends HttpConnector.HttpConnection { ch.sendHandleReceivedCallback(); if ((currentInFrame.flags & SpdyConnection.Frame.FLAG_HALF_CLOSE) != 0) { - ch.getIn().close(); ch.handleEndReceive(); } } firstFrame = false; } + + /** + * On frame error. + */ + private void abort(String msg) throws IOException { + streamErrors.incrementAndGet(); + for (HttpChannel ch : channels.values()) { + ch.abort(msg); + } + close(); + } private BBuffer processHeaders(IOBuffer iob, HttpChannel ch, HttpMessageBytes reqBytes) throws IOException { - int res = iob.peek() & 0xFF; int nvCount = 0; - if (firstFrame && (res & 0x0F) != 8) { - headerCompression = false; + if (firstFrame) { + int res = iob.peek() & 0xFF; + if ((res & 0x0F) != 8) { + headerCompression = false; + } } headRecvBuf.recycle(); if (headerCompression) { @@ -257,7 +309,11 @@ public class SpdyConnection extends HttpConnector.HttpConnection { } else { nvCount = readShort(iob); // 8 = stream Id (4) + pri/unused (2) + nvCount (2) - iob.read(headRecvBuf, currentInFrame.length - 8); + // we know we have enough data + int rd = iob.read(headRecvBuf, currentInFrame.length - 8); + if (rd != currentInFrame.length - 8) { + abort("Unexpected incomplete read"); + } } // Wrapper - so we don't change position in head headRecvBuf.wrapTo(headW); @@ -268,6 +324,9 @@ public class SpdyConnection extends HttpConnector.HttpConnection { for (int i = 0; i < nvCount; i++) { int nameLen = SpdyConnection.readShort(headW); + if (nameLen > headW.remaining()) { + abort("Name too long"); + } nameBuf.setBytes(headW.array(), headW.position(), nameLen); @@ -300,18 +359,23 @@ public class SpdyConnection extends HttpConnector.HttpConnection { } @Override - protected void sendRequest(HttpChannel http) throws IOException { + protected synchronized void sendRequest(HttpChannel http) throws IOException { if (serverMode) { throw new IOException("Only in client mode"); } - + if (!checkConnection(http)) { + return; + } MultiMap mimeHeaders = http.getRequest().getMimeHeaders(); BBuffer headBuf = BBuffer.allocate(); SpdyConnection.appendShort(headBuf, mimeHeaders.size() + 3); serializeMime(mimeHeaders, headBuf); - + + if (headerCompression) { + } + // TODO: url - with host prefix , method // optimize... SpdyConnection.appendAsciiHead(headBuf, "version"); @@ -332,6 +396,11 @@ public class SpdyConnection extends HttpConnector.HttpConnection { out.putByte(0x00); out.putByte(0x01); + CBuffer method = http.getRequest().method(); + if (method.equals("GET") || method.equals("HEAD")) { + http.getOut().close(); + } + if (http.getOut().isAppendClosed()) { out.putByte(0x01); // closed } else { @@ -348,17 +417,32 @@ public class SpdyConnection extends HttpConnector.HttpConnection { http.channelId = 2 * lastOutStream.incrementAndGet() + 1; } SpdyConnection.appendInt(out, http.channelId); - - channels.put(http.channelId, http); + http.setConnection(this); + + synchronized (this) { + channels.put(http.channelId, http); + } out.putByte(0x00); // no priority out.putByte(0x00); sendFrame(out, headBuf); + if (http.outMessage.state == HttpMessage.State.HEAD) { + http.outMessage.state = HttpMessage.State.BODY_DATA; + } + if (http.getOut().isAppendClosed()) { + http.handleEndSent(); + } + // Any existing data //sendData(http); } + + + public synchronized Collection getActives() { + return channels.values(); + } @Override protected synchronized void sendResponseHeaders(HttpChannel http) throws IOException { @@ -415,7 +499,8 @@ public class SpdyConnection extends HttpConnector.HttpConnection { // It seems piggibacking data is not allowed frameHead.putByte(0x00); - SpdyConnection.append24(frameHead, headBuf.remaining() + 6); + int len = headBuf.remaining() + 6; + SpdyConnection.append24(frameHead, len); // // Stream-Id, unused SpdyConnection.appendInt(frameHead, http.channelId); @@ -474,11 +559,14 @@ public class SpdyConnection extends HttpConnector.HttpConnection { if (net == null) { return; // unit test } + outBytes.addAndGet(out.remaining()); net.getOut().append(out); if (headBuf != null) { net.getOut().append(headBuf); + outBytes.addAndGet(headBuf.remaining()); } net.startSending(); + outFrames.incrementAndGet(); } public synchronized void sendDataFrame(IOBuffer out2, int avail, @@ -497,11 +585,14 @@ public class SpdyConnection extends HttpConnector.HttpConnection { // TODO: chunk if too much data ( at least at 24 bits) SpdyConnection.append24(outFrameBuffer, avail); + outBytes.addAndGet(outFrameBuffer.remaining() + avail); net.getOut().append(outFrameBuffer); + if (avail > 0) { net.getOut().append(out2, avail); } net.startSending(); + outDataFrames.incrementAndGet(); } static void appendInt(BBuffer headBuf, int length) throws IOException { @@ -579,9 +670,11 @@ public class SpdyConnection extends HttpConnector.HttpConnection { static int FLAG_HALF_CLOSE = 1; - public void parse(BBuffer iob) throws IOException { + public void parse(SpdyConnection spdyConnection, + BBuffer iob) throws IOException { int b0 = iob.read(); if (b0 < 128) { + // data frame c = false; streamId = b0; for (int i = 0; i < 3; i++) { @@ -592,6 +685,10 @@ public class SpdyConnection extends HttpConnector.HttpConnection { c = true; b0 -= 128; version = ((b0 << 8) | iob.read()); + if (version != 1) { + spdyConnection.abort("Wrong version"); + return; + } b0 = iob.read(); type = ((b0 << 8) | iob.read()); } @@ -601,12 +698,23 @@ public class SpdyConnection extends HttpConnector.HttpConnection { b0 = iob.read(); length = length << 8 | b0; } - + iob.recycle(); } } + @Override + protected void endSendReceive(HttpChannel http) throws IOException { + synchronized (this) { + HttpChannel doneHttp = channels.remove(http.channelId); + if (doneHttp != http) { + log.severe("Error removing " + doneHttp + " " + http); + } + } + httpConnector.cpool.afterRequest(http, this, true); + } + /** * Framing error, client interrupt, etc. */ @@ -615,5 +723,86 @@ public class SpdyConnection extends HttpConnector.HttpConnection { } + + volatile boolean connecting = false; + volatile boolean connected = false; + + + private boolean checkConnection(HttpChannel http) throws IOException { + synchronized(this) { + if (net == null || !isOpen()) { + connected = false; + } + + if (!connected) { + if (!connecting) { + // TODO: secure set at start ? + connecting = true; + httpConnector.cpool.httpConnect(http, + target.toString(), + http.getRequest().isSecure(), this); + } + + synchronized (remoteHost) { + remoteHost.pending.add(http); + httpConnector.cpool.queued.incrementAndGet(); + } + return false; + } + } + + return true; + } + + @Override + public void handleConnected(IOChannel net) throws IOException { + HttpChannel httpCh = null; + if (!net.isOpen()) { + while (true) { + synchronized (remoteHost) { + if (remoteHost.pending.size() == 0) { + return; + } + httpCh = remoteHost.pending.remove(); + } + httpCh.abort("Can't connect"); + } + } + + synchronized (remoteHost) { + httpCh = remoteHost.pending.peek(); + } + secure = httpCh.getRequest().isSecure(); + if (secure) { + SslChannel ch1 = new SslChannel(); + ch1.setSslContext(httpConnector.sslConnector.getSSLContext()); + ch1.setSink(net); + net.addFilterAfter(ch1); + net = ch1; + } + + if (httpConnector.debugHttp) { + IOChannel ch1 = new DumpChannel(""); + net.addFilterAfter(ch1); + net = ch1; + } + + setSink(net); + + synchronized(this) { + connecting = false; + connected = true; + } + + while (true) { + synchronized (remoteHost) { + if (remoteHost.pending.size() == 0) { + return; + } + httpCh = remoteHost.pending.remove(); + } + sendRequest(httpCh); + } + } } \ No newline at end of file diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOBuffer.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOBuffer.java index a27381671..2553d62fa 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOBuffer.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOBuffer.java @@ -348,18 +348,25 @@ public class IOBuffer { } public int read(byte[] buf, int off, int len) throws IOException { - BBucket bucket = peekFirst(); if (isClosedAndEmpty()) { return -1; } - if (bucket == null) { - return 0; + int rd = 0; + while (true) { + BBucket bucket = peekFirst(); + if (bucket == null) { + return rd; + } + int toCopy = Math.min(len, bucket.remaining()); + System.arraycopy(bucket.array(), bucket.position(), + buf, off + rd, toCopy); + bucket.position(bucket.position() + toCopy); + rd += toCopy; + len -= toCopy; + if (len == 0) { + return rd; + } } - int toCopy = Math.min(len, bucket.remaining()); - System.arraycopy(bucket.array(), bucket.position(), buf, - off, toCopy); - bucket.position(bucket.position() + toCopy); - return toCopy; } diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOChannel.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOChannel.java index 6b0fc635f..9893cf6cd 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOChannel.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOChannel.java @@ -27,7 +27,7 @@ public abstract class IOChannel implements ByteChannel, IOConnector.DataReceived protected IOChannel app; protected String id; - protected String target; + protected CharSequence target; protected IOConnector connector; @@ -136,7 +136,7 @@ public abstract class IOChannel implements ByteChannel, IOConnector.DataReceived shutdownOutput(); // Should it read the buffers ? - if (getIn().isAppendClosed()) { + if (getIn() == null || getIn().isAppendClosed()) { return; } else { getIn().close(); @@ -146,11 +146,13 @@ public abstract class IOChannel implements ByteChannel, IOConnector.DataReceived } public boolean isOpen() { - return !getIn().isAppendClosed() && !getOut().isAppendClosed(); + return getIn() != null && + getOut() != null && + !getIn().isAppendClosed() && !getOut().isAppendClosed(); } public void shutdownOutput() throws IOException { - if (getOut().isAppendClosed()) { + if (getOut() == null || getOut().isAppendClosed()) { return; } else { getOut().close(); @@ -290,5 +292,8 @@ public abstract class IOChannel implements ByteChannel, IOConnector.DataReceived return target; } + public void setTarget(CharSequence target) { + this.target = target; + } } diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/service/IOStatus.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/service/IOStatus.java index 705df981c..fe5a65070 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/service/IOStatus.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/service/IOStatus.java @@ -3,14 +3,15 @@ package org.apache.tomcat.lite.service; import java.io.IOException; -import java.util.Map; +import java.util.List; +import org.apache.tomcat.lite.http.HttpConnectionPool; import org.apache.tomcat.lite.http.HttpRequest; import org.apache.tomcat.lite.http.HttpResponse; import org.apache.tomcat.lite.http.HttpWriter; import org.apache.tomcat.lite.http.HttpChannel.HttpService; -import org.apache.tomcat.lite.http.HttpConnector.ConnectionPool; -import org.apache.tomcat.lite.http.HttpConnector.RemoteServer; +import org.apache.tomcat.lite.http.HttpConnectionPool.RemoteServer; +import org.apache.tomcat.lite.http.HttpConnector.HttpConnection; import org.apache.tomcat.lite.io.IOChannel; /** @@ -18,30 +19,32 @@ import org.apache.tomcat.lite.io.IOChannel; */ public class IOStatus implements HttpService { - private ConnectionPool pool; + private HttpConnectionPool pool; - public IOStatus(ConnectionPool pool) { + public IOStatus(HttpConnectionPool pool) { this.pool = pool; } @Override public void service(HttpRequest httpReq, HttpResponse httpRes) throws IOException { - ConnectionPool sc = pool; + HttpConnectionPool sc = pool; HttpWriter out = httpRes.getBodyWriter(); httpRes.setContentType("text/plain"); + // TODO: use JMX/DynamicObject to get all public info out.println("hosts=" + sc.getTargetCount()); out.println("waiting=" + sc.getSocketCount()); out.println("closed=" + sc.getClosedSockets()); out.println(); - for (Map.Entry e: sc.hosts.entrySet()) { - out.append(e.getKey()); + for (RemoteServer remote: sc.getServers()) { + out.append(remote.target); out.append("="); - out.println(Integer.toString(e.getValue().connections.size())); + List connections = remote.getConnections(); + out.println(Integer.toString(connections.size())); - for (IOChannel ch: e.getValue().connections) { + for (IOChannel ch: connections) { out.println(ch.getId() + " " + ch.toString()); } diff --git a/modules/tomcat-lite/test/org/apache/tomcat/lite/TestMain.java b/modules/tomcat-lite/test/org/apache/tomcat/lite/TestMain.java index 755bc3e0c..9cd3386e0 100644 --- a/modules/tomcat-lite/test/org/apache/tomcat/lite/TestMain.java +++ b/modules/tomcat-lite/test/org/apache/tomcat/lite/TestMain.java @@ -233,7 +233,7 @@ public class TestMain { URL url = new URL(path); HttpURLConnection connection = (HttpURLConnection) url.openConnection(); - // connection.setReadTimeout(100000); + connection.setReadTimeout(10000); connection.connect(); int rc = connection.getResponseCode(); InputStream is = connection.getInputStream(); diff --git a/modules/tomcat-lite/test/org/apache/tomcat/lite/http/HttpChannelInMemoryTest.java b/modules/tomcat-lite/test/org/apache/tomcat/lite/http/HttpChannelInMemoryTest.java index 76580a1fd..59b89cba0 100644 --- a/modules/tomcat-lite/test/org/apache/tomcat/lite/http/HttpChannelInMemoryTest.java +++ b/modules/tomcat-lite/test/org/apache/tomcat/lite/http/HttpChannelInMemoryTest.java @@ -27,7 +27,7 @@ public class HttpChannelInMemoryTest extends TestCase { /** * Last http channel created by the connection */ - HttpChannel http; + volatile HttpChannel http; // Input/output for the connection MemoryIOConnector.MemoryIOChannel net = new MemoryIOChannel(); diff --git a/modules/tomcat-lite/test/org/apache/tomcat/lite/http/LiveHttp1Test.java b/modules/tomcat-lite/test/org/apache/tomcat/lite/http/LiveHttp1Test.java index 01822499a..923a6ae20 100644 --- a/modules/tomcat-lite/test/org/apache/tomcat/lite/http/LiveHttp1Test.java +++ b/modules/tomcat-lite/test/org/apache/tomcat/lite/http/LiveHttp1Test.java @@ -86,7 +86,7 @@ public class LiveHttp1Test extends TestCase { httpClient.requestURI().set("/chunked/foo"); httpClient.send(); httpClient.readAll(bodyRecvBuffer, to); - assertTrue(bodyRecvBuffer.toString().indexOf("AAA") >= 0); + assertTrue(bodyRecvBuffer.toString(), bodyRecvBuffer.toString().indexOf("AAA") >= 0); } // Check waitResponseHead() diff --git a/modules/tomcat-lite/test/org/apache/tomcat/lite/http/SpdyTest.java b/modules/tomcat-lite/test/org/apache/tomcat/lite/http/SpdyTest.java index 1cd60c60d..8c51fb879 100644 --- a/modules/tomcat-lite/test/org/apache/tomcat/lite/http/SpdyTest.java +++ b/modules/tomcat-lite/test/org/apache/tomcat/lite/http/SpdyTest.java @@ -9,17 +9,15 @@ import java.io.InputStream; import junit.framework.TestCase; import org.apache.tomcat.lite.TestMain; +import org.apache.tomcat.lite.http.HttpConnectionPool.RemoteServer; import org.apache.tomcat.lite.io.IOBuffer; -import org.apache.tomcat.lite.http.SpdyConnection.SpdyConnectionManager; public class SpdyTest extends TestCase { HttpConnector http11Con = TestMain.shared().getClient(); - static HttpConnector spdyCon = DefaultHttpConnector.get() - .withConnectionManager(new SpdyConnectionManager()); + static HttpConnector spdyCon = DefaultHttpConnector.get(); - HttpConnector memSpdyCon = - new HttpConnector(null).withConnectionManager(new SpdyConnectionManager()); + HttpConnector memSpdyCon = new HttpConnector(null); public void testClient() throws IOException { HttpRequest req = @@ -43,7 +41,7 @@ public class SpdyTest extends TestCase { IOBuffer iob = new IOBuffer(); iob.append(is); - SpdyConnection con = (SpdyConnection) memSpdyCon.newConnection(); + SpdyConnection con = new SpdyConnection(memSpdyCon, new RemoteServer()); // By default it has a dispatcher buit-in con.serverMode = true; @@ -72,7 +70,7 @@ public class SpdyTest extends TestCase { IOBuffer iob = new IOBuffer(); iob.append(is); - SpdyConnection con = (SpdyConnection) memSpdyCon.newConnection(); + SpdyConnection con = new SpdyConnection(memSpdyCon, new RemoteServer()); // By default it has a dispatcher buit-in con.serverMode = true; @@ -95,8 +93,8 @@ public class SpdyTest extends TestCase { public void testLargeInt() throws Exception { IOBuffer iob = new IOBuffer(); - iob.append(0xFF); - iob.append(0xFF); + iob.append(0x80); + iob.append(0x01); iob.append(0xFF); iob.append(0xFF); @@ -105,12 +103,34 @@ public class SpdyTest extends TestCase { iob.append(0xFF); iob.append(0xFF); - SpdyConnection con = (SpdyConnection) memSpdyCon.newConnection(); + SpdyConnection con = new SpdyConnection(memSpdyCon, new RemoteServer()); con.dataReceived(iob); - assertEquals(0x7FFF, con.currentInFrame.version); + assertEquals(1, con.currentInFrame.version); assertEquals(0xFFFF, con.currentInFrame.type); assertEquals(0xFF, con.currentInFrame.flags); assertEquals(0xFFFFFF, con.currentInFrame.length); } + + // Does int parsing works ? + public void testBad() throws Exception { + + IOBuffer iob = new IOBuffer(); + iob.append(0xFF); + iob.append(0xFF); + iob.append(0xFF); + iob.append(0xFF); + + iob.append(0xFF); + iob.append(0xFF); + iob.append(0xFF); + iob.append(0xFF); + + SpdyConnection con = new SpdyConnection(memSpdyCon, new RemoteServer()); + con.dataReceived(iob); + + assertEquals(1, con.streamErrors.get()); + + } + } diff --git a/modules/tomcat-lite/test/org/apache/tomcat/lite/load/LiveHttpThreadedTest.java b/modules/tomcat-lite/test/org/apache/tomcat/lite/load/LiveHttpThreadedTest.java index ad2e45424..4487bceee 100644 --- a/modules/tomcat-lite/test/org/apache/tomcat/lite/load/LiveHttpThreadedTest.java +++ b/modules/tomcat-lite/test/org/apache/tomcat/lite/load/LiveHttpThreadedTest.java @@ -68,175 +68,184 @@ import org.apache.tomcat.util.buf.ByteChunk; * it seems there is a bug as well. */ public class LiveHttpThreadedTest extends TestCase { - HttpConnector clientCon = TestMain.shared().getClient(); - ThreadRunner tr; - static MBeanServer server; - - AtomicInteger ok = new AtomicInteger(); - Object lock = new Object(); - int reqCnt; + HttpConnector clientCon = TestMain.shared().getClient(); + HttpConnector serverCon = TestMain.shared().getTestServer(); + ThreadRunner tr; + static MBeanServer server; - Map active = new HashMap(); - - public void test1000Async() throws Exception { - try { - asyncRequest(10, 100, false); - } finally { - dumpHeap("heapAsync.bin"); - } - - } - - public void test10000Async() throws Exception { - try { - asyncRequest(20, 500, false); - } finally { - dumpHeap("heapAsync.bin"); - } - } - - public void xtest1000AsyncSpdy() throws Exception { - try { - asyncRequest(10, 20, true); - } finally { - dumpHeap("heapAsync.bin"); - } - - } - - public void xtest10000AsyncSpdy() throws Exception { - try { - asyncRequest(20, 500, true); - } finally { - dumpHeap("heapAsync.bin"); - } - } - - public void asyncRequest(int thr, int perthr, boolean spdy) throws Exception { - reqCnt = thr * perthr; - if (spdy) { - // TODO: simpler API ( 'allowSpdy', etc ) - after negotiation is impl - clientCon.withConnectionManager(new SpdyConnection.SpdyConnectionManager()); - } - long t0 = System.currentTimeMillis(); - tr = new ThreadRunner(thr, perthr) { - public void makeRequest(int i) throws Exception { - HttpRequest cstate = clientCon.request("localhost", 8802); - synchronized (active) { - active.put(cstate, cstate); - } - - cstate.requestURI().set("/hello"); - cstate.setCompletedCallback(reqCallback); - - // Send the request, wait response - Thread.currentThread().sleep(20); - cstate.send(); - } - }; - tr.run(); - assertEquals(0, tr.errors.get()); - synchronized (lock) { - lock.wait(reqCnt * 100); - } - assertEquals(reqCnt, ok.get()); - System.err.println(reqCnt + " Async requests: " + (System.currentTimeMillis() - t0)); - } - - public void testURLRequest() throws Exception { - urlRequest(10, 100); - } + AtomicInteger ok = new AtomicInteger(); + Object lock = new Object(); + int reqCnt; - public void testURLRequest2() throws Exception { - urlRequest(20, 500); + Map active = new HashMap(); - } - - /** - * HttpURLConnection client against lite.http server. - */ - public void urlRequest(int thr, int cnt) throws Exception { - long t0 = System.currentTimeMillis(); - - - try { - HttpConnector testServer = TestMain.getTestServer(); - - tr = new ThreadRunner(thr, cnt) { - - public void makeRequest(int i) throws Exception { - try { - ByteChunk out = new ByteChunk(); - HttpURLConnection con = TestMain.getUrl("http://localhost:8802/hello", out); - if (con.getResponseCode() != 200) { - errors.incrementAndGet(); - } - if (!"Hello world".equals(out.toString())) { - errors.incrementAndGet(); - System.err.println("bad result " + out); - } - } catch(Throwable t) { - t.printStackTrace(); - errors.incrementAndGet(); - } - } - }; - tr.run(); - assertEquals(0, tr.errors.get()); - - System.err.println(thr + " threads, " + (thr * cnt) + " total blocking URL requests: " + - (System.currentTimeMillis() - t0)); - - //assertEquals(testServer., actual) - } finally { - dumpHeap("heapURLReq.bin"); - } - } - - // TODO: move to a servlet - private void dumpHeap(String file) throws InstanceNotFoundException, - MBeanException, ReflectionException, MalformedObjectNameException { - - if (server == null) { - server = ManagementFactory.getPlatformMBeanServer(); - - } - File f1 = new java.io.File(file); - if (f1.exists()) { - f1.delete(); - } - server.invoke(new ObjectName("com.sun.management:type=HotSpotDiagnostic"), - "dumpHeap", - new Object[] {file, Boolean.FALSE /* live */}, - new String[] {String.class.getName(), "boolean"}); - } - - - RequestCompleted reqCallback = new RequestCompleted() { - @Override - public void handle(HttpChannel data, Object extraData) - throws IOException { - String out = data.getIn().copyAll(null).toString(); - if (200 != data.getResponse().getStatus()) { - System.err.println("Wrong status"); - tr.errors.incrementAndGet(); + public void tearDown() throws IOException { + clientCon.cpool.clear(); + } + + public void test1000Async() throws Exception { + try { + asyncRequest(10, 100, false); + } finally { + dumpHeap("heapAsync.bin"); } - if (!"Hello world".equals(out)) { - tr.errors.incrementAndGet(); - System.err.println("bad result " + out); - } - synchronized (active) { - active.remove(data.getRequest()); + + } + + public void test10000Async() throws Exception { + try { + asyncRequest(20, 500, false); + } finally { + dumpHeap("heapAsyncBig.bin"); + } + } + + public void test1000AsyncSpdy() throws Exception { + try { + asyncRequest(10, 100, true); + } finally { + dumpHeap("heapSpdy1000.bin"); + } + + } + + public void test10000AsyncSpdy() throws Exception { + try { + asyncRequest(20, 500, true); + } finally { + dumpHeap("heapSpdy10000.bin"); } - data.release(); - int okres = ok.incrementAndGet(); - if (okres >= reqCnt) { - synchronized (lock) { - lock.notify(); + } + + public void asyncRequest(int thr, int perthr, + final boolean spdy) throws Exception { + reqCnt = thr * perthr; + long t0 = System.currentTimeMillis(); + tr = new ThreadRunner(thr, perthr) { + public void makeRequest(int i) throws Exception { + HttpRequest cstate = clientCon.request("localhost", 8802); + synchronized (active) { + active.put(cstate, cstate); + } + if (spdy) { + // Magic way to force spdy - will be replaced with + // a negotiation. + cstate.setProtocol("SPDY/1.0"); + } + cstate.requestURI().set("/hello"); + cstate.setCompletedCallback(reqCallback); + // no body + cstate.getBody().close(); + // Send the request, wait response + Thread.currentThread().sleep(20); + cstate.send(); + } + }; + tr.run(); + assertEquals(0, tr.errors.get()); + synchronized (lock) { + if (ok.get() < reqCnt) { + lock.wait(reqCnt * 100); } } + assertEquals(reqCnt, ok.get()); + System.err.println(reqCnt + " Async requests: " + (System.currentTimeMillis() - t0)); } - }; - - + + public void testURLRequest1000() throws Exception { + urlRequest(10, 100); + } + + public void testURLRequest10000() throws Exception { + urlRequest(20, 500); + + } + + /** + * HttpURLConnection client against lite.http server. + */ + public void urlRequest(int thr, int cnt) throws Exception { + long t0 = System.currentTimeMillis(); + + + try { + HttpConnector testServer = TestMain.getTestServer(); + + tr = new ThreadRunner(thr, cnt) { + + public void makeRequest(int i) throws Exception { + try { + ByteChunk out = new ByteChunk(); + HttpURLConnection con = TestMain.getUrl("http://localhost:8802/hello", out); + if (con.getResponseCode() != 200) { + errors.incrementAndGet(); + } + if (!"Hello world".equals(out.toString())) { + errors.incrementAndGet(); + System.err.println("bad result " + out); + } + } catch(Throwable t) { + t.printStackTrace(); + errors.incrementAndGet(); + } + } + }; + tr.run(); + assertEquals(0, tr.errors.get()); + + System.err.println(thr + " threads, " + (thr * cnt) + " total blocking URL requests: " + + (System.currentTimeMillis() - t0)); + + //assertEquals(testServer., actual) + } finally { + dumpHeap("heapURLReq.bin"); + } + } + + // TODO: move to a servlet + private void dumpHeap(String file) throws InstanceNotFoundException, + MBeanException, ReflectionException, MalformedObjectNameException { + + if (server == null) { + server = ManagementFactory.getPlatformMBeanServer(); + + } + File f1 = new java.io.File(file); + if (f1.exists()) { + f1.delete(); + } + server.invoke(new ObjectName("com.sun.management:type=HotSpotDiagnostic"), + "dumpHeap", + new Object[] {file, Boolean.FALSE /* live */}, + new String[] {String.class.getName(), "boolean"}); + } + + + RequestCompleted reqCallback = new RequestCompleted() { + @Override + public void handle(HttpChannel data, Object extraData) + throws IOException { + String out = data.getIn().copyAll(null).toString(); + if (200 != data.getResponse().getStatus()) { + System.err.println("Wrong status"); + tr.errors.incrementAndGet(); + } + if (!"Hello world".equals(out)) { + tr.errors.incrementAndGet(); + System.err.println("bad result " + out); + } + synchronized (active) { + active.remove(data.getRequest()); + } + data.release(); + int okres = ok.incrementAndGet(); + if (okres >= reqCnt) { + synchronized (lock) { + lock.notify(); + } + } + } + }; + + } diff --git a/modules/tomcat-lite/test/org/apache/tomcat/test/watchdog/WatchdogClient.java b/modules/tomcat-lite/test/org/apache/tomcat/test/watchdog/WatchdogClient.java index 8f39d3513..c80f151fa 100644 --- a/modules/tomcat-lite/test/org/apache/tomcat/test/watchdog/WatchdogClient.java +++ b/modules/tomcat-lite/test/org/apache/tomcat/test/watchdog/WatchdogClient.java @@ -33,7 +33,7 @@ import org.w3c.dom.Element; import org.w3c.dom.NodeList; import org.xml.sax.SAXException; -public class WatchdogClient implements Test { +public class WatchdogClient { protected String goldenDir; protected String testMatch; @@ -167,12 +167,10 @@ public class WatchdogClient implements Test { protected String single; WatchdogTestCase singleTest; - @Override public int countTestCases() { return 1; } - @Override public void run(TestResult result) { getSuite(); if (singleTest != null) {