From 0082c718e0e2ae4c06249f6ab832942e8a0925e2 Mon Sep 17 00:00:00 2001 From: costin Date: Fri, 4 Dec 2009 07:16:59 +0000 Subject: [PATCH] One more iteration: - added few more tests - moved the http/1.x code to HttpConnection - easier to test, allows protocol upgrade - added an (experimental, hello-world-style) implementation of spdy ( a new binary protocol and possible replacement for jk ). Tested with chrome and the unit tests - the tricky part seems to work - detecting and 'upgrading' the wire transport. git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@887087 13f79535-47bb-0310-9956-ffa450edef68 --- .../tomcat/lite/http/DefaultHttpConnector.java | 10 +- .../org/apache/tomcat/lite/http/Dispatcher.java | 28 +- .../apache/tomcat/lite/http/Http11Connection.java | 1381 ++++++++++++++++++++ .../java/org/apache/tomcat/lite/http/HttpBody.java | 589 --------- .../org/apache/tomcat/lite/http/HttpChannel.java | 909 ++----------- .../org/apache/tomcat/lite/http/HttpConnector.java | 391 +++--- .../org/apache/tomcat/lite/http/HttpMessage.java | 48 +- .../org/apache/tomcat/lite/http/HttpRequest.java | 126 +- .../org/apache/tomcat/lite/http/HttpResponse.java | 56 +- .../apache/tomcat/lite/http/SpdyConnection.java | 548 ++++++++ .../java/org/apache/tomcat/lite/io/BBuffer.java | 30 +- .../java/org/apache/tomcat/lite/io/CBuffer.java | 14 +- .../org/apache/tomcat/lite/io/DumpChannel.java | 19 +- .../java/org/apache/tomcat/lite/io/IOBuffer.java | 96 +- .../java/org/apache/tomcat/lite/io/IOChannel.java | 8 +- .../org/apache/tomcat/lite/io/IOConnector.java | 3 +- .../java/org/apache/tomcat/lite/io/NioChannel.java | 1 - .../java/org/apache/tomcat/lite/io/NioThread.java | 16 +- .../org/apache/tomcat/lite/io/SocketIOChannel.java | 4 +- .../java/org/apache/tomcat/lite/io/SslChannel.java | 7 +- .../apache/tomcat/lite/proxy/HttpProxyService.java | 4 +- .../tomcat/lite/proxy/StaticContentService.java | 2 - .../org/apache/tomcat/lite/servlet/TomcatLite.java | 3 +- .../apache/coyote/lite/TomcatLiteCoyoteTest.java | 2 +- .../test/org/apache/tomcat/lite/TestMain.java | 26 +- .../org/apache/tomcat/lite/http/ClientTest.java | 60 + .../tomcat/lite/http/HttpChannelInMemoryTest.java | 238 ++-- .../apache/tomcat/lite/http/HttpChannelTest.java | 15 +- .../org/apache/tomcat/lite/http/HttpsTest.java | 22 +- .../org/apache/tomcat/lite/http/LiveHttp1Test.java | 37 +- .../test/org/apache/tomcat/lite/http/SpdyTest.java | 89 ++ .../tomcat/lite/http/services/EchoCallback.java | 3 +- .../tomcat/lite/http/services/SleepCallback.java | 2 +- .../test/org/apache/tomcat/lite/http/spdyreq0 | Bin 0 -> 425 bytes .../test/org/apache/tomcat/lite/io/OneTest.java | 5 +- .../tomcat/lite/load/LiveHttpThreadedTest.java | 9 +- .../org/apache/tomcat/lite/proxy/ProxyTest.java | 2 +- .../apache/tomcat/lite/proxy/SmallProxyTest.java | 15 +- .../lite/servlet/TomcatLiteNoConnectorTest.java | 35 +- .../tomcat/test/watchdog/WatchdogHttpClient.java | 15 +- 40 files changed, 2961 insertions(+), 1907 deletions(-) create mode 100644 modules/tomcat-lite/java/org/apache/tomcat/lite/http/Http11Connection.java delete mode 100644 modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpBody.java create mode 100644 modules/tomcat-lite/java/org/apache/tomcat/lite/http/SpdyConnection.java create mode 100644 modules/tomcat-lite/test/org/apache/tomcat/lite/http/ClientTest.java create mode 100644 modules/tomcat-lite/test/org/apache/tomcat/lite/http/SpdyTest.java create mode 100644 modules/tomcat-lite/test/org/apache/tomcat/lite/http/spdyreq0 diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/DefaultHttpConnector.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/DefaultHttpConnector.java index a20aa41a9..d798d8b77 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/DefaultHttpConnector.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/DefaultHttpConnector.java @@ -11,13 +11,13 @@ public class DefaultHttpConnector { } public synchronized static HttpConnector get() { - if (DefaultHttpConnector.defaultHttpConnector == null) { - DefaultHttpConnector.defaultHttpConnector = - new HttpConnector(new SocketConnector()); + if (DefaultHttpConnector.socketConnector == null) { + socketConnector = + new SocketConnector(); } - return DefaultHttpConnector.defaultHttpConnector; + return new HttpConnector(socketConnector); } - private static HttpConnector defaultHttpConnector; + private static SocketConnector socketConnector; } diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/Dispatcher.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/Dispatcher.java index 59a3c7c20..040601952 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/Dispatcher.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/Dispatcher.java @@ -54,6 +54,10 @@ public class Dispatcher implements HttpService { } public void runService(HttpChannel ch) { + runService(ch, true); + } + + public void runService(HttpChannel ch, boolean recycle) { MappingData mapRes = ch.getRequest().getMappingData(); HttpService h = (HttpService) mapRes.getServiceObject(); try { @@ -61,11 +65,12 @@ public class Dispatcher implements HttpService { h.service(ch.getRequest(), ch.getResponse()); if (!ch.getRequest().isAsyncStarted()) { ch.complete(); - ch.release(); // recycle objects. + if (recycle) { + ch.release(); // recycle objects. + } } else { // Nothing - complete must be called when done. } - } catch (IOException e) { e.printStackTrace(); } catch( Throwable t ) { @@ -75,11 +80,24 @@ public class Dispatcher implements HttpService { @Override public void service(HttpRequest httpReq, HttpResponse httpRes) throws IOException { - service(httpReq, httpRes, false); + service(httpReq, httpRes, false, true); + } + + /** + * Process the request/response in the current thread, without + * release ( recycle ) at the end. + * + * For use by tests and/or in-memory running of servlets. + * + * If no connection is associated with the request - the + * output will remain in the out buffer. + */ + public void run(HttpRequest httpReq, HttpResponse httpRes) throws IOException { + service(httpReq, httpRes, true, false); } - public void service(HttpRequest httpReq, HttpResponse httpRes, boolean noThread) + public void service(HttpRequest httpReq, HttpResponse httpRes, boolean noThread, boolean recycle) throws IOException { long t0 = System.currentTimeMillis(); HttpChannel http = httpReq.getHttpChannel(); @@ -104,7 +122,7 @@ public class Dispatcher implements HttpService { } if (mapRes.service.selectorThread || noThread) { - runService(http); + runService(http, recycle); } else { tp.execute(httpReq.getHttpChannel().dispatcherRunnable); } diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/Http11Connection.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/Http11Connection.java new file mode 100644 index 000000000..c49529a0c --- /dev/null +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/Http11Connection.java @@ -0,0 +1,1381 @@ +/* + */ +package org.apache.tomcat.lite.http; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.tomcat.lite.http.HttpConnector.HttpConnection; +import org.apache.tomcat.lite.http.HttpMessage.HttpMessageBytes; +import org.apache.tomcat.lite.io.BBucket; +import org.apache.tomcat.lite.io.BBuffer; +import org.apache.tomcat.lite.io.CBuffer; +import org.apache.tomcat.lite.io.FastHttpDateFormat; +import org.apache.tomcat.lite.io.Hex; +import org.apache.tomcat.lite.io.IOBuffer; +import org.apache.tomcat.lite.io.IOChannel; + +public class Http11Connection extends HttpConnection { + public static final String CHUNKED = "chunked"; + + public static final String CLOSE = "close"; + + public static final String KEEPALIVE_S = "keep-alive"; + + public static final String CONNECTION = "connection"; + + public static final String TRANSFERENCODING = "transfer-encoding"; + + + protected static Logger log = Logger.getLogger("Http11Connection"); + static final byte COLON = (byte) ':'; + + // net is the 'socket' connector + + HttpChannel activeHttp; + boolean debug; + BBuffer line = BBuffer.wrapper(); + boolean endSent = false; + + BodyState receiveBodyState = new BodyState(); + BodyState sendBodyState = new BodyState(); + + BBuffer headW = BBuffer.wrapper(); + + boolean headersReceived = false; + boolean bodyReceived = false; + + /** + * Close connection when done writting, no content-length/chunked, + * or no keep-alive ( http/1.0 ) or error. + * + * ServerMode: set if HTTP/0.9 &1.0 || !keep-alive + * ClientMode: not currently used + */ + boolean keepAlive = true; + + protected boolean http11 = true; + protected boolean http10 = false; + protected boolean http09 = false; + + HttpConnection switchedProtocol = null; + + public Http11Connection(HttpConnector httpConnector) { + this.httpConnector = httpConnector; + debug = true; //httpConnector.debug; + } + + public void beforeRequest() { + log.info("Before request"); + activeHttp = null; + endSent = false; + keepAlive = true; + receiveBodyState.recycle(); + sendBodyState.recycle(); + http11 = true; + http09 = false; + http10 = false; + headersReceived = false; + bodyReceived = false; + headRecvBuf.recycle(); + } + + public Http11Connection serverMode() { + serverMode = true; + return this; + } + + private boolean readHead() throws IOException { + while (true) { + int read; + if (headRecvBuf.remaining() < 4) { + // requests have at least 4 bytes - detect protocol + read = net.getIn().read(headRecvBuf, 4); + if (read < 0) { + return closeInHead(); + } + if (read < 4) { + return false; // need more + } + // we have at least 4 bytes + if (headRecvBuf.get(0) == 0x80 && + headRecvBuf.get(1) == 0x01) { + // SPDY signature ( experimental ) + switchedProtocol = new SpdyConnection(httpConnector); + if (serverMode) { + switchedProtocol.serverMode = true; + } + switchedProtocol.withExtraBuffer(headRecvBuf); + // Will also call handleReceived + switchedProtocol.setSink(net); + return false; + } + + } + + // we know we have one + read = net.getIn().readLine(headRecvBuf); + // Remove starting empty lines. + headRecvBuf.skipEmptyLines(); + + // Do we have another full line in the input ? + if (BBuffer.hasLFLF(headRecvBuf)) { + break; // done + } + if (read == 0) { // no more data + return false; + } + if (read < 0) { + return closeInHead(); + } + } + return true; + } + + private boolean closeInHead() throws IOException { + if (debug) { + trace("CLOSE while reading HEAD"); + } + // too early - we don't have the head + abort("Close in head"); + return false; + } + + // Unit tests use this to access the HttpChannel + protected HttpChannel checkHttpChannel() throws IOException { + if (switchedProtocol != null) { + return switchedProtocol.checkHttpChannel(); + } + if (activeHttp == null) { + if (serverMode) { + activeHttp = httpConnector.getServer(); + activeHttp.setConnection(this); + if (httpConnector.defaultService != null) { + activeHttp.setHttpService(httpConnector.defaultService); + } + } else { + } + } + return activeHttp; + } + + @Override + public void handleReceived(IOChannel netx) throws IOException { + if (switchedProtocol != null) { + switchedProtocol.handleReceived(netx); + return; + } + + if (!checkKeepAliveClient()) { + return; // we were in client keep alive mode + } + + if (!headersReceived) { + if (!readHead()) { + return; + } + } + + // We have a header + if (activeHttp == null) { + if (checkHttpChannel() == null) { + return; + } + } + + IOBuffer receiveBody = activeHttp.receiveBody; + + if (!headersReceived) { + headRecvBuf.wrapTo(headW); + parseMessage(activeHttp, headW); + if (serverMode && activeHttp.httpReq.decodedUri.remaining() == 0) { + abort(activeHttp, "Invalid url"); + } + + headersReceived = true; + // Send header callbacks - we process any incoming data + // first, so callbacks have more info + activeHttp.handleHeadersReceived(activeHttp.inMessage); + } + + // any remaining data will be processed as part of the + // body - or left in the channel until endSendReceive() + + if (!bodyReceived) { + // Will close receiveBody when it consummed enough + rawDataReceived(activeHttp, receiveBody, net.getIn()); + // Did we process anything ? + if (receiveBody.getBufferCount() > 0) { + activeHttp.sendHandleReceivedCallback(); // callback + } + + // Receive has marked the body as closed + if (receiveBody.isAppendClosed()) { + activeHttp.handleEndReceive(); + bodyReceived = true; + } + } + + + if (net.getIn().isClosedAndEmpty()) { + // If not already closed. + closeStreamOnEnd("closed after body"); + } + + } + + /** + * We got data while in client keep alive ( no activeHttp ) + * + * @return false if there is an error + */ + private boolean checkKeepAliveClient() throws IOException { + // Client, no active connection ( keep alive ) + if (!serverMode && activeHttp == null) { + if (net.getIn().isClosedAndEmpty() || !net.isOpen()) { + // server disconnected, fine + httpConnector.cpool.stopKeepAlive(this); + return false; + } + if (net.getIn().available() == 0) { + return true; + } + log.warning("Unexpected message from server in client keep alive " + + net.getIn()); + if (net.isOpen()) { + net.close(); + } + return false; + } + return true; + } + + private void processProtocol(CBuffer protocolMB) throws IOException { + http11 = false; + http09 = false; + http10 = false; + + if (protocolMB.equals(HttpChannel.HTTP_11)) { + http11 = true; + } else if (protocolMB.equals(HttpChannel.HTTP_10)) { + http10 = true; + } else if (protocolMB.equals("")) { + http09 = true; + } else { + http11 = true; // hopefully will be backward compat + } + } + + void closeStreamOnEnd(String cause) { + if (debug) + log.info("Not reusing connection because: " + cause); + keepAlive = false; + } + + boolean keepAlive() { + if (httpConnector != null) { + if (serverMode && !httpConnector.serverKeepAlive) { + keepAlive = false; + } + if (!serverMode && !httpConnector.clientKeepAlive) { + keepAlive = false; + } + } + if (http09) { + keepAlive = false; + } + if (!net.isOpen()) { + keepAlive = false; + } + return keepAlive; + } + + @Override + protected void endSendReceive(HttpChannel http) throws IOException { + if (switchedProtocol != null) { + switchedProtocol.endSendReceive(http); + return; + } + + activeHttp = null; + if (!keepAlive()) { + if (debug) { + log.info("--- Close socket, no keepalive " + net); + } + if (net != null) { + net.close(); +// net.getOut().close(); // shutdown output if not done +// net.getIn().close(); // this should close the socket + net.startSending(); + + } + beforeRequest(); + return; + } + + beforeRequest(); // will clear head buffer + + if (serverMode) { + handleReceived(net); // will attempt to read next req + if (debug) { + log.info(">>> server socket KEEP_ALIVE " + net.getTarget() + + " " + net); + } + + } else { + if (debug) { + log.info(">>> client socket KEEP_ALIVE " + net.getTarget() + + " " + net); + } + httpConnector.cpool.returnChannel(this); + } + } + + private void trace(String s) { + if(debug) { + log.info(this.toString() + " " + activeHttp + " " + s); + } + } + + private boolean isDone(BodyState bodys, IOBuffer body) { + if (bodys.noBody) { + return true; + } + if (bodys.isContentDelimited()) { + if (!bodys.chunked && bodys.remaining == 0) { + return true; + } else if (bodys.chunked && body.isAppendClosed()) { + return true; + } + } + return false; + } + + void parseMessage(HttpChannel http, BBuffer headB) throws IOException { + //Parse the response + line.recycle(); + headB.readLine(line); + + HttpMessageBytes msgBytes; + + if (serverMode) { + msgBytes = http.httpReq.getMsgBytes(); + parseRequestLine(line, msgBytes.method(), + msgBytes.url(), + msgBytes.query(), + msgBytes.protocol()); + } else { + msgBytes = http.httpRes.getMsgBytes(); + parseResponseLine(line, msgBytes.protocol(), + msgBytes.status(), msgBytes.message()); + } + + parseHeaders(http, msgBytes, headB); + + http.inMessage.state = HttpMessage.State.BODY_DATA; + + http.inMessage.processReceivedHeaders(); + + // TODO: hook to allow specific charsets ( can be done later ) + processProtocol(http.inMessage.protocol()); + + if (serverMode) { + // requested connection:close/keepAlive and proto + updateKeepAlive(http.getRequest().getMimeHeaders(), true); + + processExpectation(http); + + processContentDelimitation(receiveBodyState, http.getRequest()); + // Spec: + // The presence of a message-body in a request is signaled by the + // inclusion of a Content-Length or Transfer-Encoding header field in + // the request's message-headers + // Server should read - but ignore .. + receiveBodyState.noBody = !receiveBodyState.isContentDelimited(); + + updateCloseOnEnd(receiveBodyState, http, http.receiveBody); + + /* + * The presence of a message-body in a request is signaled by the + * inclusion of a Content-Length or Transfer-Encoding header field in + * the request's message-headers. A message-body MUST NOT be included + * in a request if the specification of the request method + * (section 5.1.1) does not allow sending an entity-body in requests. + * A server SHOULD read and forward a message-body on any request; if the request method does not include defined semantics for an entity-body, then the message-body SHOULD be ignored when handling the request. + */ + if (!receiveBodyState.isContentDelimited()) { + // No body + http.getIn().close(); + } + + } else { + receiveBodyState.noBody = http.getResponse().hasBody(); + + updateKeepAlive(http.getResponse().getMimeHeaders(), false); + + if (statusDropsConnection(http.getResponse().getStatus())) { + closeStreamOnEnd("response status drops connection"); + } + IOBuffer body = http.receiveBody; + processContentDelimitation(receiveBodyState, http.getResponse()); + + if (isDone(receiveBodyState, body)) { + body.close(); + } + + if (!receiveBodyState.isContentDelimited()) { + closeStreamOnEnd("not content delimited"); + } + } + + } + + private void processExpectation(HttpChannel http) throws IOException { + http.expectation = false; + MultiMap headers = http.getRequest().getMimeHeaders(); + + CBuffer expect = headers.getHeader("expect"); + if ((expect != null) + && (expect.indexOf("100-continue") != -1)) { + http.expectation = true; + + // TODO: configure, use the callback or the servlet 'read'. + net.getOut().append("HTTP/1.1 100 Continue\r\n\r\n"); + net.startSending(); + } + } + + + + /** + * Updates chunked, contentLength, remaining - based + * on headers + */ + private void processContentDelimitation(BodyState bodys, + HttpMessage httpMsg) { + + bodys.contentLength = httpMsg.getContentLength(); + if (bodys.contentLength >= 0) { + bodys.remaining = bodys.contentLength; + } + + // TODO: multiple transfer encoding headers, only process the last + String transferEncodingValue = httpMsg.getHeader(TRANSFERENCODING); + if (transferEncodingValue != null) { + int startPos = 0; + int commaPos = transferEncodingValue.indexOf(','); + String encodingName = null; + while (commaPos != -1) { + encodingName = transferEncodingValue.substring + (startPos, commaPos).toLowerCase().trim(); + if ("chunked".equalsIgnoreCase(encodingName)) { + bodys.chunked = true; + } + startPos = commaPos + 1; + commaPos = transferEncodingValue.indexOf(',', startPos); + } + encodingName = transferEncodingValue.substring(startPos) + .toLowerCase().trim(); + if ("chunked".equals(encodingName)) { + bodys.chunked = true; + httpMsg.chunked = true; + } else { + System.err.println("TODO: ABORT 501"); + //return 501; // Currently only chunked is supported for + // transfer encoding. + } + } + + if (bodys.chunked) { + bodys.remaining = 0; + } + } + + /** + * Read the request line. This function is meant to be used during the + * HTTP request header parsing. Do NOT attempt to read the request body + * using it. + * + * @throws IOException If an exception occurs during the underlying socket + * read operations, or if the given buffer is not big enough to accomodate + * the whole line. + */ + boolean parseRequestLine(BBuffer line, + BBuffer methodMB, BBuffer requestURIMB, + BBuffer queryMB, + BBuffer protoMB) + throws IOException { + + line.readToSpace(methodMB); + line.skipSpace(); + + line.readToDelimOrSpace(HttpChannel.QUESTION, requestURIMB); + if (line.remaining() > 0 && line.get(0) == HttpChannel.QUESTION) { + // Has query + line.readToSpace(queryMB); + // don't include '?' + queryMB.position(queryMB.position() + 1); + } else { + queryMB.setBytes(line.array(), line.position(), 0); + } + line.skipSpace(); + + line.readToSpace(protoMB); + + // proto is optional ( for 0.9 ) + return requestURIMB.remaining() > 0; + } + + boolean parseResponseLine(BBuffer line, + BBuffer protoMB, BBuffer statusCode, BBuffer status) + throws IOException { + line.skipEmptyLines(); + + line.readToSpace(protoMB); + line.skipSpace(); + line.readToSpace(statusCode); + line.skipSpace(); + line.wrapTo(status); + + // message may be empty + return statusCode.remaining() > 0; + } + + private void parseHeaders(HttpChannel http, HttpMessageBytes msgBytes, + BBuffer head) + throws IOException { + + head.readLine(line); + + int idx = 0; + while(line.remaining() > 0) { + // not empty.. + idx = msgBytes.addHeader(); + BBuffer nameBuf = msgBytes.getHeaderName(idx); + BBuffer valBuf = msgBytes.getHeaderValue(idx); + parseHeader(http, head, line, nameBuf, valBuf); + + // TODO: process 'interesting' headers here. + } + } + + /** + * Parse one header. + * Line must be populated. On return line will be populated + * with the next header: + * + * @param line current header line, not empty. + */ + int parseHeader(HttpChannel http, BBuffer head, + BBuffer line, BBuffer name, BBuffer value) + throws IOException { + + int newPos = line.readToDelimOrSpace(COLON, name); + line.skipSpace(); + if (line.readByte() != COLON) { + throw new IOException("Missing ':' in header name " + line); + } + line.skipSpace(); + line.read(value); // remaining of the line + + while (true) { + head.readLine(line); + if (line.remaining() == 0) { + break; + } + int first = line.get(0); + if (first != BBuffer.SP && first != BBuffer.HT) { + break; + } + // continuation line - append it to value + value.setEnd(line.getEnd()); + line.position(line.limit()); + } + + // We may want to keep the original and use separate buffer ? + http.normalizeHeader(value); + return 1; + } + + private int receiveDone(HttpChannel http, IOBuffer body, boolean frameError) throws IOException { + // Content-length case, we're done reading + body.close(); + + http.error = frameError; + if (frameError) { + closeStreamOnEnd("frame error"); + } + + return DONE; + } + + /** + * Called when raw body data is received. + * Callback should not consume past the end of the body. + * @param rawReceiveBuffers + * + */ + private void rawDataReceived(HttpChannel http, IOBuffer body, + IOBuffer rawReceiveBuffers) throws IOException { + // TODO: Make sure we don't process more than we need ( eat next req ). + // If we read too much: leave it in readBuf, the finalzation code + // should skip KeepAlive and start processing it. + // we need to read at least something - to detect -1 ( we could + // suspend right away, but seems safer + BodyState bodys = receiveBodyState; + + while (http.inMessage.state == HttpMessage.State.BODY_DATA) { + if (receiveBodyState.noBody) { + receiveDone(http, body, false); + return; + } + if (rawReceiveBuffers.isClosedAndEmpty()) { + if (receiveBodyState.isContentDelimited()) { + if (receiveBodyState.contentLength >= 0 && receiveBodyState.remaining == 0) { + receiveDone(http, body, false); + } else { + // End of input - other side closed, no more data + //log.info("CLOSE while reading " + this); + // they're not supposed to close ! + receiveDone(http, body, true); + } + } else { + receiveDone(http, body, false); // ok + } + // input connection closed ? + closeStreamOnEnd("Closed input"); + return; + } + BBucket rawBuf = rawReceiveBuffers.peekFirst(); + if (rawBuf == null) { + return; // need more data + } + + if (!bodys.isContentDelimited()) { + while (true) { + BBucket first = rawReceiveBuffers.popFirst(); + if (first == null) { + break; // will go back to check if done. + } else { + body.queue(first); + } + } + } else { + + if (bodys.contentLength >= 0 && bodys.remaining == 0) { + receiveDone(http, body, false); + return; + } + + if (bodys.chunked && bodys.remaining == 0) { + int rc = NEED_MORE; + // TODO: simplify, use readLine() + while (rc == NEED_MORE) { + rc = chunk.parseChunkHeader(rawReceiveBuffers); + if (rc == ERROR) { + http.abort("Chunk error"); + receiveDone(http, body, true); + return; + } else if (rc == NEED_MORE) { + return; + } + } + if (rc == 0) { // last chunk + receiveDone(http, body, false); + return; + } else { + bodys.remaining = rc; + } + } + + rawBuf = (BBucket) rawReceiveBuffers.peekFirst(); + if (rawBuf == null) { + return; // need more data + } + + + if (bodys.remaining < rawBuf.remaining()) { + // To buffer has more data than we need. + int lenToConsume = (int) bodys.remaining; + BBucket sb = rawReceiveBuffers.popLen(lenToConsume); + body.queue(sb); + //log.info("Queue received buffer " + this + " " + lenToConsume); + bodys.remaining = 0; + } else { + BBucket first = rawReceiveBuffers.popFirst(); + bodys.remaining -= first.remaining(); + body.queue(first); + //log.info("Queue full received buffer " + this + " RAW: " + rawReceiveBuffers); + } + if (bodys.contentLength >= 0 && bodys.remaining == 0) { + // Content-Length, all done + body.close(); + receiveDone(http, body, false); + } + } + } + } + + + + protected void sendRequest(HttpChannel http) + throws IOException { + if (switchedProtocol != null) { + switchedProtocol.sendRequest(http); + return; + } + + this.activeHttp = http; + + // Update transfer fields based on headers. + processProtocol(http.getRequest().protocol()); + updateKeepAlive(http.getRequest().getMimeHeaders(), true); + + // Update Host header + if (http.getRequest().getMimeHeaders().getHeader("Host") == null) { + String target = http.getTarget(); + if (target == null) { + throw new IOException("Missing host header"); + } + CBuffer hostH = http.getRequest().getMimeHeaders().addValue("Host"); + if (target.endsWith(":80")) { + hostH.set(target.substring(0, target.length() - 3)); + } else { + hostH.set(target); + } + } + + processContentDelimitation(sendBodyState, + http.getRequest()); + + + // 1.0: The presence of an entity body in a request is signaled by + // the inclusion of a Content-Length header field in the request + // message headers. HTTP/1.0 requests containing an entity body + // must include a valid Content-Length header field. + if (http10 && !sendBodyState.isContentDelimited()) { + // Will not close connection - just flush and mark the body + // as sent + sendBodyState.noBody = true; + } + + if (sendBodyState.noBody) { + http.getRequest().getMimeHeaders().remove(HttpChannel.CONTENT_LENGTH); + http.getRequest().getMimeHeaders().remove(TRANSFERENCODING); + http.getOut().close(); + } else { + long contentLength = + http.getRequest().getContentLength(); + if (contentLength < 0) { + http.getRequest().getMimeHeaders().addValue("Transfer-Encoding"). + set(CHUNKED); + } + } + + updateCloseOnEnd(sendBodyState, http, http.sendBody); + + try { + serialize(http.getRequest(), net.getOut()); + if (http.debug) { + http.trace("S: \n" + net.getOut()); + } + + if (http.outMessage.state == HttpMessage.State.HEAD) { + http.outMessage.state = HttpMessage.State.BODY_DATA; + } + + + // TODO: add any body and flush. More body can be added later - + // including 'end'. + + http.startSending(); + } catch (Throwable t) { + log.log(Level.SEVERE, "Error sending request", t); + abort(t.getMessage()); + } + + } + + + /** + * Determine if we must drop the connection because of the HTTP status + * code. Use the same list of codes as Apache/httpd. + */ + private boolean statusDropsConnection(int status) { + return status == 400 /* SC_BAD_REQUEST */ || + status == 408 /* SC_REQUEST_TIMEOUT */ || + status == 411 /* SC_LENGTH_REQUIRED */ || + status == 413 /* SC_REQUEST_ENTITY_TOO_LARGE */ || + status == 414 /* SC_REQUEST_URI_TOO_LARGE */ || + status == 500 /* SC_INTERNAL_SERVER_ERROR */ || + status == 503 /* SC_SERVICE_UNAVAILABLE */ || + status == 501 /* SC_NOT_IMPLEMENTED */; + } + + protected void sendResponseHeaders(HttpChannel http) + throws IOException { + if (switchedProtocol != null) { + switchedProtocol.sendResponseHeaders(http); + return; + } + + if (!serverMode) { + throw new IOException("Only in server mode"); + } + endSent = false; + IOBuffer sendBody = http.sendBody; + HttpResponse res = http.getResponse(); + if (res.isCommitted()) { + return; + } + res.setCommitted(true); + + sendBodyState.noBody = !res.hasBody(); + + if (statusDropsConnection(res.getStatus())) { + closeStreamOnEnd("status drops connection"); + } + if (http.error) { + closeStreamOnEnd("error"); + } + + MultiMap headers = res.getMimeHeaders(); + + // Add date header + if (headers.getHeader("Date") == null) { + headers.setValue("Date").set(FastHttpDateFormat.getCurrentDate()); + } + + // Add server header + if (http.serverHeader.length() > 0) { + headers.setValue("Server").set(http.serverHeader); + } + + // Decide on a transfer encoding for out. + if (keepAlive()) { // request and user allows keep alive + int cl = res.getContentLength(); + + if (http10) { + if (cl < 0 && !sendBodyState.noBody && + sendBody.isAppendClosed()) { + // We can generate content-lenght + cl = sendBody.available(); + res.setContentLength(cl); + } + if (cl < 0 && !sendBodyState.noBody) { + closeStreamOnEnd("HTTP/1.0 without content length"); + } else { + headers.setValue(CONNECTION).set(KEEPALIVE_S); + } + } else { // http11 + if (!sendBodyState.noBody) { + if (cl < 0) { + res.getMimeHeaders().setValue(TRANSFERENCODING).set(CHUNKED); + } + } + } + } else { + headers.setValue(CONNECTION).set(CLOSE); + // since we close the connection - don't bother with + // transfer encoding + headers.remove(TRANSFERENCODING); + } + + // Update our internal state based on headers we just set. + processContentDelimitation(sendBodyState, res); + updateCloseOnEnd(sendBodyState, http, sendBody); + + + if (http.debug) { + http.trace("Send response headers " + net); + } + if (net != null) { + serialize(res, net.getOut()); + } + + if (http.outMessage.state == HttpMessage.State.HEAD) { + http.outMessage.state = HttpMessage.State.BODY_DATA; + } + + if (isDone(sendBodyState, sendBody)) { + http.getOut().close(); + } + + if (net != null) { + net.startSending(); + } + } + + private void abort(String t) throws IOException { + abort(activeHttp, t); + } + + private void updateCloseOnEnd(BodyState bodys, HttpChannel http, IOBuffer body) { + if (!bodys.isContentDelimited() && !bodys.noBody) { + closeStreamOnEnd("not content delimited"); + } + } + + /** + * Disconnect abruptly - client closed, frame errors, etc + * @param t + * @throws IOException + */ + public void abort(HttpChannel http, String t) throws IOException { + if (switchedProtocol != null) { + switchedProtocol.abort(http, t); + return; + } + keepAlive = false; + if (net != null ) { + if (net.isOpen()) { + net.close(); + net.startSending(); + } + } + if (http != null) { + http.abort(t); + } + } + + /** + * Update keepAlive based on Connection header and protocol. + */ + private void updateKeepAlive(MultiMap headers, boolean request) { + if (http09) { + closeStreamOnEnd("http 0.9"); + return; + } + + // TODO: also need to remove headers matching connection + // ( like 'upgrade') + + CBuffer value = headers.getHeader(CONNECTION); + String conHeader = (value == null) ? null : value.toString(); + if (conHeader != null) { + if (CLOSE.equalsIgnoreCase(conHeader)) { + // 1.1 ( but we accept it for 1.0 too ) + closeStreamOnEnd("connection close"); + } + if (http10 && conHeader.indexOf(KEEPALIVE_S) < 0) { + // Keep-Alive required for http/1.0 + closeStreamOnEnd("connection != keep alive"); + } + // we have connection: keepalive, good + } else { + // no connection header - for 1.1 default is keepAlive, + // for 10 it's close + if (http10) { + closeStreamOnEnd("http1.0 no connection header"); + } + } + } + + @Override + public void startSending() throws IOException { + if (switchedProtocol != null) { + switchedProtocol.startSending(); + return; + } + + } + + @Override + public void startSending(HttpChannel http) throws IOException { + if (switchedProtocol != null) { + switchedProtocol.startSending(http); + return; + } + http.send(); // if needed + + if (net == null) { + return; // not connected yet. + } + + if (net.getOut().isAppendClosed()) { + abort("Net closed"); + } else { + flushToNext(http.sendBody, net.getOut()); + net.startSending(); + } + + } + + protected void outClosed(HttpChannel http) throws IOException { + if (switchedProtocol != null) { + switchedProtocol.outClosed(http); + return; + } + // TODO: move it ? + if (sendBodyState.isContentDelimited() && !http.error) { + if (!sendBodyState.chunked && + sendBodyState.remaining - http.getOut().available() > 0) { + http.abort("CLOSE CALLED WITHOUT FULL LEN"); + } + } + + } + + @Override + public void handleFlushed(IOChannel net) throws IOException { + if (switchedProtocol != null) { + switchedProtocol.handleFlushed(net); + return; + } + if (activeHttp != null) { + activeHttp.flushLock.signal(this); + activeHttp.handleFlushed(this); + if (activeHttp.sendBody.isClosedAndEmpty()) { + activeHttp.handleEndSent(); + } + } + } + + + + private void flushToNext(IOBuffer body, IOBuffer out) throws IOException { + + synchronized (this) { + // TODO: better head support + if (sendBodyState.noBody) { + for (int i = 0; i < body.getBufferCount(); i++) { + Object bc = body.peekBucket(i); + if (bc instanceof BBucket) { + ((BBucket) bc).release(); + } + } + body.clear(); + return; + } + + // TODO: only send < remainingWrite, if buffer + // keeps changing after startWrite() is called (shouldn't) + + if (sendBodyState.chunked) { + sendChunked(sendBodyState, body, out); + } else if (sendBodyState.contentLength >= 0) { + // content-length based + sendContentLen(sendBodyState, body, out); + } else { + sendCloseDelimited(body, out); + } + } + } + + private void sendCloseDelimited(IOBuffer body, IOBuffer out) throws IOException { + // Close delimitation + while (true) { + Object bc = body.popFirst(); + if (bc == null) { + break; + } + out.queue(bc); + } + if (body.isClosedAndEmpty()) { + out.close(); // no content-delimitation + } + } + + /** + * Convert the request to bytes, ready to send. + */ + public static void serialize(HttpRequest req, IOBuffer rawSendBuffers2) throws IOException { + rawSendBuffers2.append(req.method()); + rawSendBuffers2.append(BBuffer.SP); + + // TODO: encode or use decoded + rawSendBuffers2.append(req.requestURI()); + if (req.queryString().length() > 0) { + rawSendBuffers2.append("?"); + rawSendBuffers2.append(req.queryString()); + } + + rawSendBuffers2.append(BBuffer.SP); + rawSendBuffers2.append(req.protocol()); + rawSendBuffers2.append(BBuffer.CRLF_BYTES); + + serializeHeaders(req.getMimeHeaders(), rawSendBuffers2); + } + + /** + * Convert the response to bytes, ready to send. + */ + public static void serialize(HttpResponse res, IOBuffer rawSendBuffers2) throws IOException { + + rawSendBuffers2.append(res.protocol()).append(' '); + String status = Integer.toString(res.getStatus()); + rawSendBuffers2.append(status).append(' '); + if (res.getMessageBuffer().length() > 0) { + rawSendBuffers2.append(res.getMessage()); + } else { + rawSendBuffers2 + .append(res.getMessage(res.getStatus())); + } + rawSendBuffers2.append(BBuffer.CRLF_BYTES); + // Headers + serializeHeaders(res.getMimeHeaders(), rawSendBuffers2); + } + + public static void serializeHeaders(MultiMap mimeHeaders, IOBuffer rawSendBuffers2) throws IOException { + for (int i = 0; i < mimeHeaders.size(); i++) { + CBuffer name = mimeHeaders.getName(i); + CBuffer value = mimeHeaders.getValue(i); + if (name.length() == 0 || value.length() == 0) { + continue; + } + rawSendBuffers2.append(name); + rawSendBuffers2.append(Http11Connection.COLON); + rawSendBuffers2.append(value); + rawSendBuffers2.append(BBuffer.CRLF_BYTES); + } + rawSendBuffers2.append(BBuffer.CRLF_BYTES); + } + + + private boolean sendContentLen(BodyState bodys, IOBuffer body, IOBuffer out) throws IOException { + while (true) { + BBucket bucket = body.peekFirst(); + if (bucket == null) { + break; + } + int len = bucket.remaining(); + if (len <= bodys.remaining) { + bodys.remaining -= len; + bucket = body.popFirst(); + out.queue(bucket); + } else { + // Write over the end of the buffer ! + log.severe("write more than Content-Length"); + len = (int) bodys.remaining; + // data between position and limit + bucket = body.popLen((int) bodys.remaining); + out.queue(bucket); + while (bucket != null) { + bucket = body.popFirst(); + if (bucket != null) { + bucket.release(); + } + } + + // forced close + //close(); + bodys.remaining = 0; + return true; + } + } + if (body.isClosedAndEmpty()) { + //http.rawSendBuffers.queue(IOBrigade.MARK); + if (bodys.remaining > 0) { + closeStreamOnEnd("sent more than content-length"); + log.severe("Content-Length > body"); + } + return true; + } + return false; + } + + private boolean sendChunked(BodyState bodys, IOBuffer body, IOBuffer out) throws IOException { + int len = body.available(); + + if (len > 0) { + ByteBuffer sendChunkBuffer = chunk.prepareChunkHeader(len); + bodys.remaining = len; + out.queue(sendChunkBuffer); + while (bodys.remaining > 0) { + BBucket bc = body.popFirst(); + bodys.remaining -= bc.remaining(); + out.queue(bc); + } + } + + if (body.isClosedAndEmpty()) { + if (!endSent) { + out.append(chunk.endChunk()); + endSent = true; + } + return true; + } else { + return false; + } + } + + // used for chunk parsing/end + ChunkState chunk = new ChunkState(); + static final int NEED_MORE = -1; + static final int ERROR = -4; + static final int DONE = -5; + + + static class ChunkState { + static byte[] END_CHUNK_BYTES = { + (byte) '\r', (byte) '\n', + (byte) '0', + (byte) '\r', (byte) '\n', + (byte) '\r', (byte) '\n'}; + + + int partialChunkLen; + boolean readDigit = false; + boolean trailer = false; + protected boolean needChunkCrlf = false; + + // Buffer used for chunk length conversion. + protected byte[] sendChunkLength = new byte[10]; + + /** End chunk marker - will include chunked end or empty */ + protected BBuffer endSendBuffer = BBuffer.wrapper(); + + public ChunkState() { + sendChunkLength[8] = (byte) '\r'; + sendChunkLength[9] = (byte) '\n'; + } + + void recycle() { + partialChunkLen = 0; + readDigit = false; + trailer = false; + needChunkCrlf = false; + endSendBuffer.recycle(); + } + + /** + * Parse the header of a chunk. + * A chunk header can look like + * A10CRLF + * F23;chunk-extension to be ignoredCRLF + * The letters before CRLF but after the trailer mark, must be valid hex digits, + * we should not parse F23IAMGONNAMESSTHISUP34CRLF as a valid header + * according to spec + */ + int parseChunkHeader(IOBuffer buffer) throws IOException { + if (buffer.peekFirst() == null) { + return NEED_MORE; + } + if (needChunkCrlf) { + // TODO: Trailing headers + int c = buffer.read(); + if (c == BBuffer.CR) { + if (buffer.peekFirst() == null) { + return NEED_MORE; + } + c = buffer.read(); + } + if (c == BBuffer.LF) { + needChunkCrlf = false; + } else { + System.err.println("Bad CRLF " + c); + return ERROR; + } + } + + while (true) { + if (buffer.peekFirst() == null) { + return NEED_MORE; + } + int c = buffer.read(); + + if (c == BBuffer.CR) { + continue; + } else if (c == BBuffer.LF) { + break; + } else if (c == HttpChannel.SEMI_COLON) { + trailer = true; + } else if (c == BBuffer.SP) { + // ignore + } else if (trailer) { + // ignore + } else { + //don't read data after the trailer + if (Hex.DEC[c] != -1) { + readDigit = true; + partialChunkLen *= 16; + partialChunkLen += Hex.DEC[c]; + } else { + //we shouldn't allow invalid, non hex characters + //in the chunked header + log.info("Chunk parsing error1 " + c + " " + buffer); + //http.abort("Chunk error"); + return ERROR; + } + } + } + + if (!readDigit) { + log.info("Chunk parsing error2 " + buffer); + return ERROR; + } + + needChunkCrlf = true; // next time I need to parse CRLF + int result = partialChunkLen; + partialChunkLen = 0; + trailer = false; + readDigit = false; + return result; + } + + + ByteBuffer prepareChunkHeader(int current) { + int pos = 7; // 8, 9 are CRLF + while (current > 0) { + int digit = current % 16; + current = current / 16; + sendChunkLength[pos--] = Hex.HEX[digit]; + } + if (needChunkCrlf) { + sendChunkLength[pos--] = (byte) '\n'; + sendChunkLength[pos--] = (byte) '\r'; + } else { + needChunkCrlf = true; + } + // TODO: pool - this may stay in the queue while we flush more + ByteBuffer chunkBB = ByteBuffer.allocate(16); + chunkBB.put(sendChunkLength, pos + 1, 9 - pos); + chunkBB.flip(); + return chunkBB; + } + + public BBuffer endChunk() { + if (! needChunkCrlf) { + endSendBuffer.setBytes(END_CHUNK_BYTES, 2, + END_CHUNK_BYTES.length - 2); // CRLF + } else { // 0 + endSendBuffer.setBytes(END_CHUNK_BYTES, 0, + END_CHUNK_BYTES.length); + } + return endSendBuffer; + } + } + + static class BodyState { + /** response: HEAD or 1xx, 204, 304 status + * req: missing content-length or transfer-encoding + */ + protected boolean noBody = false; + protected boolean chunked = false; + protected long contentLength = -1; // C-L header + /** Bytes remaining in the current chunk or body ( if CL ) */ + protected long remaining = 0; // both chunked and C-L + + public void recycle() { + chunked = false; + remaining = 0; + contentLength = -1; + } + public boolean isContentDelimited() { + return chunked || contentLength >= 0; + } + + } + + public String toString() { + if (switchedProtocol != null) { + return switchedProtocol.toString(); + } + + return (serverMode ? "S11 " : "C11 ") + + (keepAlive() ? " KA " : ""); + } + +} diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpBody.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpBody.java deleted file mode 100644 index 11e84df33..000000000 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpBody.java +++ /dev/null @@ -1,589 +0,0 @@ -/* - */ -package org.apache.tomcat.lite.http; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.logging.Logger; - -import org.apache.tomcat.lite.io.BBucket; -import org.apache.tomcat.lite.io.BBuffer; -import org.apache.tomcat.lite.io.Hex; -import org.apache.tomcat.lite.io.IOBuffer; -import org.apache.tomcat.lite.io.IOChannel; - -/** - * Transport decoded buffer, representing the body - * of HTTP messages. - * - * Supports: - * - Chunked and Content-Length delimitation - * - "Close" delimitation ( no content delimitation - TCP close ) - * - * TODO: continue support - * TODO: gzip encoding - * - * For sending, data is kept in this buffer until flush() is called. - */ -class HttpBody extends IOBuffer { - protected static Logger log = Logger.getLogger("HttpBody"); - - static int DEFAULT_CHUNK_SIZE = 4096; - - private HttpChannel http; - - protected boolean chunked = false; - protected long contentLength = -1; // C-L header - - /** True: Http/1.x + chunked || C-L - - * False: for http/0.9, connection:close, errors. -> - * close delimited - */ - boolean frameError = false; - - - /** Bytes remaining in the current chunk or body ( if CL ) */ - protected long remaining = 0; // both chunked and C-L - - // used for chunk parsing - ChunkState chunk = new ChunkState(); - - boolean noBody; - - boolean endSent = false; - boolean sendBody = false; - - private HttpMessage httpMsg; - - HttpBody(HttpChannel asyncHttp, boolean sendBody) { - this.http = asyncHttp; - if (sendBody) { - this.sendBody = true; - // For flush and close to work - need to fix - //this.ch = http; - } else { - this.ch = null; - } - } - - public String toString() { - return "{" + super.toString() + " " + - (chunked ? "CNK/" + remaining : "") - + (contentLength >= 0 ? "C-L/" + contentLength + "/" + remaining : "") - + (isAppendClosed() ? ", C" : "") - + "}"; - } - - public void recycle() { - chunked = false; - remaining = 0; - contentLength = -1; - chunk.recycle(); - chunk.recycle(); - super.recycle(); - frameError = false; - noBody = false; - endSent = false; - } - - public boolean isContentDelimited() { - return chunked || contentLength >= 0; - } - - - - /** - * Updates chunked, contentLength, remaining - */ - protected void processContentDelimitation() { - - contentLength = httpMsg.getContentLength(); - if (contentLength >= 0) { - remaining = contentLength; - } - - // TODO: multiple transfer encoding headers, only process the last - String transferEncodingValue = httpMsg.getHeader(HttpChannel.TRANSFERENCODING); - if (transferEncodingValue != null) { - int startPos = 0; - int commaPos = transferEncodingValue.indexOf(','); - String encodingName = null; - while (commaPos != -1) { - encodingName = transferEncodingValue.substring - (startPos, commaPos).toLowerCase().trim(); - if ("chunked".equalsIgnoreCase(encodingName)) { - chunked = true; - } - startPos = commaPos + 1; - commaPos = transferEncodingValue.indexOf(',', startPos); - } - encodingName = transferEncodingValue.substring(startPos) - .toLowerCase().trim(); - if ("chunked".equals(encodingName)) { - chunked = true; - httpMsg.chunked = true; - } else { - System.err.println("TODO: ABORT 501"); - //return 501; // Currently only chunked is supported for - // transfer encoding. - } - } - - if (chunked) { - remaining = 0; - } - } - - void updateCloseOnEnd() { - if (!isContentDelimited() && !noBody) { - http.closeStreamOnEnd("not content delimited"); - } - } - - void processContentEncoding() { - // Content encoding - set it on the buffer, will be processed in blocking - // mode, after transfer encoding. -// MessageBytes contentEncodingValueMB = -// headers.getValue("content-encoding"); - -// if (contentEncodingValueMB != null) { -// if (contentEncodingValueMB.equals("gzip")) { -// buffer.addActiveFilter(gzipIF); -// } -// // TODO: other encoding filters -// // TODO: this should be separate layer -// } - } - - /** - * Determine if we must drop the connection because of the HTTP status - * code. Use the same list of codes as Apache/httpd. - */ - protected boolean statusDropsConnection(int status) { - return status == 400 /* SC_BAD_REQUEST */ || - status == 408 /* SC_REQUEST_TIMEOUT */ || - status == 411 /* SC_LENGTH_REQUIRED */ || - status == 413 /* SC_REQUEST_ENTITY_TOO_LARGE */ || - status == 414 /* SC_REQUEST_URI_TOO_LARGE */ || - status == 500 /* SC_INTERNAL_SERVER_ERROR */ || - status == 503 /* SC_SERVICE_UNAVAILABLE */ || - status == 501 /* SC_NOT_IMPLEMENTED */; - } - - static final int NEED_MORE = -1; - static final int ERROR = -4; - static final int DONE = -5; - - class ChunkState { - int partialChunkLen; - boolean readDigit = false; - boolean trailer = false; - protected boolean needChunkCrlf = false; - - // Buffer used for chunk length conversion. - protected byte[] sendChunkLength = new byte[10]; - - /** End chunk marker - will include chunked end or empty */ - protected BBuffer endSendBuffer = BBuffer.wrapper(); - - public ChunkState() { - sendChunkLength[8] = (byte) '\r'; - sendChunkLength[9] = (byte) '\n'; - } - - void recycle() { - partialChunkLen = 0; - readDigit = false; - trailer = false; - needChunkCrlf = false; - endSendBuffer.recycle(); - } - - /** - * Parse the header of a chunk. - * A chunk header can look like - * A10CRLF - * F23;chunk-extension to be ignoredCRLF - * The letters before CRLF but after the trailer mark, must be valid hex digits, - * we should not parse F23IAMGONNAMESSTHISUP34CRLF as a valid header - * according to spec - */ - int parseChunkHeader(IOBuffer buffer) throws IOException { - if (buffer.peekFirst() == null) { - return NEED_MORE; - } - if (needChunkCrlf) { - // TODO: Trailing headers - int c = buffer.read(); - if (c == BBuffer.CR) { - if (buffer.peekFirst() == null) { - return NEED_MORE; - } - c = buffer.read(); - } - if (c == BBuffer.LF) { - needChunkCrlf = false; - } else { - System.err.println("Bad CRLF " + c); - return ERROR; - } - } - - while (true) { - if (buffer.peekFirst() == null) { - return NEED_MORE; - } - int c = buffer.read(); - - if (c == BBuffer.CR) { - continue; - } else if (c == BBuffer.LF) { - break; - } else if (c == HttpChannel.SEMI_COLON) { - trailer = true; - } else if (c == BBuffer.SP) { - // ignore - } else if (trailer) { - // ignore - } else { - //don't read data after the trailer - if (Hex.DEC[c] != -1) { - readDigit = true; - partialChunkLen *= 16; - partialChunkLen += Hex.DEC[c]; - } else { - //we shouldn't allow invalid, non hex characters - //in the chunked header - log.info("Chunk parsing error1 " + c + " " + buffer); - http.abort("Chunk error"); - return ERROR; - } - } - } - - if (!readDigit) { - log.info("Chunk parsing error2 " + buffer); - return ERROR; - } - - needChunkCrlf = true; // next time I need to parse CRLF - int result = partialChunkLen; - partialChunkLen = 0; - trailer = false; - readDigit = false; - return result; - } - - - ByteBuffer prepareChunkHeader(int current) { - int pos = 7; // 8, 9 are CRLF - while (current > 0) { - int digit = current % 16; - current = current / 16; - sendChunkLength[pos--] = Hex.HEX[digit]; - } - if (needChunkCrlf) { - sendChunkLength[pos--] = (byte) '\n'; - sendChunkLength[pos--] = (byte) '\r'; - } else { - needChunkCrlf = true; - } - // TODO: pool - this may stay in the queue while we flush more - ByteBuffer chunkBB = ByteBuffer.allocate(16); - chunkBB.put(sendChunkLength, pos + 1, 9 - pos); - chunkBB.flip(); - return chunkBB; - } - - public BBuffer endChunk() { - if (! needChunkCrlf) { - endSendBuffer.setBytes(HttpChannel.END_CHUNK_BYTES, 2, - HttpChannel.END_CHUNK_BYTES.length - 2); // CRLF - } else { // 0 - endSendBuffer.setBytes(HttpChannel.END_CHUNK_BYTES, 0, - HttpChannel.END_CHUNK_BYTES.length); - } - return endSendBuffer; - } - } - - private int receiveDone(boolean frameError) throws IOException { - // Content-length case, we're done reading - close(); - - this.frameError = frameError; - if (frameError) { - http.closeStreamOnEnd("frame error"); - } - - return DONE; - } - - /** - * Called when raw body data is received. - * Callback should not consume past the end of the body. - * @param rawReceiveBuffers - * - */ - void rawDataReceived(IOBuffer rawReceiveBuffers) throws IOException { - // TODO: Make sure we don't process more than we need ( eat next req ). - // If we read too much: leave it in readBuf, the finalzation code - // should skip KeepAlive and start processing it. - // we need to read at least something - to detect -1 ( we could - // suspend right away, but seems safer. - while (http.inMessage.state == HttpMessage.State.BODY_DATA) { - //log.info("RAW DATA: " + this + " RAW: " + rawReceiveBuffers); - if (noBody) { - receiveDone(false); - return; - } - if (rawReceiveBuffers.isClosedAndEmpty()) { - if (isContentDelimited()) { - if (contentLength >= 0 && remaining == 0) { - receiveDone(false); - } else { - // End of input - other side closed, no more data - //log.info("CLOSE while reading " + this); - // they're not supposed to close ! - receiveDone(true); - } - } else { - receiveDone(false); // ok - } - // input connection closed ? - http.closeStreamOnEnd("Closed input"); - return; - } - BBucket rawBuf = rawReceiveBuffers.peekFirst(); - if (rawBuf == null) { - return; // need more data - } - - if (!isContentDelimited()) { - while (true) { - BBucket first = rawReceiveBuffers.popFirst(); - if (first == null) { - break; // will go back to check if done. - } else { - super.queue(first); - } - } - } else { - - if (contentLength >= 0 && remaining == 0) { - receiveDone(false); - return; - } - - if (chunked && remaining == 0) { - int rc = NEED_MORE; - while (rc == NEED_MORE) { - rc = chunk.parseChunkHeader(rawReceiveBuffers); - if (rc == ERROR) { - receiveDone(true); - return; - } else if (rc == NEED_MORE) { - return; - } - } - if (rc == 0) { // last chunk - receiveDone(false); - return; - } else { - remaining = rc; - } - } - - rawBuf = (BBucket) rawReceiveBuffers.peekFirst(); - if (rawBuf == null) { - return; // need more data - } - - - if (remaining < rawBuf.remaining()) { - // To buffer has more data than we need. - int lenToConsume = (int) remaining; - BBucket sb = rawReceiveBuffers.popLen(lenToConsume); - super.queue(sb); - //log.info("Queue received buffer " + this + " " + lenToConsume); - remaining = 0; - } else { - BBucket first = rawReceiveBuffers.popFirst(); - remaining -= first.remaining(); - super.queue(first); - //log.info("Queue full received buffer " + this + " RAW: " + rawReceiveBuffers); - } - if (contentLength >= 0 && remaining == 0) { - // Content-Length, all done - super.close(); - receiveDone(false); - } - } - } - } - - void flushToNext() throws IOException { - http.sendHeaders(); // if needed - - if (getNet() == null) { - return; // not connected yet. - } - - synchronized (this) { - if (noBody) { - for (int i = 0; i < super.getBufferCount(); i++) { - Object bc = super.peekBucket(i); - if (bc instanceof BBucket) { - ((BBucket) bc).release(); - } - } - super.clear(); - return; - } - // TODO: only send < remainingWrite, if buffer - // keeps changing after startWrite() is called (shouldn't) - boolean done = false; - - if (chunked) { - done = sendChunked(); - } else if (contentLength >= 0) { - // content-length based - done = sendContentLen(); - } else { - // Close delimitation - while (true) { - Object bc = popFirst(); - if (bc == null) { - break; - } - getNet().getOut().queue(bc); - } - if (super.isClosedAndEmpty()) { - done = true; - if (getNet() != null) { - getNet().getOut().close(); // no content-delimitation - } - } - } - } - } - - private boolean sendContentLen() throws IOException { - while (true) { - BBucket bucket = super.peekFirst(); - if (bucket == null) { - break; - } - int len = bucket.remaining(); - if (len <= remaining) { - remaining -= len; - bucket = super.popFirst(); - getNet().getOut().queue(bucket); - } else { - // Write over the end of the buffer ! - log.severe(http.dbgName + - ": write more than Content-Length"); - len = (int) remaining; - // data between position and limit - bucket = popLen((int) remaining); - getNet().getOut().queue(bucket); - while (bucket != null) { - bucket = super.popFirst(); - if (bucket != null) { - bucket.release(); - } - } - - // forced close - //close(); - remaining = 0; - return true; - } - } - if (super.isClosedAndEmpty()) { - //http.rawSendBuffers.queue(IOBrigade.MARK); - if (remaining > 0) { - http.closeStreamOnEnd("sent more than content-length"); - log.severe("Content-Length > body"); - } - return true; - } - return false; - } - - public void close() throws IOException { - if (sendBody && !http.error) { - flushToNext(); // will send any remaining data. - } - - if (isContentDelimited() && !http.error) { - if (!chunked && remaining > 0) { - log.severe("CLOSE CALLED WITHOUT FULL LEN"); - // TODO: abort ? - } else { - super.close(); - if (sendBody) { - flushToNext(); // will send '0' - } - } - } else { - super.close(); - if (sendBody) { - flushToNext(); - } - } - } - - private boolean sendChunked() throws IOException { - int len = 0; - int cnt = super.getBufferCount(); - for (int i = 0; i < cnt; i++) { - BBucket iob = super.peekBucket(i); - len += iob.remaining(); - } - - if (len > 0) { - ByteBuffer sendChunkBuffer = chunk.prepareChunkHeader(len); - remaining = len; - getNet().getOut().queue(sendChunkBuffer); - while (cnt > 0) { - Object bc = super.popFirst(); - getNet().getOut().queue(bc); - cnt --; - } - } - - if (super.isClosedAndEmpty()) { - if (!endSent) { - getNet().getOut().append(chunk.endChunk()); - endSent = true; - } - //http.rawSendBuffers.queue(IOBrigade.MARK); - return true; - } else { - return false; - } - } - - boolean isDone() { - if (noBody) { - return true; - } - if (isContentDelimited()) { - if (!chunked && remaining == 0) { - return true; - } else if (chunked && super.isAppendClosed()) { - return true; - } - } - return false; - } - - IOChannel getNet() { - return http.getNet(); - } - - public void setMessage(HttpMessage httpMsg) { - this.httpMsg = httpMsg; - } -} diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpChannel.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpChannel.java index df8878c6e..170dbaf9e 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpChannel.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpChannel.java @@ -16,20 +16,15 @@ package org.apache.tomcat.lite.http; import java.io.IOException; -import java.util.ArrayList; import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.tomcat.lite.http.HttpMessage.HttpMessageBytes; +import org.apache.tomcat.lite.http.HttpConnector.HttpConnection; +import org.apache.tomcat.lite.io.BBucket; import org.apache.tomcat.lite.io.BBuffer; import org.apache.tomcat.lite.io.CBuffer; -import org.apache.tomcat.lite.io.DumpChannel; -import org.apache.tomcat.lite.io.FastHttpDateFormat; -import org.apache.tomcat.lite.io.BBucket; import org.apache.tomcat.lite.io.IOBuffer; import org.apache.tomcat.lite.io.IOChannel; -import org.apache.tomcat.lite.io.IOConnector; /** * HTTP async client and server, based on tomcat NIO/APR connectors @@ -48,46 +43,36 @@ public class HttpChannel extends IOChannel { static AtomicInteger serCnt = new AtomicInteger(); - protected static Logger log = Logger.getLogger("HttpCh"); + public static final String CONTENT_LENGTH= "Content-Length"; - boolean debug = false; - // Used to receive an entity - headers + maybe some body - // read() must first consume extra data from this buffer. - // Next reads will be direct from socket. - protected BBuffer headRecvBuf = BBuffer.allocate(HEADER_SIZE); - BBuffer line = BBuffer.wrapper(); + public static final String HTTP_10 = "HTTP/1.0"; - // ---- Buffers owned by the AsyncHttp object ---- + public static final String HTTP_11 = "HTTP/1.1"; - BBuffer headB = BBuffer.wrapper(); - FutureCallbacks doneLock = new FutureCallbacks(); - ArrayList filters = new ArrayList(); - - // ---------- Body read side ------------ - - // Set if Exect: 100-continue was set on reqest. - // If this is the case - body won't be sent until - // server responds ( client ) and server will only - // read body after ack() - or skip to next request - // without swallowing the body. - protected boolean expectation = false; + /** + * SEMI_COLON. + */ + public static final byte SEMI_COLON = (byte) ';'; - /** Ready for recycle, if send/receive are done */ - protected boolean release = false; + public static final byte QUESTION = (byte) '?'; - protected boolean sendReceiveDone = false; + protected static Logger log = Logger.getLogger("HttpChannel"); + - // ----------- Body write side ------------ + boolean debug = false; + // ---- Callbacks and locks - // TODO: setters + FutureCallbacks doneLock = new FutureCallbacks(); + FutureCallbacks headersReceivedLock = + new FutureCallbacks(); /** * Called when the incoming headers have been received. * ( response for client mode, request for server mode ) * @throws IOException */ - HttpService httpService; + protected HttpService httpService; /** * Called when: * - body sent @@ -101,104 +86,70 @@ public class HttpChannel extends IOChannel { * - returned to the pool. */ private RequestCompleted doneAllCallback; + protected boolean sendReceiveDone = false; + // Will be signalled (open) when the buffer is empty. + FutureCallbacks flushLock = new FutureCallbacks(); - HttpMessage inMessage; - - - HttpMessage outMessage; - - // receive can be for request ( server mode ) or response ( client ) - HttpBody receiveBody = new HttpBody(this, false); - HttpBody sendBody = new HttpBody(this, true); - - private HttpRequest httpReq; - private HttpResponse httpRes; + FutureCallbacks doneFuture; + boolean doneCallbackCalled = false; - boolean headersDone = false; - protected boolean serverMode = false; - // ---------- Client only ------------ - - //protected HttpParser parser = new HttpParser(); - - protected String dbgName = this.getClass().getSimpleName(); - // ----- Pools - sockets, heavy objects ------------- - // If client mode - what host we are connected to. - protected String host; - protected int port; + // ---------- - private HttpConnector httpConnector; - // ------ JMX - protected int ser; // id - for jmx registration and logs - // Server side only - protected String serverHeader = "TomcatLite"; + // Set if Exect: 100-continue was set on reqest. + // If this is the case - body won't be sent until + // server responds ( client ) and server will only + // read body after ack() - or skip to next request + // without swallowing the body. + protected boolean expectation = false; + + /** Ready for recycle, if send/receive are done */ + protected boolean release = false; - protected boolean http11 = false; + // ----------- - protected boolean http09 = false; + protected boolean headersDone = false; protected boolean error = false; protected boolean abortDone = false; - FutureCallbacks doneFuture; - boolean doneCallbackCalled = false; + + + protected int ser; // id - for jmx registration and logs + protected int channelId; /** - * Close connection when done writting, no content-length/chunked, - * or no keep-alive ( http/1.0 ) or error. - * - * ServerMode: set if HTTP/0.9 &1.0 || !keep-alive - * ClientMode: not currently used - */ - boolean keepAlive = true; + * Null after endSendReceive and before sending the request + */ + HttpConnection conn; - // Will be signalled (open) when the buffer is empty. - private FutureCallbacks flushLock = new FutureCallbacks(); + HttpConnector httpConnector; - // -- Lifecycle -- - - Runnable dispatcherRunnable = new Runnable() { - @Override - public void run() { - getConnector().getDispatcher().runService(HttpChannel.this); - } + // Different ways to point to request response (server/client) + HttpRequest httpReq; + HttpResponse httpRes; + HttpMessage inMessage; + HttpMessage outMessage; + // receive can be for request ( server mode ) or response ( client ) + IOBuffer receiveBody = new IOBuffer(); + + // notify us that user called close() + IOBuffer sendBody = new IOBuffer() { + public void close() throws IOException { + if (isAppendClosed()) { + return; + } + super.close(); + outClosed(); + } }; - long ioTimeout = 30 * 60000; // 30 min seems high enough - - public static final String CONTENT_LENGTH= "Content-Length"; - /** - * HTTP/1.0. - */ - public static final String HTTP_10 = "HTTP/1.0"; - - public static final String HTTP_11 = "HTTP/1.1"; - - public static final String CHUNKED = "chunked"; - - public static final String CLOSE = "close"; + // Server side only + protected String serverHeader = "TomcatLite"; - public static final String KEEPALIVE_S = "keep-alive"; - + long ioTimeout = 30 * 60000; // 30 min seems high enough - public static final String CONNECTION = "Connection"; - - public static final String TRANSFERENCODING = "Transfer-Encoding"; - - /** - * SEMI_COLON. - */ - public static final byte SEMI_COLON = (byte) ';'; - - static byte[] END_CHUNK_BYTES = { - (byte) '\r', (byte) '\n', - (byte) '0', - (byte) '\r', (byte) '\n', - (byte) '\r', (byte) '\n'}; - - public static final byte QUESTION = (byte) '?'; - - static final byte COLON = (byte) ':'; + public HttpChannel() { ser = serCnt.incrementAndGet(); httpReq = new HttpRequest(this); @@ -230,38 +181,31 @@ public class HttpChannel extends IOChannel { checkRelease(); trace("abort " + t); log.info("Abort connection " + t); - if (net != null ) { - if (net.isOpen()) { - net.close(); - net.startSending(); - } + if (conn != null) { + conn.abort(this, t); } inMessage.state = HttpMessage.State.DONE; outMessage.state = HttpMessage.State.DONE; sendReceiveDone = true; error = true; - close(); handleEndSendReceive(); } - public HttpChannel addFilterAfter(IOChannel filter) { - filters.add(filter); - return this; - } - - + /** + * If release was called - throw exception, you shouldn't use + * the object again. + * @throws IOException + */ private void checkRelease() throws IOException { if (release && sendReceiveDone) { throw new IOException("Object released"); } } - void closeStreamOnEnd(String cause) { - if (debug) - log.info("Not reusing connection because: " + cause); - keepAlive = false; - } - + /** + * Called when the request is done. Need to send remaining byte. + * + */ public void complete() throws IOException { checkRelease(); if (!getOut().isAppendClosed()) { @@ -295,21 +239,10 @@ public class HttpChannel extends IOChannel { return read; } - public void flushNet() throws IOException { - checkRelease(); - if (net != null) { - net.startSending(); - } - } - public HttpConnector getConnector() { return httpConnector; } - public FutureCallbacks getDoneFuture() { - return doneLock; - } - public boolean getError() { return error; } @@ -329,8 +262,12 @@ public class HttpChannel extends IOChannel { return ioTimeout; } + // TODO: replace with getSocketChannel - used for remote addr, etc public IOChannel getNet() { - return net; + if (conn == null) { + return null; + } + return conn.getSink(); } @@ -350,9 +287,8 @@ public class HttpChannel extends IOChannel { public String getState() { return - (serverMode ? "SRV:" : "") + - (keepAlive() ? " KA " : "") - + "RCV=[" + inMessage.state.toString() + " " + + conn + + "RCV=[" + inMessage.state.toString() + " " + receiveBody.toString() + "] SND=[" + outMessage.state.toString() + " " + sendBody.toString() + "]"; @@ -366,10 +302,10 @@ public class HttpChannel extends IOChannel { public String getTarget() { - if (host == null) { - return ":" + port; + if (target == null) { + return ":0"; // server mode ? } - return host + ":" + port; + return target; } @@ -378,19 +314,13 @@ public class HttpChannel extends IOChannel { * is completed ( or if there is no req body ) * @throws IOException */ - protected void handleEndReceive(boolean frameError) throws IOException { + protected void handleEndReceive() throws IOException { if (inMessage.state == HttpMessage.State.DONE) { return; } if (debug) { - trace("END_RECV " + ((frameError) ? " FRAME_ERROR" : "")); + trace("END_RECV"); } - if (frameError) { - closeStreamOnEnd("frame error"); - // TODO: next read() should throw exception !! - error = true; - } - getIn().close(); inMessage.state = HttpMessage.State.DONE; @@ -419,31 +349,20 @@ public class HttpChannel extends IOChannel { doneCallbackCalled = true; } - if (!keepAlive() && net != null) { - net.getOut().close(); // shutdown output if not done - net.getIn().close(); // this should close the socket - net.startSending(); - } - + getIn().close(); + if (doneAllCallback != null) { doneAllCallback.handle(this, error ? new Throwable() : null); } - // Remove the net object - will be pooled separtely - IOChannel ch = this.net; - if (ch != null && keepAlive()) { - - boolean keepOpen = ch.isOpen(); - - resetBuffers(); // net is now NULL - can't send anything more - if (getConnector() != null) { - getConnector().returnSocket(ch, serverMode, keepOpen); - } + if (conn != null) { + conn.endSendReceive(this); } + conn = null; + if (debug) { trace("END_SEND_RECEIVE" - + (!keepAlive() ? " CLOSE_ON_END " : "") + (release ? " REL" : "")); } @@ -482,67 +401,11 @@ public class HttpChannel extends IOChannel { System.err.println("Error " + type + " " + outMessage.state); } - @Override - public void handleFlushed(IOChannel net) throws IOException { - flushLock.signal(this); - super.handleFlushed(this); - if (sendBody.isClosedAndEmpty()) { - handleEndSent(); - } - } - - /** - * Called when the net has readable data. - */ - @Override - public void handleReceived(IOChannel net) throws IOException { - try { - if (getConnector() == null) { - throw new IOException("Data received after release"); - } - if (net == null) { - return; // connection released - } - if (net.getIn().isClosedAndEmpty()) { - // Close received - closeStreamOnEnd("close on input 2"); - if (inMessage.state == HttpMessage.State.HEAD) { - trace("NET CLOSE WHILE READING HEAD"); - abort(new IOException("Connection closed")); - return; - } else if (inMessage.state == HttpMessage.State.DONE) { - // Close received - make sure we close out - if (sendBody.isClosedAndEmpty()) { - net.getOut().close(); - } - return; - } - } - if (debug) { - trace("Http data received " + inMessage.state + " " + - net.getIn() + " headerDone=" + headersDone); - } - - if (inMessage.state == HttpMessage.State.HEAD) { - headDataReceived(); - if (inMessage.state == HttpMessage.State.HEAD) { - return; // still parsing head - } - if (serverMode && httpReq.decodedUri.remaining() == 0) { - abort("Invalid url"); - } - } - - if (inMessage.state == HttpMessage.State.BODY_DATA) { - if (net != null) { - receiveBody.rawDataReceived(net.getIn()); - } - } - - // Send header callbacks - we process any incoming data - // first, so callbacks have more info - if (httpService != null && !headersDone) { - headersDone = true; + void handleHeadersReceived(HttpMessage in) throws IOException { + if (!headersDone) { + headersDone = true; + headersReceivedLock.signal(this); + if (httpService != null) { try { httpService.service(getRequest(), getResponse()); } catch (Throwable t) { @@ -550,122 +413,11 @@ public class HttpChannel extends IOChannel { abort(t); } } - - // If header callback or previous dataReceived() hasn't consumed all - if (receiveBody.getBufferCount() > 0) { - // Has data - super.sendHandleReceivedCallback(); // callback - } - - // Receive has marked the body as closed - if (receiveBody.isAppendClosed() - && inMessage.state != HttpMessage.State.DONE) { - if (net != null && net.getIn().getBufferCount() > 0) { - if (debug) { - trace("Pipelined requests"); // may be a crlf - } - } - handleEndReceive(receiveBody.frameError); - } - - if (inMessage.state == HttpMessage.State.DONE) { - // TCP end ? - if (net == null || net.getIn() == null) { - trace("NO NET"); - return; - } - if (net.getIn().isClosedAndEmpty()) { - // If not already closed. - closeStreamOnEnd("closed on input 3"); - if (outMessage.state == HttpMessage.State.DONE) { - // need to flush out. - net.getOut().close(); - flushNet(); - } - } else { - // Next request, ignore it. - } - - } - } catch (IOException ex) { - ex.printStackTrace(); - abort(ex); - } - } - - - /** - * Read and process a chunk of head, called from dataReceived() if - * in HEAD mode. - * - * @return <= 0 - still in head mode. > 0 moved to body mode, some - * body chunk may have been received. - */ - protected void headDataReceived() throws IOException { - while (true) { - // we know we have one - int read = net.getIn().readLine(headRecvBuf); - if (read < 0) { - if (debug) { - trace("CLOSE while reading HEAD"); - } - // too early - we don't have the head - abort("Close in head"); - return; - } - // Remove starting empty lines. - headRecvBuf.skipEmptyLines(); - - // Do we have another full line in the input ? - if (BBuffer.hasLFLF(headRecvBuf)) { - break; - } - if (read == 0) { // no more data - return; - } - } - headRecvBuf.wrapTo(headB); - - - parseMessage(headB); - - - if (debug) { - trace("HEAD_RECV " + getRequest().requestURI() + " " + - getResponse().getMsgBytes().status() + " " + net.getIn()); - } - - } - - public void parseMessage(BBuffer headB) throws IOException { - //Parse the response - headB.readLine(line); - - HttpMessageBytes msgBytes; - - if (serverMode) { - msgBytes = httpReq.getMsgBytes(); - parseRequestLine(line, msgBytes.method(), - msgBytes.url(), - msgBytes.query(), - msgBytes.protocol()); - } else { - msgBytes = httpRes.getMsgBytes(); - parseResponseLine(line, msgBytes.protocol(), - msgBytes.status(), msgBytes.message()); } - - parseHeaders(msgBytes, headB); - - inMessage.state = HttpMessage.State.BODY_DATA; - - // TODO: hook to allow specific charsets ( can be done later ) + } - inMessage.processReceivedHeaders(); - } private void init() { - headRecvBuf.recycle(); headersDone = false; sendReceiveDone = false; @@ -673,25 +425,19 @@ public class HttpChannel extends IOChannel { sendBody.recycle(); expectation = false; - http11 = false; - http09 = false; error = false; abortDone = false; getRequest().recycle(); getResponse().recycle(); - host = null; - filters.clear(); - - line.recycle(); - headB.recycle(); + target = null; doneLock.recycle(); + headersReceivedLock.recycle(); flushLock.recycle(); doneCallbackCalled = false; - keepAlive = true; // Will be set again after pool setHttpService(null); doneAllCallback = null; @@ -702,13 +448,6 @@ public class HttpChannel extends IOChannel { return outMessage.state == HttpMessage.State.DONE && inMessage.state == HttpMessage.State.DONE; } - public boolean keepAlive() { - if (http09) { - return false; - } - return keepAlive; - } - /** * Called when all done: * - service finished ( endService was called ) @@ -786,174 +525,6 @@ public class HttpChannel extends IOChannel { } - /** - * Parse one header. - * Line must be populated. On return line will be populated - * with the next header: - * - * @param line current header line, not empty. - */ - public int parseHeader(BBuffer head, - BBuffer line, BBuffer name, BBuffer value) - throws IOException { - - int newPos = line.readToDelimOrSpace(COLON, name); - line.skipSpace(); - if (line.readByte() != COLON) { - throw new IOException("Missing ':' in header name " + line); - } - line.skipSpace(); - line.read(value); // remaining of the line - - while (true) { - head.readLine(line); - if (line.remaining() == 0) { - break; - } - byte first = line.get(0); - if (first != BBuffer.SP && first != BBuffer.HT) { - break; - } - // continuation line - append it to value - value.setEnd(line.getEnd()); - line.position(line.limit()); - } - - // We may want to keep the original and use separate buffer ? - normalizeHeader(value); - return 1; - } - - public void parseHeaders(HttpMessageBytes msgBytes, - BBuffer head) - throws IOException { - - head.readLine(line); - - int idx = 0; - while(line.remaining() > 0) { - // not empty.. - idx = msgBytes.addHeader(); - BBuffer nameBuf = msgBytes.getHeaderName(idx); - BBuffer valBuf = msgBytes.getHeaderValue(idx); - parseHeader(head, line, nameBuf, valBuf); - - // TODO: process 'interesting' headers here. - } - } - - /** - * Read the request line. This function is meant to be used during the - * HTTP request header parsing. Do NOT attempt to read the request body - * using it. - * - * @throws IOException If an exception occurs during the underlying socket - * read operations, or if the given buffer is not big enough to accomodate - * the whole line. - */ - public boolean parseRequestLine(BBuffer line, - BBuffer methodMB, BBuffer requestURIMB, - BBuffer queryMB, - BBuffer protoMB) - throws IOException { - - line.readToSpace(methodMB); - line.skipSpace(); - - line.readToDelimOrSpace(QUESTION, requestURIMB); - if (line.remaining() > 0 && line.get(0) == QUESTION) { - // Has query - line.readToSpace(queryMB); - // don't include '?' - queryMB.position(queryMB.position() + 1); - } else { - queryMB.setBytes(line.array(), line.position(), 0); - } - line.skipSpace(); - - line.readToSpace(protoMB); - - // proto is optional ( for 0.9 ) - return requestURIMB.remaining() > 0; - } - - public boolean parseResponseLine(BBuffer line, - BBuffer protoMB, BBuffer statusCode, BBuffer status) - throws IOException { - line.skipEmptyLines(); - - line.readToSpace(protoMB); - line.skipSpace(); - line.readToSpace(statusCode); - line.skipSpace(); - line.wrapTo(status); - - // message may be empty - return statusCode.remaining() > 0; - } - - /** - * Update keepAlive based on Connection header and protocol. - */ - void processConnectionHeader(MultiMap headers) { - if (http09) { - return; - } - - CBuffer value = headers.getHeader(HttpChannel.CONNECTION); - String conHeader = (value == null) ? null : value.toString(); - if (conHeader != null) { - if (HttpChannel.CLOSE.equalsIgnoreCase(conHeader)) { - closeStreamOnEnd("connection close"); - } - if (!HttpChannel.KEEPALIVE_S.equalsIgnoreCase(conHeader)) { - closeStreamOnEnd("connection != keep alive"); - } - } else { - // no connection header - if (!http11) { - closeStreamOnEnd("http1.0 no connection header"); - } - } - } - - void processExpectation() throws IOException { - expectation = false; - MultiMap headers = getRequest().getMimeHeaders(); - - CBuffer expect = headers.getHeader("expect"); - if ((expect != null) - && (expect.indexOf("100-continue") != -1)) { - expectation = true; - - // TODO: configure, use the callback or the servlet 'read'. - net.getOut().append("HTTP/1.1 100 Continue\r\n\r\n"); - net.startSending(); - } - } - - void processProtocol() throws IOException { - http11 = true; - http09 = false; - - CBuffer protocolMB = getRequest().protocol(); - if (protocolMB.equals(HttpChannel.HTTP_11)) { - http11 = true; - } else if (protocolMB.equals(HttpChannel.HTTP_10)) { - http11 = false; - } else if (protocolMB.equals("")) { - http09 = true; - http11 = false; - } else { - // Unsupported protocol - http11 = false; - error = true; - // Send 505; Unsupported HTTP version - getResponse().setStatus(505); - abort("Invalid protocol"); - } - } - protected void recycle() { if (debug) { trace("RECYCLE"); @@ -983,265 +554,49 @@ public class HttpChannel extends IOChannel { } } - public void resetBuffers() { - if (net != null) { - net.setDataFlushedCallback(null); - net.setDataReceivedCallback(null); - setSink(null); - } - } - - public void sendHeaders() throws IOException { + public void send() throws IOException { checkRelease(); - if (serverMode) { - sendResponseHeaders(); - } else { - sendRequest(); - } - } - - /** - * Can be called from any thread. - * - * @param host - * @param port - * @throws IOException - */ - public void sendRequest() throws IOException { - if (getRequest().isCommitted()) { - return; - } - getRequest().setCommitted(true); - - String target = host + ":" + port; - - if (getRequest().getMimeHeaders().getHeader("Host") == null - && host != null) { - CBuffer hostH = getRequest().getMimeHeaders().addValue("Host"); - hostH.set(host); // TODO: port - } - - outMessage.state = HttpMessage.State.HEAD; - - IOChannel ch = getConnector().cpool.getChannel(target); - - if (ch == null) { - if (debug) { - trace("HTTP_CONNECT: New connection " + target); - } - IOConnector.ConnectedCallback connected = new IOConnector.ConnectedCallback() { - @Override - public void handleConnected(IOChannel ch) throws IOException { - if (httpConnector.debugHttp) { - IOChannel ch1 = new DumpChannel(""); - ch.addFilterAfter(ch1); - ch = ch1; - } - - sendRequestHeaders(ch); - } - }; - getConnector().getIOConnector().connect(host, port, connected); + if (httpReq == inMessage) { + conn.sendResponseHeaders(this); } else { - if (debug) { - trace("HTTP_CONNECT: Reuse connection " + target + " " + this); - } - // TODO retry if closed - sendRequestHeaders(ch); - } - } - - /** - * Used in request mode. - * - * @throws IOException - */ - void sendRequestHeaders(IOChannel ch) throws IOException { - if (getConnector() == null) { - throw new IOException("after release"); - } - if (!ch.isOpen()) { - abort("Closed channel"); - return; - } - setChannel(ch); // register read/write callbacks - - // Update transfer fields based on headers. - processProtocol(); - - processConnectionHeader(getRequest().getMimeHeaders()); - - - // 1.0: The presence of an entity body in a request is signaled by - // the inclusion of a Content-Length header field in the request - // message headers. HTTP/1.0 requests containing an entity body - // must include a valid Content-Length header field. - - if (!sendBody.isContentDelimited()) { - // Will not close connection - just flush and mark the body - // as sent - sendBody.noBody = true; - getOut().close(); - } - - if (sendBody.noBody) { - getRequest().getMimeHeaders().remove(HttpChannel.CONTENT_LENGTH); - getRequest().getMimeHeaders().remove(HttpChannel.TRANSFERENCODING); - } else { - long contentLength = - getRequest().getContentLength(); - if (contentLength < 0) { - getRequest().getMimeHeaders().addValue(HttpChannel.TRANSFERENCODING). - set(HttpChannel.CHUNKED); - } - sendBody.processContentDelimitation(); - } - - sendBody.updateCloseOnEnd(); - - try { - getRequest().serialize(net.getOut()); - if (debug) { - trace("S: \n" + net.getOut()); + if (getRequest().isCommitted()) { + return; } - - } catch (Throwable t) { - log.log(Level.SEVERE, "Error sending request", t); - } - - if (outMessage.state == HttpMessage.State.HEAD) { - outMessage.state = HttpMessage.State.BODY_DATA; + getRequest().setCommitted(true); + + outMessage.state = HttpMessage.State.HEAD; + + getConnector().connectAndSend(this); } - - // TODO: add any body and flush. More body can be added later - - // including 'end'. - - startSending(); - } - - /** - * When committing the response, we have to validate the set of headers, as - * well as setup the response filters. - * Only in server mode. - */ - void sendResponseHeaders() throws IOException { - checkRelease(); - if (!serverMode) { - throw new IOException("Only in server mode"); - } - - if (getResponse().isCommitted()) { - return; - } - getResponse().setCommitted(true); - - sendBody.noBody = !getResponse().hasBody(); - - if (sendBody.statusDropsConnection(getResponse().getStatus())) { - closeStreamOnEnd("status drops connection"); - } - if (error) { - closeStreamOnEnd("error"); - } - - // A header explicitely set. - CBuffer transferEncHeader = - getResponse().getMimeHeaders().getHeader(HttpChannel.TRANSFERENCODING); - if (!sendBody.noBody - && keepAlive()) { - if (getResponse().getContentLength() < 0) { - // Use chunked by default, if no c-l - if (transferEncHeader == null) { - getResponse().getMimeHeaders().addValue(HttpChannel.TRANSFERENCODING).set(HttpChannel.CHUNKED); - } else { - transferEncHeader.set(HttpChannel.CHUNKED); - } - } - } - - sendBody.processContentDelimitation(); - - sendBody.updateCloseOnEnd(); - - MultiMap headers = getResponse().getMimeHeaders(); - - // Add date header - if (headers.getHeader("Date") == null) { - headers.setValue("Date").set(FastHttpDateFormat.getCurrentDate()); - } - - // Add server header - if (serverHeader.length() > 0) { - headers.setValue("Server").set(serverHeader); - } - - // did the user set a connection header that may override what we have ? - processConnectionHeader(headers); - - if (!keepAlive()) { - headers.setValue(HttpChannel.CONNECTION).set(HttpChannel.CLOSE); - } else { - if (!http11 && !http09) { - headers.setValue(HttpChannel.CONNECTION).set(HttpChannel.KEEPALIVE_S); - } - } - if (debug) { - trace("Send response headers " + net); - } - if (net != null) { - getResponse().serialize(net.getOut()); - } - - if (outMessage.state == HttpMessage.State.HEAD) { - outMessage.state = HttpMessage.State.BODY_DATA; - } - - if (sendBody.isDone()) { - getOut().close(); - } - - if (net != null) { - net.startSending(); + /** Called when the outgoing stream is closed: + * - by an explicit call to close() + * - when all content has been sent. + */ + protected void outClosed() throws IOException { + if (conn != null) { + conn.outClosed(this); } } public HttpChannel serverMode(boolean enabled) { if (enabled) { - serverMode = true; - dbgName = "AsyncHttpServer"; httpReq.setBody(receiveBody); httpRes.setBody(sendBody); - sendBody.setMessage(httpRes); - receiveBody.setMessage(httpReq); inMessage = httpReq; outMessage = httpRes; } else { - serverMode = false; - dbgName = "AsyncHttp"; httpReq.setBody(sendBody); httpRes.setBody(receiveBody); - sendBody.setMessage(httpReq); - receiveBody.setMessage(httpRes); inMessage = httpRes; outMessage = httpReq; } if (debug) { - log = Logger.getLogger(dbgName); } return this; } - public void setChannel(IOChannel ch) throws IOException { - for (IOChannel filter: filters) { - ch.addFilterAfter(filter); - ch = filter; - } - - withBuffers(ch); - } - public void setCompletedCallback(RequestCompleted doneAllCallback) throws IOException { this.doneAllCallback = doneAllCallback; @@ -1271,16 +626,16 @@ public class HttpChannel extends IOChannel { ioTimeout = timeout; } - public void setTarget(String host, int port) { - this.host = host; - this.port = port; + + public void setTarget(String host) { + this.target = host; } public void startSending() throws IOException { checkRelease(); - - sendBody.flushToNext(); - flushNet(); + if (conn != null) { + conn.startSending(this); + } } public String toString() { @@ -1305,13 +660,11 @@ public class HttpChannel extends IOChannel { flushLock.waitSignal(timeMs); } - public HttpChannel withBuffers(IOChannel net) { - setSink(net); - net.setDataFlushedCallback(this); - net.setDataReceivedCallback(this); + public HttpChannel setConnection(HttpConnection conn) { + this.conn = conn; return this; } - + /** * Normalize URI. *

@@ -1439,5 +792,13 @@ public class HttpChannel extends IOChannel { void handle(HttpChannel data, Object extraData) throws IOException; } + Runnable dispatcherRunnable = new Runnable() { + @Override + public void run() { + getConnector().getDispatcher().runService(HttpChannel.this); + } + }; + + } \ No newline at end of file diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpConnector.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpConnector.java index 3bad3b0af..3050bf184 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpConnector.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpConnector.java @@ -15,10 +15,14 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; import org.apache.tomcat.lite.http.HttpChannel.HttpService; +import org.apache.tomcat.lite.http.SpdyConnection.SpdyConnectionManager; +import org.apache.tomcat.lite.io.BBuffer; import org.apache.tomcat.lite.io.DumpChannel; import org.apache.tomcat.lite.io.BBucket; +import org.apache.tomcat.lite.io.IOBuffer; import org.apache.tomcat.lite.io.IOChannel; import org.apache.tomcat.lite.io.IOConnector; +import org.apache.tomcat.lite.io.IOConnector.DataReceivedCallback; /** * Manages HttpChannels and associated socket pool. @@ -41,6 +45,8 @@ public class HttpConnector { public void onDestroy(HttpChannel ch, HttpConnector con) throws IOException; } + HttpConnectionManager conManager = new HttpConnectionManager(); + private static Logger log = Logger.getLogger("HttpConnector"); private int maxHttpPoolSize = 20; @@ -49,7 +55,7 @@ public class HttpConnector { private Queue httpChannelPool = new ConcurrentLinkedQueue(); - private IOConnector ioConnector; + protected IOConnector ioConnector; boolean debugHttp = false; boolean debug = false; @@ -67,8 +73,13 @@ public class HttpConnector { public AtomicInteger reusedChannels = new AtomicInteger(); public ConnectionPool cpool = new ConnectionPool(); - + // Host + context mapper. + Dispatcher dispatcher; + protected HttpService defaultService; + int port = 8080; + + public HttpConnector(IOConnector ioConnector) { this.ioConnector = ioConnector; dispatcher = new Dispatcher(); @@ -152,20 +163,31 @@ public class HttpConnector { } public HttpChannel get(String host, int port) throws IOException { - HttpChannel http = get(false, host, port); - http.setTarget(host, port); + HttpChannel http = get(false); + http.setTarget(host + ":" + port); return http; } public HttpChannel getServer() { try { - return get(true, null, 0); + return get(true); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); return null; } } + + public HttpRequest request(String host, int port) throws IOException { + HttpChannel http = get(false); + http.setTarget(host + ":" + port); + return http.getRequest(); + + } + + public HttpRequest request(CharSequence urlString) throws IOException { + return get(urlString).getRequest(); + } /** * Get an existing AsyncHttp object. Since it uses many buffers and @@ -183,15 +205,15 @@ public class HttpConnector { port = secure ? 443: 80; } // TODO: insert SSL filter - HttpChannel http = get(false, host, port); - http.setTarget(host, port); + HttpChannel http = get(false); + http.setTarget(host + ":" + port); String path = url.getFile(); // path + qry // TODO: query string http.getRequest().requestURI().set(path); return http; } - protected HttpChannel get(boolean server, CharSequence host, int port) throws IOException { + protected HttpChannel get(boolean server) throws IOException { HttpChannel processor = null; synchronized (httpChannelPool) { processor = httpChannelPool.poll(); @@ -210,8 +232,7 @@ public class HttpConnector { } processor.serverMode(server); if (debug) { - log.info((reuse ? "REUSE ": "Create ") - + host + ":" + port + + log.info((reuse ? "REUSE ": "Create ") + (server? " S" : "") + " id=" + processor.ser + " " + processor + @@ -238,75 +259,8 @@ public class HttpConnector { boolean keepOpen) throws IOException { // Now handle net - note that we could have reused the async object - if (serverMode) { - BBucket first = ch.getIn().peekFirst(); - if (first != null) { - HttpChannel http = getServer(); - if (debug) { - http.trace("PIPELINED request " + first + " " + http.httpService); - } - http.setChannel(ch); - http.setHttpService(defaultService); - - // In case it was disabled - if (ch != null) { - if (ch.isOpen()) { - ch.readInterest(true); - } - // Notify that data was received. The callback must deal with - // pre-existing data. - ch.sendHandleReceivedCallback(); - } - http.handleReceived(http.getSink()); - return; - } - } - if (serverMode && !serverKeepAlive) { - keepOpen = false; - } - if (!serverMode && !clientKeepAlive) { - keepOpen = false; - } - if (keepOpen) { - // reuse the socket - if (serverMode) { - if (debug) { - log.info(">>> server socket KEEP_ALIVE " + ch.getTarget() + - " " + ch); - } - ch.readInterest(true); - ch.setDataReceivedCallback(receiveCallback); - ch.setDataFlushedCallback(null); - - cpool.returnChannel(ch); - // TODO: timeout event to close it - // ch.setTimer(10000, new Runnable() { - // @Override - // public void run() { - // System.err.println("Keep alive timeout"); - // } - // }); - } else { - if (debug) { - log.info(">>> client socket KEEP_ALIVE " + ch.getTarget() + - " " + ch); - } - ch.readInterest(true); - ch.setDataReceivedCallback(clientReceiveCallback); - ch.setDataFlushedCallback(null); - - cpool.returnChannel(ch); - } - } else { - if (debug) { - log.info("--- Close socket, no keepalive " + ch); - } - if (ch != null) { - ch.close(); - } - } } protected void returnToPool(HttpChannel http) throws IOException { @@ -321,7 +275,7 @@ public class HttpConnector { // No more data - release the object synchronized (httpChannelPool) { - http.resetBuffers(); + http.setConnection(null); http.setConnector(null); if (httpChannelPool.contains(http)) { System.err.println("dup ? "); @@ -341,11 +295,6 @@ public class HttpConnector { return ioConnector; } - // Host + context mapper. - Dispatcher dispatcher; - HttpService defaultService; - int port = 8080; - public void setHttpService(HttpService s) { defaultService = s; @@ -353,7 +302,7 @@ public class HttpConnector { public void start() throws IOException { if (ioConnector != null) { - ioConnector.acceptor(new AcceptorCallback(this, defaultService), + ioConnector.acceptor(new AcceptorCallback(), Integer.toString(port), null); } } @@ -369,85 +318,214 @@ public class HttpConnector { } } - private static class AcceptorCallback implements IOConnector.ConnectedCallback { + protected void connectAndSend(HttpChannel httpCh) throws IOException { + String target = httpCh.getTarget(); + // TODO: SSL + HttpConnection ch = cpool.getChannel(target); + + if (ch == null) { + if (debug) { + httpCh.trace("HTTP_CONNECT: New connection " + target); + } + IOConnector.ConnectedCallback connected = + new HttpConnectedCallback(this, httpCh); + + // will call sendRequestHeaders + String[] hostPort = target.split(":"); + int targetPort = hostPort.length > 1 ? + Integer.parseInt(hostPort[1]) : 80; + getIOConnector().connect(hostPort[0], targetPort, + connected); + } else { + if (debug) { + httpCh.trace("HTTP_CONNECT: Reuse connection " + target + " " + this); + } + // TODO retry if closed + ch.beforeRequest(); + httpCh.setConnection(ch); + ch.sendRequest(httpCh); + } + } + + static class HttpConnectedCallback implements IOConnector.ConnectedCallback { HttpConnector httpCon; - HttpService callback; + HttpChannel httpCh; - public AcceptorCallback(HttpConnector asyncHttpConnector, - HttpService headersReceived) { - this.httpCon = asyncHttpConnector; - this.callback = headersReceived; + public HttpConnectedCallback(HttpConnector httpConnector, + HttpChannel httpCh2) { + this.httpCh = httpCh2; + this.httpCon = httpConnector; } @Override - public void handleConnected(IOChannel accepted) throws IOException { - HttpChannel shttp = httpCon.getServer(); - if (callback != null) { - shttp.setHttpService(callback); - } + public void handleConnected(IOChannel ch) throws IOException { if (httpCon.debugHttp) { - IOChannel ch = new DumpChannel(""); - accepted.addFilterAfter(ch); - shttp.setChannel(ch); - } else { - shttp.setChannel(accepted); + IOChannel ch1 = new DumpChannel(""); + ch.addFilterAfter(ch1); + ch = ch1; } - // TODO: JSSE filter - - - // Will read any data in the channel. - - accepted.handleReceived(accepted); + httpCon.handleConnected(ch, httpCh); } + } + HttpConnection newConnection() { + return conManager.newConnection(this); } - private IOConnector.DataReceivedCallback receiveCallback = - new IOConnector.DataReceivedCallback() { - /** For keepalive - for server - * - * @param peer - * @throws IOException - */ + private class AcceptorCallback implements IOConnector.ConnectedCallback { @Override - public void handleReceived(IOChannel net) throws IOException { - cpool.stopKeepAlive(net); - if (!net.isOpen()) { - return; - } - HttpChannel shttp = getServer(); - shttp.setChannel(net); - shttp.setHttpService(defaultService); - net.sendHandleReceivedCallback(); + public void handleConnected(IOChannel accepted) throws IOException { + System.err.println("ACCEPTED " + accepted); + handleAccepted(accepted); } - }; + } + public HttpConnection handleAccepted(IOChannel accepted) throws IOException { + // TODO: reuse + HttpConnection shttp = newConnection(); + shttp.serverMode = true; - // Sate-less, just closes the net. - private IOConnector.DataReceivedCallback clientReceiveCallback = - new IOConnector.DataReceivedCallback() { - - @Override - public void handleReceived(IOChannel net) throws IOException { - if (!net.isOpen()) { - cpool.stopKeepAlive(net); - return; - } - log.warning("Unexpected message from server in client keep alive " - + net.getIn()); - if (net.isOpen()) { - net.close(); - } + if (debugHttp) { + log.info("Accepted " + accepted.getFirst().getPort(true)); + IOChannel ch = new DumpChannel(""); + accepted.addFilterAfter(ch); + shttp.setSink(ch); + } else { + shttp.setSink(accepted); } + // TODO: JSSE filter - }; + + // Will read any data in the channel. + + accepted.handleReceived(accepted); + return shttp; + } public HttpConnector setPort(int port2) { this.port = port2; return this; } + public void handleConnected(IOChannel net, HttpChannel httpCh) + throws IOException { + if (!net.isOpen()) { + httpCh.abort("Can't connect"); + return; + } + HttpConnection httpStream = newConnection(); + httpStream.setSink(net); + + // TODO: add it to the cpool + httpCh.setConnection(httpStream); + + httpStream.sendRequest(httpCh); + } + + public static class HttpConnectionManager { + public HttpConnection newConnection(HttpConnector con) { + return new Http11Connection(con); + } + + public HttpConnection getFromPool(RemoteServer t) { + return t.connections.remove(t.connections.size() - 1); + } + } + + /** + * Actual HTTP/1.1 wire protocol. + * + */ + public static class HttpConnection extends IOChannel + implements DataReceivedCallback + { + protected HttpConnector httpConnector; + protected boolean serverMode; + + protected BBuffer headRecvBuf = BBuffer.allocate(8192); + + + @Override + public void handleReceived(IOChannel ch) throws IOException { + dataReceived(ch.getIn()); + } + + protected HttpChannel checkHttpChannel() throws IOException { + return null; + } + + /** + * Called before a new request is sent, on a channel that is + * reused. + */ + public void beforeRequest() { + } + + public void setSink(IOChannel ch) throws IOException { + this.net = ch; + ch.setDataReceivedCallback(this); + ch.setDataFlushedCallback(this); + // we may have data in the buffer; + handleReceived(ch); + } + + + /** + * Incoming data. + */ + public void dataReceived(IOBuffer iob) throws IOException { + + } + + /** + * Framing error, client interrupt, etc. + */ + public void abort(HttpChannel http, String t) throws IOException { + } + + protected void sendRequest(HttpChannel http) + throws IOException { + } + + protected void sendResponseHeaders(HttpChannel http) + throws IOException { + } + + public void startSending(HttpChannel http) throws IOException { + } + + @Override + public IOBuffer getIn() { + return net.getIn(); + } + + @Override + public IOBuffer getOut() { + return net.getOut(); + } + + @Override + public void startSending() throws IOException { + } + + /** Called when the outgoing stream is closed: + * - by an explicit call to close() + * - when all content has been sent. + */ + protected void outClosed(HttpChannel http) throws IOException { + } + + protected void endSendReceive(HttpChannel httpChannel) throws IOException { + return; + } + + public void withExtraBuffer(BBuffer received) { + return; + } + + } + /** * Connections for one remote host. * This should't be restricted by IP:port or even hostname, @@ -455,10 +533,9 @@ public class HttpConnector { */ public static class RemoteServer { public ConnectionPool pool; - public ArrayList connections = new ArrayList(); + public ArrayList connections = new ArrayList(); } - // TODO: add timeouts, limits per host/total, expire old entries // TODO: discover apr and use it @@ -510,7 +587,7 @@ public class HttpConnector { * are connected to equivalent servers ( LB ) * @throws IOException */ - public IOChannel getChannel(CharSequence key) throws IOException { + public HttpConnection getChannel(CharSequence key) throws IOException { RemoteServer t = null; synchronized (hosts) { t = hosts.get(key); @@ -519,32 +596,23 @@ public class HttpConnector { return null; } } - IOChannel res = null; + HttpConnection res = null; synchronized (t) { if (t.connections.size() == 0) { misses.incrementAndGet(); - hosts.remove(key); return null; } // one may be added - no harm. - res = t.connections.remove(t.connections.size() - 1); - - if (t.connections.size() == 0) { - hosts.remove(key); - } - if (res == null) { - log.fine("Null connection ?"); - misses.incrementAndGet(); - return null; - } + + res = conManager.getFromPool(t); if (!res.isOpen()) { res.setDataReceivedCallback(null); res.close(); log.fine("Already closed " + res); - //res.keepAliveServer = null; res = null; + misses.incrementAndGet(); + return null; } - waitingSockets.decrementAndGet(); } hits.incrementAndGet(); @@ -557,7 +625,7 @@ public class HttpConnector { /** * Must be called in IOThread for the channel */ - public void returnChannel(IOChannel ch) + public void returnChannel(HttpConnection ch) throws IOException { CharSequence key = ch.getTarget(); if (key == null) { @@ -595,10 +663,7 @@ public class HttpConnector { ch.ts = System.currentTimeMillis(); synchronized (t) { - // sdata.keepAliveServer = t; t.connections.add(ch); - //sdata.ch.setDataCallbacks(readable, null, cc); - ch.readInterest(true); } } @@ -622,4 +687,10 @@ public class HttpConnector { } } } + + public HttpConnector withConnectionManager( + HttpConnectionManager connectionManager) { + this.conManager = connectionManager; + return this; + } } diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpMessage.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpMessage.java index eb81f0701..5daddfe78 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpMessage.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpMessage.java @@ -9,6 +9,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import org.apache.tomcat.lite.http.HttpChannel.RequestCompleted; +import org.apache.tomcat.lite.http.HttpConnector.HttpConnection; import org.apache.tomcat.lite.io.BBuffer; import org.apache.tomcat.lite.io.BufferedIOReader; import org.apache.tomcat.lite.io.CBuffer; @@ -346,11 +348,6 @@ public abstract class HttpMessage { public void setCommitted(boolean b) { commited = b; } - - // Not used in coyote connector ( hack ) - - public void sendHead() throws IOException { - } public HttpChannel getHttpChannel() { return httpCh; @@ -382,6 +379,27 @@ public abstract class HttpMessage { return reader; } + public BBuffer readAll(BBuffer chunk, long to) throws IOException { + return httpCh.readAll(chunk, to); + } + + public BBuffer readAll() throws IOException { + return httpCh.readAll(null, httpCh.ioTimeout); + } + + /** + * We're done with this object, it can be recycled. + * Any use after this should throw exception or affect an + * unrelated request. + */ + public void release() throws IOException { + httpCh.release(); + } + + public void setCompletedCallback(RequestCompleted doneAllCallback) throws IOException { + httpCh.setCompletedCallback(doneAllCallback); + } + /** * Returns a buffered reader. */ @@ -395,26 +413,6 @@ public abstract class HttpMessage { return writer; } - // - public abstract void serialize(IOBuffer out) throws IOException; - - - public void serializeHeaders(IOBuffer rawSendBuffers2) throws IOException { - MultiMap mimeHeaders = getMimeHeaders(); - - for (int i = 0; i < mimeHeaders.size(); i++) { - CBuffer name = mimeHeaders.getName(i); - CBuffer value = mimeHeaders.getValue(i); - if (name.length() == 0 || value.length() == 0) { - continue; - } - rawSendBuffers2.append(name); - rawSendBuffers2.append(HttpChannel.COLON); - rawSendBuffers2.append(value); - rawSendBuffers2.append(BBuffer.CRLF_BYTES); - } - rawSendBuffers2.append(BBuffer.CRLF_BYTES); - } protected void processMimeHeaders() { for (int idx = 0; idx < getMsgBytes().headerCount; idx++) { diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpRequest.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpRequest.java index 398d60d90..2f126c077 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpRequest.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpRequest.java @@ -7,6 +7,7 @@ import java.util.Enumeration; import java.util.HashMap; import java.util.Map; +import org.apache.tomcat.lite.http.HttpChannel.HttpService; import org.apache.tomcat.lite.http.MultiMap.Entry; import org.apache.tomcat.lite.io.BBuffer; import org.apache.tomcat.lite.io.CBuffer; @@ -61,6 +62,8 @@ public class HttpRequest extends HttpMessage { boolean ssl = false; boolean async = false; + + CBuffer requestURL = CBuffer.newInstance(); private Map attributes = new HashMap(); @@ -97,6 +100,7 @@ public class HttpRequest extends HttpMessage { schemeMB.recycle(); methodMB.set("GET"); requestURI.recycle(); + requestURL.recycle(); queryMB.recycle(); decodedUriMB.recycle(); @@ -298,6 +302,30 @@ public class HttpRequest extends HttpMessage { return requestURI; } + public CBuffer requestURL() { + CBuffer url = requestURL; + url.recycle(); + + String scheme = getScheme(); + int port = getServerPort(); + if (port < 0) + port = 80; // Work around java.net.URL bug + + url.append(scheme); + url.append("://"); + url.append(getServerName()); + if ((scheme.equals("http") && (port != 80)) + || (scheme.equals("https") && (port != 443))) { + url.append(':'); + url.append(port); + } + // Decoded !! + url.append(getRequestURI()); + + return (url); + + } + /** * Not decoded - %xx as in original. * @return @@ -427,29 +455,33 @@ public class HttpRequest extends HttpMessage { this.localPort = port; } - public void sendHead() throws IOException { - httpCh.sendRequestHeaders(httpCh); + public HttpResponse waitResponse() throws IOException { + return waitResponse(httpCh.ioTimeout); } - /** - * Convert the request to bytes, ready to send. - */ - public void serialize(IOBuffer rawSendBuffers2) throws IOException { - rawSendBuffers2.append(method()); - rawSendBuffers2.append(BBuffer.SP); - - // TODO: encode or use decoded - rawSendBuffers2.append(requestURI()); - if (queryString().length() > 0) { - rawSendBuffers2.append("?"); - rawSendBuffers2.append(queryString()); + public void send(HttpService headersCallback, long timeout) throws IOException { + if (headersCallback != null) { + httpCh.setHttpService(headersCallback); } - rawSendBuffers2.append(BBuffer.SP); - rawSendBuffers2.append(protocol()); - rawSendBuffers2.append(BBuffer.CRLF_BYTES); + httpCh.send(); + } + + public void send(HttpService headersCallback) throws IOException { + send(headersCallback, httpCh.ioTimeout); + } + + public void send() throws IOException { + send(null, httpCh.ioTimeout); + } + + public HttpResponse waitResponse(long timeout) throws IOException { + // TODO: close out if post + httpCh.send(); - super.serializeHeaders(rawSendBuffers2); + httpCh.headersReceivedLock.waitSignal(timeout); + + return httpCh.getResponse(); } /** @@ -467,10 +499,15 @@ public class HttpRequest extends HttpMessage { } BBuffer valueBC = hostHF.valueB; + if (valueBC == null) { + valueBC = BBuffer.allocate(); + hostHF.getValue().toAscii(valueBC); + } byte[] valueB = valueBC.array(); int valueL = valueBC.getLength(); int valueS = valueBC.getStart(); - int colonPos = -1; + + int colonPos = valueBC.indexOf(':', 0); serverNameMB.recycle(); @@ -492,10 +529,8 @@ public class HttpRequest extends HttpMessage { if (colonPos < 0) { if (!ssl) { - // 80 - Default HTTP port setServerPort(80); } else { - // 443 - Default HTTPS port setServerPort(443); } } else { @@ -823,6 +858,7 @@ public class HttpRequest extends HttpMessage { // URL decode and normalize decodedUri.append(getMsgBytes().url()); + getURLDecoder().urlDecode(decodedUri, false); // Need to normalize again - %decoding may decode / @@ -833,47 +869,8 @@ public class HttpRequest extends HttpMessage { } decodedURI().set(decodedUri); - httpCh.processProtocol(); - // default response protocol httpCh.getResponse().protocol().set(getMsgBytes().protocol()); - - // requested connection:close/keepAlive and proto - httpCh.processConnectionHeader(getMimeHeaders()); - - httpCh.processExpectation(); - - httpCh.receiveBody.processContentDelimitation(); - // Spec: - // The presence of a message-body in a request is signaled by the - // inclusion of a Content-Length or Transfer-Encoding header field in - // the request's message-headers - // Server should read - but ignore - - httpCh.receiveBody.noBody = !httpCh.receiveBody.isContentDelimited(); - - httpCh.receiveBody.updateCloseOnEnd(); - - /* - * The presence of a message-body in a request is signaled by the - * inclusion of a Content-Length or Transfer-Encoding header field in - * the request's message-headers. A message-body MUST NOT be included - * in a request if the specification of the request method - * (section 5.1.1) does not allow sending an entity-body in requests. - * A server SHOULD read and forward a message-body on any request; if the request method does not include defined semantics for an entity-body, then the message-body SHOULD be ignored when handling the request. - */ - if (!httpCh.receiveBody.isContentDelimited()) { - // No body - httpCh.getIn().close(); - } - - CBuffer valueMB = getMimeHeaders().getHeader("host"); - // Check host header -// if (httpCh.http11 && (valueMB == null)) { -// httpCh.error = true; -// // 400 - Bad request -// httpCh.getResponse().setStatus(400); -// } } @@ -977,4 +974,13 @@ public class HttpRequest extends HttpMessage { return bb; } + public String toString() { + IOBuffer out = new IOBuffer(); + try { + Http11Connection.serialize(this, out); + return out.readAll(null).toString(); + } catch (IOException e) { + return "Invalid request"; + } + } } diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpResponse.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpResponse.java index d4e8c738a..fd38abf28 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpResponse.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/HttpResponse.java @@ -64,29 +64,6 @@ public class HttpResponse extends HttpMessage { return status; } - public void sendHead() throws IOException { - httpCh.sendHeaders(); - } - - /** - * Convert the response to bytes, ready to send. - */ - public void serialize(IOBuffer rawSendBuffers2) throws IOException { - - rawSendBuffers2.append(protocol()).append(' '); - String status = Integer.toString(getStatus()); - rawSendBuffers2.append(status).append(' '); - if (getMessageBuffer().length() > 0) { - rawSendBuffers2.append(getMessage()); - } else { - rawSendBuffers2 - .append(getMessage(getStatus())); - } - rawSendBuffers2.append(BBuffer.CRLF_BYTES); - // Headers - super.serializeHeaders(rawSendBuffers2); - } - public HttpRequest getRequest() { return getHttpChannel().getRequest(); } @@ -96,36 +73,12 @@ public class HttpResponse extends HttpMessage { protocol().set(getMsgBytes().protocol()); message.set(getMsgBytes().message()); processMimeHeaders(); - - // TODO: if protocol == 1.0 and we requested 1.1, downgrade getHttpChannel().pro - int status = 500; try { status = getStatus(); } catch (Throwable t) { getHttpChannel().log.warning("Invalid status " + getMsgBytes().status() + " " + getMessage()); } - HttpBody body = (HttpBody) getBody(); - body.noBody = !hasBody(); - - // Will parse 'connection:close', set close on end - getHttpChannel().processConnectionHeader(getMimeHeaders()); - - body.processContentDelimitation(); - - if (body.statusDropsConnection(status)) { - getHttpChannel().closeStreamOnEnd("response status drops connection"); - } - - if (body.isDone()) { - body.close(); - } - - if (!body.isContentDelimited()) { - getHttpChannel().closeStreamOnEnd("not content delimited"); - } - - } /** @@ -159,7 +112,7 @@ public class HttpResponse extends HttpMessage { * Common messages are cached. * */ - private BBucket getMessage( int status ) { + BBucket getMessage( int status ) { // method from Response. // Does HTTP requires/allow international messages or @@ -174,10 +127,15 @@ public class HttpResponse extends HttpMessage { case 404: return st_404; } - return stats.get(status); + BBucket bb = stats.get(status); + if (bb == null) { + return st_unknown; + } + return bb; } + static BBucket st_unknown = BBuffer.wrapper("No Message"); static BBucket st_200 = BBuffer.wrapper("OK"); static BBucket st_302= BBuffer.wrapper("Moved Temporarily"); static BBucket st_400= BBuffer.wrapper("Bad Request"); diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/http/SpdyConnection.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/SpdyConnection.java new file mode 100644 index 000000000..10436157d --- /dev/null +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/http/SpdyConnection.java @@ -0,0 +1,548 @@ +/* + */ +package org.apache.tomcat.lite.http; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Logger; + +import org.apache.tomcat.lite.http.HttpConnector.HttpConnection; +import org.apache.tomcat.lite.http.HttpConnector.RemoteServer; +import org.apache.tomcat.lite.http.HttpMessage.HttpMessageBytes; +import org.apache.tomcat.lite.io.BBucket; +import org.apache.tomcat.lite.io.BBuffer; +import org.apache.tomcat.lite.io.CBuffer; +import org.apache.tomcat.lite.io.IOBuffer; + +/* + * TODO: expectations ? + * Fix docs - order matters + * Crashes in chrome + */ + +public class SpdyConnection extends HttpConnector.HttpConnection { + + public static class SpdyConnectionManager + extends HttpConnector.HttpConnectionManager { + @Override + public HttpConnection newConnection(HttpConnector con) { + return new SpdyConnection(con); + } + + @Override + public HttpConnection getFromPool(RemoteServer t) { + // TODO: we may initiate multiple SPDY connections with each server + // Sending frames is synchronized, receiving is muxed + return t.connections.get(0); + } + + } + + + protected static Logger log = Logger.getLogger("SpdyConnection"); + + /** + * @param spdyConnector + */ + SpdyConnection(HttpConnector spdyConnector) { + this.httpConnector = spdyConnector; + } + + AtomicInteger lastInStream = new AtomicInteger(); + AtomicInteger lastOutStream = new AtomicInteger(); + + // TODO: use int map + Map channels = new HashMap(); + + SpdyConnection.Frame currentInFrame = null; + + SpdyConnection.Frame lastFrame = null; // for debug + + BBuffer outFrameBuffer = BBuffer.allocate(); + BBuffer inFrameBuffer = BBuffer.allocate(); + + BBuffer headW = BBuffer.wrapper(); + + // TODO: detect if it's spdy or http based on bit 8 + + @Override + public void withExtraBuffer(BBuffer received) { + inFrameBuffer = received; + } + + @Override + public void dataReceived(IOBuffer iob) throws IOException { + int avail = iob.available(); + while (avail > 0) { + if (currentInFrame == null) { + if (inFrameBuffer.remaining() + avail < 8) { + return; + } + if (inFrameBuffer.remaining() < 8) { + int headRest = 8 - inFrameBuffer.remaining(); + int rd = iob.read(inFrameBuffer, headRest); + avail -= rd; + } + currentInFrame = new SpdyConnection.Frame(); // TODO: reuse + currentInFrame.parse(inFrameBuffer); + } + if (avail < currentInFrame.length) { + return; + } + // We have a full frame. Process it. + onFrame(iob); + + // TODO: extra checks, make sure the frame is correct and + // it consumed all data. + avail -= currentInFrame.length; + currentInFrame = null; + } + } + + /** + * Frame received. Must consume all data for the frame. + * + * @param iob + * @throws IOException + */ + protected void onFrame(IOBuffer iob) throws IOException { + // TODO: make sure we have enough data. + lastFrame = currentInFrame; + + if (currentInFrame.c) { + if (currentInFrame.type == SpdyConnection.Frame.TYPE_HELO) { + // receivedHello = currentInFrame; + } else if (currentInFrame.type == SpdyConnection.Frame.TYPE_SYN_STREAM) { + HttpChannel ch = new HttpChannel(); // TODO: reuse + ch.channelId = SpdyConnection.readInt(iob); + ch.setConnection(this); + ch.httpConnector = this.httpConnector; + if (serverMode) { + ch.serverMode(true); + } + if (this.httpConnector.defaultService != null) { + ch.setHttpService(this.httpConnector.defaultService); + } + + channels.put(ch.channelId, ch); + + // pri and unused + SpdyConnection.readShort(iob); + + HttpMessageBytes reqBytes = ch.getRequest().getMsgBytes(); + + BBuffer head = processHeaders(iob, ch, reqBytes); + + ch.getRequest().processReceivedHeaders(); + + ch.handleHeadersReceived(ch.getRequest()); + + if ((currentInFrame.flags & SpdyConnection.Frame.FLAG_HALF_CLOSE) != 0) { + ch.getIn().close(); + ch.handleEndReceive(); + } + } else if (currentInFrame.type == SpdyConnection.Frame.TYPE_SYN_REPLY) { + int chId = SpdyConnection.readInt(iob); + HttpChannel ch = channels.get(chId); + + SpdyConnection.readShort(iob); + + HttpMessageBytes resBytes = ch.getResponse().getMsgBytes(); + + BBuffer head = processHeaders(iob, ch, resBytes); + + ch.getResponse().processReceivedHeaders(); + + ch.handleHeadersReceived(ch.getResponse()); + + if ((currentInFrame.flags & SpdyConnection.Frame.FLAG_HALF_CLOSE) != 0) { + ch.getIn().close(); + ch.handleEndReceive(); + } + } else { + log.warning("Unknown frame type " + currentInFrame.type); + iob.advance(currentInFrame.length); + } + } else { + // data frame - part of an existing stream + HttpChannel ch = channels.get(currentInFrame.streamId); + if (ch == null) { + log.warning("Unknown stream "); + net.close(); + net.startSending(); + return; + } + int len = currentInFrame.length; + while (len > 0) { + BBucket bb = iob.peekFirst(); + if (len > bb.remaining()) { + ch.getIn().append(bb); + len += bb.remaining(); + bb.position(bb.limit()); + } else { + ch.getIn().append(bb, len); + bb.position(bb.position() + len); + len = 0; + } + } + ch.sendHandleReceivedCallback(); + + if ((currentInFrame.flags & SpdyConnection.Frame.FLAG_HALF_CLOSE) != 0) { + ch.getIn().close(); + ch.handleEndReceive(); + } + } + } + + private BBuffer processHeaders(IOBuffer iob, HttpChannel ch, + HttpMessageBytes reqBytes) throws IOException { + int nvCount = SpdyConnection.readShort(iob); + int read = 8; + + iob.read(headRecvBuf, currentInFrame.length - 8); + + // Wrapper - so we don't change position in head + headRecvBuf.wrapTo(headW); + + BBuffer nameBuf = BBuffer.wrapper(); + BBuffer valBuf = BBuffer.wrapper(); + + for (int i = 0; i < nvCount; i++) { + + int nameLen = SpdyConnection.readShort(headW); + + nameBuf + .setBytes(headW.array(), headW.position(), + nameLen); + headW.advance(nameLen); + + int valueLen = SpdyConnection.readShort(headW); + valBuf + .setBytes(headW.array(), headW.position(), + valueLen); + headW.advance(valueLen); + + // TODO: no need to send version, method if default + + if (nameBuf.equals("method")) { + valBuf.wrapTo(reqBytes.method()); + } else if (nameBuf.equals("version")) { + valBuf.wrapTo(reqBytes.protocol()); + } else if (nameBuf.equals("url")) { + valBuf.wrapTo(reqBytes.url()); + // TODO: spdy uses full URL, we may want to trim + // also no host header + } else { + int idx = reqBytes.addHeader(); + nameBuf.wrapTo(reqBytes.getHeaderName(idx)); + valBuf.wrapTo(reqBytes.getHeaderValue(idx)); + } + + // TODO: repeated values are separated by a 0 + // pretty weird... + read += nameLen + valueLen + 4; + } + return headW; + } + + @Override + protected void sendRequest(HttpChannel http) throws IOException { + if (serverMode) { + throw new IOException("Only in client mode"); + } + + MultiMap mimeHeaders = http.getRequest().getMimeHeaders(); + BBuffer headBuf = BBuffer.allocate(); + + SpdyConnection.appendShort(headBuf, mimeHeaders.size() + 3); + + serializeMime(mimeHeaders, headBuf); + + // TODO: url - with host prefix , method + // optimize... + SpdyConnection.appendAsciiHead(headBuf, "version"); + SpdyConnection.appendAsciiHead(headBuf, "HTTP/1.1"); + + SpdyConnection.appendAsciiHead(headBuf, "method"); + SpdyConnection.appendAsciiHead(headBuf, http.getRequest().getMethod()); + + SpdyConnection.appendAsciiHead(headBuf, "url"); + // TODO: url + SpdyConnection.appendAsciiHead(headBuf, http.getRequest().requestURL()); + + + BBuffer out = BBuffer.allocate(); + // Syn-reply + out.putByte(0x80); + out.putByte(0x01); + out.putByte(0x00); + out.putByte(0x01); + + if (http.getOut().isAppendClosed()) { + out.putByte(0x01); // closed + } else { + out.putByte(0x00); + } + SpdyConnection.append24(out, headBuf.remaining() + http.getOut().available() + 4); + + if (serverMode) { + http.channelId = 2 * lastOutStream.incrementAndGet(); + } else { + http.channelId = 2 * lastOutStream.incrementAndGet() + 1; + } + SpdyConnection.appendInt(out, http.channelId); + + channels.put(http.channelId, http); + + out.putByte(0x00); // no priority + out.putByte(0x00); + + sendFrame(out, headBuf); + + // Any existing data + sendData(http); + } + + @Override + protected void sendResponseHeaders(HttpChannel http) throws IOException { + if (!serverMode) { + throw new IOException("Only in server mode"); + } + + if (http.getResponse().isCommitted()) { + return; + } + http.getResponse().setCommitted(true); + + MultiMap mimeHeaders = http.getResponse().getMimeHeaders(); + + BBuffer headBuf = BBuffer.allocate(); + + SpdyConnection.appendInt(headBuf, http.channelId); + headBuf.putByte(0); + headBuf.putByte(0); + + //mimeHeaders.remove("content-length"); + + SpdyConnection.appendShort(headBuf, mimeHeaders.size() + 2); + + // chrome will crash if we don't send the header + serializeMime(mimeHeaders, headBuf); + + // Must be at the end + SpdyConnection.appendAsciiHead(headBuf, "status"); + SpdyConnection.appendAsciiHead(headBuf, + Integer.toString(http.getResponse().getStatus())); + + SpdyConnection.appendAsciiHead(headBuf, "version"); + SpdyConnection.appendAsciiHead(headBuf, "HTTP/1.1"); + + + BBuffer out = BBuffer.allocate(); + // Syn-reply + out.putByte(0x80); // Control + out.putByte(0x01); // version + out.putByte(0x00); // 00 02 - SYN_REPLY + out.putByte(0x02); + + // It seems piggibacking data is not allowed + out.putByte(0x00); + + SpdyConnection.append24(out, headBuf.remaining()); + + sendFrame(out, headBuf); + } + + + public void startSending(HttpChannel http) throws IOException { + http.send(); // if needed + + if (net != null) { + sendData(http); + net.startSending(); + } + } + + private void sendData(HttpChannel http) throws IOException { + int avail = http.getOut().available(); + boolean closed = http.getOut().isAppendClosed(); + if (avail > 0 || closed) { + sendDataFrame(http.getOut(), avail, + http.channelId, closed); + if (avail > 0) { + getOut().advance(avail); + } + } + if (closed) { + http.handleEndSent(); + } + } + + private BBuffer serializeMime(MultiMap mimeHeaders, BBuffer headBuf) + throws IOException { + + // TODO: duplicated headers not allowed + for (int i = 0; i < mimeHeaders.size(); i++) { + CBuffer name = mimeHeaders.getName(i); + CBuffer value = mimeHeaders.getValue(i); + if (name.length() == 0 || value.length() == 0) { + continue; + } + SpdyConnection.appendShort(headBuf, name.length()); + name.toAscii(headBuf); + SpdyConnection.appendShort(headBuf, value.length()); + value.toAscii(headBuf); + } + return headBuf; + } + + + private synchronized void sendFrame(BBuffer out, BBuffer headBuf) + throws IOException { + if (net == null) { + return; // unit test + } + net.getOut().append(out); + if (headBuf != null) { + net.getOut().append(headBuf); + } + net.startSending(); + } + + public synchronized void sendDataFrame(IOBuffer out2, int avail, + int channelId, boolean last) throws IOException { + if (net == null) { + return; // unit test + } + outFrameBuffer.recycle(); + SpdyConnection.appendInt(outFrameBuffer, channelId); // first bit 0 ? + if (last) { + outFrameBuffer.putByte(0x01); // closed + } else { + outFrameBuffer.putByte(0x00); + } + + // TODO: chunk if too much data ( at least at 24 bits) + SpdyConnection.append24(outFrameBuffer, avail); + + net.getOut().append(outFrameBuffer); + if (avail > 0) { + net.getOut().append(out2, avail); + } + net.startSending(); + } + + static void appendInt(BBuffer headBuf, int length) throws IOException { + headBuf.putByte((length & 0xFF000000) >> 24); + headBuf.putByte((length & 0xFF0000) >> 16); + headBuf.putByte((length & 0xFF00) >> 8); + headBuf.putByte((length & 0xFF)); + } + + static void append24(BBuffer headBuf, int length) throws IOException { + headBuf.putByte((length & 0xFF0000) >> 16); + headBuf.putByte((length & 0xFF00) >> 8); + headBuf.putByte((length & 0xFF)); + } + + static void appendAsciiHead(BBuffer headBuf, CBuffer s) throws IOException { + appendShort(headBuf, s.length()); + for (int i = 0; i < s.length(); i++) { + headBuf.append(s.charAt(i)); + } + } + + static void appendShort(BBuffer headBuf, int length) throws IOException { + if (length > 0xFFFF) { + throw new IOException("Too long"); + } + headBuf.putByte((length & 0xFF00) >> 8); + headBuf.putByte((length & 0xFF)); + } + + static void appendAsciiHead(BBuffer headBuf, String s) throws IOException { + SpdyConnection.appendShort(headBuf, s.length()); + for (int i = 0; i < s.length(); i++) { + headBuf.append(s.charAt(i)); + } + } + + static int readShort(BBuffer iob) throws IOException { + int res = iob.readByte(); + return res << 8 | iob.readByte(); + } + + static int readShort(IOBuffer iob) throws IOException { + int res = iob.read(); + return res << 8 | iob.read(); + } + + static int readInt(IOBuffer iob) throws IOException { + int res = 0; + for (int i = 0; i < 4; i++) { + int b0 = iob.read(); + res = res << 8 | b0; + } + return res; + } + + public static class Frame { + int flags; + + int length; + + boolean c; // for control + + int version; + + int type; + + int streamId; // for data + + static int TYPE_HELO = 4; + + static int TYPE_SYN_STREAM = 1; + + static int TYPE_SYN_REPLY = 2; + + static int FLAG_HALF_CLOSE = 1; + + public void parse(BBuffer iob) throws IOException { + int b0 = iob.read(); + if (b0 < 128) { + c = false; + streamId = b0; + for (int i = 0; i < 3; i++) { + b0 = iob.read(); + streamId = streamId << 8 | b0; + } + } else { + c = true; + b0 -= 128; + version = ((b0 << 8) | iob.read()); + b0 = iob.read(); + type = ((b0 << 8) | iob.read()); + } + + flags = iob.read(); + for (int i = 0; i < 3; i++) { + b0 = iob.read(); + length = length << 8 | b0; + } + + iob.recycle(); + } + + } + + /** + * Framing error, client interrupt, etc. + */ + public void abort(HttpChannel http, String t) throws IOException { + // TODO: send interrupt signal + } + + +} \ No newline at end of file diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/io/BBuffer.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/io/BBuffer.java index a83c5fbde..3ceb5da0b 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/io/BBuffer.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/io/BBuffer.java @@ -330,11 +330,11 @@ public class BBuffer implements Cloneable, Serializable, return true; } - public byte get(int off) { + public int get(int off) { if (start + off >= end) { throw new ArrayIndexOutOfBoundsException(); } - return buff[start + off]; + return buff[start + off] & 0xFF; } /** @@ -495,7 +495,10 @@ public class BBuffer implements Cloneable, Serializable, // } // return true; // } - + public int indexOf(String src) { + return indexOf(src, 0, src.length(), 0); + } + public int indexOf(String src, int srcOff, int srcLen, int myOff) { char first = src.charAt(srcOff); @@ -635,7 +638,9 @@ public class BBuffer implements Cloneable, Serializable, return start; } - + public void advance(int len) { + start += len; + } @Override public void position(int newStart) { @@ -647,6 +652,11 @@ public class BBuffer implements Cloneable, Serializable, buff[end++] = b; } + public void putByte(int b) { + makeSpace(1); + buff[end++] = (byte) b; + } + public int read(BBuffer res) { res.setBytes(buff, start, remaining()); end = start; @@ -899,19 +909,17 @@ public class BBuffer implements Cloneable, Serializable, } } - public int substract() { - - if ((end - start) == 0) { + public int read() { + if (end == start) { return -1; } - return (buff[start++] & 0xFF); } public int substract(BBuffer src) { - if ((end - start) == 0) { + if (end == start) { return -1; } @@ -945,7 +953,7 @@ public class BBuffer implements Cloneable, Serializable, public String toString(String enc) { if (null == buff) { return null; - } else if (end - start == 0) { + } else if (end == start) { return ""; } @@ -1188,6 +1196,6 @@ public class BBuffer implements Cloneable, Serializable, super.setBytesInternal(b, off, len); } } - + } diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/io/CBuffer.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/io/CBuffer.java index bfc36f94a..d94b451bc 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/io/CBuffer.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/io/CBuffer.java @@ -162,6 +162,12 @@ public class CBuffer extends CBucket implements Cloneable, return this; } + public CBuffer append(int i) { + // TODO: can be optimizeed... + append(Integer.toString(i)); + return this; + } + /** * Add data to the buffer */ @@ -261,7 +267,13 @@ public class CBuffer extends CBucket implements Cloneable, return this; } - + + public void toAscii(BBuffer bb) { + for (int i = start; i < end; i++) { + bb.append(value[i]); + } + } + /** * Append and advance CharBuffer. * diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/io/DumpChannel.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/io/DumpChannel.java index bb8ce5f10..b3eee1c24 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/io/DumpChannel.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/io/DumpChannel.java @@ -2,7 +2,10 @@ */ package org.apache.tomcat.lite.io; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.OutputStream; // TODO: dump to a file, hex, etc. /** @@ -12,6 +15,7 @@ public class DumpChannel extends IOChannel { IOBuffer in = new IOBuffer(this); IOBuffer out = new IOBuffer(this); + static final boolean dumpToFile = false; public DumpChannel(String id) { this.id = id; @@ -65,7 +69,9 @@ public class DumpChannel extends IOChannel { } } - private void out(String dir, BBucket first, boolean closed) { + static int did = 0; + + protected void out(String dir, BBucket first, boolean closed) { // Dump if (first != null) { String hd = Hex.getHexDump(first.array(), first.position(), @@ -76,9 +82,18 @@ public class DumpChannel extends IOChannel { hd); } else { System.err.println("\n" + dir + ": " + id + " " + - (closed ? "CLS" : "") + + (closed ? "CLS " : "") + "END\n"); } + if (dumpToFile && first != null) { + try { + OutputStream os = new FileOutputStream("dmp" + did++); + os.write(first.array(), first.position(), first.remaining()); + os.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } } @Override diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOBuffer.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOBuffer.java index 135059c14..e645a6577 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOBuffer.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOBuffer.java @@ -63,7 +63,6 @@ public class IOBuffer { // ===== Buffer access ===== - BBucket first; /** * Return first non-empty buffer. @@ -88,7 +87,6 @@ public class IOBuffer { buffers.removeFirst(); o = (buffers.size() == 0) ? null : buffers.getFirst(); } else { - first = o; return o; } } @@ -103,7 +101,18 @@ public class IOBuffer { public void advance(int len) { - first.position(first.position() + len); + while (len > 0) { + BBucket first = peekFirst(); + if (first == null) { + return; + } + if (len > first.remaining()) { + len -= first.remaining(); + first.position(first.limit()); + } else { + first.position(first.position() + len); + } + } } public void queue(String s) throws IOException { @@ -250,16 +259,21 @@ public class IOBuffer { // =================== Helper methods ================== /** - * Non-blocking. + * Non-blocking read. + * + * @return -1 if EOF, -2 if no data available, or 0..255 for normal read. */ public int read() throws IOException { + if (isClosedAndEmpty()) { + return -1; + } BBucket bucket = peekFirst(); if (bucket == null) { - return -1; + return -2; } int res = bucket.array()[bucket.position()]; bucket.position(bucket.position() + 1); - return res; + return res & 0xFF; } public int peek() throws IOException { @@ -348,6 +362,16 @@ public class IOBuffer { } + public int read(BBuffer bb, int len) throws IOException { + bb.makeSpace(len); + int rd = read(bb.array(), bb.limit(), len); + if (rd < 0) { + return rd; + } + bb.limit(bb.limit() + rd); + return rd; + } + /** * Non-blocking read. */ @@ -478,23 +502,35 @@ public class IOBuffer { cs.remaining()); } + /** + * Append a buffer. The buffer will not be modified. + */ public IOBuffer append(BBucket cs) throws IOException { append(cs.array(), cs.position(), cs.remaining()); return this; } - + + /** + * Append a buffer. The buffer will not be modified. + */ + public IOBuffer append(BBucket cs, int len) throws IOException { + append(cs.array(), cs.position(), len); + return this; + } + public IOBuffer append(IOBuffer cs) throws IOException { for (int i = 0; i < cs.getBufferCount(); i++) { - Object o = cs.peekBucket(i); - if (o instanceof BBucket) { - append((BBucket)o); - } else if (o instanceof ByteBuffer) { - append((ByteBuffer) o); - } else if (o instanceof CharSequence) { - append((CharSequence) o); - } else { - throw new IOException("Unknown type " + o); - } + BBucket o = cs.peekBucket(i); + append(o); + } + + return this; + } + + public IOBuffer append(IOBuffer cs, int len) throws IOException { + for (int i = 0; i < cs.getBufferCount(); i++) { + BBucket o = cs.peekBucket(i); + append(o); } return this; @@ -505,7 +541,7 @@ public class IOBuffer { append(data, 0, data.length); return this; } - + public IOBuffer append(char c) throws IOException { ByteBuffer bb = getWriteBuffer(); bb.put((byte) c); @@ -571,16 +607,14 @@ public class IOBuffer { if (closeQueued) { throw new IOException("Closed"); } - synchronized (buffers) { - BBucket last = (buffers.size() == 0) ? - null : buffers.getLast(); - if (last == null || last != appendable || - last.array().length - last.limit() < 16) { - last = BBuffer.allocate(ALLOC_SIZE); - } - appending = true; - appendable = (BBuffer) last; + BBucket last = (buffers.size() == 0) ? + null : buffers.getLast(); + if (last == null || last != appendable || + last.array().length - last.limit() < 16) { + last = BBuffer.allocate(ALLOC_SIZE); } + appending = true; + appendable = (BBuffer) last; if (writeBuffer == null || writeBuffer.array() != appendable.array()) { writeBuffer = ByteBuffer.wrap(appendable.array()); @@ -592,10 +626,10 @@ public class IOBuffer { } public void releaseWriteBuffer(int read) throws IOException { - if (!appending) { - throw new IOException("Not appending"); - } synchronized (buffers) { + if (!appending) { + throw new IOException("Not appending"); + } if (writeBuffer != null) { if (read > 0) { appendable.limit(writeBuffer.position()); @@ -607,8 +641,8 @@ public class IOBuffer { notifyDataAvailable(appendable); } } + appending = false; } - appending = false; } diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOChannel.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOChannel.java index 23b642bf5..6b0fc635f 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOChannel.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOChannel.java @@ -158,7 +158,7 @@ public abstract class IOChannel implements ByteChannel, IOConnector.DataReceived } } - public void setSink(IOChannel previous) { + public void setSink(IOChannel previous) throws IOException { this.net = previous; } @@ -170,8 +170,9 @@ public abstract class IOChannel implements ByteChannel, IOConnector.DataReceived /** * Called to add an filter _after_ the current channel. + * @throws IOException */ - public IOChannel addFilterAfter(IOChannel next) { + public IOChannel addFilterAfter(IOChannel next) throws IOException { this.app = next; app.setSink(this); @@ -182,6 +183,9 @@ public abstract class IOChannel implements ByteChannel, IOConnector.DataReceived dataReceivedCallback = null; dataFlushedCallback = null; + + // we may have data in our buffers + next.handleReceived(this); return this; } diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOConnector.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOConnector.java index 1bb33b30e..1505cc35a 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOConnector.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/io/IOConnector.java @@ -33,8 +33,9 @@ public abstract class IOConnector { throws IOException; // TODO: failures ? + // TODO: use String target or url public abstract void connect(String host, int port, - IOConnector.ConnectedCallback sc) throws IOException; + IOConnector.ConnectedCallback sc) throws IOException; public void stop() { diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/io/NioChannel.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/io/NioChannel.java index c136b8b7d..4a0e65765 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/io/NioChannel.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/io/NioChannel.java @@ -79,7 +79,6 @@ public class NioChannel implements ByteChannel { .append(readInterest ? "R/" : "") .append(outClosed ? "Out-CLOSE/" : "") .append(inClosed ? "In-CLOSE/" : "") - .append(selKey == null ? -1 : ((SelectionKey) (selKey)).interestOps()) .append("/") .append(channel.toString()); diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/io/NioThread.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/io/NioThread.java index 95e34e740..2391246e9 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/io/NioThread.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/io/NioThread.java @@ -104,7 +104,7 @@ public class NioThread implements Runnable { // time events. private long minSleep = 100; - boolean daemon = true; + boolean daemon = false; // TODO: trace log - record all events with timestamps, replay @@ -408,7 +408,7 @@ public class NioThread implements Runnable { ch.callback.handleConnected(ch); } } catch (Throwable t) { - log.warning("Error in connect, closing "); + log.warning("Error in connect, closing " + t); close(ch, t); try { if (ch.callback != null) { @@ -463,6 +463,9 @@ public class NioThread implements Runnable { if (channel instanceof SocketChannel) { SocketChannel sc = (SocketChannel) channel; if (sc.isOpen() && sc.isConnected()) { + if (debug) { + log.info("Half shutdown " + ch); + } sc.socket().shutdownOutput(); // TCP end to the other side } } @@ -499,7 +502,9 @@ public class NioThread implements Runnable { SocketChannel sc = (SocketChannel) channel; if (sc.isConnected()) { - //System.err.println("Close socket, opened=" + o); + if (debug) { + log.info("Close socket, opened=" + o); + } try { sc.socket().shutdownInput(); } catch(IOException io1) { @@ -526,8 +531,7 @@ public class NioThread implements Runnable { } } } catch (IOException ex2) { - log.severe("SelectorThread: Error closing socket " + ex2); - ex2.printStackTrace(); + log.log(Level.SEVERE, "SelectorThread: Error closing socket ", ex2); } } @@ -559,7 +563,7 @@ public class NioThread implements Runnable { selectorData.zeroReads = 0; } else if (done < 0) { if (debug) { - log.info("SelectorThread: EOF while reading"); + log.info("SelectorThread: EOF while reading " + selectorData); } } else { // need more... diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/io/SocketIOChannel.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/io/SocketIOChannel.java index 785f4e02e..ec3f5155a 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/io/SocketIOChannel.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/io/SocketIOChannel.java @@ -144,6 +144,7 @@ public class SocketIOChannel extends IOChannel implements NioChannelCallback { try { synchronized(in) { // data between 0 and position + int total = 0; while (true) { if (in.isAppendClosed()) { // someone closed me ? ch.inputClosed(); // remove read interest. @@ -165,9 +166,9 @@ public class SocketIOChannel extends IOChannel implements NioChannelCallback { } if (read < 0) { - ch.inputClosed(); // mark the in buffer as closed in.close(); + ch.inputClosed(); sendHandleReceivedCallback(); return; } @@ -177,6 +178,7 @@ public class SocketIOChannel extends IOChannel implements NioChannelCallback { } return; } + total += read; newData = true; } } diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/io/SslChannel.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/io/SslChannel.java index 7c5b708cc..618f68643 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/io/SslChannel.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/io/SslChannel.java @@ -91,7 +91,7 @@ public class SslChannel extends IOChannel implements Runnable { @Override - public void setSink(IOChannel net) { + public void setSink(IOChannel net) throws IOException { try { initSsl(); super.setSink(net); @@ -223,13 +223,15 @@ public class SslChannel extends IOChannel implements Runnable { SSLEngineResult wrap = sslEngine.wrap(EMPTY, myNetOutData); myNetOutData.flip(); + if (wrap.getStatus() != Status.CLOSED) { + System.err.println("Unexpected status " + wrap); + } net.getOut().write(myNetOutData); // TODO: timer to close socket if we don't get // clean close handshake } private synchronized void startRealSending() throws IOException { - IOBuffer netOut = net.getOut(); while (true) { myAppOutData.compact(); @@ -411,6 +413,7 @@ public class SslChannel extends IOChannel implements Runnable { @Override public void handleReceived(IOChannel ch) throws IOException { processInput(net.getIn(), in); + // Maybe we don't have data - that's fine. sendHandleReceivedCallback(); } diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/proxy/HttpProxyService.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/proxy/HttpProxyService.java index 0fe686cc2..be8f185db 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/proxy/HttpProxyService.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/proxy/HttpProxyService.java @@ -106,7 +106,6 @@ public class HttpProxyService implements HttpService { serverNet.getOut().queue(OK); serverNet.startSending(); - serverHttp.resetBuffers(); // no buffers serverHttp.release(); // no longer used } } @@ -230,7 +229,7 @@ public class HttpProxyService implements HttpService { serverHttp.setDataReceivedCallback(copy); copy.handleReceived(serverHttp); - httpClient.sendRequest(); + httpClient.send(); //serverHttp.handleReceived(serverHttp.getSink()); @@ -280,7 +279,6 @@ public class HttpProxyService implements HttpService { clientHttpReq.getHttpChannel().setDataReceivedCallback(copy); copy.handleReceived(clientHttpReq.getHttpChannel()); - serverHttp.sendHeaders(); serverHttp.startSending(); diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/proxy/StaticContentService.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/proxy/StaticContentService.java index 36dcf105d..dac28eb66 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/proxy/StaticContentService.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/proxy/StaticContentService.java @@ -104,8 +104,6 @@ public class StaticContentService implements HttpService { } res.setContentType(contentType); - res.sendHead(); - if (chunked) { res.getBody() .queue(BBuffer.wrapper(mb, 0, mb.remaining())); diff --git a/modules/tomcat-lite/java/org/apache/tomcat/lite/servlet/TomcatLite.java b/modules/tomcat-lite/java/org/apache/tomcat/lite/servlet/TomcatLite.java index ac55b4f5a..933bdb7d5 100644 --- a/modules/tomcat-lite/java/org/apache/tomcat/lite/servlet/TomcatLite.java +++ b/modules/tomcat-lite/java/org/apache/tomcat/lite/servlet/TomcatLite.java @@ -41,10 +41,9 @@ import org.apache.tomcat.lite.http.HttpRequest; import org.apache.tomcat.lite.http.HttpResponse; import org.apache.tomcat.lite.http.MappingData; import org.apache.tomcat.lite.http.HttpChannel.HttpService; -import org.apache.tomcat.lite.io.WrappedException; import org.apache.tomcat.lite.io.CBuffer; import org.apache.tomcat.lite.io.MemoryIOConnector; -import org.apache.tomcat.lite.io.CBuffer; +import org.apache.tomcat.lite.io.WrappedException; /** * Helper allowing to run servlets using Tomcat lite http server. diff --git a/modules/tomcat-lite/test/org/apache/coyote/lite/TomcatLiteCoyoteTest.java b/modules/tomcat-lite/test/org/apache/coyote/lite/TomcatLiteCoyoteTest.java index 07efc2a33..f79c27537 100644 --- a/modules/tomcat-lite/test/org/apache/coyote/lite/TomcatLiteCoyoteTest.java +++ b/modules/tomcat-lite/test/org/apache/coyote/lite/TomcatLiteCoyoteTest.java @@ -64,7 +64,7 @@ public class TomcatLiteCoyoteTest extends TestCase { HttpConnector clientCon = DefaultHttpConnector.get(); HttpChannel ch = clientCon.get("localhost", 8885); ch.getRequest().setRequestURI("/examples/servlets/servlet/HelloWorldExample"); - ch.sendRequest(); + ch.getRequest().send(); BBuffer res = ch.readAll(null, 0); assertTrue(res.toString().indexOf("Hello World!") >= 0); diff --git a/modules/tomcat-lite/test/org/apache/tomcat/lite/TestMain.java b/modules/tomcat-lite/test/org/apache/tomcat/lite/TestMain.java index e2c8d91e2..7c67b2200 100644 --- a/modules/tomcat-lite/test/org/apache/tomcat/lite/TestMain.java +++ b/modules/tomcat-lite/test/org/apache/tomcat/lite/TestMain.java @@ -19,6 +19,7 @@ import org.apache.tomcat.lite.http.DefaultHttpConnector; import org.apache.tomcat.lite.http.Dispatcher; import org.apache.tomcat.lite.http.HttpChannel; import org.apache.tomcat.lite.http.HttpConnector; +import org.apache.tomcat.lite.http.HttpRequest; import org.apache.tomcat.lite.http.BaseMapper.ContextMapping; import org.apache.tomcat.lite.http.HttpConnector.HttpChannelEvents; import org.apache.tomcat.lite.http.services.EchoCallback; @@ -48,12 +49,12 @@ public class TestMain { public static HttpConnector testClient = DefaultHttpConnector.get(); public static HttpConnector testServer = new HttpConnector(serverCon); public static HttpConnector testProxy = new HttpConnector(serverCon); - + static Dispatcher mcb; static HttpProxyService proxy; - public static HttpConnector getTestServer() { + public static HttpConnector initTestEnv() { if (defaultServer == null) { defaultServer = new TestMain(); defaultServer.run(); @@ -61,11 +62,17 @@ public class TestMain { return defaultServer.testServer; } + public static HttpConnector getClientAndInit() { + if (defaultServer == null) { + defaultServer = new TestMain(); + defaultServer.run(); + } + return defaultServer.testClient; + } + public static void initTestCallback(Dispatcher d) { BaseMapper.ContextMapping mCtx = d.addContext(null, "", null, null, null, null); -// testServer.setDebugHttp(true); -// testServer.setDebug(true); d.addWrapper(mCtx, "/", new StaticContentService() .setContentType("text/html") @@ -98,8 +105,8 @@ public class TestMain { BBuffer out = BBuffer.allocate(); - HttpChannel aclient = DefaultHttpConnector.get().get(url); - aclient.sendRequest(); + HttpRequest aclient = DefaultHttpConnector.get().request(url); + aclient.send(); aclient.readAll(out, //Long.MAX_VALUE);// 2000000); @@ -194,6 +201,8 @@ public class TestMain { testProxy.setPort(port); // testProxy.setDebugHttp(true); // testProxy.setDebug(true); +// testClient.setDebug(true); +// testClient.setDebugHttp(true); // dispatcher rejects 'http://' testProxy.setHttpService(proxy); @@ -214,8 +223,9 @@ public class TestMain { e.printStackTrace(); } - port = basePort + 443; - +// testServer.setDebugHttp(true); +// testServer.setDebug(true); + } Runtime.getRuntime().addShutdownHook(new Thread() { diff --git a/modules/tomcat-lite/test/org/apache/tomcat/lite/http/ClientTest.java b/modules/tomcat-lite/test/org/apache/tomcat/lite/http/ClientTest.java new file mode 100644 index 000000000..4bf0a0272 --- /dev/null +++ b/modules/tomcat-lite/test/org/apache/tomcat/lite/http/ClientTest.java @@ -0,0 +1,60 @@ +/* + */ +package org.apache.tomcat.lite.http; + +import java.io.BufferedReader; +import java.io.IOException; + +import org.apache.tomcat.lite.TestMain; + +import junit.framework.TestCase; + +/** + * Examples and tests for Tomcat-lite in client mode. + * + */ +public class ClientTest extends TestCase { + + /** + * All connectors created this way will share a single + * IO thread. Each connector will have its keep-alive + * pool - so it's better to share them. + * + * Since I want to test keep-alive works, I use a static one + */ + static HttpConnector httpCon = DefaultHttpConnector.get(); + + /** + * Start a http server, runs on 8802 - shared by all tests. + * Will use /echo handler. + */ + static HttpConnector testServer = TestMain.initTestEnv(); + + + public void testSimpleBlocking() throws IOException { + HttpRequest req = httpCon.request("http://localhost:8802/echo/test1"); + HttpResponse res = req.waitResponse(); + + assertEquals(200, res.getStatus()); + //assertEquals("", res.getHeader("")); + + BufferedReader reader = res.getReader(); + String line1 = reader.readLine(); + assertEquals("REQ HEAD:", line1); + } + + public void testSimpleCallback() throws IOException { + + } + + public void testGetParams() throws IOException { + } + + public void testPostParams() throws IOException { + } + + public void testPostBody() throws IOException { + } + + +} diff --git a/modules/tomcat-lite/test/org/apache/tomcat/lite/http/HttpChannelInMemoryTest.java b/modules/tomcat-lite/test/org/apache/tomcat/lite/http/HttpChannelInMemoryTest.java index 5ebbf92a4..7f50b5787 100644 --- a/modules/tomcat-lite/test/org/apache/tomcat/lite/http/HttpChannelInMemoryTest.java +++ b/modules/tomcat-lite/test/org/apache/tomcat/lite/http/HttpChannelInMemoryTest.java @@ -17,48 +17,41 @@ import org.apache.tomcat.lite.io.IOConnector; import org.apache.tomcat.lite.io.MemoryIOConnector; import org.apache.tomcat.lite.io.MemoryIOConnector.MemoryIOChannel; +// TODO: rename to Http11ConnectionTest public class HttpChannelInMemoryTest extends TestCase { - - MemoryIOConnector memoryServerConnector = new MemoryIOConnector(); - MemoryIOConnector memoryClientConnector = - new MemoryIOConnector().withServer(memoryServerConnector); + /** + * Connection under test + */ + Http11Connection conn; + /** + * Last http channel created by the connection + */ + HttpChannel http; - // Used for pipelined requests - after the first request is - // processed, a new HttpChannel is used ( first may still be - // in use ) - HttpChannel lastServer; - - // The server channel will use this for I/O... + // Input/output for the connection MemoryIOConnector.MemoryIOChannel net = new MemoryIOChannel(); - HttpConnector serverConnector = new HttpConnector(memoryServerConnector) { - @Override - public HttpChannel get(CharSequence target) throws IOException { - throw new IOException(); - } - public HttpChannel getServer() { - lastServer = new HttpChannel().serverMode(true); - lastServer.withBuffers(net); - lastServer.setConnector(this); - //lastServer.withIOConnector(memoryServerConnector); - return lastServer; - } - }; - - HttpConnector httpClient = new HttpConnector(memoryClientConnector); + HttpConnector serverConnector = new HttpConnector(null); + // Callback results for callback tests boolean hasBody = false; boolean bodyDone = false; boolean bodySentDone = false; boolean headersDone = false; boolean allDone = false; - - HttpChannel http = serverConnector.getServer(); - public void setUp() throws IOException { + // Requests will not be serviced - you must manually generate + // the response. serverConnector.setHttpService(null); + + conn = new Http11Connection(serverConnector) { + protected HttpChannel checkHttpChannel() throws IOException { + return http = super.checkHttpChannel(); + } + }.serverMode(); + conn.setSink(net); } @@ -80,6 +73,7 @@ public class HttpChannelInMemoryTest extends TestCase { "\r\n"; net.getIn().append(req); + //http = lastServer.get(0); assertTrue(http.getRequest().method().equals("GET")); assertTrue(http.getRequest().protocol().equals("HTTP/1.1")); assertEquals(http.getRequest().getMimeHeaders().size(), 4); @@ -96,7 +90,7 @@ public class HttpChannelInMemoryTest extends TestCase { // now second response must be in. // the connector will create a new http channel - http = lastServer; + //http = lastServer.get(1); assertTrue(http.getRequest().method().equals("HEAD")); assertTrue(http.getRequest().protocol().equals("HTTP/1.1")); @@ -105,12 +99,101 @@ public class HttpChannelInMemoryTest extends TestCase { .equals("Foo.com")); } + public void testHttp11Close() throws IOException { + String req = "GET /index.html?q=b&c=d HTTP/1.1\r\n" + + "Host: Foo.com\n" + + "Connection: close\n" + + "\n"; + net.getIn().append(req); + + assertTrue(http.getRequest().method().equals("GET")); + assertTrue(http.getRequest().protocol().equals("HTTP/1.1")); + + http.getOut().append("Response1"); + http.getOut().close(); + http.startSending(); + http.release(); + + assertTrue(net.out.indexOf("connection:close") > 0); + assertFalse(net.isOpen()); + } + + public void testHttp10Close() throws IOException { + String req = "GET /index.html?q=b&c=d HTTP/1.0\r\n" + + "Host: Foo.com \n" + + "\r\n"; + net.getIn().append(req); + + assertTrue(http.getRequest().method().equals("GET")); + assertTrue(http.getRequest().protocol().equals("HTTP/1.0")); + + http.getOut().append("Response1"); + http.getOut().close(); + http.startSending(); + + assertTrue(net.out.indexOf("connection:close") > 0); + assertFalse(net.isOpen()); + } + + public void testHttp10KA() throws IOException { + String req = "GET /index.html?q=b&c=d HTTP/1.0\r\n" + + "Connection: Keep-Alive\n" + + "Host: Foo.com \n" + + "\r\n"; + net.getIn().append(req); + + assertTrue(http.getRequest().method().equals("GET")); + assertTrue(http.getRequest().protocol().equals("HTTP/1.0")); + + http.getOut().append("Hi"); + http.getOut().close(); + http.startSending(); + + // after request + assertEquals(conn.activeHttp, null); + + assertTrue(net.out.indexOf("connection:keep-alive") > 0); + assertTrue(net.isOpen()); + // inserted since we can calculate the response + assertEquals(http.getResponse().getHeader("Content-Length"), + "2"); + } + + public void testHttp10KANoCL() throws IOException { + String req = "GET /index.html?q=b&c=d HTTP/1.0\r\n" + + "Connection: Keep-Alive\n" + + "Host: Foo.com \n" + + "\r\n"; + net.getIn().append(req); + + assertTrue(http.getRequest().method().equals("GET")); + assertTrue(http.getRequest().protocol().equals("HTTP/1.0")); + + http.getOut().append("Hi"); + http.startSending(); + + http.getOut().append("After"); + http.getOut().close(); + http.startSending(); + + // after request + assertEquals(conn.activeHttp, null); + + assertFalse(net.out.indexOf("connection:keep-alive") > 0); + assertFalse(net.isOpen()); + // inserted since we can calculate the response + assertEquals(http.getResponse().getHeader("Content-Length"), + null); + assertEquals(http.getResponse().getHeader("Transfer-Encoding"), + null); + } + public void testMultiLineHead() throws IOException { - http.getNet().getIn().append("GET / HTTP/1.0\n" + + net.getIn().append("GET / HTTP/1.0\n" + "Cookie: 1234\n" + " 456 \n" + "Connection: Close\n\n"); - http.getNet().getIn().close(); + net.getIn().close(); MultiMap headers = http.getRequest().getMimeHeaders(); CBuffer cookie = headers.getHeader("Cookie"); @@ -118,20 +201,20 @@ public class HttpChannelInMemoryTest extends TestCase { assertEquals(conn.toString(), "Close"); assertEquals(cookie.toString(), "1234 456"); - assertEquals(http.headRecvBuf.toString(), + assertEquals(http.conn.headRecvBuf.toString(), "GET / HTTP/1.0\n" + "Cookie: 1234 456 \n" + // \n -> trailing space "Connection: Close\n\n"); } public void testCloseSocket() throws IOException { - http.getNet().getIn().append("GET / HTTP/1.1\n" + net.getIn().append("GET / HTTP/1.1\n" + "Host: localhost\n" + "\n"); - assertTrue(http.keepAlive()); + assertTrue(((Http11Connection)http.conn).keepAlive()); - http.getNet().getIn().close(); - assertFalse(http.keepAlive()); + net.getIn().close(); + assertFalse(((Http11Connection)http.conn).keepAlive()); } public void test2ReqByte2Byte() throws IOException { @@ -166,7 +249,6 @@ public class HttpChannelInMemoryTest extends TestCase { http.release(); // now second response must be in - http = lastServer; assertTrue(http.getRequest().method().equals("HEAD")); assertTrue(http.getRequest().protocol().equals("HTTP/1.1")); assertTrue(http.getRequest().getMimeHeaders().size() == 2); @@ -182,14 +264,16 @@ public class HttpChannelInMemoryTest extends TestCase { } public void testEndWithoutFlushCallbacks() throws IOException { + + net.getIn().append(POST); + + net.getIn().close(); http.setCompletedCallback(new RequestCompleted() { public void handle(HttpChannel data, Object extra) throws IOException { allDone = true; } }); - http.getNet().getIn().append(POST); - http.getNet().getIn().close(); http.sendBody.queue("Hi"); http.getOut().close(); @@ -200,41 +284,48 @@ public class HttpChannelInMemoryTest extends TestCase { } public void testCallbacks() throws IOException { - http.setCompletedCallback(new RequestCompleted() { - public void handle(HttpChannel data, Object extra) - throws IOException { - allDone = true; - } - }); - http.setHttpService(new HttpService() { + // already accepted - will change + serverConnector.setHttpService(new HttpService() { public void service(HttpRequest httpReq, HttpResponse httpRes) - throws IOException { + throws IOException { + headersDone = true; - } - }); - http.setDataReceivedCallback(new IOConnector.DataReceivedCallback() { - @Override - public void handleReceived(IOChannel ch) throws IOException { - if (ch.getIn().isAppendClosed()) { - bodyDone = true; - } - } - }); - http.setDataFlushedCallback(new IOConnector.DataFlushedCallback() { - @Override - public void handleFlushed(IOChannel ch) throws IOException { - if (ch.getOut().isAppendClosed()) { - bodySentDone = true; - } + HttpChannel http = httpReq.getHttpChannel(); + + http.setCompletedCallback(new RequestCompleted() { + public void handle(HttpChannel data, Object extra) + throws IOException { + allDone = true; + } + }); + http.setDataReceivedCallback(new IOConnector.DataReceivedCallback() { + @Override + public void handleReceived(IOChannel ch) throws IOException { + if (ch.getIn().isAppendClosed()) { + bodyDone = true; + } + } + }); + http.setDataFlushedCallback(new IOConnector.DataFlushedCallback() { + @Override + public void handleFlushed(IOChannel ch) throws IOException { + if (ch.getOut().isAppendClosed()) { + bodySentDone = true; + } + } + }); } }); // Inject the request - http.getNet().getIn().append(POST); + net.getIn().append("POST / HTTP/1.0\n" + + "Connection: Close\n" + + "Content-Length: 4\n\n" + + "1"); assertTrue(headersDone); - http.getNet().getIn().append("1234"); + net.getIn().append("234"); - http.getNet().getIn().close(); + net.getIn().close(); assertTrue(bodyDone); @@ -253,17 +344,16 @@ public class HttpChannelInMemoryTest extends TestCase { "1234"; public void testClose() throws IOException { - http.getNet().getIn().append(POST); - http.getNet().getIn().close(); - - HttpBody receiveBody = http.receiveBody; + net.getIn().append(POST); + net.getIn().close(); + + IOBuffer receiveBody = http.receiveBody; IOBuffer appData = receiveBody; BBuffer res = BBuffer.allocate(1000); appData.readAll(res); assertEquals(res.toString(), "1234"); - assertFalse(http.keepAlive()); - assertFalse(http.keepAlive()); + assertFalse(((Http11Connection)http.conn).keepAlive()); http.sendBody.queue(res); http.getOut().close(); @@ -275,13 +365,13 @@ public class HttpChannelInMemoryTest extends TestCase { } public void testReadLine() throws Exception { - http.getNet().getIn().append("POST / HTTP/1.0\n" + + net.getIn().append("POST / HTTP/1.0\n" + "Content-Length: 28\n\n" + "Line 1\n" + "Line 2\r\n" + "Line 3\r" + "Line 4"); - http.getNet().getIn().close(); + net.getIn().close(); BufferedReader r = http.getRequest().getReader(); assertEquals("Line 1", r.readLine()); diff --git a/modules/tomcat-lite/test/org/apache/tomcat/lite/http/HttpChannelTest.java b/modules/tomcat-lite/test/org/apache/tomcat/lite/http/HttpChannelTest.java index 5cda35f29..0296e85a8 100644 --- a/modules/tomcat-lite/test/org/apache/tomcat/lite/http/HttpChannelTest.java +++ b/modules/tomcat-lite/test/org/apache/tomcat/lite/http/HttpChannelTest.java @@ -4,14 +4,14 @@ package org.apache.tomcat.lite.http; import java.io.IOException; -import org.apache.tomcat.lite.io.BBuffer; -import org.apache.tomcat.lite.io.CBuffer; - import junit.framework.TestCase; +import org.apache.tomcat.lite.io.BBuffer; + public class HttpChannelTest extends TestCase { HttpChannel ch = new HttpChannel().serverMode(true); + Http11Connection con = new Http11Connection(null).serverMode(); HttpRequest req = ch.getRequest(); @@ -39,7 +39,6 @@ public class HttpChannelTest extends TestCase { BBuffer f3 = BBuffer.wrapper("GET /a?b HTTP/1.0\na:b\r\r"); BBuffer f4 = BBuffer.wrapper("GET /a?b HTTP/1.0\na:b\r\n\r"); - public void reqTest(String lineS, String method, String req, String qry, String proto) throws IOException { BBuffer line = BBuffer.wrapper(lineS); @@ -47,7 +46,7 @@ public class HttpChannelTest extends TestCase { protoB.recycle(); requestB.recycle(); methodB.recycle(); - ch.parseRequestLine(line, methodB, requestB, queryB, protoB); + con.parseRequestLine(line, methodB, requestB, queryB, protoB); assertEquals(proto, protoB.toString()); assertEquals(req, requestB.toString()); assertEquals(qry, queryB.toString()); @@ -62,7 +61,7 @@ public class HttpChannelTest extends TestCase { private MultiMap processQry(String qry) throws IOException { BBuffer head = BBuffer.wrapper("GET /a?" + qry + " HTTP/1.0\n" + "Host: a\n\n"); - ch.parseMessage(head); + con.parseMessage(ch, head); MultiMap params = req.getParameters(); return params; } @@ -82,7 +81,7 @@ public class HttpChannelTest extends TestCase { String expLine, String expRest) throws IOException { head = BBuffer.wrapper(headS); head.readLine(line); - ch.parseHeader(head, line, name, value); + con.parseHeader(ch, head, line, name, value); assertEquals(expName, name.toString()); assertEquals(expValue, value.toString()); @@ -111,7 +110,7 @@ public class HttpChannelTest extends TestCase { statusB.recycle(); msgB.recycle(); BBuffer line = BBuffer.wrapper(lineS); - ch.parseResponseLine(line, + con.parseResponseLine(line, protoB, statusB, msgB); assertEquals(proto, protoB.toString()); assertEquals(status, statusB.toString()); diff --git a/modules/tomcat-lite/test/org/apache/tomcat/lite/http/HttpsTest.java b/modules/tomcat-lite/test/org/apache/tomcat/lite/http/HttpsTest.java index 5eb28008f..eb2f9fed8 100644 --- a/modules/tomcat-lite/test/org/apache/tomcat/lite/http/HttpsTest.java +++ b/modules/tomcat-lite/test/org/apache/tomcat/lite/http/HttpsTest.java @@ -94,13 +94,12 @@ public class HttpsTest extends TestCase { } - private void checkResponse(HttpConnector httpClient) throws Exception { - HttpChannel ch = httpClient.get("localhost", port); - ch.getRequest().setRequestURI("/hello"); - ch.getRequest().setProtocol("HTTP/1.0"); - // problems with keep alive !!! - ch.sendRequest(); - BBuffer res = ch.readAll(null, 1000000); + private void checkResponse(HttpConnector httpCon) throws Exception { + HttpRequest ch = httpCon.request("localhost", port); + ch.setRequestURI("/hello"); + ch.setProtocol("HTTP/1.0"); + ch.send(); + BBuffer res = ch.readAll(); assertTrue(res.toString().indexOf("Hello") >= 0); } @@ -120,12 +119,11 @@ public class HttpsTest extends TestCase { public void testSimpleRequestGoogle() throws Exception { SslConnector sslCon = new SslConnector(); httpClient = new HttpConnector(sslCon); - HttpChannel client = httpClient.get("www.google.com", 443); - client.getRequest().setRequestURI("/accounts/ServiceLogin"); - client.sendRequest(); + HttpRequest client = httpClient.request("www.google.com", 443); + client.setRequestURI("/accounts/ServiceLogin"); + client.send(); - BBuffer res = BBuffer.allocate(10000); - client.readAll(res, 1000000); + BBuffer res = client.readAll(); assertTrue(res.toString().indexOf("Google Accounts") > 0); } diff --git a/modules/tomcat-lite/test/org/apache/tomcat/lite/http/LiveHttp1Test.java b/modules/tomcat-lite/test/org/apache/tomcat/lite/http/LiveHttp1Test.java index 25c8ab488..74c70771e 100644 --- a/modules/tomcat-lite/test/org/apache/tomcat/lite/http/LiveHttp1Test.java +++ b/modules/tomcat-lite/test/org/apache/tomcat/lite/http/LiveHttp1Test.java @@ -22,25 +22,24 @@ import junit.framework.TestCase; import org.apache.tomcat.lite.TestMain; import org.apache.tomcat.lite.io.BBuffer; -import org.apache.tomcat.lite.io.CBuffer; import org.apache.tomcat.lite.io.SocketConnector; public class LiveHttp1Test extends TestCase { // Proxy tests extend this class, run same tests via proxy on 8903 protected int clientPort = 8802; - HttpChannel httpClient; + HttpRequest httpClient; BBuffer bodyRecvBuffer = BBuffer.allocate(1024); - int to = 1000; + int to = 1000000; public void setUp() throws IOException { // DefaultHttpConnector.get().setDebug(true); // DefaultHttpConnector.get().setDebugHttp(true); - TestMain.getTestServer(); + TestMain.initTestEnv(); - httpClient = DefaultHttpConnector.get().get("localhost", clientPort); + httpClient = DefaultHttpConnector.get().request("localhost", clientPort); bodyRecvBuffer.recycle(); } @@ -53,9 +52,9 @@ public class LiveHttp1Test extends TestCase { } public void testSimpleRequest() throws Exception { - httpClient.getRequest().requestURI().set("/hello"); + httpClient.requestURI().set("/hello"); - httpClient.sendRequest(); + httpClient.send(); httpClient.readAll(bodyRecvBuffer, to); assertEquals("Hello world", bodyRecvBuffer.toString()); } @@ -75,18 +74,18 @@ public class LiveHttp1Test extends TestCase { } public void testSimpleChunkedRequest() throws Exception { - httpClient.getRequest().requestURI().set("/chunked/foo"); - httpClient.sendRequest(); + httpClient.requestURI().set("/chunked/foo"); + httpClient.send(); httpClient.readAll(bodyRecvBuffer, to); assertTrue(bodyRecvBuffer.toString().indexOf("AAA") >= 0); } // Check waitResponseHead() public void testRequestHead() throws Exception { - httpClient.getRequest().requestURI().set("/echo/foo"); + httpClient.requestURI().set("/echo/foo"); // Send the request, wait response - httpClient.sendRequest(); + httpClient.send(); httpClient.readAll(bodyRecvBuffer, to); assertTrue(bodyRecvBuffer.toString().indexOf("GET /echo/foo") > 0); @@ -109,19 +108,19 @@ public class LiveHttp1Test extends TestCase { } public void notFound() throws Exception { - httpClient.getRequest().requestURI().set("/foo"); - httpClient.sendRequest(); + httpClient.requestURI().set("/foo"); + httpClient.send(); httpClient.readAll(bodyRecvBuffer, to); } // compression not implemented public void testGzipRequest() throws Exception { - httpClient.getRequest().requestURI().set("/hello"); - httpClient.getRequest().setHeader("accept-encoding", + httpClient.requestURI().set("/hello"); + httpClient.setHeader("accept-encoding", "gzip"); // Send the request, wait response - httpClient.sendRequest(); + httpClient.send(); // cstate.waitResponseHead(10000); // headers are received // ByteChunk data = new ByteChunk(1024); // acstate.serializeResponse(acstate.res, data); @@ -133,10 +132,10 @@ public class LiveHttp1Test extends TestCase { } public void testWrongPort() throws Exception { - httpClient = DefaultHttpConnector.get().get("localhost", 18904); - httpClient.getRequest().requestURI().set("/hello"); + httpClient = DefaultHttpConnector.get().request("localhost", 18904); + httpClient.requestURI().set("/hello"); - httpClient.sendRequest(); + httpClient.send(); httpClient.readAll(bodyRecvBuffer, to); assertEquals(0, bodyRecvBuffer.remaining()); diff --git a/modules/tomcat-lite/test/org/apache/tomcat/lite/http/SpdyTest.java b/modules/tomcat-lite/test/org/apache/tomcat/lite/http/SpdyTest.java new file mode 100644 index 000000000..e98f41df2 --- /dev/null +++ b/modules/tomcat-lite/test/org/apache/tomcat/lite/http/SpdyTest.java @@ -0,0 +1,89 @@ +/* + */ +package org.apache.tomcat.lite.http; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; + +import junit.framework.TestCase; + +import org.apache.tomcat.lite.TestMain; +import org.apache.tomcat.lite.io.IOBuffer; +import org.apache.tomcat.lite.http.SpdyConnection.SpdyConnectionManager; + +public class SpdyTest extends TestCase { + HttpConnector http11Con = TestMain.getClientAndInit(); + + static HttpConnector spdyCon = DefaultHttpConnector.get() + .withConnectionManager(new SpdyConnectionManager()); + + HttpConnector memSpdyCon = + new HttpConnector(null).withConnectionManager(new SpdyConnectionManager()); + + public void testClient() throws IOException { + HttpRequest req = + spdyCon.request("http://localhost:8802/echo/test1"); + + HttpResponse res = req.waitResponse(); + + assertEquals(200, res.getStatus()); + //assertEquals("", res.getHeader("")); + + BufferedReader reader = res.getReader(); + String line1 = reader.readLine(); + //assertEquals("", line1); + } + + // Initial frame generated by Chrome + public void testParse() throws IOException { + InputStream is = + getClass().getClassLoader().getResourceAsStream("org/apache/tomcat/lite/http/spdyreq0"); + + IOBuffer iob = new IOBuffer(); + iob.append(is); + + SpdyConnection con = (SpdyConnection) memSpdyCon.newConnection(); + + // By default it has a dispatcher buit-in + con.serverMode = true; + + con.dataReceived(iob); + + HttpChannel spdyChannel = con.channels.get(1); + + assertEquals(1, con.lastFrame.version); + assertEquals(1, con.lastFrame.type); + assertEquals(1, con.lastFrame.flags); + + assertEquals(417, con.lastFrame.length); + + // TODO: test req, headers + HttpRequest req = spdyChannel.getRequest(); + assertTrue(req.getHeader("accept").indexOf("application/xml") >= 0); + + } + + // Does int parsing works ? + public void testLargeInt() throws Exception { + + IOBuffer iob = new IOBuffer(); + iob.append(0xFF); + iob.append(0xFF); + iob.append(0xFF); + iob.append(0xFF); + + iob.append(0xFF); + iob.append(0xFF); + iob.append(0xFF); + iob.append(0xFF); + + SpdyConnection con = (SpdyConnection) memSpdyCon.newConnection(); + con.dataReceived(iob); + assertEquals(0x7FFF, con.currentInFrame.version); + assertEquals(0xFFFF, con.currentInFrame.type); + assertEquals(0xFF, con.currentInFrame.flags); + assertEquals(0xFFFFFF, con.currentInFrame.length); + + } +} diff --git a/modules/tomcat-lite/test/org/apache/tomcat/lite/http/services/EchoCallback.java b/modules/tomcat-lite/test/org/apache/tomcat/lite/http/services/EchoCallback.java index d7b2c3480..1c29ec492 100644 --- a/modules/tomcat-lite/test/org/apache/tomcat/lite/http/services/EchoCallback.java +++ b/modules/tomcat-lite/test/org/apache/tomcat/lite/http/services/EchoCallback.java @@ -18,6 +18,7 @@ package org.apache.tomcat.lite.http.services; import java.io.IOException; import java.util.logging.Logger; +import org.apache.tomcat.lite.http.Http11Connection; import org.apache.tomcat.lite.http.HttpChannel; import org.apache.tomcat.lite.http.HttpRequest; import org.apache.tomcat.lite.http.HttpResponse; @@ -43,7 +44,7 @@ public class EchoCallback implements HttpService { res.setContentType(contentType); IOBuffer tmp = new IOBuffer(null); - req.serialize(tmp); + Http11Connection.serialize(req, tmp); sproc.getOut().append("REQ HEAD:\n"); sproc.getOut().append(tmp.readAll(null)); diff --git a/modules/tomcat-lite/test/org/apache/tomcat/lite/http/services/SleepCallback.java b/modules/tomcat-lite/test/org/apache/tomcat/lite/http/services/SleepCallback.java index b0cce6f9d..12657d114 100644 --- a/modules/tomcat-lite/test/org/apache/tomcat/lite/http/services/SleepCallback.java +++ b/modules/tomcat-lite/test/org/apache/tomcat/lite/http/services/SleepCallback.java @@ -61,7 +61,7 @@ public class SleepCallback extends StaticContentService { } res.setContentType(contentType); - res.sendHead(); + res.flush(); Thread.currentThread().sleep(t2); diff --git a/modules/tomcat-lite/test/org/apache/tomcat/lite/http/spdyreq0 b/modules/tomcat-lite/test/org/apache/tomcat/lite/http/spdyreq0 new file mode 100644 index 0000000000000000000000000000000000000000..4df3aa94a1e2508a43e4d7eeb7e82a7a2b666f28 GIT binary patch literal 425 zcmY+A-A=+l5QT@3An^j@3+P@VZo3pH7Kj(d7%)II0iyA4U50kUcDJ@$L%8x)d=nqd z@)Hx2xj84_%$c085|oPh27uBJKou%n2uv)z@gz}Fa4U^wn^@6*I>b((A8S@ZXqC?tXjlJhX?U3P!h^HA^xOkjBIYHp}9FOL9@8!Yri@;XK^Rgf4NVxJ8;RMfArguEV^Dgc@pRF{5~V1S&Dw$Sk2ee3(NyPZeB7 z)+T+%lo4D-CbRt@Xto%N$S{kUh0{F4)G3Z>3uU6H$@H+;eOv9!!@w_Kmp$D{dlLWz=VBtsNyb4Bi>)Wpa-Zqsdbdamb@8DH^K V9#-Jw6Vt3{9;&1H{F!;K_X|*hfHD98 literal 0 HcmV?d00001 diff --git a/modules/tomcat-lite/test/org/apache/tomcat/lite/io/OneTest.java b/modules/tomcat-lite/test/org/apache/tomcat/lite/io/OneTest.java index fedc2db8f..9cfce1117 100644 --- a/modules/tomcat-lite/test/org/apache/tomcat/lite/io/OneTest.java +++ b/modules/tomcat-lite/test/org/apache/tomcat/lite/io/OneTest.java @@ -12,12 +12,9 @@ import org.apache.tomcat.lite.io.MemoryIOConnector.MemoryIOChannel; import junit.framework.TestCase; public class OneTest extends TestCase { - MemoryIOConnector.MemoryIOChannel net = new MemoryIOChannel(); - HttpChannel http = new HttpChannel() - .serverMode(true).withBuffers(net); public void setUp() throws Exception { - TestMain.getTestServer(); + TestMain.initTestEnv(); } public void tearDown() throws IOException { diff --git a/modules/tomcat-lite/test/org/apache/tomcat/lite/load/LiveHttpThreadedTest.java b/modules/tomcat-lite/test/org/apache/tomcat/lite/load/LiveHttpThreadedTest.java index 64cfc4ac0..fc3108d97 100644 --- a/modules/tomcat-lite/test/org/apache/tomcat/lite/load/LiveHttpThreadedTest.java +++ b/modules/tomcat-lite/test/org/apache/tomcat/lite/load/LiveHttpThreadedTest.java @@ -25,11 +25,12 @@ import org.apache.tomcat.lite.TestMain; import org.apache.tomcat.lite.http.DefaultHttpConnector; import org.apache.tomcat.lite.http.HttpChannel; import org.apache.tomcat.lite.http.HttpConnector; +import org.apache.tomcat.lite.http.HttpRequest; import org.apache.tomcat.lite.http.HttpChannel.HttpService; import org.apache.tomcat.lite.http.HttpChannel.RequestCompleted; public class LiveHttpThreadedTest extends TestCase { - HttpConnector staticMain = TestMain.getTestServer(); + HttpConnector staticMain = TestMain.initTestEnv(); int tCount = 1; @@ -104,13 +105,13 @@ public class LiveHttpThreadedTest extends TestCase { }; void makeRequest(int i, boolean block) throws Exception { - HttpChannel cstate = DefaultHttpConnector.get().get("localhost", 8802); + HttpRequest cstate = DefaultHttpConnector.get().request("localhost", 8802); - cstate.getRequest().requestURI().set("/hello"); + cstate.requestURI().set("/hello"); cstate.setCompletedCallback(reqCallback); // Send the request, wait response - cstate.sendRequest(); + cstate.send(); } } diff --git a/modules/tomcat-lite/test/org/apache/tomcat/lite/proxy/ProxyTest.java b/modules/tomcat-lite/test/org/apache/tomcat/lite/proxy/ProxyTest.java index 1c40a2dfc..e59883ea5 100644 --- a/modules/tomcat-lite/test/org/apache/tomcat/lite/proxy/ProxyTest.java +++ b/modules/tomcat-lite/test/org/apache/tomcat/lite/proxy/ProxyTest.java @@ -28,7 +28,7 @@ public class ProxyTest extends TestCase { String resStr; public void setUp() throws Exception { - TestMain.getTestServer(); + TestMain.initTestEnv(); } public void tearDown() throws IOException { diff --git a/modules/tomcat-lite/test/org/apache/tomcat/lite/proxy/SmallProxyTest.java b/modules/tomcat-lite/test/org/apache/tomcat/lite/proxy/SmallProxyTest.java index 31801218b..83ac8a974 100644 --- a/modules/tomcat-lite/test/org/apache/tomcat/lite/proxy/SmallProxyTest.java +++ b/modules/tomcat-lite/test/org/apache/tomcat/lite/proxy/SmallProxyTest.java @@ -8,6 +8,7 @@ import junit.framework.TestCase; import org.apache.tomcat.lite.http.HttpChannel; import org.apache.tomcat.lite.http.HttpConnector; +import org.apache.tomcat.lite.http.HttpConnector.HttpConnection; import org.apache.tomcat.lite.io.MemoryIOConnector; import org.apache.tomcat.lite.io.MemoryIOConnector.MemoryIOChannel; @@ -20,14 +21,13 @@ public class SmallProxyTest extends TestCase { new MemoryIOConnector().withServer(memoryServerConnector); - HttpConnector httpPool = new HttpConnector(memoryServerConnector) { + HttpConnector httpCon = new HttpConnector(memoryServerConnector) { @Override public HttpChannel get(CharSequence target) throws IOException { throw new IOException(); } public HttpChannel getServer() { lastServer = new HttpChannel().serverMode(true); - lastServer.withBuffers(net); lastServer.setConnector(this); //lastServer.withIOConnector(memoryServerConnector); return lastServer; @@ -65,9 +65,12 @@ public class SmallProxyTest extends TestCase { MemoryIOConnector.MemoryIOChannel net = new MemoryIOChannel(); HttpChannel http; + + HttpConnection serverConnection; public void setUp() throws IOException { - http = httpPool.getServer(); + http = httpCon.getServer(); + serverConnection = httpCon.handleAccepted(net); } /** @@ -75,13 +78,13 @@ public class SmallProxyTest extends TestCase { * @throws IOException */ public void testProxy() throws IOException { - http.setHttpService(new HttpProxyService() + httpCon.setHttpService(new HttpProxyService() .withSelector(memoryClientConnector) .withHttpClient(httpClient)); - http.getNet().getIn().append("GET http://www.cyberluca.com/ HTTP/1.0\n" + + net.getIn().append("GET http://www.apache.org/ HTTP/1.0\n" + "Connection: Close\n\n"); - http.getNet().getIn().close(); + net.getIn().close(); // lastClient.rawSendBuffers has the request sent by proxy lastClient.getNet().getIn() diff --git a/modules/tomcat-lite/test/org/apache/tomcat/lite/servlet/TomcatLiteNoConnectorTest.java b/modules/tomcat-lite/test/org/apache/tomcat/lite/servlet/TomcatLiteNoConnectorTest.java index a6621e34b..88f0ecd6a 100644 --- a/modules/tomcat-lite/test/org/apache/tomcat/lite/servlet/TomcatLiteNoConnectorTest.java +++ b/modules/tomcat-lite/test/org/apache/tomcat/lite/servlet/TomcatLiteNoConnectorTest.java @@ -25,9 +25,6 @@ import org.apache.tomcat.lite.http.HttpConnector; import org.apache.tomcat.lite.http.HttpRequest; import org.apache.tomcat.lite.http.HttpResponse; import org.apache.tomcat.lite.io.BBuffer; -import org.apache.tomcat.lite.io.MemoryIOConnector; -import org.apache.tomcat.lite.io.MemoryIOConnector.MemoryIOChannel; -import org.apache.tomcat.lite.servlet.TomcatLite; /** * Example of testing servlets without using sockets. @@ -37,12 +34,10 @@ import org.apache.tomcat.lite.servlet.TomcatLite; public class TomcatLiteNoConnectorTest extends TestCase { TomcatLite lite; - MemoryIOConnector net; HttpConnector con; public void setUp() throws Exception { - net = new MemoryIOConnector(); - con = new HttpConnector(net); + con = new HttpConnector(null); lite = new TomcatLite(); lite.setHttpConnector(con); @@ -55,39 +50,21 @@ public class TomcatLiteNoConnectorTest extends TestCase { lite.stop(); } - public void testSimpleRequest() throws Exception { - MemoryIOConnector.MemoryIOChannel ch = new MemoryIOChannel(); - HttpChannel httpCh = con.getServer(); - httpCh.withBuffers(ch); HttpRequest req = httpCh.getRequest(); req.setURI("/test1/1stTest"); HttpResponse res = httpCh.getResponse(); - lite.getHttpConnector().getDispatcher().service(req, res, true); - // req/res will be recycled - - // parse out to a response - BBuffer out = ch.out; - MemoryIOChannel clientCh = new MemoryIOChannel(); - clientCh.getIn().append(out); - - HttpChannel client = con.get("localhost", 80); - client.withBuffers(clientCh); - clientCh.handleReceived(clientCh); - - - HttpResponse cres = client.getResponse(); - assertEquals(res.getStatus(), 200); + lite.getHttpConnector().getDispatcher().service(req, res, true, false); - BBuffer resBody = BBuffer.allocate(200); - cres.getBody().readAll(resBody); + BBuffer resBody = res.getBody().readAll(null); assertEquals("Hello world", resBody.toString()); - assertEquals(cres.getHeader("Foo"), "Bar"); - assertEquals(cres.getStatus(), 200); + + assertEquals(res.getHeader("Foo"), "Bar"); + assertEquals(res.getStatus(), 200); } // diff --git a/modules/tomcat-lite/test/org/apache/tomcat/test/watchdog/WatchdogHttpClient.java b/modules/tomcat-lite/test/org/apache/tomcat/test/watchdog/WatchdogHttpClient.java index f49dd75ee..3100737ac 100644 --- a/modules/tomcat-lite/test/org/apache/tomcat/test/watchdog/WatchdogHttpClient.java +++ b/modules/tomcat-lite/test/org/apache/tomcat/test/watchdog/WatchdogHttpClient.java @@ -52,7 +52,7 @@ public class WatchdogHttpClient { System.out.println( " Socket Exception: " + ex ); return; } - //socket.setSoTimeout(10000); + //socket.setSoTimeout(2000); //socket obtained, rebuild the request. rebuildRequest(client, client.request, socket); @@ -144,7 +144,6 @@ public class WatchdogHttpClient { // write the request out.write( reqbytes, 0, reqbytes.length ); out.flush(); - reqbuf = null; } catch ( Exception ex1 ) { System.out.println( " Error writing request " + ex1 ); if ( debug > 0 ) { @@ -385,9 +384,9 @@ public class WatchdogHttpClient { * @exception IOException if an error occurs */ private void fill() throws IOException { - if (markpos < 0) + if (markpos < 0) { pos = 0; /* no mark: throw away the buffer */ - else if (pos >= buf.length) /* no room left in buffer */ + } else if (pos >= buf.length) {/* no room left in buffer */ if (markpos > 0) { /* can throw away early part of the buffer */ int sz = pos - markpos; System.arraycopy(buf, markpos, buf, 0, sz); @@ -404,10 +403,12 @@ public class WatchdogHttpClient { System.arraycopy(buf, 0, nbuf, 0, pos); buf = nbuf; } - count = pos; - int n = in.read(buf, pos, buf.length - pos); - if (n > 0) + } + count = pos; + int n = in.read(buf, pos, buf.length - pos); + if (n > 0) { count = n + pos; + } } } -- 2.11.0