import java.io.IOException;
import java.util.logging.Logger;
+import org.apache.tomcat.lite.http.HttpChannel.HttpService;
import org.apache.tomcat.lite.io.CBuffer;
import org.apache.tomcat.lite.io.FileConnector;
import org.apache.tomcat.lite.io.BBucket;
/**
* Context associated with this wrapper, used for wrapper mapping.
*/
- public BaseMapper.ContextMapping contextMapElement = new BaseMapper.ContextMapping();
+ public BaseMapper.Context contextMapElement = new BaseMapper.Context(this);
/**
* Set context, used for wrapper mapping (request dispatcher).
* @param resources Static resources of the context
* @param ctxService
*/
- public BaseMapper.ContextMapping addContext(String hostName, String path, Object context,
+ public BaseMapper.Context addContext(String hostName, String path, Object context,
String[] welcomeResources, FileConnector resources,
HttpChannel.HttpService ctxService) {
int slashCount = slashCount(path);
synchronized (host) {
- BaseMapper.ContextMapping[] contexts = host.contextList.contexts;
+ BaseMapper.Context[] contexts = host.contextList.contexts;
// Update nesting
if (slashCount > host.contextList.nesting) {
host.contextList.nesting = slashCount;
}
- BaseMapper.ContextMapping[] newContexts = new BaseMapper.ContextMapping[contexts.length + 1];
- BaseMapper.ContextMapping newContext = new BaseMapper.ContextMapping();
+ for (int i = 0; i < contexts.length; i++) {
+ if (path.equals(contexts[i].name)) {
+ return contexts[i];
+ }
+ }
+ BaseMapper.Context[] newContexts = new BaseMapper.Context[contexts.length + 1];
+ BaseMapper.Context newContext = new BaseMapper.Context(this);
newContext.name = path;
newContext.object = context;
if (welcomeResources != null) {
public void removeContext(String hostName, String path) {
Host host = getHost(hostName);
synchronized (host) {
- BaseMapper.ContextMapping[] contexts = host.contextList.contexts;
+ BaseMapper.Context[] contexts = host.contextList.contexts;
if( contexts.length == 0 ){
return;
}
- BaseMapper.ContextMapping[] newContexts = new BaseMapper.ContextMapping[contexts.length - 1];
+ BaseMapper.Context[] newContexts = new BaseMapper.Context[contexts.length - 1];
if (removeMap(contexts, newContexts, path)) {
host.contextList.contexts = newContexts;
// Recalculate nesting
public void addWrapper(String hostName, String contextPath, String path,
Object wrapper, boolean jspWildCard) {
Host host = getHost(hostName);
- BaseMapper.ContextMapping[] contexts = host.contextList.contexts;
+ BaseMapper.Context[] contexts = host.contextList.contexts;
int pos2 = find(contexts, contextPath);
if( pos2<0 ) {
logger.severe("No context found: " + contextPath );
return;
}
- BaseMapper.ContextMapping context = contexts[pos2];
+ BaseMapper.Context context = contexts[pos2];
if (context.name.equals(contextPath)) {
addWrapper(context, path, wrapper, jspWildCard);
}
}
- public void addWrapper(BaseMapper.ContextMapping context, String path, Object wrapper) {
+ public void addWrapper(BaseMapper.Context context, String path, Object wrapper) {
addWrapper(context, path, wrapper, false);
}
* @param jspWildCard true if the wrapper corresponds to the JspServlet
* and the mapping path contains a wildcard; false otherwise
*/
- protected void addWrapper(BaseMapper.ContextMapping context, String path, Object wrapper,
+ protected void addWrapper(BaseMapper.Context context, String path, Object wrapper,
boolean jspWildCard) {
synchronized (context) {
public void removeWrapper(String hostName, String contextPath,
String path) {
Host host = getHost(hostName);
- BaseMapper.ContextMapping[] contexts = host.contextList.contexts;
+ BaseMapper.Context[] contexts = host.contextList.contexts;
int pos2 = find(contexts, contextPath);
if (pos2 < 0) {
return;
}
- BaseMapper.ContextMapping context = contexts[pos2];
+ BaseMapper.Context context = contexts[pos2];
if (context.name.equals(contextPath)) {
removeWrapper(context, path);
}
}
- protected void removeWrapper(BaseMapper.ContextMapping context, String path) {
+ protected void removeWrapper(BaseMapper.Context context, String path) {
synchronized (context) {
if (path.endsWith("/*")) {
// Wildcard wrapper
private final void internalMap(CBuffer host, CBuffer uri,
MappingData mappingData)
throws Exception {
- BaseMapper.ContextMapping[] contexts = null;
- BaseMapper.ContextMapping context = null;
+ BaseMapper.Context[] contexts = null;
+ BaseMapper.Context context = null;
int nesting = 0;
// Virtual host mapping
* Wrapper mapping, using servlet rules.
*/
protected final void internalMapWrapper(
- BaseMapper.ContextMapping context,
+ BaseMapper.Context context,
CBuffer url,
MappingData mappingData)
- throws Exception {
+ throws Exception {
boolean noServletPath = false;
if (url.length() < context.name.length()) {
}
try {
-
- mappingData.tmpServletPath.set(url,
- context.name.length(),
- url.length() - context.name.length());
- if (mappingData.tmpServletPath.length() == 0) {
- mappingData.tmpServletPath.append('/');
- noServletPath = true;
- }
+ // Set the servlet path.
+ mappingData.tmpServletPath.set(url,
+ context.name.length(),
+ url.length() - context.name.length());
+
+ if (mappingData.tmpServletPath.length() == 0) {
+ mappingData.tmpServletPath.append('/');
+ // This is just the context /example or /
+ if (!context.name.equals("/")) {
+ noServletPath = true;
+ }
+ }
- mapAfterContext(context, url, mappingData.tmpServletPath, mappingData,
- noServletPath);
+ mapAfterContext(context, url, mappingData.tmpServletPath, mappingData,
+ noServletPath);
} catch (ArrayIndexOutOfBoundsException ex) {
System.err.println(1);
}
}
- void mapAfterContext(BaseMapper.ContextMapping context,
+ void mapAfterContext(BaseMapper.Context context,
CBuffer url, CBuffer urlNoContext,
MappingData mappingData, boolean noServletPath)
throws Exception {
* if pathStr corresponds to a directory, we'll need to redirect with /
* at end.
*/
- protected void mapDefaultServlet(BaseMapper.ContextMapping context,
+ protected void mapDefaultServlet(BaseMapper.Context context,
CBuffer path,
MappingData mappingData,
CBuffer url,
* Filesystem dependent method:
* check if a resource exists in filesystem.
*/
- protected void mapWelcomResource(BaseMapper.ContextMapping context, CBuffer path,
+ protected void mapWelcomResource(BaseMapper.Context context, CBuffer path,
MappingData mappingData,
BaseMapper.ServiceMapping[] extensionWrappers, String pathStr) {
// Shared among host aliases.
protected static final class ContextList {
- public BaseMapper.ContextMapping[] contexts = new BaseMapper.ContextMapping[0];
+ public BaseMapper.Context[] contexts = new BaseMapper.Context[0];
public int nesting = 0;
}
- public static final class ContextMapping extends BaseMapper.Mapping {
+ public static final class Context extends BaseMapper.Mapping {
+ Context(BaseMapper mapper) {
+ this.mapper = mapper;
+ }
+ public BaseMapper mapper;
public String[] welcomeResources = new String[0];
public FileConnector resources = null;
public BaseMapper.ServiceMapping[] extensionWrappers = new BaseMapper.ServiceMapping[0];
public int nesting = 0;
+ public void addWrapper(String path, HttpService service) {
+ mapper.addWrapper(this, path, service);
+ }
+
}
public Object object = null;
public String toString() {
+ if (name == null || "".equals(name)) {
+ return "DEFAULT";
+ }
return name;
}
}
import java.util.concurrent.Executors;
import java.util.logging.Logger;
-import org.apache.tomcat.lite.http.BaseMapper.ContextMapping;
import org.apache.tomcat.lite.http.HttpChannel.HttpService;
import org.apache.tomcat.lite.http.HttpChannel.RequestCompleted;
import org.apache.tomcat.lite.io.CBuffer;
import org.apache.tomcat.lite.io.FileConnector;
-import org.apache.tomcat.lite.io.CBuffer;
-import org.apache.tomcat.lite.io.UrlEncoding;
/**
* This class has several functions:
* - decide if the request should be run in the selector thread
* or in a thread pool
* - finalizes the request ( close / flush )
- * - detectsif the request is complete or set callbacks
+ * - detects if the request is complete or set callbacks
* for receive/flush/done.
*
*/
MappingData mapRes = ch.getRequest().getMappingData();
HttpService h = (HttpService) mapRes.getServiceObject();
try {
- //log.info("Service ");
h.service(ch.getRequest(), ch.getResponse());
if (!ch.getRequest().isAsyncStarted()) {
ch.complete();
}
};
- public BaseMapper.ContextMapping addContext(String hostname, String ctxPath,
+ public BaseMapper.Context addContext(String hostname, String ctxPath,
Object ctx, String[] welcomeResources, FileConnector resources,
HttpService ctxService) {
return mapper.addContext(hostname, ctxPath, ctx, welcomeResources, resources,
ctxService);
}
+
+ public BaseMapper.Context addContext(String ctxPath) {
+ return mapper.addContext(null, ctxPath, null, null, null,
+ null);
+ }
public void map(CBuffer hostMB, CBuffer urlMB, MappingData md) {
try {
}
}
- public void map(BaseMapper.ContextMapping ctx,
+ public void map(BaseMapper.Context ctx,
CBuffer uri, MappingData md) {
try {
mapper.internalMapWrapper(ctx, uri, md);
}
}
- public void addWrapper(BaseMapper.ContextMapping ctx, String path,
+ public void addWrapper(BaseMapper.Context ctx, String path,
HttpService service) {
mapper.addWrapper(ctx, path, service);
}
public void setDefaultService(HttpService service) {
- BaseMapper.ContextMapping mCtx =
+ BaseMapper.Context mCtx =
mapper.addContext(null, "/", null, null, null, null);
mapper.addWrapper(mCtx, "/", service);
}
+++ /dev/null
-/*
- */
-package org.apache.tomcat.lite.http;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.AbstractQueuedSynchronizer;
-
-
-/**
- * Support for blocking calls and callbacks.
- *
- * Unlike FutureTask, it is possible to reuse this and hopefully
- * easier to extends. Also has callbacks.
- *
- * @author Costin Manolache
- */
-public class FutureCallbacks<V> implements Future<V> {
-
- // Other options: ReentrantLock uses AbstractQueueSynchronizer,
- // more complex. Same for CountDownLatch
- // FutureTask - uses Sync as well, ugly interface with
- // Callable, can't be recycled.
- // Mina: simple object lock, doesn't extend java.util.concurent.Future
-
- private Sync sync = new Sync();
-
- private V value;
-
- public static interface Callback<V> {
- public void run(V param);
- }
-
- private List<Callback<V>> callbacks = new ArrayList();
-
- public FutureCallbacks() {
- }
-
- /**
- * Unlocks the object if it was locked. Should be called
- * when the object is reused.
- *
- * Callbacks will not be invoked.
- */
- public void reset() {
- sync.releaseShared(0);
- sync.reset();
- }
-
- public void recycle() {
- callbacks.clear();
- sync.releaseShared(0);
- sync.reset();
- }
-
- /**
- * Unlocks object and calls the callbacks.
- * @param v
- *
- * @throws IOException
- */
- public void signal(V v) throws IOException {
- sync.releaseShared(0);
- onSignal(v);
- }
-
- protected boolean isSignaled() {
- return true;
- }
-
- /**
- * Override to call specific callbacks
- */
- protected void onSignal(V v) {
- for (Callback<V> cb: callbacks) {
- if (cb != null) {
- cb.run(v);
- }
- }
- }
-
- /**
- * Set the response. Will cause the callback to be called and lock to be
- * released.
- *
- * @param value
- * @throws IOException
- */
- public void setValue(V value) throws IOException {
- synchronized (this) {
- this.value = value;
- signal(value);
- }
- }
-
- public void waitSignal(long to) throws IOException {
- try {
- get(to, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e1) {
- throw new IOException(e1.getMessage());
- } catch (TimeoutException e1) {
- throw new IOException(e1.getMessage());
- } catch (ExecutionException e) {
- throw new IOException(e.getMessage());
- }
- }
-
- @Override
- public V get() throws InterruptedException, ExecutionException {
- sync.acquireSharedInterruptibly(0);
- return value;
- }
-
- @Override
- public V get(long timeout, TimeUnit unit) throws InterruptedException,
- ExecutionException, TimeoutException {
- if (!sync.tryAcquireSharedNanos(0, unit.toNanos(timeout))) {
- throw new TimeoutException();
- }
- return value;
- }
-
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- return false;
- }
-
- @Override
- public boolean isCancelled() {
- return false;
- }
-
- @Override
- public boolean isDone() {
- return sync.isSignaled();
- }
-
- private class Sync extends AbstractQueuedSynchronizer {
-
- static final int DONE = 1;
- static final int BLOCKED = 0;
- Object result;
- Throwable t;
-
- @Override
- protected int tryAcquireShared(int ignore) {
- return getState() == DONE ? 1 : -1;
- }
-
- @Override
- protected boolean tryReleaseShared(int ignore) {
- setState(DONE);
- return true;
- }
-
- public void reset() {
- setState(BLOCKED);
- }
-
- boolean isSignaled() {
- return getState() == DONE;
- }
- }
-}
import org.apache.tomcat.lite.io.BBucket;
import org.apache.tomcat.lite.io.BBuffer;
import org.apache.tomcat.lite.io.CBuffer;
-import org.apache.tomcat.lite.io.IOConnector;
import org.apache.tomcat.lite.io.DumpChannel;
import org.apache.tomcat.lite.io.FastHttpDateFormat;
import org.apache.tomcat.lite.io.Hex;
import org.apache.tomcat.lite.io.IOBuffer;
import org.apache.tomcat.lite.io.IOChannel;
-import org.apache.tomcat.lite.io.SslChannel;
+import org.apache.tomcat.lite.io.IOConnector;
public class Http11Connection extends HttpConnection
implements IOConnector.ConnectedCallback {
}
if (body.isClosedAndEmpty()) {
- if (!endSent) {
- out.append(chunk.endChunk());
- endSent = true;
+ synchronized(this) {
+ if (!endSent) {
+ out.append(chunk.endChunk());
+ endSent = true;
+ }
}
return true;
} else {
@Override
public void handleConnected(IOChannel net) throws IOException {
-
HttpChannel httpCh = activeHttp;
+
+ if (!net.isOpen()) {
+ httpCh.abort(net.lastException());
+ return;
+ }
+
boolean ssl = httpCh.getRequest().isSecure();
if (ssl) {
String[] hostPort = httpCh.getTarget().split(":");
- SslChannel ch1 = httpConnector.sslConnector.channel(
+ IOChannel ch1 = httpConnector.sslProvider.channel(net,
hostPort[0], Integer.parseInt(hostPort[1]));
- ch1.setSink(net);
- net.addFilterAfter(ch1);
+ //net.setHead(ch1);
net = ch1;
}
if (httpConnector.debugHttp) {
- IOChannel ch1 = new DumpChannel("");
- net.addFilterAfter(ch1);
- net = ch1;
- }
-
- if (!net.isOpen()) {
- httpCh.abort(net.lastException());
- return;
+ net = DumpChannel.wrap("Http-Client-", net);
}
setSink(net);
import org.apache.tomcat.lite.http.HttpConnector.HttpConnection;
import org.apache.tomcat.lite.io.BBucket;
import org.apache.tomcat.lite.io.BBuffer;
+import org.apache.tomcat.lite.io.FutureCallbacks;
import org.apache.tomcat.lite.io.IOBuffer;
import org.apache.tomcat.lite.io.IOChannel;
import org.apache.tomcat.lite.io.IOConnector;
try {
checkRelease();
trace("abort " + t);
- log.info("Abort connection " + t);
if (conn != null) {
conn.abort(this, t);
}
}
}
+ @Override
public void waitFlush(long timeMs) throws IOException {
if (getOut().getBufferCount() == 0) {
return;
--- /dev/null
+/*
+ */
+package org.apache.tomcat.lite.http;
+
+import org.apache.tomcat.lite.io.SocketConnector;
+import org.apache.tomcat.lite.io.SslProvider;
+import org.apache.tomcat.lite.io.jsse.JsseSslProvider;
+
+/**
+ * Entry point for http client code.
+ *
+ * ( initial version after removing 'integration', will add settings,
+ * defaults, helpers )
+ */
+public class HttpClient {
+ static SslProvider sslConC = new JsseSslProvider();
+
+ public synchronized static HttpConnector newClient() {
+ return new HttpConnector(new SocketConnector()).withSsl(sslConC);
+ }
+
+}
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
import java.util.Timer;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import org.apache.tomcat.lite.io.IOBuffer;
import org.apache.tomcat.lite.io.IOChannel;
import org.apache.tomcat.lite.io.IOConnector;
-import org.apache.tomcat.lite.io.SslChannel;
-import org.apache.tomcat.lite.io.SslConnector;
+import org.apache.tomcat.lite.io.SslProvider;
import org.apache.tomcat.lite.io.IOConnector.DataReceivedCallback;
/**
protected IOConnector ioConnector;
// for https connections
- protected SslConnector sslConnector = new SslConnector();
+ protected SslProvider sslProvider;
boolean debugHttp = false;
boolean debug = false;
private Timer timer;
boolean compression = true;
+
+ boolean serverSSL = false;
private static Timer defaultTimer = new Timer(true);
this.debugHttp = b;
}
+ public HttpConnector withSsl(SslProvider ssl) {
+ sslProvider = ssl;
+ return this;
+ }
+
+ HttpConnector setServerSsl(boolean b) {
+ serverSSL = b;
+ return this;
+ }
+
+ public SslProvider getSslProvider() {
+ return sslProvider;
+ }
+
/**
* Allow or disable compression for this connector.
* Compression is enabled by default.
// TODO: reuse
HttpConnection shttp = cpool.accepted(accepted);
shttp.serverMode = true;
-
+
+ IOChannel head = accepted;
+ IOChannel ch;
+
+ String id = null;
if (debugHttp) {
- log.info("Accepted " + accepted.getFirst().getPort(true));
- IOChannel ch = new DumpChannel("");
- accepted.addFilterAfter(ch);
- shttp.setSink(ch);
- } else {
- shttp.setSink(accepted);
+ id = port + "-" + accepted.getFirst().getAttribute(IOChannel.ATT_REMOTE_PORT);
+ log.info("Accepted " + id);
+ head = DumpChannel.wrap("SSL-" + id, head);
}
- // TODO: JSSE filter
-
- // Will read any data in the channel.
+ // TODO: seems cleaner this way...
+ if (serverSSL) {
+ ch = sslProvider.serverChannel(head);
+ head.setHead(ch);
+ head = ch;
+
+ if (debugHttp) {
+ head = DumpChannel.wrap("CLEAR-" + id, head);
+ }
+ }
+
+ shttp.setSink(head);
+ // Will read any data in the channel, notify data available up
accepted.handleReceived(accepted);
return shttp;
}
public HttpMessage(HttpChannel httpCh) {
this.httpCh = httpCh;
- out = new IOOutputStream(httpCh.getOut(), this);
+ out = new IOOutputStream(httpCh.getOut(), httpCh);
conv = new IOWriter(httpCh);
writer = new HttpWriter(this, out, conv);
httpCh.setCompletedCallback(doneAllCallback);
}
+ public void setReadTimeout(long to) {
+ reader.setTimeout(to);
+ }
+
/**
* Returns a buffered reader.
*/
import org.apache.tomcat.lite.io.IOChannel;
import org.apache.tomcat.lite.io.IOReader;
import org.apache.tomcat.lite.io.IOWriter;
-import org.apache.tomcat.lite.io.SocketIOChannel;
import org.apache.tomcat.lite.io.UrlEncoding;
public class HttpRequest extends HttpMessage {
if (remoteAddrMB.length() == 0) {
HttpChannel asyncHttp = getHttpChannel();
IOChannel iochannel = asyncHttp.getNet().getFirst();
- if (iochannel instanceof SocketIOChannel) {
- SocketIOChannel channel = (SocketIOChannel) iochannel;
-
- String addr = (channel == null) ?
- "127.0.0.1" :
- channel.getAddress(true).getHostAddress();
-
- remoteAddrMB.set(addr);
- }
+ remoteAddrMB.set((String)
+ iochannel.getAttribute(IOChannel.ATT_REMOTE_ADDRESS));
}
return remoteAddrMB;
}
if (remoteHostMB.length() == 0) {
HttpChannel asyncHttp = getHttpChannel();
IOChannel iochannel = asyncHttp.getNet().getFirst();
- if (iochannel instanceof SocketIOChannel) {
- SocketIOChannel channel = (SocketIOChannel) iochannel;
- String addr = (channel == null) ?
- "127.0.0.1" :
- channel.getAddress(true).getCanonicalHostName();
-
- remoteHostMB.set(addr);
- }
+ remoteHostMB.set((String)
+ iochannel.getAttribute(IOChannel.ATT_REMOTE_HOSTNAME));
}
return remoteHostMB;
}
if (remotePort == -1) {
HttpChannel asyncHttp = getHttpChannel();
IOChannel iochannel = asyncHttp.getNet().getFirst();
- if (iochannel instanceof SocketIOChannel) {
- SocketIOChannel channel = (SocketIOChannel) iochannel;
- remotePort = (channel == null) ?
- 0 : channel.getPort(true);
- }
+ remotePort = (Integer) iochannel.getAttribute(IOChannel.ATT_REMOTE_PORT);
}
return remotePort;
}
if (localPort == -1) {
HttpChannel asyncHttp = getHttpChannel();
IOChannel iochannel = asyncHttp.getNet().getFirst();
- if (iochannel instanceof SocketIOChannel) {
- SocketIOChannel channel = (SocketIOChannel) iochannel;
- localPort = (channel == null) ?
- 0 : channel.getPort(false);
- }
+ localPort = (Integer) iochannel.getAttribute(IOChannel.ATT_LOCAL_PORT);
}
return localPort;
}
--- /dev/null
+/*
+ */
+package org.apache.tomcat.lite.http;
+
+import org.apache.tomcat.lite.io.SocketConnector;
+import org.apache.tomcat.lite.io.SslProvider;
+import org.apache.tomcat.lite.io.jsse.JsseSslProvider;
+
+/**
+ * Main entry point for HTTP server code.
+ *
+ * ( initial draft - will replace statics, add helpers, etc )
+ */
+public class HttpServer {
+ static SslProvider sslConC = new JsseSslProvider();
+
+ public synchronized static HttpConnector newServer(int port) {
+ return new HttpConnector(new SocketConnector()).
+ withSsl(sslConC).setPort(port);
+ }
+
+ public synchronized static HttpConnector newSslServer(int port) {
+ // DHE broken in harmony - will replace with a flag
+ // SslConnector.setEnabledCiphers(new String[] {
+ // "TLS_RSA_WITH_3DES_EDE_CBC_SHA"
+ // });
+ // -cipher DES-CBC3-SHA
+
+ SslProvider sslCon = new JsseSslProvider();
+
+ return new HttpConnector(new SocketConnector()).
+ withSsl(sslCon).setPort(port).setServerSsl(true);
+ }
+
+}
public Object context = null; // ServletContextImpl
- public BaseMapper.ContextMapping contextMap;
+ public BaseMapper.Context contextMap;
public BaseMapper.ServiceMapping service = null;
import org.apache.tomcat.lite.io.IOBuffer;
import org.apache.tomcat.lite.io.IOChannel;
import org.apache.tomcat.lite.io.IOConnector;
-import org.apache.tomcat.lite.io.SslChannel;
/*
* TODO: expectations ?
synchronized (remoteHost) {
httpCh = remoteHost.pending.peek();
}
- secure = httpCh.getRequest().isSecure();
- if (secure) {
- if (httpConnector.debugHttp) {
- IOChannel ch1 = new DumpChannel("NET-IN");
- net.addFilterAfter(ch1);
+ if (httpCh != null) {
+ secure = httpCh.getRequest().isSecure();
+ if (secure) {
+ if (httpConnector.debugHttp) {
+ net = DumpChannel.wrap("SPDY-SSL", net);
+ }
+ String[] hostPort = httpCh.getTarget().split(":");
+
+ IOChannel ch1 = httpConnector.sslProvider.channel(net,
+ hostPort[0], Integer.parseInt(hostPort[1]));
+ //net.setHead(ch1);
net = ch1;
}
- String[] hostPort = httpCh.getTarget().split(":");
-
- SslChannel ch1 = httpConnector.sslConnector.channel(
- hostPort[0], Integer.parseInt(hostPort[1]));
- ch1.setSink(net);
- net.addFilterAfter(ch1);
- net = ch1;
- }
-
+ }
if (httpConnector.debugHttp) {
- IOChannel ch1 = new DumpChannel("");
- net.addFilterAfter(ch1);
- net = ch1;
+ net = DumpChannel.wrap("SPDY", net);
}
setSink(net);
/**
* Holds raw data. Similar interface with a ByteBuffer in 'channel write'
- * or 'read mode'. Data is between position and limit - allways.
+ * or 'read mode'. Data is between position and limit - there is no
+ * switching.
*
* TODO: FileBucket, DirectBufferBucket, CharBucket, ...
*
package org.apache.tomcat.lite.io;
import java.io.IOException;
-import java.io.Serializable;
import java.nio.CharBuffer;
import java.io.OutputStream;
// TODO: dump to a file, hex, etc.
+
/**
* For debug - will print all bytes that go trough the channel
*/
IOBuffer in = new IOBuffer(this);
IOBuffer out = new IOBuffer(this);
static final boolean dumpToFile = false;
+ static int idCnt = 0;
+
+ DumpChannel(String id) {
+ this.id = id + idCnt++;
+ }
- public DumpChannel(String id) {
- this.id = id;
+ public static IOChannel wrap(String id, IOChannel net) throws IOException {
+ if (id == null) {
+ id = "";
+ }
+ DumpChannel dmp = new DumpChannel(id + idCnt++);
+ net.setHead(dmp);
+ return dmp;
}
public String toString() {
--- /dev/null
+/*
+ */
+package org.apache.tomcat.lite.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+
+
+/**
+ * Support for blocking calls and callbacks.
+ *
+ * Unlike FutureTask, it is possible to reuse this and hopefully
+ * easier to extends. Also has callbacks.
+ *
+ * @author Costin Manolache
+ */
+public class FutureCallbacks<V> implements Future<V> {
+
+ // Other options: ReentrantLock uses AbstractQueueSynchronizer,
+ // more complex. Same for CountDownLatch
+ // FutureTask - uses Sync as well, ugly interface with
+ // Callable, can't be recycled.
+ // Mina: simple object lock, doesn't extend java.util.concurent.Future
+
+ private Sync sync = new Sync();
+
+ private V value;
+
+ public static interface Callback<V> {
+ public void run(V param);
+ }
+
+ private List<Callback<V>> callbacks = new ArrayList();
+
+ public FutureCallbacks() {
+ }
+
+ /**
+ * Unlocks the object if it was locked. Should be called
+ * when the object is reused.
+ *
+ * Callbacks will not be invoked.
+ */
+ public void reset() {
+ sync.releaseShared(0);
+ sync.reset();
+ }
+
+ public void recycle() {
+ callbacks.clear();
+ sync.releaseShared(0);
+ sync.reset();
+ }
+
+ /**
+ * Unlocks object and calls the callbacks.
+ * @param v
+ *
+ * @throws IOException
+ */
+ public void signal(V v) throws IOException {
+ sync.releaseShared(0);
+ onSignal(v);
+ }
+
+ protected boolean isSignaled() {
+ return true;
+ }
+
+ /**
+ * Override to call specific callbacks
+ */
+ protected void onSignal(V v) {
+ for (Callback<V> cb: callbacks) {
+ if (cb != null) {
+ cb.run(v);
+ }
+ }
+ }
+
+ /**
+ * Set the response. Will cause the callback to be called and lock to be
+ * released.
+ *
+ * @param value
+ * @throws IOException
+ */
+ public void setValue(V value) throws IOException {
+ synchronized (this) {
+ this.value = value;
+ signal(value);
+ }
+ }
+
+ public void waitSignal(long to) throws IOException {
+ try {
+ get(to, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e1) {
+ throw new WrappedException(e1);
+ } catch (TimeoutException e1) {
+ throw new WrappedException(e1);
+ } catch (ExecutionException e) {
+ throw new WrappedException(e);
+ }
+ }
+
+ @Override
+ public V get() throws InterruptedException, ExecutionException {
+ sync.acquireSharedInterruptibly(0);
+ return value;
+ }
+
+ @Override
+ public V get(long timeout, TimeUnit unit) throws InterruptedException,
+ ExecutionException, TimeoutException {
+ if (!sync.tryAcquireSharedNanos(0, unit.toNanos(timeout))) {
+ throw new TimeoutException("Waiting " + timeout);
+ }
+ return value;
+ }
+
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return sync.isSignaled();
+ }
+
+ private class Sync extends AbstractQueuedSynchronizer {
+
+ static final int DONE = 1;
+ static final int BLOCKED = 0;
+ Object result;
+ Throwable t;
+
+ @Override
+ protected int tryAcquireShared(int ignore) {
+ return getState() == DONE ? 1 : -1;
+ }
+
+ @Override
+ protected boolean tryReleaseShared(int ignore) {
+ setState(DONE);
+ return true;
+ }
+
+ public void reset() {
+ setState(BLOCKED);
+ }
+
+ boolean isSignaled() {
+ return getState() == DONE;
+ }
+ }
+}
import java.util.LinkedList;
import java.util.logging.Logger;
-import org.apache.tomcat.lite.http.FutureCallbacks;
// TODO: append() will trigger callbacks - do it explicitely !!!
// TODO: queue() shouldn't modify the buffer
if (timeMs == 0) {
timeMs = defaultTimeout;
}
- long exp = (timeMs == Long.MAX_VALUE) ?
- Long.MAX_VALUE : System.currentTimeMillis() + timeMs;
synchronized (hasDataLock) {
if (hasData()) {
return;
}
hasDataLock.reset();
}
- if (timeMs == 0) {
- timeMs = Long.MAX_VALUE;
- }
- long wait = (timeMs == Long.MAX_VALUE) ? Long.MAX_VALUE :
- exp - System.currentTimeMillis();
-
- hasDataLock.waitSignal(wait);
- if (exp < System.currentTimeMillis()) {
- throw new IOException("Timeout");
- }
+ hasDataLock.waitSignal(timeMs);
}
/**
- * Buffered ByteChannel, backed by a buffer brigade to allow
- * some zero-copy operations.
+ * Buffered, non-blocking ByteChannel.
+ *
+ * write() data will be added to the buffer. Call startSending() to
+ * flush.
+ *
+ *
*
* - you can use it as a normal non-blocking ByteChannel.
* - you can call getRead
public abstract class IOChannel implements ByteChannel, IOConnector.DataReceivedCallback,
IOConnector.DataFlushedCallback {
+ /**
+ * If this channel wraps another channel - for example a socket.
+ * Will be null if this is the 'root' channel - a socket, memory.
+ */
protected IOChannel net;
- protected IOChannel app;
+
+ /**
+ * Set with another channel layered on top of the current channel.
+ */
+ protected IOChannel head;
protected String id;
+
+ /**
+ * A string that can be parsed to extract the target.
+ * host:port for normal sockets
+ */
protected CharSequence target;
+ /**
+ * Connector that created the channel.
+ */
protected IOConnector connector;
+ /**
+ * Callbacks. Will be moved if a new head is inserted.
+ */
protected IOConnector.ConnectedCallback connectedCallback;
+
+ /**
+ * Will be called if any data is received.
+ * Will also be called on close. Close with lastException set indicates
+ * an error condition.
+ */
protected IOConnector.DataReceivedCallback dataReceivedCallback;
+
+ /**
+ * Out data is buffered, then sent with startSending.
+ * This callback indicates the data has been sent. Can be used
+ * to implement blocking flush.
+ */
protected IOConnector.DataFlushedCallback dataFlushedCallback;
// Last activity timestamp.
- // TODO: update, etc
+ // TODO: update and use it ( placeholder )
public long ts;
+ /**
+ * If an async exception happens.
+ */
protected Throwable lastException;
+ protected IOChannel() {
+ }
+
public void setConnectedCallback(IOConnector.ConnectedCallback connectedCallback) {
this.connectedCallback = connectedCallback;
}
this.dataFlushedCallback = dataFlushedCallback;
}
- protected IOChannel() {
- }
-
// Input
public abstract IOBuffer getIn();
sendHandleFlushedCallback();
}
- public void sendHandleFlushedCallback() throws IOException {
+ private void sendHandleFlushedCallback() throws IOException {
try {
if (dataFlushedCallback != null) {
dataFlushedCallback.handleFlushed(this);
}
- if (app != null) {
- app.handleFlushed(this);
+ if (head != null) {
+ head.handleFlushed(this);
}
} catch (Throwable t) {
close();
/**
- * Notify next channel that data has been received.
+ * Notify next channel or callback that data has been received.
+ * Called after a lower channel gets more data ( in the IOThread
+ * for example ).
+ *
+ * Also called when closed stream is detected. Can be called
+ * to just force upper layers to check for data.
*/
public void sendHandleReceivedCallback() throws IOException {
try {
if (dataReceivedCallback != null) {
dataReceivedCallback.handleReceived(this);
}
- if (app != null) {
- app.handleReceived(this);
+ if (head != null) {
+ head.handleReceived(this);
}
} catch (Throwable t) {
t.printStackTrace();
// Chaining/filtering
/**
- * Called to add an filter _after_ the current channel.
+ * Called to add an filter after the current channel, for
+ * example set SSL on top of a socket channel.
+ *
+ * The 'next' channel will have the received/flushed callbacks
+ * of the current channel. The current channel's callbacks will
+ * be reset.
+ *
+ * "Head" is from STREAMS.
+ *
* @throws IOException
*/
- public IOChannel addFilterAfter(IOChannel next) throws IOException {
- this.app = next;
- app.setSink(this);
+ public IOChannel setHead(IOChannel head) throws IOException {
+ this.head = head;
+ head.setSink(this);
// TODO: do we want to migrate them automatically ?
- app.setDataReceivedCallback(dataReceivedCallback);
- app.setDataFlushedCallback(dataFlushedCallback);
+ head.setDataReceivedCallback(dataReceivedCallback);
+ head.setDataFlushedCallback(dataFlushedCallback);
// app.setClosedCallback(closedCallback);
dataReceivedCallback = null;
dataFlushedCallback = null;
-
- // we may have data in our buffers
- next.handleReceived(this);
return this;
}
// Socket support
- public int getPort(boolean remote) {
- if (net != null) {
- return net.getPort(remote);
- }
- return 80;
- }
-
public void readInterest(boolean b) throws IOException {
if (net != null) {
net.readInterest(b);
return getIn().read(bb);
}
+ public void waitFlush(long timeMs) throws IOException {
+ return;
+ }
+
public int readBlocking(ByteBuffer bb, long timeMs) throws IOException {
getIn().waitData(timeMs);
return getIn().read(bb);
public void setTarget(CharSequence target) {
this.target = target;
}
+
+ public static final String ATT_REMOTE_HOSTNAME = "RemoteHostname";
+ public static final String ATT_LOCAL_HOSTNAME = "LocalHostname";
+ public static final String ATT_REMOTE_PORT = "RemotePort";
+ public static final String ATT_LOCAL_PORT = "LocalPort";
+ public static final String ATT_LOCAL_ADDRESS = "LocalAddress";
+ public static final String ATT_REMOTE_ADDRESS = "RemoteAddress";
+
+ public Object getAttribute(String name) {
+ if (net != null) {
+ return net.getAttribute(name);
+ }
+ return null;
+ }
}
return timer;
}
+ /**
+ * If the connector is layered on top of a different connector,
+ * return the lower layer ( for example the socket connector)
+ */
public IOConnector getNet() {
return null;
}
import java.nio.ByteBuffer;
import java.text.MessageFormat;
-import org.apache.tomcat.lite.http.HttpMessage;
-import org.apache.tomcat.lite.http.HttpWriter;
-
/**
* Same methods with ServletOutputStream.
*
public class IOOutputStream extends OutputStream {
IOBuffer bb;
- HttpMessage message;
- int bufferSize = HttpWriter.DEFAULT_BUFFER_SIZE;
+ IOChannel ch;
+ int bufferSize = 8 * 1024;
int wSinceFlush = 0;
- public IOOutputStream(IOBuffer out, HttpMessage httpMessage) {
+ public IOOutputStream(IOBuffer out, IOChannel httpMessage) {
this.bb = out;
- message = httpMessage;
+ ch = httpMessage;
}
public void recycle() {
wSinceFlush = 0;
- bufferSize = HttpWriter.DEFAULT_BUFFER_SIZE;
+ bufferSize = 8 * 1024;
}
public void reset() {
}
public void flush() throws IOException {
- if (message.getHttpChannel() != null) {
- message.getHttpChannel().startSending();
+ if (ch != null) {
+ ch.startSending();
- message.getHttpChannel().waitFlush(Long.MAX_VALUE);
+ ch.waitFlush(Long.MAX_VALUE);
}
wSinceFlush = 0;
}
String enc;
private boolean closed;
public static final String DEFAULT_ENCODING = "ISO-8859-1";
+ long timeout = 0;
public IOReader(IOBuffer iob) {
this.iob = iob;
}
+ public void setTimeout(long to) {
+ timeout = to;
+ }
+
public void setEncoding(String charset) {
enc = charset;
if (enc == null) {
}
bucket = null;
while (bucket == null) {
- iob.waitData(0);
+ iob.waitData(timeout);
bucket = iob.peekFirst();
if (bucket == null && iob.isClosedAndEmpty()) {
// EOF, we couldn't decode anything
* SelectorThread provides non-blocking methods for read/write and generates
* callbacks using SelectorCallback. It has no buffers of its own.
*
- * Additional requirements:
- * - support timers ( setTimer() in SelectorChannel )
+ * This is non-blocking, non-buffering and uses callbacks.
*
* @author Costin Manolache
*/
return ch.getAddress(remote);
}
- public int getPort(boolean remote) {
- return ch.getPort(remote);
- }
-
- public String getRemoteAddress() {
- return getAddress(true).toString();
- }
-
- public int getRemotePort() {
- return getPort(true);
+ @Override
+ public Object getAttribute(String name) {
+ if (ATT_REMOTE_HOSTNAME.equals(name)) {
+ return getAddress(true).getHostName();
+ } else if (ATT_LOCAL_HOSTNAME.equals(name)) {
+ return getAddress(false).getHostName();
+ } else if (ATT_REMOTE_ADDRESS.equals(name)) {
+ return getAddress(true).getHostAddress();
+ } else if (ATT_LOCAL_ADDRESS.equals(name)) {
+ return getAddress(false).getHostAddress();
+ } else if (ATT_REMOTE_PORT.equals(name)) {
+ return ch.getPort(true);
+ } else if (ATT_LOCAL_PORT.equals(name)) {
+ return ch.getPort(false);
+ }
+ return null;
}
-
public void startSending() throws IOException {
flush(ch);
}
headResBuffer.put((byte) 1); // ip
headResBuffer.put(hostB);
- int port2 = clientCh.getPort(true);
+ int port2 = (Integer) clientCh.getAttribute(IOChannel.ATT_REMOTE_PORT);
headResBuffer.putShort((short) port2);
}