diff options
author | Simone Bordet | 2013-09-06 22:36:31 +0000 |
---|---|---|
committer | Simone Bordet | 2013-09-06 22:36:31 +0000 |
commit | 3cdf4dece1626dcb7df8c8aaf5221deb9bf3b0e7 (patch) | |
tree | fa2e1c0c44e57c15fe518c41155c38c040c0046e | |
parent | e09415145f6c1e2926db7f2d9bf4efbf1272b16d (diff) | |
download | org.eclipse.jetty.project-3cdf4dece1626dcb7df8c8aaf5221deb9bf3b0e7.tar.gz org.eclipse.jetty.project-3cdf4dece1626dcb7df8c8aaf5221deb9bf3b0e7.tar.xz org.eclipse.jetty.project-3cdf4dece1626dcb7df8c8aaf5221deb9bf3b0e7.zip |
Refactored some behaviour to base classes to ease the FCGI
implementation, and taken the chance to remove redundant code.
11 files changed, 453 insertions, 322 deletions
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpClientTransport.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpClientTransport.java new file mode 100644 index 0000000000..46cf76aa77 --- /dev/null +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpClientTransport.java @@ -0,0 +1,233 @@ +// +// ======================================================================== +// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import javax.net.ssl.SSLEngine; + +import org.eclipse.jetty.http.HttpScheme; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.SelectChannelEndPoint; +import org.eclipse.jetty.io.SelectorManager; +import org.eclipse.jetty.io.ssl.SslConnection; +import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.component.ContainerLifeCycle; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.ssl.SslContextFactory; + +public abstract class AbstractHttpClientTransport extends ContainerLifeCycle implements HttpClientTransport +{ + protected static final Logger LOG = Log.getLogger(HttpClientTransport.class); + + private final int selectors; + private volatile HttpClient client; + private volatile SelectorManager selectorManager; + + protected AbstractHttpClientTransport(int selectors) + { + this.selectors = selectors; + } + + protected HttpClient getHttpClient() + { + return client; + } + + @Override + public void setHttpClient(HttpClient client) + { + this.client = client; + } + + @Override + protected void doStart() throws Exception + { + selectorManager = newSelectorManager(client); + selectorManager.setConnectTimeout(client.getConnectTimeout()); + addBean(selectorManager); + super.doStart(); + } + + @Override + public void connect(HttpDestination destination, SocketAddress address, Promise<org.eclipse.jetty.client.api.Connection> promise) + { + SocketChannel channel = null; + try + { + channel = SocketChannel.open(); + HttpClient client = destination.getHttpClient(); + SocketAddress bindAddress = client.getBindAddress(); + if (bindAddress != null) + channel.bind(bindAddress); + configure(client, channel); + channel.configureBlocking(false); + channel.connect(address); + + ConnectionCallback callback = new ConnectionCallback(destination, promise); + selectorManager.connect(channel, callback); + } + // Must catch all exceptions, since some like + // UnresolvedAddressException are not IOExceptions. + catch (Throwable x) + { + try + { + if (channel != null) + channel.close(); + } + catch (IOException xx) + { + LOG.ignore(xx); + } + finally + { + promise.failed(x); + } + } + } + + protected void configure(HttpClient client, SocketChannel channel) throws SocketException + { + channel.socket().setTcpNoDelay(client.isTCPNoDelay()); + } + + protected SelectorManager newSelectorManager(HttpClient client) + { + return new ClientSelectorManager(client, selectors); + } + + protected SslConnection createSslConnection(EndPoint endPoint, HttpDestination destination) + { + HttpClient httpClient = destination.getHttpClient(); + SslContextFactory sslContextFactory = httpClient.getSslContextFactory(); + SSLEngine engine = sslContextFactory.newSSLEngine(destination.getHost(), destination.getPort()); + engine.setUseClientMode(true); + + SslConnection sslConnection = newSslConnection(httpClient, endPoint, engine); + sslConnection.setRenegotiationAllowed(sslContextFactory.isRenegotiationAllowed()); + endPoint.setConnection(sslConnection); + EndPoint appEndPoint = sslConnection.getDecryptedEndPoint(); + Connection connection = newConnection(appEndPoint, destination); + appEndPoint.setConnection(connection); + + return sslConnection; + } + + protected SslConnection newSslConnection(HttpClient httpClient, EndPoint endPoint, SSLEngine engine) + { + return new SslConnection(httpClient.getByteBufferPool(), httpClient.getExecutor(), endPoint, engine); + } + + protected abstract Connection newConnection(EndPoint endPoint, HttpDestination destination); + + protected org.eclipse.jetty.client.api.Connection tunnel(EndPoint endPoint, HttpDestination destination, org.eclipse.jetty.client.api.Connection connection) + { + SslConnection sslConnection = createSslConnection(endPoint, destination); + Connection result = sslConnection.getDecryptedEndPoint().getConnection(); + selectorManager.connectionClosed((Connection)connection); + selectorManager.connectionOpened(sslConnection); + LOG.debug("Tunnelled {} over {}", connection, result); + return (org.eclipse.jetty.client.api.Connection)result; + } + + protected class ClientSelectorManager extends SelectorManager + { + private final HttpClient client; + + protected ClientSelectorManager(HttpClient client, int selectors) + { + super(client.getExecutor(), client.getScheduler(), selectors); + this.client = client; + } + + @Override + protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key) + { + return new SelectChannelEndPoint(channel, selector, key, getScheduler(), client.getIdleTimeout()); + } + + @Override + public Connection newConnection(SocketChannel channel, EndPoint endPoint, Object attachment) throws IOException + { + ConnectionCallback callback = (ConnectionCallback)attachment; + HttpDestination destination = callback.destination; + + SslContextFactory sslContextFactory = client.getSslContextFactory(); + if (!destination.isProxied() && HttpScheme.HTTPS.is(destination.getScheme())) + { + if (sslContextFactory == null) + { + IOException failure = new ConnectException("Missing " + SslContextFactory.class.getSimpleName() + " for " + destination.getScheme() + " requests"); + callback.failed(failure); + throw failure; + } + else + { + SslConnection sslConnection = createSslConnection(endPoint, destination); + callback.succeeded((org.eclipse.jetty.client.api.Connection)sslConnection.getDecryptedEndPoint().getConnection()); + return sslConnection; + } + } + else + { + Connection connection = AbstractHttpClientTransport.this.newConnection(endPoint, destination); + callback.succeeded((org.eclipse.jetty.client.api.Connection)connection); + return connection; + } + } + + @Override + protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment) + { + ConnectionCallback callback = (ConnectionCallback)attachment; + callback.failed(ex); + } + } + + private class ConnectionCallback implements Promise<org.eclipse.jetty.client.api.Connection> + { + private final HttpDestination destination; + private final Promise<org.eclipse.jetty.client.api.Connection> promise; + + private ConnectionCallback(HttpDestination destination, Promise<org.eclipse.jetty.client.api.Connection> promise) + { + this.destination = destination; + this.promise = promise; + } + + @Override + public void succeeded(org.eclipse.jetty.client.api.Connection result) + { + promise.succeeded(result); + } + + @Override + public void failed(Throwable x) + { + promise.failed(x); + } + } +} diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java index 48ab7c31cd..ef1fc0f6b4 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java @@ -450,7 +450,7 @@ public class HttpClient extends ContainerLifeCycle HttpDestination destination = destinations.get(address); if (destination == null) { - destination = transport.newHttpDestination(this, scheme, host, port); + destination = transport.newHttpDestination(scheme, host, port); if (isRunning()) { HttpDestination existing = destinations.putIfAbsent(address, destination); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClientTransport.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClientTransport.java index 80d284c0d7..de943a61db 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClientTransport.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClientTransport.java @@ -23,13 +23,57 @@ import java.net.SocketAddress; import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.util.Promise; +/** + * {@link HttpClientTransport} represents what transport implementations should provide + * in order to plug-in a different transport for {@link HttpClient}. + * <p/> + * While the {@link HttpClient} APIs define the HTTP semantic (request, response, headers, etc.) + * <em>how</em> a HTTP exchange is carried over the network depends on implementations of this class. + * <p/> + * The default implementation uses the HTTP protocol to carry over the network the HTTP exchange, + * but the HTTP exchange may also be carried using the SPDY protocol or the FCGI protocol or, in future, + * other protocols. + */ public interface HttpClientTransport { - void setHttpClient(HttpClient client); + /** + * Sets the {@link HttpClient} instance on this transport. + * <p /> + * This is needed because of a chicken-egg problem: in order to create the {@link HttpClient} + * a {@link HttpClientTransport} is needed, that therefore cannot have a reference yet to the + * {@link HttpClient}. + * + * @param client the {@link HttpClient} that uses this transport. + */ + public void setHttpClient(HttpClient client); - HttpDestination newHttpDestination(HttpClient httpClient, String scheme, String host, int port); + /** + * Creates a new, transport-specific, {@link HttpDestination} object. + * <p /> + * {@link HttpDestination} controls the destination-connection cardinality: protocols like + * HTTP have 1-N cardinality, while multiplexed protocols like SPDY have a 1-1 cardinality. + * + * @param scheme the destination scheme + * @param host the destination host + * @param port the destination port + * @return a new, transport-specific, {@link HttpDestination} object + */ + public HttpDestination newHttpDestination(String scheme, String host, int port); - void connect(HttpDestination destination, SocketAddress address, Promise<Connection> promise); + /** + * Establishes a physical connection to the given {@code address}. + * + * @param destination the destination + * @param address the address to connect to + * @param promise the promise to notify when the connection succeeds or fails + */ + public void connect(HttpDestination destination, SocketAddress address, Promise<Connection> promise); - Connection tunnel(Connection connection); + /** + * Establishes an encrypted tunnel over the given {@code connection} + * + * @param connection the connection to tunnel + * @return the tunnelled connection + */ + public Connection tunnel(Connection connection); } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java index 1559fc97c9..e09079b1ca 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java @@ -39,18 +39,16 @@ public abstract class HttpConnection implements Connection { private static final HttpField CHUNKED_FIELD = new HttpField(HttpHeader.TRANSFER_ENCODING, HttpHeaderValue.CHUNKED); - private final HttpClient client; private final HttpDestination destination; - protected HttpConnection(HttpClient client, HttpDestination destination) + protected HttpConnection(HttpDestination destination) { - this.client = client; this.destination = destination; } public HttpClient getHttpClient() { - return client; + return destination.getHttpClient(); } public HttpDestination getHttpDestination() @@ -65,13 +63,13 @@ public abstract class HttpConnection implements Connection if (request.getTimeout() > 0) { TimeoutCompleteListener timeoutListener = new TimeoutCompleteListener(request); - timeoutListener.schedule(client.getScheduler()); + timeoutListener.schedule(getHttpClient().getScheduler()); listeners.add(timeoutListener); } if (listener != null) listeners.add(listener); - HttpConversation conversation = client.getConversation(request.getConversationID(), true); + HttpConversation conversation = getHttpClient().getConversation(request.getConversationID(), true); HttpExchange exchange = new HttpExchange(conversation, getHttpDestination(), request, listeners); send(exchange); @@ -123,7 +121,7 @@ public abstract class HttpConnection implements Connection } // Cookies - List<HttpCookie> cookies = client.getCookieStore().get(request.getURI()); + List<HttpCookie> cookies = getHttpClient().getCookieStore().get(request.getURI()); StringBuilder cookieString = null; for (int i = 0; i < cookies.size(); ++i) { @@ -139,7 +137,7 @@ public abstract class HttpConnection implements Connection // Authorization URI authenticationURI = destination.isProxied() ? destination.getProxyURI() : request.getURI(); - Authentication.Result authnResult = client.getAuthenticationStore().findAuthenticationResult(authenticationURI); + Authentication.Result authnResult = getHttpClient().getAuthenticationStore().findAuthenticationResult(authenticationURI); if (authnResult != null) authnResult.apply(request); } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java index 6f310918ed..53e7584e64 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java @@ -359,8 +359,10 @@ public abstract class HttpSender implements AsyncContentProvider.Listener * content needs to be sent, this method is invoked again; subclasses need only to send the content * at the {@link HttpContent} cursor position. * <p /> - * This method is invoked one last time when {@link HttpContent#isConsumed()} is true; subclasses - * needs to skip sending content in this case, and just complete their content generation. + * This method is invoked one last time when {@link HttpContent#isConsumed()} is true and therefore + * there is no actual content to send. + * This is done to allow subclasses to write "terminal" bytes (such as the terminal chunk when the + * transfer encoding is chunked) if their protocol needs to. * * @param exchange the exchange to send * @param content the content to send @@ -615,7 +617,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener } else { - if (content.isLast()) + if (content.isConsumed()) { sendContent(exchange, content, lastCallback); } @@ -685,7 +687,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener } else { - if (content.isLast()) + if (content.isConsumed()) { sendContent(exchange, content, lastCallback); } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java new file mode 100644 index 0000000000..aa2106e892 --- /dev/null +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java @@ -0,0 +1,135 @@ +// +// ======================================================================== +// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client; + +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.jetty.client.api.Connection; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.util.Promise; + +public abstract class MultiplexHttpDestination<C extends Connection> extends HttpDestination implements Promise<Connection> +{ + private final AtomicReference<ConnectState> connect = new AtomicReference<>(ConnectState.DISCONNECTED); + private C connection; + + protected MultiplexHttpDestination(HttpClient client, String scheme, String host, int port) + { + super(client, scheme, host, port); + } + + @Override + protected void send() + { + while (true) + { + ConnectState current = connect.get(); + switch (current) + { + case DISCONNECTED: + { + if (!connect.compareAndSet(current, ConnectState.CONNECTING)) + break; + newConnection(this); + return; + } + case CONNECTING: + { + // Waiting to connect, just return + return; + } + case CONNECTED: + { + if (process(connection, false)) + break; + return; + } + default: + { + throw new IllegalStateException(); + } + } + } + } + + @Override + @SuppressWarnings("unchecked") + public void succeeded(Connection result) + { + C connection = this.connection = (C)result; + if (connect.compareAndSet(ConnectState.CONNECTING, ConnectState.CONNECTED)) + { + process(connection, true); + } + else + { + connection.close(); + failed(new IllegalStateException()); + } + } + + @Override + public void failed(Throwable x) + { + connect.set(ConnectState.DISCONNECTED); + } + + protected boolean process(final C connection, boolean dispatch) + { + HttpClient client = getHttpClient(); + final HttpExchange exchange = getHttpExchanges().poll(); + LOG.debug("Processing exchange {} on connection {}", exchange, connection); + if (exchange == null) + return false; + + final Request request = exchange.getRequest(); + Throwable cause = request.getAbortCause(); + if (cause != null) + { + LOG.debug("Abort before processing {}: {}", exchange, cause); + abort(exchange, cause); + } + else + { + if (dispatch) + { + client.getExecutor().execute(new Runnable() + { + @Override + public void run() + { + send(connection, exchange); + } + }); + } + else + { + send(connection, exchange); + } + } + return true; + } + + protected abstract void send(C connection, HttpExchange exchange); + + private enum ConnectState + { + DISCONNECTED, CONNECTING, CONNECTED + } +} diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpClientTransportOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpClientTransportOverHTTP.java index 391e2c83c1..2e491cfe4d 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpClientTransportOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpClientTransportOverHTTP.java @@ -18,220 +18,39 @@ package org.eclipse.jetty.client.http; -import java.io.IOException; -import java.net.ConnectException; -import java.net.SocketAddress; -import java.net.SocketException; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import javax.net.ssl.SSLEngine; - -import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.HttpClientTransport; +import org.eclipse.jetty.client.AbstractHttpClientTransport; import org.eclipse.jetty.client.HttpDestination; -import org.eclipse.jetty.http.HttpScheme; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; -import org.eclipse.jetty.io.SelectChannelEndPoint; -import org.eclipse.jetty.io.SelectorManager; -import org.eclipse.jetty.io.ssl.SslConnection; -import org.eclipse.jetty.util.Promise; -import org.eclipse.jetty.util.component.ContainerLifeCycle; -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.ssl.SslContextFactory; -public class HttpClientTransportOverHTTP extends ContainerLifeCycle implements HttpClientTransport +public class HttpClientTransportOverHTTP extends AbstractHttpClientTransport { - private static final Logger LOG = Log.getLogger(HttpClientTransportOverHTTP.class); - - private volatile HttpClient client; - private volatile SelectorManager selectorManager; - - @Override - public void setHttpClient(HttpClient client) + public HttpClientTransportOverHTTP() { - this.client = client; + this(1); } - @Override - protected void doStart() throws Exception + public HttpClientTransportOverHTTP(int selectors) { - selectorManager = newSelectorManager(client); - selectorManager.setConnectTimeout(client.getConnectTimeout()); - addBean(selectorManager); - super.doStart(); + super(selectors); } @Override - public HttpDestination newHttpDestination(HttpClient client, String scheme, String host, int port) + public HttpDestination newHttpDestination(String scheme, String host, int port) { - return new HttpDestinationOverHTTP(client, scheme, host, port); + return new HttpDestinationOverHTTP(getHttpClient(), scheme, host, port); } @Override - public void connect(HttpDestination destination, SocketAddress address, Promise<org.eclipse.jetty.client.api.Connection> promise) + protected Connection newConnection(EndPoint endPoint, HttpDestination destination) { - SocketChannel channel = null; - try - { - channel = SocketChannel.open(); - HttpClient client = destination.getHttpClient(); - SocketAddress bindAddress = client.getBindAddress(); - if (bindAddress != null) - channel.bind(bindAddress); - configure(client, channel); - channel.configureBlocking(false); - channel.connect(address); - - ConnectionCallback callback = new ConnectionCallback(destination, promise); - selectorManager.connect(channel, callback); - } - // Must catch all exceptions, since some like - // UnresolvedAddressException are not IOExceptions. - catch (Throwable x) - { - try - { - if (channel != null) - channel.close(); - } - catch (IOException xx) - { - LOG.ignore(xx); - } - finally - { - promise.failed(x); - } - } + return new HttpConnectionOverHTTP(endPoint, destination); } @Override public org.eclipse.jetty.client.api.Connection tunnel(org.eclipse.jetty.client.api.Connection connection) { HttpConnectionOverHTTP httpConnection = (HttpConnectionOverHTTP)connection; - HttpDestination destination = httpConnection.getHttpDestination(); - SslConnection sslConnection = createSslConnection(destination, httpConnection.getEndPoint()); - HttpConnectionOverHTTP result = (HttpConnectionOverHTTP)sslConnection.getDecryptedEndPoint().getConnection(); - selectorManager.connectionClosed(httpConnection); - selectorManager.connectionOpened(sslConnection); - LOG.debug("Tunnelled {} over {}", connection, result); - return result; - } - - protected void configure(HttpClient client, SocketChannel channel) throws SocketException - { - channel.socket().setTcpNoDelay(client.isTCPNoDelay()); - } - - protected SelectorManager newSelectorManager(HttpClient client) - { - return new ClientSelectorManager(client, 1); - } - - protected Connection newHttpConnection(HttpClient httpClient, EndPoint endPoint, HttpDestination destination) - { - return new HttpConnectionOverHTTP(httpClient, endPoint, destination); - } - - protected SslConnection newSslConnection(HttpClient httpClient, EndPoint endPoint, SSLEngine engine) - { - return new SslConnection(httpClient.getByteBufferPool(), httpClient.getExecutor(), endPoint, engine); - } - - private SslConnection createSslConnection(HttpDestination destination, EndPoint endPoint) - { - HttpClient httpClient = destination.getHttpClient(); - SslContextFactory sslContextFactory = httpClient.getSslContextFactory(); - SSLEngine engine = sslContextFactory.newSSLEngine(destination.getHost(), destination.getPort()); - engine.setUseClientMode(true); - - SslConnection sslConnection = newSslConnection(httpClient, endPoint, engine); - sslConnection.setRenegotiationAllowed(sslContextFactory.isRenegotiationAllowed()); - endPoint.setConnection(sslConnection); - EndPoint appEndPoint = sslConnection.getDecryptedEndPoint(); - Connection connection = newHttpConnection(httpClient, appEndPoint, destination); - appEndPoint.setConnection(connection); - - return sslConnection; - } - - protected class ClientSelectorManager extends SelectorManager - { - private final HttpClient client; - - protected ClientSelectorManager(HttpClient client, int selectors) - { - super(client.getExecutor(), client.getScheduler(), selectors); - this.client = client; - } - - @Override - protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key) - { - return new SelectChannelEndPoint(channel, selector, key, getScheduler(), client.getIdleTimeout()); - } - - @Override - public Connection newConnection(SocketChannel channel, EndPoint endPoint, Object attachment) throws IOException - { - ConnectionCallback callback = (ConnectionCallback)attachment; - HttpDestination destination = callback.destination; - - SslContextFactory sslContextFactory = client.getSslContextFactory(); - if (!destination.isProxied() && HttpScheme.HTTPS.is(destination.getScheme())) - { - if (sslContextFactory == null) - { - IOException failure = new ConnectException("Missing " + SslContextFactory.class.getSimpleName() + " for " + destination.getScheme() + " requests"); - callback.failed(failure); - throw failure; - } - else - { - SslConnection sslConnection = createSslConnection(destination, endPoint); - callback.succeeded((org.eclipse.jetty.client.api.Connection)sslConnection.getDecryptedEndPoint().getConnection()); - return sslConnection; - } - } - else - { - Connection connection = newHttpConnection(client, endPoint, destination); - callback.succeeded((org.eclipse.jetty.client.api.Connection)connection); - return connection; - } - } - - @Override - protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment) - { - ConnectionCallback callback = (ConnectionCallback)attachment; - callback.failed(ex); - } - } - - private class ConnectionCallback implements Promise<org.eclipse.jetty.client.api.Connection> - { - private final HttpDestination destination; - private final Promise<org.eclipse.jetty.client.api.Connection> promise; - - private ConnectionCallback(HttpDestination destination, Promise<org.eclipse.jetty.client.api.Connection> promise) - { - this.destination = destination; - this.promise = promise; - } - - @Override - public void succeeded(org.eclipse.jetty.client.api.Connection result) - { - promise.succeeded(result); - } - - @Override - public void failed(Throwable x) - { - promise.failed(x); - } + return tunnel(httpConnection.getEndPoint(), httpConnection.getHttpDestination(), connection); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java index 9c8e245866..fbbb0bed04 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java @@ -20,7 +20,6 @@ package org.eclipse.jetty.client.http; import java.util.concurrent.TimeoutException; -import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpConnection; import org.eclipse.jetty.client.HttpDestination; import org.eclipse.jetty.client.HttpExchange; @@ -41,10 +40,10 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec private boolean closed; private long idleTimeout; - public HttpConnectionOverHTTP(HttpClient client, EndPoint endPoint, HttpDestination destination) + public HttpConnectionOverHTTP(EndPoint endPoint, HttpDestination destination) { - super(endPoint, client.getExecutor(), client.isDispatchIO()); - this.delegate = new Delegate(client, destination); + super(endPoint, destination.getHttpClient().getExecutor(), destination.getHttpClient().isDispatchIO()); + this.delegate = new Delegate(destination); this.channel = new HttpChannelOverHTTP(this); } @@ -146,9 +145,9 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec private class Delegate extends HttpConnection { - private Delegate(HttpClient client, HttpDestination destination) + private Delegate(HttpDestination destination) { - super(client, destination); + super(destination); } @Override diff --git a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpClientTransportOverSPDY.java b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpClientTransportOverSPDY.java index a1f0297238..b2fb3a6e4f 100644 --- a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpClientTransportOverSPDY.java +++ b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpClientTransportOverSPDY.java @@ -46,7 +46,7 @@ public class HttpClientTransportOverSPDY implements HttpClientTransport } @Override - public HttpDestination newHttpDestination(HttpClient httpClient, String scheme, String host, int port) + public HttpDestination newHttpDestination(String scheme, String host, int port) { return new HttpDestinationOverSPDY(httpClient, scheme, host, port); } @@ -75,7 +75,7 @@ public class HttpClientTransportOverSPDY implements HttpClientTransport @Override public void succeeded(Session session) { - Connection result = new HttpConnectionOverSPDY(httpClient, destination, session); + Connection result = new HttpConnectionOverSPDY(destination, session); promise.succeeded(result); } diff --git a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpConnectionOverSPDY.java b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpConnectionOverSPDY.java index ffe025146b..850088cc08 100644 --- a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpConnectionOverSPDY.java +++ b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpConnectionOverSPDY.java @@ -19,7 +19,6 @@ package org.eclipse.jetty.spdy.client.http; import org.eclipse.jetty.client.HttpChannel; -import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpConnection; import org.eclipse.jetty.client.HttpDestination; import org.eclipse.jetty.client.HttpExchange; @@ -31,9 +30,9 @@ public class HttpConnectionOverSPDY extends HttpConnection { private final Session session; - public HttpConnectionOverSPDY(HttpClient client, HttpDestination destination, Session session) + public HttpConnectionOverSPDY(HttpDestination destination, Session session) { - super(client, destination); + super(destination); this.session = session; } diff --git a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpDestinationOverSPDY.java b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpDestinationOverSPDY.java index 24b2acf4b7..321fb9991c 100644 --- a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpDestinationOverSPDY.java +++ b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpDestinationOverSPDY.java @@ -18,118 +18,20 @@ package org.eclipse.jetty.spdy.client.http; -import java.util.concurrent.atomic.AtomicReference; - import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.HttpDestination; import org.eclipse.jetty.client.HttpExchange; -import org.eclipse.jetty.client.api.Connection; -import org.eclipse.jetty.client.api.Request; -import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.client.MultiplexHttpDestination; -public class HttpDestinationOverSPDY extends HttpDestination implements Promise<Connection> +public class HttpDestinationOverSPDY extends MultiplexHttpDestination<HttpConnectionOverSPDY> { - private final AtomicReference<ConnectState> connect = new AtomicReference<>(ConnectState.DISCONNECTED); - private HttpConnectionOverSPDY connection; - public HttpDestinationOverSPDY(HttpClient client, String scheme, String host, int port) { super(client, scheme, host, port); } @Override - protected void send() - { - while (true) - { - ConnectState current = connect.get(); - switch (current) - { - case DISCONNECTED: - { - if (!connect.compareAndSet(current, ConnectState.CONNECTING)) - break; - newConnection(this); - return; - } - case CONNECTING: - { - // Waiting to connect, just return - return; - } - case CONNECTED: - { - if (process(connection, false)) - break; - return; - } - default: - { - throw new IllegalStateException(); - } - } - } - } - - @Override - public void succeeded(Connection result) - { - HttpConnectionOverSPDY connection = this.connection = (HttpConnectionOverSPDY)result; - if (connect.compareAndSet(ConnectState.CONNECTING, ConnectState.CONNECTED)) - { - process(connection, true); - } - else - { - connection.close(); - failed(new IllegalStateException()); - } - } - - @Override - public void failed(Throwable x) - { - connect.set(ConnectState.DISCONNECTED); - } - - private boolean process(final HttpConnectionOverSPDY connection, boolean dispatch) - { - HttpClient client = getHttpClient(); - final HttpExchange exchange = getHttpExchanges().poll(); - LOG.debug("Processing exchange {} on connection {}", exchange, connection); - if (exchange == null) - return false; - - final Request request = exchange.getRequest(); - Throwable cause = request.getAbortCause(); - if (cause != null) - { - LOG.debug("Abort before processing {}: {}", exchange, cause); - abort(exchange, cause); - } - else - { - if (dispatch) - { - client.getExecutor().execute(new Runnable() - { - @Override - public void run() - { - connection.send(exchange); - } - }); - } - else - { - connection.send(exchange); - } - } - return true; - } - - private enum ConnectState + protected void send(HttpConnectionOverSPDY connection, HttpExchange exchange) { - DISCONNECTED, CONNECTING, CONNECTED + connection.send(exchange); } } |