Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimone Bordet2015-11-04 12:36:36 -0500
committerSimone Bordet2015-11-04 12:36:36 -0500
commit973d95c8bc4c106776e70f7610a4deb5a2219e45 (patch)
tree162eb493a1d8f10670dd499c8951e88a4cc587bf
parentafc6b9e5c0644287cd0c5a510491e6477e0f6218 (diff)
parentaf0b6284e527d84f6fbb5838a7e2b47602670a1e (diff)
downloadorg.eclipse.jetty.project-973d95c8bc4c106776e70f7610a4deb5a2219e45.tar.gz
org.eclipse.jetty.project-973d95c8bc4c106776e70f7610a4deb5a2219e45.tar.xz
org.eclipse.jetty.project-973d95c8bc4c106776e70f7610a4deb5a2219e45.zip
Merged branch 'jetty-9.2.x' into 'jetty-9.3.x'.
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java2
-rw-r--r--jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ConnectHandler.java221
-rw-r--r--jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ConnectHandlerTest.java26
3 files changed, 166 insertions, 83 deletions
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java
index 0444f788ab..5b49c8e44c 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java
@@ -182,7 +182,7 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
Connection old_connection = getConnection();
if (LOG.isDebugEnabled())
- LOG.debug("{} upgradeing from {} to {}", this, old_connection, newConnection);
+ LOG.debug("{} upgrading from {} to {}", this, old_connection, newConnection);
ByteBuffer prefilled = (old_connection instanceof Connection.UpgradeFrom)
?((Connection.UpgradeFrom)old_connection).onUpgradeFrom():null;
diff --git a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ConnectHandler.java b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ConnectHandler.java
index 71a75bc899..9d0c49fb2e 100644
--- a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ConnectHandler.java
+++ b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ConnectHandler.java
@@ -18,6 +18,7 @@
package org.eclipse.jetty.proxy;
+import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@@ -51,6 +52,7 @@ import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.HandlerWrapper;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@@ -160,21 +162,17 @@ public class ConnectHandler extends HandlerWrapper
protected void doStart() throws Exception
{
if (executor == null)
- {
- setExecutor(getServer().getThreadPool());
- }
+ executor = getServer().getThreadPool();
+
if (scheduler == null)
- {
- setScheduler(new ScheduledExecutorScheduler());
- addBean(getScheduler());
- }
+ addBean(scheduler = new ScheduledExecutorScheduler());
+
if (bufferPool == null)
- {
- setByteBufferPool(new MappedByteBufferPool());
- addBean(getByteBufferPool());
- }
+ addBean(bufferPool = new MappedByteBufferPool());
+
addBean(selector = newSelectorManager());
selector.setConnectTimeout(getConnectTimeout());
+
super.doStart();
}
@@ -191,16 +189,8 @@ public class ConnectHandler extends HandlerWrapper
String serverAddress = request.getRequestURI();
if (LOG.isDebugEnabled())
LOG.debug("CONNECT request for {}", serverAddress);
- try
- {
- handleConnect(baseRequest, request, response, serverAddress);
- }
- catch (Exception x)
- {
- // TODO
- LOG.warn("ConnectHandler " + baseRequest.getHttpURI() + " " + x);
- LOG.debug(x);
- }
+
+ handleConnect(baseRequest, request, response, serverAddress);
}
else
{
@@ -249,32 +239,40 @@ public class ConnectHandler extends HandlerWrapper
return;
}
- SocketChannel channel = SocketChannel.open();
- channel.socket().setTcpNoDelay(true);
- channel.configureBlocking(false);
-
- AsyncContext asyncContext = request.startAsync();
- asyncContext.setTimeout(0);
-
HttpTransport transport = baseRequest.getHttpChannel().getHttpTransport();
-
// TODO Handle CONNECT over HTTP2!
if (!(transport instanceof HttpConnection))
{
if (LOG.isDebugEnabled())
- LOG.debug("CONNECT forbidden for {}", transport);
+ LOG.debug("CONNECT not supported for {}", transport);
sendConnectResponse(request, response, HttpServletResponse.SC_FORBIDDEN);
return;
}
- InetSocketAddress address = newConnectAddress(host, port);
+ AsyncContext asyncContext = request.startAsync();
+ asyncContext.setTimeout(0);
+
if (LOG.isDebugEnabled())
- LOG.debug("Connecting to {}", address);
- ConnectContext connectContext = new ConnectContext(request, response, asyncContext, (HttpConnection)transport);
- if (channel.connect(address))
- selector.accept(channel, connectContext);
- else
- selector.connect(channel, connectContext);
+ LOG.debug("Connecting to {}:{}", host, port);
+
+ connectToServer(request, host, port, new Promise<SocketChannel>()
+ {
+ @Override
+ public void succeeded(SocketChannel channel)
+ {
+ ConnectContext connectContext = new ConnectContext(request, response, asyncContext, (HttpConnection)transport);
+ if (channel.isConnected())
+ selector.accept(channel, connectContext);
+ else
+ selector.connect(channel, connectContext);
+ }
+
+ @Override
+ public void failed(Throwable x)
+ {
+ onConnectFailure(request, response, asyncContext, x);
+ }
+ });
}
catch (Exception x)
{
@@ -282,37 +280,59 @@ public class ConnectHandler extends HandlerWrapper
}
}
- /* ------------------------------------------------------------ */
- /** Create the address the connect channel will connect to.
- * @param host The host from the connect request
- * @param port The port from the connect request
+ protected void connectToServer(HttpServletRequest request, String host, int port, Promise<SocketChannel> promise)
+ {
+ SocketChannel channel = null;
+ try
+ {
+ channel = SocketChannel.open();
+ channel.socket().setTcpNoDelay(true);
+ channel.configureBlocking(false);
+ InetSocketAddress address = newConnectAddress(host, port);
+ channel.connect(address);
+ promise.succeeded(channel);
+ }
+ catch (Throwable x)
+ {
+ close(channel);
+ promise.failed(x);
+ }
+ }
+
+ private void close(Closeable closeable)
+ {
+ try
+ {
+ if (closeable != null)
+ closeable.close();
+ }
+ catch (Throwable x)
+ {
+ LOG.ignore(x);
+ }
+ }
+
+ /**
+ * Creates the server address to connect to.
+ *
+ * @param host The host from the CONNECT request
+ * @param port The port from the CONNECT request
* @return The InetSocketAddress to connect to.
*/
protected InetSocketAddress newConnectAddress(String host, int port)
{
return new InetSocketAddress(host, port);
}
-
+
protected void onConnectSuccess(ConnectContext connectContext, UpstreamConnection upstreamConnection)
{
- HttpConnection httpConnection = connectContext.getHttpConnection();
- ByteBuffer requestBuffer = httpConnection.getRequestBuffer();
- ByteBuffer buffer = BufferUtil.EMPTY_BUFFER;
- int remaining = requestBuffer.remaining();
- if (remaining > 0)
- {
- buffer = bufferPool.acquire(remaining, requestBuffer.isDirect());
- BufferUtil.flipToFill(buffer);
- buffer.put(requestBuffer);
- buffer.flip();
- }
-
ConcurrentMap<String, Object> context = connectContext.getContext();
HttpServletRequest request = connectContext.getRequest();
prepareContext(request, context);
+ HttpConnection httpConnection = connectContext.getHttpConnection();
EndPoint downstreamEndPoint = httpConnection.getEndPoint();
- DownstreamConnection downstreamConnection = newDownstreamConnection(downstreamEndPoint, context, buffer);
+ DownstreamConnection downstreamConnection = newDownstreamConnection(downstreamEndPoint, context, BufferUtil.EMPTY_BUFFER);
downstreamConnection.setInputBufferSize(getBufferSize());
upstreamConnection.setConnection(downstreamConnection);
@@ -324,6 +344,7 @@ public class ConnectHandler extends HandlerWrapper
sendConnectResponse(request, response, HttpServletResponse.SC_OK);
upgradeConnection(request, response, downstreamConnection);
+
connectContext.getAsyncContext().complete();
}
@@ -349,7 +370,8 @@ public class ConnectHandler extends HandlerWrapper
}
catch (IOException x)
{
- // TODO: nothing we can do, close the connection
+ if (LOG.isDebugEnabled())
+ LOG.debug("Could not send CONNECT response", x);
}
}
@@ -367,9 +389,18 @@ public class ConnectHandler extends HandlerWrapper
return true;
}
+ /**
+ * @deprecated use {@link #newDownstreamConnection(EndPoint, ConcurrentMap)} instead
+ */
+ @Deprecated
protected DownstreamConnection newDownstreamConnection(EndPoint endPoint, ConcurrentMap<String, Object> context, ByteBuffer buffer)
{
- return new DownstreamConnection(endPoint, getExecutor(), getByteBufferPool(), context, buffer);
+ return newDownstreamConnection(endPoint, context);
+ }
+
+ protected DownstreamConnection newDownstreamConnection(EndPoint endPoint, ConcurrentMap<String, Object> context)
+ {
+ return new DownstreamConnection(endPoint, getExecutor(), getByteBufferPool(), context);
}
protected UpstreamConnection newUpstreamConnection(EndPoint endPoint, ConnectContext connectContext)
@@ -396,10 +427,23 @@ public class ConnectHandler extends HandlerWrapper
*
* @param endPoint the endPoint to read from
* @param buffer the buffer to read data into
+ * @param context the context information related to the connection
* @return the number of bytes read (possibly 0 since the read is non-blocking)
* or -1 if the channel has been closed remotely
* @throws IOException if the endPoint cannot be read
*/
+ protected int read(EndPoint endPoint, ByteBuffer buffer, ConcurrentMap<String, Object> context) throws IOException
+ {
+ int read = read(endPoint, buffer);
+ if (LOG.isDebugEnabled())
+ LOG.debug("{} read {} bytes", this, read);
+ return read;
+ }
+
+ /**
+ * @deprecated override {@link #read(EndPoint, ByteBuffer, ConcurrentMap)} instead
+ */
+ @Deprecated
protected int read(EndPoint endPoint, ByteBuffer buffer) throws IOException
{
return endPoint.fill(buffer);
@@ -411,11 +455,21 @@ public class ConnectHandler extends HandlerWrapper
* @param endPoint the endPoint to write to
* @param buffer the buffer to write
* @param callback the completion callback to invoke
+ * @param context the context information related to the connection
*/
- protected void write(EndPoint endPoint, ByteBuffer buffer, Callback callback)
+ protected void write(EndPoint endPoint, ByteBuffer buffer, Callback callback, ConcurrentMap<String, Object> context)
{
if (LOG.isDebugEnabled())
LOG.debug("{} writing {} bytes", this, buffer.remaining());
+ write(endPoint, buffer, callback);
+ }
+
+ /**
+ * @deprecated override {@link #write(EndPoint, ByteBuffer, Callback, ConcurrentMap)} instead
+ */
+ @Deprecated
+ protected void write(EndPoint endPoint, ByteBuffer buffer, Callback callback)
+ {
endPoint.write(callback, buffer);
}
@@ -494,14 +548,9 @@ public class ConnectHandler extends HandlerWrapper
@Override
protected void connectionFailed(SocketChannel channel, final Throwable ex, final Object attachment)
{
- getExecutor().execute(new Runnable()
- {
- public void run()
- {
- ConnectContext connectContext = (ConnectContext)attachment;
- onConnectFailure(connectContext.request, connectContext.response, connectContext.asyncContext, ex);
- }
- });
+ close(channel);
+ ConnectContext connectContext = (ConnectContext)attachment;
+ onConnectFailure(connectContext.request, connectContext.response, connectContext.asyncContext, ex);
}
}
@@ -561,37 +610,45 @@ public class ConnectHandler extends HandlerWrapper
public void onOpen()
{
super.onOpen();
- getExecutor().execute(new Runnable()
- {
- public void run()
- {
- onConnectSuccess(connectContext, UpstreamConnection.this);
- fillInterested();
- }
- });
+ onConnectSuccess(connectContext, UpstreamConnection.this);
+ fillInterested();
}
@Override
protected int read(EndPoint endPoint, ByteBuffer buffer) throws IOException
{
- return ConnectHandler.this.read(endPoint, buffer);
+ return ConnectHandler.this.read(endPoint, buffer, getContext());
}
@Override
protected void write(EndPoint endPoint, ByteBuffer buffer,Callback callback)
{
- ConnectHandler.this.write(endPoint, buffer, callback);
+ ConnectHandler.this.write(endPoint, buffer, callback, getContext());
}
}
- public class DownstreamConnection extends ProxyConnection
+ public class DownstreamConnection extends ProxyConnection implements Connection.UpgradeTo
{
- private final ByteBuffer buffer;
+ private ByteBuffer buffer;
- public DownstreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool bufferPool, ConcurrentMap<String, Object> context, ByteBuffer buffer)
+ public DownstreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool bufferPool, ConcurrentMap<String, Object> context)
{
super(endPoint, executor, bufferPool, context);
- this.buffer = buffer;
+ }
+
+ /**
+ * @deprecated use {@link #DownstreamConnection(EndPoint, Executor, ByteBufferPool, ConcurrentMap)} instead
+ */
+ @Deprecated
+ public DownstreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool bufferPool, ConcurrentMap<String, Object> context, ByteBuffer buffer)
+ {
+ this(endPoint, executor, bufferPool, context);
+ }
+
+ @Override
+ public void onUpgradeTo(ByteBuffer buffer)
+ {
+ this.buffer = buffer == null ? BufferUtil.EMPTY_BUFFER : buffer;
}
@Override
@@ -623,13 +680,13 @@ public class ConnectHandler extends HandlerWrapper
@Override
protected int read(EndPoint endPoint, ByteBuffer buffer) throws IOException
{
- return ConnectHandler.this.read(endPoint, buffer);
+ return ConnectHandler.this.read(endPoint, buffer, getContext());
}
@Override
protected void write(EndPoint endPoint, ByteBuffer buffer, Callback callback)
{
- ConnectHandler.this.write(endPoint, buffer, callback);
+ ConnectHandler.this.write(endPoint, buffer, callback, getContext());
}
}
}
diff --git a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ConnectHandlerTest.java b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ConnectHandlerTest.java
index f601975408..780e70686a 100644
--- a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ConnectHandlerTest.java
+++ b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ConnectHandlerTest.java
@@ -27,6 +27,8 @@ import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Locale;
import java.util.concurrent.ConcurrentMap;
@@ -36,12 +38,15 @@ import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.http.SimpleHttpResponse;
import org.eclipse.jetty.util.B64Code;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.Promise;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -631,12 +636,33 @@ public class ConnectHandlerTest extends AbstractConnectHandlerTest
}
@Override
+ protected void connectToServer(HttpServletRequest request, String host, int port, Promise<SocketChannel> promise)
+ {
+ Assert.assertEquals(contextValue, request.getAttribute(contextKey));
+ super.connectToServer(request, host, port, promise);
+ }
+
+ @Override
protected void prepareContext(HttpServletRequest request, ConcurrentMap<String, Object> context)
{
// Transfer data from the HTTP request to the connection context
Assert.assertEquals(contextValue, request.getAttribute(contextKey));
context.put(contextKey, request.getAttribute(contextKey));
}
+
+ @Override
+ protected int read(EndPoint endPoint, ByteBuffer buffer, ConcurrentMap<String, Object> context) throws IOException
+ {
+ Assert.assertEquals(contextValue, context.get(contextKey));
+ return super.read(endPoint, buffer, context);
+ }
+
+ @Override
+ protected void write(EndPoint endPoint, ByteBuffer buffer, Callback callback, ConcurrentMap<String, Object> context)
+ {
+ Assert.assertEquals(contextValue, context.get(contextKey));
+ super.write(endPoint, buffer, callback, context);
+ }
});
proxy.start();

Back to the top