Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimone Bordet2013-09-06 22:36:31 +0000
committerSimone Bordet2013-09-06 22:36:31 +0000
commit3cdf4dece1626dcb7df8c8aaf5221deb9bf3b0e7 (patch)
treefa2e1c0c44e57c15fe518c41155c38c040c0046e
parente09415145f6c1e2926db7f2d9bf4efbf1272b16d (diff)
downloadorg.eclipse.jetty.project-3cdf4dece1626dcb7df8c8aaf5221deb9bf3b0e7.tar.gz
org.eclipse.jetty.project-3cdf4dece1626dcb7df8c8aaf5221deb9bf3b0e7.tar.xz
org.eclipse.jetty.project-3cdf4dece1626dcb7df8c8aaf5221deb9bf3b0e7.zip
Refactored some behaviour to base classes to ease the FCGI
implementation, and taken the chance to remove redundant code.
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpClientTransport.java233
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java2
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/HttpClientTransport.java52
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java14
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java10
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java135
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpClientTransportOverHTTP.java203
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java11
-rw-r--r--jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpClientTransportOverSPDY.java4
-rw-r--r--jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpConnectionOverSPDY.java5
-rw-r--r--jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpDestinationOverSPDY.java106
11 files changed, 453 insertions, 322 deletions
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpClientTransport.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpClientTransport.java
new file mode 100644
index 0000000000..46cf76aa77
--- /dev/null
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpClientTransport.java
@@ -0,0 +1,233 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.client;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import javax.net.ssl.SSLEngine;
+
+import org.eclipse.jetty.http.HttpScheme;
+import org.eclipse.jetty.io.Connection;
+import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.io.SelectChannelEndPoint;
+import org.eclipse.jetty.io.SelectorManager;
+import org.eclipse.jetty.io.ssl.SslConnection;
+import org.eclipse.jetty.util.Promise;
+import org.eclipse.jetty.util.component.ContainerLifeCycle;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+
+public abstract class AbstractHttpClientTransport extends ContainerLifeCycle implements HttpClientTransport
+{
+ protected static final Logger LOG = Log.getLogger(HttpClientTransport.class);
+
+ private final int selectors;
+ private volatile HttpClient client;
+ private volatile SelectorManager selectorManager;
+
+ protected AbstractHttpClientTransport(int selectors)
+ {
+ this.selectors = selectors;
+ }
+
+ protected HttpClient getHttpClient()
+ {
+ return client;
+ }
+
+ @Override
+ public void setHttpClient(HttpClient client)
+ {
+ this.client = client;
+ }
+
+ @Override
+ protected void doStart() throws Exception
+ {
+ selectorManager = newSelectorManager(client);
+ selectorManager.setConnectTimeout(client.getConnectTimeout());
+ addBean(selectorManager);
+ super.doStart();
+ }
+
+ @Override
+ public void connect(HttpDestination destination, SocketAddress address, Promise<org.eclipse.jetty.client.api.Connection> promise)
+ {
+ SocketChannel channel = null;
+ try
+ {
+ channel = SocketChannel.open();
+ HttpClient client = destination.getHttpClient();
+ SocketAddress bindAddress = client.getBindAddress();
+ if (bindAddress != null)
+ channel.bind(bindAddress);
+ configure(client, channel);
+ channel.configureBlocking(false);
+ channel.connect(address);
+
+ ConnectionCallback callback = new ConnectionCallback(destination, promise);
+ selectorManager.connect(channel, callback);
+ }
+ // Must catch all exceptions, since some like
+ // UnresolvedAddressException are not IOExceptions.
+ catch (Throwable x)
+ {
+ try
+ {
+ if (channel != null)
+ channel.close();
+ }
+ catch (IOException xx)
+ {
+ LOG.ignore(xx);
+ }
+ finally
+ {
+ promise.failed(x);
+ }
+ }
+ }
+
+ protected void configure(HttpClient client, SocketChannel channel) throws SocketException
+ {
+ channel.socket().setTcpNoDelay(client.isTCPNoDelay());
+ }
+
+ protected SelectorManager newSelectorManager(HttpClient client)
+ {
+ return new ClientSelectorManager(client, selectors);
+ }
+
+ protected SslConnection createSslConnection(EndPoint endPoint, HttpDestination destination)
+ {
+ HttpClient httpClient = destination.getHttpClient();
+ SslContextFactory sslContextFactory = httpClient.getSslContextFactory();
+ SSLEngine engine = sslContextFactory.newSSLEngine(destination.getHost(), destination.getPort());
+ engine.setUseClientMode(true);
+
+ SslConnection sslConnection = newSslConnection(httpClient, endPoint, engine);
+ sslConnection.setRenegotiationAllowed(sslContextFactory.isRenegotiationAllowed());
+ endPoint.setConnection(sslConnection);
+ EndPoint appEndPoint = sslConnection.getDecryptedEndPoint();
+ Connection connection = newConnection(appEndPoint, destination);
+ appEndPoint.setConnection(connection);
+
+ return sslConnection;
+ }
+
+ protected SslConnection newSslConnection(HttpClient httpClient, EndPoint endPoint, SSLEngine engine)
+ {
+ return new SslConnection(httpClient.getByteBufferPool(), httpClient.getExecutor(), endPoint, engine);
+ }
+
+ protected abstract Connection newConnection(EndPoint endPoint, HttpDestination destination);
+
+ protected org.eclipse.jetty.client.api.Connection tunnel(EndPoint endPoint, HttpDestination destination, org.eclipse.jetty.client.api.Connection connection)
+ {
+ SslConnection sslConnection = createSslConnection(endPoint, destination);
+ Connection result = sslConnection.getDecryptedEndPoint().getConnection();
+ selectorManager.connectionClosed((Connection)connection);
+ selectorManager.connectionOpened(sslConnection);
+ LOG.debug("Tunnelled {} over {}", connection, result);
+ return (org.eclipse.jetty.client.api.Connection)result;
+ }
+
+ protected class ClientSelectorManager extends SelectorManager
+ {
+ private final HttpClient client;
+
+ protected ClientSelectorManager(HttpClient client, int selectors)
+ {
+ super(client.getExecutor(), client.getScheduler(), selectors);
+ this.client = client;
+ }
+
+ @Override
+ protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key)
+ {
+ return new SelectChannelEndPoint(channel, selector, key, getScheduler(), client.getIdleTimeout());
+ }
+
+ @Override
+ public Connection newConnection(SocketChannel channel, EndPoint endPoint, Object attachment) throws IOException
+ {
+ ConnectionCallback callback = (ConnectionCallback)attachment;
+ HttpDestination destination = callback.destination;
+
+ SslContextFactory sslContextFactory = client.getSslContextFactory();
+ if (!destination.isProxied() && HttpScheme.HTTPS.is(destination.getScheme()))
+ {
+ if (sslContextFactory == null)
+ {
+ IOException failure = new ConnectException("Missing " + SslContextFactory.class.getSimpleName() + " for " + destination.getScheme() + " requests");
+ callback.failed(failure);
+ throw failure;
+ }
+ else
+ {
+ SslConnection sslConnection = createSslConnection(endPoint, destination);
+ callback.succeeded((org.eclipse.jetty.client.api.Connection)sslConnection.getDecryptedEndPoint().getConnection());
+ return sslConnection;
+ }
+ }
+ else
+ {
+ Connection connection = AbstractHttpClientTransport.this.newConnection(endPoint, destination);
+ callback.succeeded((org.eclipse.jetty.client.api.Connection)connection);
+ return connection;
+ }
+ }
+
+ @Override
+ protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
+ {
+ ConnectionCallback callback = (ConnectionCallback)attachment;
+ callback.failed(ex);
+ }
+ }
+
+ private class ConnectionCallback implements Promise<org.eclipse.jetty.client.api.Connection>
+ {
+ private final HttpDestination destination;
+ private final Promise<org.eclipse.jetty.client.api.Connection> promise;
+
+ private ConnectionCallback(HttpDestination destination, Promise<org.eclipse.jetty.client.api.Connection> promise)
+ {
+ this.destination = destination;
+ this.promise = promise;
+ }
+
+ @Override
+ public void succeeded(org.eclipse.jetty.client.api.Connection result)
+ {
+ promise.succeeded(result);
+ }
+
+ @Override
+ public void failed(Throwable x)
+ {
+ promise.failed(x);
+ }
+ }
+}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java
index 48ab7c31cd..ef1fc0f6b4 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java
@@ -450,7 +450,7 @@ public class HttpClient extends ContainerLifeCycle
HttpDestination destination = destinations.get(address);
if (destination == null)
{
- destination = transport.newHttpDestination(this, scheme, host, port);
+ destination = transport.newHttpDestination(scheme, host, port);
if (isRunning())
{
HttpDestination existing = destinations.putIfAbsent(address, destination);
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClientTransport.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClientTransport.java
index 80d284c0d7..de943a61db 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClientTransport.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClientTransport.java
@@ -23,13 +23,57 @@ import java.net.SocketAddress;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Promise;
+/**
+ * {@link HttpClientTransport} represents what transport implementations should provide
+ * in order to plug-in a different transport for {@link HttpClient}.
+ * <p/>
+ * While the {@link HttpClient} APIs define the HTTP semantic (request, response, headers, etc.)
+ * <em>how</em> a HTTP exchange is carried over the network depends on implementations of this class.
+ * <p/>
+ * The default implementation uses the HTTP protocol to carry over the network the HTTP exchange,
+ * but the HTTP exchange may also be carried using the SPDY protocol or the FCGI protocol or, in future,
+ * other protocols.
+ */
public interface HttpClientTransport
{
- void setHttpClient(HttpClient client);
+ /**
+ * Sets the {@link HttpClient} instance on this transport.
+ * <p />
+ * This is needed because of a chicken-egg problem: in order to create the {@link HttpClient}
+ * a {@link HttpClientTransport} is needed, that therefore cannot have a reference yet to the
+ * {@link HttpClient}.
+ *
+ * @param client the {@link HttpClient} that uses this transport.
+ */
+ public void setHttpClient(HttpClient client);
- HttpDestination newHttpDestination(HttpClient httpClient, String scheme, String host, int port);
+ /**
+ * Creates a new, transport-specific, {@link HttpDestination} object.
+ * <p />
+ * {@link HttpDestination} controls the destination-connection cardinality: protocols like
+ * HTTP have 1-N cardinality, while multiplexed protocols like SPDY have a 1-1 cardinality.
+ *
+ * @param scheme the destination scheme
+ * @param host the destination host
+ * @param port the destination port
+ * @return a new, transport-specific, {@link HttpDestination} object
+ */
+ public HttpDestination newHttpDestination(String scheme, String host, int port);
- void connect(HttpDestination destination, SocketAddress address, Promise<Connection> promise);
+ /**
+ * Establishes a physical connection to the given {@code address}.
+ *
+ * @param destination the destination
+ * @param address the address to connect to
+ * @param promise the promise to notify when the connection succeeds or fails
+ */
+ public void connect(HttpDestination destination, SocketAddress address, Promise<Connection> promise);
- Connection tunnel(Connection connection);
+ /**
+ * Establishes an encrypted tunnel over the given {@code connection}
+ *
+ * @param connection the connection to tunnel
+ * @return the tunnelled connection
+ */
+ public Connection tunnel(Connection connection);
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java
index 1559fc97c9..e09079b1ca 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java
@@ -39,18 +39,16 @@ public abstract class HttpConnection implements Connection
{
private static final HttpField CHUNKED_FIELD = new HttpField(HttpHeader.TRANSFER_ENCODING, HttpHeaderValue.CHUNKED);
- private final HttpClient client;
private final HttpDestination destination;
- protected HttpConnection(HttpClient client, HttpDestination destination)
+ protected HttpConnection(HttpDestination destination)
{
- this.client = client;
this.destination = destination;
}
public HttpClient getHttpClient()
{
- return client;
+ return destination.getHttpClient();
}
public HttpDestination getHttpDestination()
@@ -65,13 +63,13 @@ public abstract class HttpConnection implements Connection
if (request.getTimeout() > 0)
{
TimeoutCompleteListener timeoutListener = new TimeoutCompleteListener(request);
- timeoutListener.schedule(client.getScheduler());
+ timeoutListener.schedule(getHttpClient().getScheduler());
listeners.add(timeoutListener);
}
if (listener != null)
listeners.add(listener);
- HttpConversation conversation = client.getConversation(request.getConversationID(), true);
+ HttpConversation conversation = getHttpClient().getConversation(request.getConversationID(), true);
HttpExchange exchange = new HttpExchange(conversation, getHttpDestination(), request, listeners);
send(exchange);
@@ -123,7 +121,7 @@ public abstract class HttpConnection implements Connection
}
// Cookies
- List<HttpCookie> cookies = client.getCookieStore().get(request.getURI());
+ List<HttpCookie> cookies = getHttpClient().getCookieStore().get(request.getURI());
StringBuilder cookieString = null;
for (int i = 0; i < cookies.size(); ++i)
{
@@ -139,7 +137,7 @@ public abstract class HttpConnection implements Connection
// Authorization
URI authenticationURI = destination.isProxied() ? destination.getProxyURI() : request.getURI();
- Authentication.Result authnResult = client.getAuthenticationStore().findAuthenticationResult(authenticationURI);
+ Authentication.Result authnResult = getHttpClient().getAuthenticationStore().findAuthenticationResult(authenticationURI);
if (authnResult != null)
authnResult.apply(request);
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java
index 6f310918ed..53e7584e64 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java
@@ -359,8 +359,10 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
* content needs to be sent, this method is invoked again; subclasses need only to send the content
* at the {@link HttpContent} cursor position.
* <p />
- * This method is invoked one last time when {@link HttpContent#isConsumed()} is true; subclasses
- * needs to skip sending content in this case, and just complete their content generation.
+ * This method is invoked one last time when {@link HttpContent#isConsumed()} is true and therefore
+ * there is no actual content to send.
+ * This is done to allow subclasses to write "terminal" bytes (such as the terminal chunk when the
+ * transfer encoding is chunked) if their protocol needs to.
*
* @param exchange the exchange to send
* @param content the content to send
@@ -615,7 +617,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
}
else
{
- if (content.isLast())
+ if (content.isConsumed())
{
sendContent(exchange, content, lastCallback);
}
@@ -685,7 +687,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
}
else
{
- if (content.isLast())
+ if (content.isConsumed())
{
sendContent(exchange, content, lastCallback);
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java
new file mode 100644
index 0000000000..aa2106e892
--- /dev/null
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java
@@ -0,0 +1,135 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.client;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.eclipse.jetty.client.api.Connection;
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.util.Promise;
+
+public abstract class MultiplexHttpDestination<C extends Connection> extends HttpDestination implements Promise<Connection>
+{
+ private final AtomicReference<ConnectState> connect = new AtomicReference<>(ConnectState.DISCONNECTED);
+ private C connection;
+
+ protected MultiplexHttpDestination(HttpClient client, String scheme, String host, int port)
+ {
+ super(client, scheme, host, port);
+ }
+
+ @Override
+ protected void send()
+ {
+ while (true)
+ {
+ ConnectState current = connect.get();
+ switch (current)
+ {
+ case DISCONNECTED:
+ {
+ if (!connect.compareAndSet(current, ConnectState.CONNECTING))
+ break;
+ newConnection(this);
+ return;
+ }
+ case CONNECTING:
+ {
+ // Waiting to connect, just return
+ return;
+ }
+ case CONNECTED:
+ {
+ if (process(connection, false))
+ break;
+ return;
+ }
+ default:
+ {
+ throw new IllegalStateException();
+ }
+ }
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void succeeded(Connection result)
+ {
+ C connection = this.connection = (C)result;
+ if (connect.compareAndSet(ConnectState.CONNECTING, ConnectState.CONNECTED))
+ {
+ process(connection, true);
+ }
+ else
+ {
+ connection.close();
+ failed(new IllegalStateException());
+ }
+ }
+
+ @Override
+ public void failed(Throwable x)
+ {
+ connect.set(ConnectState.DISCONNECTED);
+ }
+
+ protected boolean process(final C connection, boolean dispatch)
+ {
+ HttpClient client = getHttpClient();
+ final HttpExchange exchange = getHttpExchanges().poll();
+ LOG.debug("Processing exchange {} on connection {}", exchange, connection);
+ if (exchange == null)
+ return false;
+
+ final Request request = exchange.getRequest();
+ Throwable cause = request.getAbortCause();
+ if (cause != null)
+ {
+ LOG.debug("Abort before processing {}: {}", exchange, cause);
+ abort(exchange, cause);
+ }
+ else
+ {
+ if (dispatch)
+ {
+ client.getExecutor().execute(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ send(connection, exchange);
+ }
+ });
+ }
+ else
+ {
+ send(connection, exchange);
+ }
+ }
+ return true;
+ }
+
+ protected abstract void send(C connection, HttpExchange exchange);
+
+ private enum ConnectState
+ {
+ DISCONNECTED, CONNECTING, CONNECTED
+ }
+}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpClientTransportOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpClientTransportOverHTTP.java
index 391e2c83c1..2e491cfe4d 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpClientTransportOverHTTP.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpClientTransportOverHTTP.java
@@ -18,220 +18,39 @@
package org.eclipse.jetty.client.http;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import javax.net.ssl.SSLEngine;
-
-import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.HttpClientTransport;
+import org.eclipse.jetty.client.AbstractHttpClientTransport;
import org.eclipse.jetty.client.HttpDestination;
-import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
-import org.eclipse.jetty.io.SelectChannelEndPoint;
-import org.eclipse.jetty.io.SelectorManager;
-import org.eclipse.jetty.io.ssl.SslConnection;
-import org.eclipse.jetty.util.Promise;
-import org.eclipse.jetty.util.component.ContainerLifeCycle;
-import org.eclipse.jetty.util.log.Log;
-import org.eclipse.jetty.util.log.Logger;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
-public class HttpClientTransportOverHTTP extends ContainerLifeCycle implements HttpClientTransport
+public class HttpClientTransportOverHTTP extends AbstractHttpClientTransport
{
- private static final Logger LOG = Log.getLogger(HttpClientTransportOverHTTP.class);
-
- private volatile HttpClient client;
- private volatile SelectorManager selectorManager;
-
- @Override
- public void setHttpClient(HttpClient client)
+ public HttpClientTransportOverHTTP()
{
- this.client = client;
+ this(1);
}
- @Override
- protected void doStart() throws Exception
+ public HttpClientTransportOverHTTP(int selectors)
{
- selectorManager = newSelectorManager(client);
- selectorManager.setConnectTimeout(client.getConnectTimeout());
- addBean(selectorManager);
- super.doStart();
+ super(selectors);
}
@Override
- public HttpDestination newHttpDestination(HttpClient client, String scheme, String host, int port)
+ public HttpDestination newHttpDestination(String scheme, String host, int port)
{
- return new HttpDestinationOverHTTP(client, scheme, host, port);
+ return new HttpDestinationOverHTTP(getHttpClient(), scheme, host, port);
}
@Override
- public void connect(HttpDestination destination, SocketAddress address, Promise<org.eclipse.jetty.client.api.Connection> promise)
+ protected Connection newConnection(EndPoint endPoint, HttpDestination destination)
{
- SocketChannel channel = null;
- try
- {
- channel = SocketChannel.open();
- HttpClient client = destination.getHttpClient();
- SocketAddress bindAddress = client.getBindAddress();
- if (bindAddress != null)
- channel.bind(bindAddress);
- configure(client, channel);
- channel.configureBlocking(false);
- channel.connect(address);
-
- ConnectionCallback callback = new ConnectionCallback(destination, promise);
- selectorManager.connect(channel, callback);
- }
- // Must catch all exceptions, since some like
- // UnresolvedAddressException are not IOExceptions.
- catch (Throwable x)
- {
- try
- {
- if (channel != null)
- channel.close();
- }
- catch (IOException xx)
- {
- LOG.ignore(xx);
- }
- finally
- {
- promise.failed(x);
- }
- }
+ return new HttpConnectionOverHTTP(endPoint, destination);
}
@Override
public org.eclipse.jetty.client.api.Connection tunnel(org.eclipse.jetty.client.api.Connection connection)
{
HttpConnectionOverHTTP httpConnection = (HttpConnectionOverHTTP)connection;
- HttpDestination destination = httpConnection.getHttpDestination();
- SslConnection sslConnection = createSslConnection(destination, httpConnection.getEndPoint());
- HttpConnectionOverHTTP result = (HttpConnectionOverHTTP)sslConnection.getDecryptedEndPoint().getConnection();
- selectorManager.connectionClosed(httpConnection);
- selectorManager.connectionOpened(sslConnection);
- LOG.debug("Tunnelled {} over {}", connection, result);
- return result;
- }
-
- protected void configure(HttpClient client, SocketChannel channel) throws SocketException
- {
- channel.socket().setTcpNoDelay(client.isTCPNoDelay());
- }
-
- protected SelectorManager newSelectorManager(HttpClient client)
- {
- return new ClientSelectorManager(client, 1);
- }
-
- protected Connection newHttpConnection(HttpClient httpClient, EndPoint endPoint, HttpDestination destination)
- {
- return new HttpConnectionOverHTTP(httpClient, endPoint, destination);
- }
-
- protected SslConnection newSslConnection(HttpClient httpClient, EndPoint endPoint, SSLEngine engine)
- {
- return new SslConnection(httpClient.getByteBufferPool(), httpClient.getExecutor(), endPoint, engine);
- }
-
- private SslConnection createSslConnection(HttpDestination destination, EndPoint endPoint)
- {
- HttpClient httpClient = destination.getHttpClient();
- SslContextFactory sslContextFactory = httpClient.getSslContextFactory();
- SSLEngine engine = sslContextFactory.newSSLEngine(destination.getHost(), destination.getPort());
- engine.setUseClientMode(true);
-
- SslConnection sslConnection = newSslConnection(httpClient, endPoint, engine);
- sslConnection.setRenegotiationAllowed(sslContextFactory.isRenegotiationAllowed());
- endPoint.setConnection(sslConnection);
- EndPoint appEndPoint = sslConnection.getDecryptedEndPoint();
- Connection connection = newHttpConnection(httpClient, appEndPoint, destination);
- appEndPoint.setConnection(connection);
-
- return sslConnection;
- }
-
- protected class ClientSelectorManager extends SelectorManager
- {
- private final HttpClient client;
-
- protected ClientSelectorManager(HttpClient client, int selectors)
- {
- super(client.getExecutor(), client.getScheduler(), selectors);
- this.client = client;
- }
-
- @Override
- protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key)
- {
- return new SelectChannelEndPoint(channel, selector, key, getScheduler(), client.getIdleTimeout());
- }
-
- @Override
- public Connection newConnection(SocketChannel channel, EndPoint endPoint, Object attachment) throws IOException
- {
- ConnectionCallback callback = (ConnectionCallback)attachment;
- HttpDestination destination = callback.destination;
-
- SslContextFactory sslContextFactory = client.getSslContextFactory();
- if (!destination.isProxied() && HttpScheme.HTTPS.is(destination.getScheme()))
- {
- if (sslContextFactory == null)
- {
- IOException failure = new ConnectException("Missing " + SslContextFactory.class.getSimpleName() + " for " + destination.getScheme() + " requests");
- callback.failed(failure);
- throw failure;
- }
- else
- {
- SslConnection sslConnection = createSslConnection(destination, endPoint);
- callback.succeeded((org.eclipse.jetty.client.api.Connection)sslConnection.getDecryptedEndPoint().getConnection());
- return sslConnection;
- }
- }
- else
- {
- Connection connection = newHttpConnection(client, endPoint, destination);
- callback.succeeded((org.eclipse.jetty.client.api.Connection)connection);
- return connection;
- }
- }
-
- @Override
- protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
- {
- ConnectionCallback callback = (ConnectionCallback)attachment;
- callback.failed(ex);
- }
- }
-
- private class ConnectionCallback implements Promise<org.eclipse.jetty.client.api.Connection>
- {
- private final HttpDestination destination;
- private final Promise<org.eclipse.jetty.client.api.Connection> promise;
-
- private ConnectionCallback(HttpDestination destination, Promise<org.eclipse.jetty.client.api.Connection> promise)
- {
- this.destination = destination;
- this.promise = promise;
- }
-
- @Override
- public void succeeded(org.eclipse.jetty.client.api.Connection result)
- {
- promise.succeeded(result);
- }
-
- @Override
- public void failed(Throwable x)
- {
- promise.failed(x);
- }
+ return tunnel(httpConnection.getEndPoint(), httpConnection.getHttpDestination(), connection);
}
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java
index 9c8e245866..fbbb0bed04 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java
@@ -20,7 +20,6 @@ package org.eclipse.jetty.client.http;
import java.util.concurrent.TimeoutException;
-import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
@@ -41,10 +40,10 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
private boolean closed;
private long idleTimeout;
- public HttpConnectionOverHTTP(HttpClient client, EndPoint endPoint, HttpDestination destination)
+ public HttpConnectionOverHTTP(EndPoint endPoint, HttpDestination destination)
{
- super(endPoint, client.getExecutor(), client.isDispatchIO());
- this.delegate = new Delegate(client, destination);
+ super(endPoint, destination.getHttpClient().getExecutor(), destination.getHttpClient().isDispatchIO());
+ this.delegate = new Delegate(destination);
this.channel = new HttpChannelOverHTTP(this);
}
@@ -146,9 +145,9 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
private class Delegate extends HttpConnection
{
- private Delegate(HttpClient client, HttpDestination destination)
+ private Delegate(HttpDestination destination)
{
- super(client, destination);
+ super(destination);
}
@Override
diff --git a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpClientTransportOverSPDY.java b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpClientTransportOverSPDY.java
index a1f0297238..b2fb3a6e4f 100644
--- a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpClientTransportOverSPDY.java
+++ b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpClientTransportOverSPDY.java
@@ -46,7 +46,7 @@ public class HttpClientTransportOverSPDY implements HttpClientTransport
}
@Override
- public HttpDestination newHttpDestination(HttpClient httpClient, String scheme, String host, int port)
+ public HttpDestination newHttpDestination(String scheme, String host, int port)
{
return new HttpDestinationOverSPDY(httpClient, scheme, host, port);
}
@@ -75,7 +75,7 @@ public class HttpClientTransportOverSPDY implements HttpClientTransport
@Override
public void succeeded(Session session)
{
- Connection result = new HttpConnectionOverSPDY(httpClient, destination, session);
+ Connection result = new HttpConnectionOverSPDY(destination, session);
promise.succeeded(result);
}
diff --git a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpConnectionOverSPDY.java b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpConnectionOverSPDY.java
index ffe025146b..850088cc08 100644
--- a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpConnectionOverSPDY.java
+++ b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpConnectionOverSPDY.java
@@ -19,7 +19,6 @@
package org.eclipse.jetty.spdy.client.http;
import org.eclipse.jetty.client.HttpChannel;
-import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
@@ -31,9 +30,9 @@ public class HttpConnectionOverSPDY extends HttpConnection
{
private final Session session;
- public HttpConnectionOverSPDY(HttpClient client, HttpDestination destination, Session session)
+ public HttpConnectionOverSPDY(HttpDestination destination, Session session)
{
- super(client, destination);
+ super(destination);
this.session = session;
}
diff --git a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpDestinationOverSPDY.java b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpDestinationOverSPDY.java
index 24b2acf4b7..321fb9991c 100644
--- a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpDestinationOverSPDY.java
+++ b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpDestinationOverSPDY.java
@@ -18,118 +18,20 @@
package org.eclipse.jetty.spdy.client.http;
-import java.util.concurrent.atomic.AtomicReference;
-
import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
-import org.eclipse.jetty.client.api.Connection;
-import org.eclipse.jetty.client.api.Request;
-import org.eclipse.jetty.util.Promise;
+import org.eclipse.jetty.client.MultiplexHttpDestination;
-public class HttpDestinationOverSPDY extends HttpDestination implements Promise<Connection>
+public class HttpDestinationOverSPDY extends MultiplexHttpDestination<HttpConnectionOverSPDY>
{
- private final AtomicReference<ConnectState> connect = new AtomicReference<>(ConnectState.DISCONNECTED);
- private HttpConnectionOverSPDY connection;
-
public HttpDestinationOverSPDY(HttpClient client, String scheme, String host, int port)
{
super(client, scheme, host, port);
}
@Override
- protected void send()
- {
- while (true)
- {
- ConnectState current = connect.get();
- switch (current)
- {
- case DISCONNECTED:
- {
- if (!connect.compareAndSet(current, ConnectState.CONNECTING))
- break;
- newConnection(this);
- return;
- }
- case CONNECTING:
- {
- // Waiting to connect, just return
- return;
- }
- case CONNECTED:
- {
- if (process(connection, false))
- break;
- return;
- }
- default:
- {
- throw new IllegalStateException();
- }
- }
- }
- }
-
- @Override
- public void succeeded(Connection result)
- {
- HttpConnectionOverSPDY connection = this.connection = (HttpConnectionOverSPDY)result;
- if (connect.compareAndSet(ConnectState.CONNECTING, ConnectState.CONNECTED))
- {
- process(connection, true);
- }
- else
- {
- connection.close();
- failed(new IllegalStateException());
- }
- }
-
- @Override
- public void failed(Throwable x)
- {
- connect.set(ConnectState.DISCONNECTED);
- }
-
- private boolean process(final HttpConnectionOverSPDY connection, boolean dispatch)
- {
- HttpClient client = getHttpClient();
- final HttpExchange exchange = getHttpExchanges().poll();
- LOG.debug("Processing exchange {} on connection {}", exchange, connection);
- if (exchange == null)
- return false;
-
- final Request request = exchange.getRequest();
- Throwable cause = request.getAbortCause();
- if (cause != null)
- {
- LOG.debug("Abort before processing {}: {}", exchange, cause);
- abort(exchange, cause);
- }
- else
- {
- if (dispatch)
- {
- client.getExecutor().execute(new Runnable()
- {
- @Override
- public void run()
- {
- connection.send(exchange);
- }
- });
- }
- else
- {
- connection.send(exchange);
- }
- }
- return true;
- }
-
- private enum ConnectState
+ protected void send(HttpConnectionOverSPDY connection, HttpExchange exchange)
{
- DISCONNECTED, CONNECTING, CONNECTED
+ connection.send(exchange);
}
}

Back to the top