diff options
author | Simone Bordet | 2015-11-04 17:36:36 +0000 |
---|---|---|
committer | Simone Bordet | 2015-11-04 17:36:36 +0000 |
commit | 973d95c8bc4c106776e70f7610a4deb5a2219e45 (patch) | |
tree | 162eb493a1d8f10670dd499c8951e88a4cc587bf | |
parent | afc6b9e5c0644287cd0c5a510491e6477e0f6218 (diff) | |
parent | af0b6284e527d84f6fbb5838a7e2b47602670a1e (diff) | |
download | org.eclipse.jetty.project-973d95c8bc4c106776e70f7610a4deb5a2219e45.tar.gz org.eclipse.jetty.project-973d95c8bc4c106776e70f7610a4deb5a2219e45.tar.xz org.eclipse.jetty.project-973d95c8bc4c106776e70f7610a4deb5a2219e45.zip |
Merged branch 'jetty-9.2.x' into 'jetty-9.3.x'.
3 files changed, 166 insertions, 83 deletions
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java index 0444f788ab..5b49c8e44c 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java @@ -182,7 +182,7 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint Connection old_connection = getConnection(); if (LOG.isDebugEnabled()) - LOG.debug("{} upgradeing from {} to {}", this, old_connection, newConnection); + LOG.debug("{} upgrading from {} to {}", this, old_connection, newConnection); ByteBuffer prefilled = (old_connection instanceof Connection.UpgradeFrom) ?((Connection.UpgradeFrom)old_connection).onUpgradeFrom():null; diff --git a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ConnectHandler.java b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ConnectHandler.java index 71a75bc899..9d0c49fb2e 100644 --- a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ConnectHandler.java +++ b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ConnectHandler.java @@ -18,6 +18,7 @@ package org.eclipse.jetty.proxy; +import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -51,6 +52,7 @@ import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.handler.HandlerWrapper; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -160,21 +162,17 @@ public class ConnectHandler extends HandlerWrapper protected void doStart() throws Exception { if (executor == null) - { - setExecutor(getServer().getThreadPool()); - } + executor = getServer().getThreadPool(); + if (scheduler == null) - { - setScheduler(new ScheduledExecutorScheduler()); - addBean(getScheduler()); - } + addBean(scheduler = new ScheduledExecutorScheduler()); + if (bufferPool == null) - { - setByteBufferPool(new MappedByteBufferPool()); - addBean(getByteBufferPool()); - } + addBean(bufferPool = new MappedByteBufferPool()); + addBean(selector = newSelectorManager()); selector.setConnectTimeout(getConnectTimeout()); + super.doStart(); } @@ -191,16 +189,8 @@ public class ConnectHandler extends HandlerWrapper String serverAddress = request.getRequestURI(); if (LOG.isDebugEnabled()) LOG.debug("CONNECT request for {}", serverAddress); - try - { - handleConnect(baseRequest, request, response, serverAddress); - } - catch (Exception x) - { - // TODO - LOG.warn("ConnectHandler " + baseRequest.getHttpURI() + " " + x); - LOG.debug(x); - } + + handleConnect(baseRequest, request, response, serverAddress); } else { @@ -249,32 +239,40 @@ public class ConnectHandler extends HandlerWrapper return; } - SocketChannel channel = SocketChannel.open(); - channel.socket().setTcpNoDelay(true); - channel.configureBlocking(false); - - AsyncContext asyncContext = request.startAsync(); - asyncContext.setTimeout(0); - HttpTransport transport = baseRequest.getHttpChannel().getHttpTransport(); - // TODO Handle CONNECT over HTTP2! if (!(transport instanceof HttpConnection)) { if (LOG.isDebugEnabled()) - LOG.debug("CONNECT forbidden for {}", transport); + LOG.debug("CONNECT not supported for {}", transport); sendConnectResponse(request, response, HttpServletResponse.SC_FORBIDDEN); return; } - InetSocketAddress address = newConnectAddress(host, port); + AsyncContext asyncContext = request.startAsync(); + asyncContext.setTimeout(0); + if (LOG.isDebugEnabled()) - LOG.debug("Connecting to {}", address); - ConnectContext connectContext = new ConnectContext(request, response, asyncContext, (HttpConnection)transport); - if (channel.connect(address)) - selector.accept(channel, connectContext); - else - selector.connect(channel, connectContext); + LOG.debug("Connecting to {}:{}", host, port); + + connectToServer(request, host, port, new Promise<SocketChannel>() + { + @Override + public void succeeded(SocketChannel channel) + { + ConnectContext connectContext = new ConnectContext(request, response, asyncContext, (HttpConnection)transport); + if (channel.isConnected()) + selector.accept(channel, connectContext); + else + selector.connect(channel, connectContext); + } + + @Override + public void failed(Throwable x) + { + onConnectFailure(request, response, asyncContext, x); + } + }); } catch (Exception x) { @@ -282,37 +280,59 @@ public class ConnectHandler extends HandlerWrapper } } - /* ------------------------------------------------------------ */ - /** Create the address the connect channel will connect to. - * @param host The host from the connect request - * @param port The port from the connect request + protected void connectToServer(HttpServletRequest request, String host, int port, Promise<SocketChannel> promise) + { + SocketChannel channel = null; + try + { + channel = SocketChannel.open(); + channel.socket().setTcpNoDelay(true); + channel.configureBlocking(false); + InetSocketAddress address = newConnectAddress(host, port); + channel.connect(address); + promise.succeeded(channel); + } + catch (Throwable x) + { + close(channel); + promise.failed(x); + } + } + + private void close(Closeable closeable) + { + try + { + if (closeable != null) + closeable.close(); + } + catch (Throwable x) + { + LOG.ignore(x); + } + } + + /** + * Creates the server address to connect to. + * + * @param host The host from the CONNECT request + * @param port The port from the CONNECT request * @return The InetSocketAddress to connect to. */ protected InetSocketAddress newConnectAddress(String host, int port) { return new InetSocketAddress(host, port); } - + protected void onConnectSuccess(ConnectContext connectContext, UpstreamConnection upstreamConnection) { - HttpConnection httpConnection = connectContext.getHttpConnection(); - ByteBuffer requestBuffer = httpConnection.getRequestBuffer(); - ByteBuffer buffer = BufferUtil.EMPTY_BUFFER; - int remaining = requestBuffer.remaining(); - if (remaining > 0) - { - buffer = bufferPool.acquire(remaining, requestBuffer.isDirect()); - BufferUtil.flipToFill(buffer); - buffer.put(requestBuffer); - buffer.flip(); - } - ConcurrentMap<String, Object> context = connectContext.getContext(); HttpServletRequest request = connectContext.getRequest(); prepareContext(request, context); + HttpConnection httpConnection = connectContext.getHttpConnection(); EndPoint downstreamEndPoint = httpConnection.getEndPoint(); - DownstreamConnection downstreamConnection = newDownstreamConnection(downstreamEndPoint, context, buffer); + DownstreamConnection downstreamConnection = newDownstreamConnection(downstreamEndPoint, context, BufferUtil.EMPTY_BUFFER); downstreamConnection.setInputBufferSize(getBufferSize()); upstreamConnection.setConnection(downstreamConnection); @@ -324,6 +344,7 @@ public class ConnectHandler extends HandlerWrapper sendConnectResponse(request, response, HttpServletResponse.SC_OK); upgradeConnection(request, response, downstreamConnection); + connectContext.getAsyncContext().complete(); } @@ -349,7 +370,8 @@ public class ConnectHandler extends HandlerWrapper } catch (IOException x) { - // TODO: nothing we can do, close the connection + if (LOG.isDebugEnabled()) + LOG.debug("Could not send CONNECT response", x); } } @@ -367,9 +389,18 @@ public class ConnectHandler extends HandlerWrapper return true; } + /** + * @deprecated use {@link #newDownstreamConnection(EndPoint, ConcurrentMap)} instead + */ + @Deprecated protected DownstreamConnection newDownstreamConnection(EndPoint endPoint, ConcurrentMap<String, Object> context, ByteBuffer buffer) { - return new DownstreamConnection(endPoint, getExecutor(), getByteBufferPool(), context, buffer); + return newDownstreamConnection(endPoint, context); + } + + protected DownstreamConnection newDownstreamConnection(EndPoint endPoint, ConcurrentMap<String, Object> context) + { + return new DownstreamConnection(endPoint, getExecutor(), getByteBufferPool(), context); } protected UpstreamConnection newUpstreamConnection(EndPoint endPoint, ConnectContext connectContext) @@ -396,10 +427,23 @@ public class ConnectHandler extends HandlerWrapper * * @param endPoint the endPoint to read from * @param buffer the buffer to read data into + * @param context the context information related to the connection * @return the number of bytes read (possibly 0 since the read is non-blocking) * or -1 if the channel has been closed remotely * @throws IOException if the endPoint cannot be read */ + protected int read(EndPoint endPoint, ByteBuffer buffer, ConcurrentMap<String, Object> context) throws IOException + { + int read = read(endPoint, buffer); + if (LOG.isDebugEnabled()) + LOG.debug("{} read {} bytes", this, read); + return read; + } + + /** + * @deprecated override {@link #read(EndPoint, ByteBuffer, ConcurrentMap)} instead + */ + @Deprecated protected int read(EndPoint endPoint, ByteBuffer buffer) throws IOException { return endPoint.fill(buffer); @@ -411,11 +455,21 @@ public class ConnectHandler extends HandlerWrapper * @param endPoint the endPoint to write to * @param buffer the buffer to write * @param callback the completion callback to invoke + * @param context the context information related to the connection */ - protected void write(EndPoint endPoint, ByteBuffer buffer, Callback callback) + protected void write(EndPoint endPoint, ByteBuffer buffer, Callback callback, ConcurrentMap<String, Object> context) { if (LOG.isDebugEnabled()) LOG.debug("{} writing {} bytes", this, buffer.remaining()); + write(endPoint, buffer, callback); + } + + /** + * @deprecated override {@link #write(EndPoint, ByteBuffer, Callback, ConcurrentMap)} instead + */ + @Deprecated + protected void write(EndPoint endPoint, ByteBuffer buffer, Callback callback) + { endPoint.write(callback, buffer); } @@ -494,14 +548,9 @@ public class ConnectHandler extends HandlerWrapper @Override protected void connectionFailed(SocketChannel channel, final Throwable ex, final Object attachment) { - getExecutor().execute(new Runnable() - { - public void run() - { - ConnectContext connectContext = (ConnectContext)attachment; - onConnectFailure(connectContext.request, connectContext.response, connectContext.asyncContext, ex); - } - }); + close(channel); + ConnectContext connectContext = (ConnectContext)attachment; + onConnectFailure(connectContext.request, connectContext.response, connectContext.asyncContext, ex); } } @@ -561,37 +610,45 @@ public class ConnectHandler extends HandlerWrapper public void onOpen() { super.onOpen(); - getExecutor().execute(new Runnable() - { - public void run() - { - onConnectSuccess(connectContext, UpstreamConnection.this); - fillInterested(); - } - }); + onConnectSuccess(connectContext, UpstreamConnection.this); + fillInterested(); } @Override protected int read(EndPoint endPoint, ByteBuffer buffer) throws IOException { - return ConnectHandler.this.read(endPoint, buffer); + return ConnectHandler.this.read(endPoint, buffer, getContext()); } @Override protected void write(EndPoint endPoint, ByteBuffer buffer,Callback callback) { - ConnectHandler.this.write(endPoint, buffer, callback); + ConnectHandler.this.write(endPoint, buffer, callback, getContext()); } } - public class DownstreamConnection extends ProxyConnection + public class DownstreamConnection extends ProxyConnection implements Connection.UpgradeTo { - private final ByteBuffer buffer; + private ByteBuffer buffer; - public DownstreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool bufferPool, ConcurrentMap<String, Object> context, ByteBuffer buffer) + public DownstreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool bufferPool, ConcurrentMap<String, Object> context) { super(endPoint, executor, bufferPool, context); - this.buffer = buffer; + } + + /** + * @deprecated use {@link #DownstreamConnection(EndPoint, Executor, ByteBufferPool, ConcurrentMap)} instead + */ + @Deprecated + public DownstreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool bufferPool, ConcurrentMap<String, Object> context, ByteBuffer buffer) + { + this(endPoint, executor, bufferPool, context); + } + + @Override + public void onUpgradeTo(ByteBuffer buffer) + { + this.buffer = buffer == null ? BufferUtil.EMPTY_BUFFER : buffer; } @Override @@ -623,13 +680,13 @@ public class ConnectHandler extends HandlerWrapper @Override protected int read(EndPoint endPoint, ByteBuffer buffer) throws IOException { - return ConnectHandler.this.read(endPoint, buffer); + return ConnectHandler.this.read(endPoint, buffer, getContext()); } @Override protected void write(EndPoint endPoint, ByteBuffer buffer, Callback callback) { - ConnectHandler.this.write(endPoint, buffer, callback); + ConnectHandler.this.write(endPoint, buffer, callback, getContext()); } } } diff --git a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ConnectHandlerTest.java b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ConnectHandlerTest.java index f601975408..780e70686a 100644 --- a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ConnectHandlerTest.java +++ b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ConnectHandlerTest.java @@ -27,6 +27,8 @@ import java.io.OutputStream; import java.net.InetAddress; import java.net.Socket; import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; import java.util.Locale; import java.util.concurrent.ConcurrentMap; @@ -36,12 +38,15 @@ import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.toolchain.test.http.SimpleHttpResponse; import org.eclipse.jetty.util.B64Code; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.Promise; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -631,12 +636,33 @@ public class ConnectHandlerTest extends AbstractConnectHandlerTest } @Override + protected void connectToServer(HttpServletRequest request, String host, int port, Promise<SocketChannel> promise) + { + Assert.assertEquals(contextValue, request.getAttribute(contextKey)); + super.connectToServer(request, host, port, promise); + } + + @Override protected void prepareContext(HttpServletRequest request, ConcurrentMap<String, Object> context) { // Transfer data from the HTTP request to the connection context Assert.assertEquals(contextValue, request.getAttribute(contextKey)); context.put(contextKey, request.getAttribute(contextKey)); } + + @Override + protected int read(EndPoint endPoint, ByteBuffer buffer, ConcurrentMap<String, Object> context) throws IOException + { + Assert.assertEquals(contextValue, context.get(contextKey)); + return super.read(endPoint, buffer, context); + } + + @Override + protected void write(EndPoint endPoint, ByteBuffer buffer, Callback callback, ConcurrentMap<String, Object> context) + { + Assert.assertEquals(contextValue, context.get(contextKey)); + super.write(endPoint, buffer, callback, context); + } }); proxy.start(); |