Implement non blocking read on HTTP requests.
authorfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Wed, 18 Oct 2006 23:24:52 +0000 (23:24 +0000)
committerfhanik <fhanik@13f79535-47bb-0310-9956-ffa450edef68>
Wed, 18 Oct 2006 23:24:52 +0000 (23:24 +0000)
A common scalability problem when it comes to HTTP is the fact that there are slow clients, that will block a server resources while sending a HTTP request. Especially when you have larger request headers.

On FreeBSD the kernel has a built in http filter to not wake up the application socket handle until the entire request has been received, however on other platforms this is not available.

With the Tomcat connectors, there is an obvious problem when it comes to slow clients, if the client sends up a partial request, Tomcat will block the thread until the client has finished sending the request. For example, if the client has 10 headers it sends up the first 5 headers, then the next 5 in a sequential batch, the tomcat thread is locked in a blocking read
I've tried to fix that problem by making the NIO connector be non blocking. The only time the NIO connector will block now is when the servlet asks for data, usually the request body, as we don't have a way to suspend a thread, like continuations.
Once we have continuations(that can truly remember thread stack data), we can have a truly non blocking server, but we are not there yet.

I believe this code could be easily ported to APR connector with very little effort.
When you review this code, please note that I have not attemtped to rewrite the header parse logic, I might do that in a later stage as this got a little messy, but I wanted the proof of concept done first and reuse as much code as possible.

Please feel free to review and even flame me if needed, at least that means this got some attention :)

git-svn-id: https://svn.apache.org/repos/asf/tomcat/tc6.0.x/trunk@465417 13f79535-47bb-0310-9956-ffa450edef68

java/org/apache/coyote/http11/Http11NioProcessor.java
java/org/apache/coyote/http11/InternalNioInputBuffer.java

index 122b8e7..d781149 100644 (file)
@@ -820,7 +820,7 @@ public class Http11NioProcessor implements ActionHook {
 
         boolean keptAlive = false;
         boolean openSocket = false;
-
+        boolean recycle = true;
         while (!error && keepAlive && !comet) {
 
             // Parsing the request header
@@ -829,8 +829,7 @@ public class Http11NioProcessor implements ActionHook {
                     socket.getIOChannel().socket().setSoTimeout((int)soTimeout);
                     inputBuffer.readTimeout = soTimeout;
                 }
-                if (!inputBuffer.parseRequestLine
-                        (keptAlive && (endpoint.getCurrentThreadsBusy() > limit))) {
+                if (!inputBuffer.parseRequestLine(keptAlive && (endpoint.getCurrentThreadsBusy() > limit))) {
                     // This means that no data is available right now
                     // (long keepalive), so that the processor should be recycled
                     // and the method should return true
@@ -839,13 +838,18 @@ public class Http11NioProcessor implements ActionHook {
                     socket.getPoller().add(socket);
                     break;
                 }
-                request.setStartTime(System.currentTimeMillis());
                 keptAlive = true;
-                if (!disableUploadTimeout) {
+                if ( !inputBuffer.parseHeaders() ) {
+                    openSocket = true;
+                    socket.getPoller().add(socket);
+                    recycle = false;
+                    break;
+                }
+                request.setStartTime(System.currentTimeMillis());
+                if (!disableUploadTimeout) { //only for body, not for request headers
                     socket.getIOChannel().socket().setSoTimeout((int)timeout);
                     inputBuffer.readTimeout = soTimeout;
                 }
-                inputBuffer.parseHeaders();
             } catch (IOException e) {
                 error = true;
                 break;
@@ -934,7 +938,7 @@ public class Http11NioProcessor implements ActionHook {
                 return SocketState.LONG;
             }
         } else {
-            recycle();
+            if ( recycle ) recycle();
             return (openSocket) ? SocketState.OPEN : SocketState.CLOSED;
         }
 
index 4698b4b..43da6c7 100644 (file)
@@ -45,9 +45,10 @@ public class InternalNioInputBuffer implements InputBuffer {
 
     // -------------------------------------------------------------- Constants
 
-
+    enum HeaderParseStatus {DONE, HAVE_MORE_HEADERS, NEED_MORE_DATA}
+    enum HeaderParsePosition {HEADER_START, HEADER_NAME, HEADER_VALUE, HEADER_MULTI_LINE}
     // ----------------------------------------------------------- Constructors
-
+    
 
     /**
      * Alternate constructor.
@@ -72,6 +73,9 @@ public class InternalNioInputBuffer implements InputBuffer {
         lastActiveFilter = -1;
 
         parsingHeader = true;
+        parsingRequestLine = true;
+        headerParsePos = HeaderParsePosition.HEADER_START;
+        headerData.recycle();
         swallowInput = true;
 
         if (readTimeout < 0) {
@@ -112,6 +116,8 @@ public class InternalNioInputBuffer implements InputBuffer {
      * State.
      */
     protected boolean parsingHeader;
+    protected boolean parsingRequestLine;
+    protected HeaderParsePosition headerParsePos;
 
 
     /**
@@ -286,6 +292,9 @@ public class InternalNioInputBuffer implements InputBuffer {
         pos = 0;
         lastActiveFilter = -1;
         parsingHeader = true;
+        headerParsePos = HeaderParsePosition.HEADER_START;
+        parsingRequestLine = true;
+        headerData.recycle();
         swallowInput = true;
 
     }
@@ -325,6 +334,9 @@ public class InternalNioInputBuffer implements InputBuffer {
         pos = 0;
         lastActiveFilter = -1;
         parsingHeader = true;
+        headerParsePos = HeaderParsePosition.HEADER_START;
+        parsingRequestLine = true;
+        headerData.recycle();
         swallowInput = true;
 
     }
@@ -360,6 +372,9 @@ public class InternalNioInputBuffer implements InputBuffer {
     public boolean parseRequestLine(boolean useAvailableData)
         throws IOException {
 
+        //check state
+        if ( !parsingRequestLine ) return true;
+        
         int start = 0;
 
         //
@@ -375,7 +390,7 @@ public class InternalNioInputBuffer implements InputBuffer {
                     return false;
                 }
                 if (readTimeout == -1) {
-                    if (!fill()) //request line parsing
+                    if (!fill(false,true)) //request line parsing
                         throw new EOFException(sm.getString("iib.eof.error"));
                 } else {
                     // Do a simple read with a short timeout
@@ -397,8 +412,8 @@ public class InternalNioInputBuffer implements InputBuffer {
                 return false;
             }
             if (readTimeout == -1) {
-                if (!fill()) //request line parsing
-                    throw new EOFException(sm.getString("iib.eof.error"));
+                if (!fill(false,false)) //request line parsing
+                    return false;
             } else {
                 // Do a simple read with a short timeout
                 if ( !readSocket(true, false) ) return false;
@@ -416,8 +431,8 @@ public class InternalNioInputBuffer implements InputBuffer {
 
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill()) //request line parsing
-                    throw new EOFException(sm.getString("iib.eof.error"));
+                if (!fill(true,false)) //request line parsing
+                    return false;
             }
 
             if (buf[pos] == Constants.SP) {
@@ -445,8 +460,8 @@ public class InternalNioInputBuffer implements InputBuffer {
 
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill()) //request line parsing
-                    throw new EOFException(sm.getString("iib.eof.error"));
+                if (!fill(true,false)) //request line parsing
+                    return false;
             }
 
             if (buf[pos] == Constants.SP) {
@@ -489,8 +504,8 @@ public class InternalNioInputBuffer implements InputBuffer {
 
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill()) //reques line parsing
-                    throw new EOFException(sm.getString("iib.eof.error"));
+                if (!fill(true,false)) //reques line parsing
+                    return false;
             }
 
             if (buf[pos] == Constants.CR) {
@@ -510,7 +525,7 @@ public class InternalNioInputBuffer implements InputBuffer {
         } else {
             request.protocol().setString("");
         }
-
+        parsingRequestLine = false;
         return true;
 
     }
@@ -552,7 +567,7 @@ public class InternalNioInputBuffer implements InputBuffer {
             } else if ( !block ) {
                 return false;
             } else {
-                timedOut = (readTimeout != -1) && ((System.currentTimeMillis()-start)>readTimeout);
+                timedOut = timeout && (readTimeout != -1) && ((System.currentTimeMillis()-start)>readTimeout);
                 if ( !timedOut && nRead == 0 )  {
                     try {
                         final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
@@ -604,15 +619,20 @@ public class InternalNioInputBuffer implements InputBuffer {
     /**
      * Parse the HTTP headers.
      */
-    public void parseHeaders()
+    public boolean parseHeaders()
         throws IOException {
-
-        while (parseHeader()) {
+        HeaderParseStatus status = HeaderParseStatus.HAVE_MORE_HEADERS;
+        
+        do {
+            status = parseHeader();
+        } while ( status == HeaderParseStatus.HAVE_MORE_HEADERS );
+        if (status == HeaderParseStatus.DONE) {
+            parsingHeader = false;
+            end = pos;
+            return true;
+        } else {
+            return false;
         }
-
-        parsingHeader = false;
-        end = pos;
-
     }
 
 
@@ -622,7 +642,7 @@ public class InternalNioInputBuffer implements InputBuffer {
      * @return false after reading a blank line (which indicates that the
      * HTTP header parsing is done
      */
-    public boolean parseHeader()
+    public HeaderParseStatus parseHeader()
         throws IOException {
 
         //
@@ -630,12 +650,14 @@ public class InternalNioInputBuffer implements InputBuffer {
         //
 
         byte chr = 0;
-        while (true) {
+        while (headerParsePos == HeaderParsePosition.HEADER_START) {
 
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill()) //parse header
-                    throw new EOFException(sm.getString("iib.eof.error"));
+                if (!fill(true,false)) {//parse header 
+                    headerParsePos = HeaderParsePosition.HEADER_START;
+                    return HeaderParseStatus.NEED_MORE_DATA;
+                }
             }
 
             chr = buf[pos];
@@ -643,7 +665,7 @@ public class InternalNioInputBuffer implements InputBuffer {
             if ((chr == Constants.CR) || (chr == Constants.LF)) {
                 if (chr == Constants.LF) {
                     pos++;
-                    return false;
+                    return HeaderParseStatus.DONE;
                 }
             } else {
                 break;
@@ -653,28 +675,31 @@ public class InternalNioInputBuffer implements InputBuffer {
 
         }
 
-        // Mark the current buffer position
-        int start = pos;
+        if ( headerParsePos == HeaderParsePosition.HEADER_START ) {
+            // Mark the current buffer position
+            headerData.start = pos;
+            headerParsePos = HeaderParsePosition.HEADER_NAME;
+        }    
 
         //
         // Reading the header name
         // Header name is always US-ASCII
         //
+        
+        
 
-        boolean colon = false;
-        MessageBytes headerValue = null;
-
-        while (!colon) {
+        while (headerParsePos == HeaderParsePosition.HEADER_NAME) {
 
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill()) //parse header
-                    throw new EOFException(sm.getString("iib.eof.error"));
+                if (!fill(true,false)) { //parse header 
+                    return HeaderParseStatus.NEED_MORE_DATA;
+                }
             }
 
             if (buf[pos] == Constants.COLON) {
-                colon = true;
-                headerValue = headers.addValue(buf, start, pos - start);
+                headerParsePos = HeaderParsePosition.HEADER_VALUE;
+                headerData.headerValue = headers.addValue(buf, headerData.start, pos - headerData.start);
             }
             chr = buf[pos];
             if ((chr >= Constants.A) && (chr <= Constants.Z)) {
@@ -682,97 +707,121 @@ public class InternalNioInputBuffer implements InputBuffer {
             }
 
             pos++;
-
+            if ( headerParsePos == HeaderParsePosition.HEADER_VALUE ) { 
+                // Mark the current buffer position
+                headerData.start = pos;
+                headerData.realPos = pos;
+            }
         }
 
-        // Mark the current buffer position
-        start = pos;
-        int realPos = pos;
-
+        
         //
         // Reading the header value (which can be spanned over multiple lines)
         //
 
         boolean eol = false;
-        boolean validLine = true;
 
-        while (validLine) {
+        while (headerParsePos == HeaderParsePosition.HEADER_VALUE ||
+               headerParsePos == HeaderParsePosition.HEADER_MULTI_LINE) {
+            if ( headerParsePos == HeaderParsePosition.HEADER_VALUE ) {
+            
+                boolean space = true;
 
-            boolean space = true;
+                // Skipping spaces
+                while (space) {
 
-            // Skipping spaces
-            while (space) {
+                    // Read new bytes if needed
+                    if (pos >= lastValid) {
+                        if (!fill(true,false)) {//parse header 
+                            //HEADER_VALUE, should already be set
+                            return HeaderParseStatus.NEED_MORE_DATA;
+                        }
+                    }
 
-                // Read new bytes if needed
-                if (pos >= lastValid) {
-                    if (!fill()) //parse header
-                        throw new EOFException(sm.getString("iib.eof.error"));
-                }
+                    if ((buf[pos] == Constants.SP) || (buf[pos] == Constants.HT)) {
+                        pos++;
+                    } else {
+                        space = false;
+                    }
 
-                if ((buf[pos] == Constants.SP) || (buf[pos] == Constants.HT)) {
-                    pos++;
-                } else {
-                    space = false;
                 }
 
-            }
-
-            int lastSignificantChar = realPos;
+                headerData.lastSignificantChar = headerData.realPos;
 
-            // Reading bytes until the end of the line
-            while (!eol) {
+                // Reading bytes until the end of the line
+                while (!eol) {
 
-                // Read new bytes if needed
-                if (pos >= lastValid) {
-                    if (!fill()) //parse header
-                        throw new EOFException(sm.getString("iib.eof.error"));
-                }
+                    // Read new bytes if needed
+                    if (pos >= lastValid) {
+                        if (!fill(true,false)) {//parse header 
+                            //HEADER_VALUE
+                            return HeaderParseStatus.NEED_MORE_DATA;
+                        }
 
-                if (buf[pos] == Constants.CR) {
-                } else if (buf[pos] == Constants.LF) {
-                    eol = true;
-                } else if (buf[pos] == Constants.SP) {
-                    buf[realPos] = buf[pos];
-                    realPos++;
-                } else {
-                    buf[realPos] = buf[pos];
-                    realPos++;
-                    lastSignificantChar = realPos;
-                }
+                    }
 
-                pos++;
+                    if (buf[pos] == Constants.CR) {
+                    } else if (buf[pos] == Constants.LF) {
+                        eol = true;
+                    } else if (buf[pos] == Constants.SP) {
+                        buf[headerData.realPos] = buf[pos];
+                        headerData.realPos++;
+                    } else {
+                        buf[headerData.realPos] = buf[pos];
+                        headerData.realPos++;
+                        headerData.lastSignificantChar = headerData.realPos;
+                    }
 
-            }
+                    pos++;
 
-            realPos = lastSignificantChar;
+                }
 
-            // Checking the first character of the new line. If the character
-            // is a LWS, then it's a multiline header
+                headerData.realPos = headerData.lastSignificantChar;
 
+                // Checking the first character of the new line. If the character
+                // is a LWS, then it's a multiline header
+                headerParsePos = HeaderParsePosition.HEADER_MULTI_LINE;
+            }
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill()) //parse header
-                    throw new EOFException(sm.getString("iib.eof.error"));
+                if (!fill(true,false)) {//parse header
+                    
+                    //HEADER_MULTI_LINE
+                    return HeaderParseStatus.NEED_MORE_DATA;
+                }
             }
 
             chr = buf[pos];
-            if ((chr != Constants.SP) && (chr != Constants.HT)) {
-                validLine = false;
-            } else {
-                eol = false;
-                // Copying one extra space in the buffer (since there must
-                // be at least one space inserted between the lines)
-                buf[realPos] = chr;
-                realPos++;
+            if ( headerParsePos == HeaderParsePosition.HEADER_MULTI_LINE ) {
+                if ( (chr != Constants.SP) && (chr != Constants.HT)) {
+                    headerParsePos = HeaderParsePosition.HEADER_START;
+                } else {
+                    eol = false;
+                    // Copying one extra space in the buffer (since there must
+                    // be at least one space inserted between the lines)
+                    buf[headerData.realPos] = chr;
+                    headerData.realPos++;
+                }
             }
-
         }
-
         // Set the header value
-        headerValue.setBytes(buf, start, realPos - start);
-
-        return true;
-
+        headerData.headerValue.setBytes(buf, headerData.start, headerData.realPos - headerData.start);
+        headerData.recycle();
+        return HeaderParseStatus.HAVE_MORE_HEADERS;
+    }
+    
+    protected HeaderParseData headerData = new HeaderParseData();
+    public static class HeaderParseData {
+        int start = 0;
+        int realPos = 0;
+        int lastSignificantChar = 0;
+        MessageBytes headerValue = null;
+        public void recycle() {
+            start = 0;
+            realPos = 0;
+            lastSignificantChar = 0;
+            headerValue = null;
+        }
     }
 
 
@@ -795,14 +844,13 @@ public class InternalNioInputBuffer implements InputBuffer {
 
     // ------------------------------------------------------ Protected Methods
 
-
     /**
      * Fill the internal buffer using data from the undelying input stream.
      * 
      * @return false if at end of stream
      */
-    protected boolean fill()
-        throws IOException {
+    protected boolean fill(boolean timeout, boolean block)
+        throws IOException, EOFException {
 
         boolean read = false;
 
@@ -814,7 +862,7 @@ public class InternalNioInputBuffer implements InputBuffer {
             }
 
             // Do a simple read with a short timeout
-            read = readSocket(true,true);
+            read = readSocket(timeout,block);
         } else {
 
             if (buf.length - end < 4500) {
@@ -827,7 +875,7 @@ public class InternalNioInputBuffer implements InputBuffer {
             pos = end;
             lastValid = pos;
             // Do a simple read with a short timeout
-            read = readSocket(true, true);
+            read = readSocket(timeout, block);
         }
         return read;
     }
@@ -851,7 +899,7 @@ public class InternalNioInputBuffer implements InputBuffer {
             throws IOException {
 
             if (pos >= lastValid) {
-                if (!fill()) //read body
+                if (!fill(true,true)) //read body, must be blocking, as the thread is inside the app
                     return -1;
             }