diff options
Diffstat (limited to 'jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java')
-rw-r--r-- | jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java | 268 |
1 files changed, 36 insertions, 232 deletions
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java index 9b2caf889d..cf4c2dfe90 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java @@ -22,13 +22,10 @@ import java.io.Closeable; import java.io.IOException; import java.net.URI; import java.nio.channels.AsynchronousCloseException; -import java.util.ArrayList; import java.util.List; import java.util.Queue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Destination; @@ -48,18 +45,15 @@ import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.ssl.SslContextFactory; -public class HttpDestination implements Destination, Closeable, Dumpable +public abstract class HttpDestination implements Destination, Closeable, Dumpable { - private static final Logger LOG = Log.getLogger(HttpDestination.class); + protected static final Logger LOG = Log.getLogger(HttpDestination.class); - private final AtomicInteger connectionCount = new AtomicInteger(); private final HttpClient client; private final String scheme; private final String host; private final Address address; private final Queue<HttpExchange> exchanges; - private final BlockingQueue<Connection> idleConnections; - private final BlockingQueue<Connection> activeConnections; private final RequestNotifier requestNotifier; private final ResponseNotifier responseNotifier; private final Address proxyAddress; @@ -72,14 +66,7 @@ public class HttpDestination implements Destination, Closeable, Dumpable this.host = host; this.address = new Address(host, port); - int maxRequestsQueued = client.getMaxRequestsQueuedPerDestination(); - int capacity = Math.min(32, maxRequestsQueued); - this.exchanges = new BlockingArrayQueue<>(capacity, capacity, maxRequestsQueued); - - int maxConnections = client.getMaxConnectionsPerDestination(); - capacity = Math.min(8, maxConnections); - this.idleConnections = new BlockingArrayQueue<>(capacity, capacity, maxConnections); - this.activeConnections = new BlockingArrayQueue<>(capacity, capacity, maxConnections); + this.exchanges = new BlockingArrayQueue<>(client.getMaxRequestsQueuedPerDestination()); this.requestNotifier = new RequestNotifier(client); this.responseNotifier = new ResponseNotifier(client); @@ -93,14 +80,14 @@ public class HttpDestination implements Destination, Closeable, Dumpable hostField = new HttpField(HttpHeader.HOST, host); } - protected BlockingQueue<Connection> getIdleConnections() + public HttpClient getHttpClient() { - return idleConnections; + return client; } - protected BlockingQueue<Connection> getActiveConnections() + public Queue<HttpExchange> getHttpExchanges() { - return activeConnections; + return exchanges; } public RequestNotifier getRequestNotifier() @@ -157,7 +144,7 @@ public class HttpDestination implements Destination, Closeable, Dumpable return hostField; } - public void send(Request request, List<Response.ResponseListener> listeners) + protected void send(Request request, List<Response.ResponseListener> listeners) { if (!scheme.equals(request.getScheme())) throw new IllegalArgumentException("Invalid request scheme " + request.getScheme() + " for destination " + this); @@ -182,9 +169,7 @@ public class HttpDestination implements Destination, Closeable, Dumpable { LOG.debug("Queued {}", request); requestNotifier.notifyQueued(request); - Connection connection = acquire(); - if (connection != null) - process(connection, false); + send(); } } else @@ -199,6 +184,8 @@ public class HttpDestination implements Destination, Closeable, Dumpable } } + protected abstract void send(); + public void newConnection(Promise<Connection> promise) { createConnection(new ProxyPromise(promise)); @@ -209,80 +196,24 @@ public class HttpDestination implements Destination, Closeable, Dumpable client.newConnection(this, promise); } - protected Connection acquire() + public boolean remove(HttpExchange exchange) { - Connection result = idleConnections.poll(); - if (result != null) - return result; - - final int maxConnections = client.getMaxConnectionsPerDestination(); - while (true) - { - int current = connectionCount.get(); - final int next = current + 1; - - if (next > maxConnections) - { - LOG.debug("Max connections per destination {} exceeded for {}", current, this); - // Try again the idle connections - return idleConnections.poll(); - } - - if (connectionCount.compareAndSet(current, next)) - { - LOG.debug("Creating connection {}/{} for {}", next, maxConnections, this); - - // This is the promise that is being called when a connection (eventually proxied) succeeds or fails. - Promise<Connection> promise = new Promise<Connection>() - { - @Override - public void succeeded(Connection connection) - { - process(connection, true); - } - - @Override - public void failed(final Throwable x) - { - client.getExecutor().execute(new Runnable() - { - @Override - public void run() - { - abort(x); - } - }); - } - }; - - // Create a new connection, and pass a ProxyPromise to establish a proxy tunnel, if needed. - // Differently from the case where the connection is created explicitly by applications, here - // we need to do a bit more logging and keep track of the connection count in case of failures. - createConnection(new ProxyPromise(promise) - { - @Override - public void succeeded(Connection connection) - { - LOG.debug("Created connection {}/{} {} for {}", next, maxConnections, connection, HttpDestination.this); - super.succeeded(connection); - } - - @Override - public void failed(Throwable x) - { - LOG.debug("Connection failed {} for {}", x, HttpDestination.this); - connectionCount.decrementAndGet(); - super.failed(x); - } - }); + return exchanges.remove(exchange); + } - // Try again the idle connections - return idleConnections.poll(); - } - } + public void close() + { + abort(new AsynchronousCloseException()); + LOG.debug("Closed {}", this); } - private void abort(Throwable cause) + /** + * Aborts all the {@link HttpExchange}s queued in this destination. + * + * @param cause the abort cause + * @see #abort(HttpExchange, Throwable) + */ + public void abort(Throwable cause) { HttpExchange exchange; while ((exchange = exchanges.poll()) != null) @@ -290,134 +221,11 @@ public class HttpDestination implements Destination, Closeable, Dumpable } /** - * <p>Processes a new connection making it idle or active depending on whether requests are waiting to be sent.</p> - * <p>A new connection is created when a request needs to be executed; it is possible that the request that - * triggered the request creation is executed by another connection that was just released, so the new connection - * may become idle.</p> - * <p>If a request is waiting to be executed, it will be dequeued and executed by the new connection.</p> + * Aborts the given {@code exchange}, notifies listeners of the failure, and completes the exchange. * - * @param connection the new connection - * @param dispatch whether to dispatch the processing to another thread + * @param exchange the {@link HttpExchange} to abort + * @param cause the abort cause */ - protected void process(Connection connection, boolean dispatch) - { - // Ugly cast, but lack of generic reification forces it - final HttpConnection httpConnection = (HttpConnection)connection; - - final HttpExchange exchange = exchanges.poll(); - if (exchange == null) - { - LOG.debug("{} idle", httpConnection); - if (!idleConnections.offer(httpConnection)) - { - LOG.debug("{} idle overflow"); - httpConnection.close(); - } - if (!client.isRunning()) - { - LOG.debug("{} is stopping", client); - remove(httpConnection); - httpConnection.close(); - } - } - else - { - final Request request = exchange.getRequest(); - Throwable cause = request.getAbortCause(); - if (cause != null) - { - abort(exchange, cause); - LOG.debug("Aborted before processing {}: {}", exchange, cause); - } - else - { - LOG.debug("{} active", httpConnection); - if (!activeConnections.offer(httpConnection)) - { - LOG.warn("{} active overflow"); - } - if (dispatch) - { - client.getExecutor().execute(new Runnable() - { - @Override - public void run() - { - httpConnection.send(exchange); - } - }); - } - else - { - httpConnection.send(exchange); - } - } - } - } - - public void release(Connection connection) - { - LOG.debug("{} released", connection); - if (client.isRunning()) - { - boolean removed = activeConnections.remove(connection); - if (removed) - process(connection, false); - else - LOG.debug("{} explicit", connection); - } - else - { - LOG.debug("{} is stopped", client); - remove(connection); - connection.close(); - } - } - - public void remove(Connection connection) - { - boolean removed = activeConnections.remove(connection); - removed |= idleConnections.remove(connection); - if (removed) - { - int open = connectionCount.decrementAndGet(); - LOG.debug("Removed connection {} for {} - open: {}", connection, this, open); - } - - // We need to execute queued requests even if this connection failed. - // We may create a connection that is not needed, but it will eventually - // idle timeout, so no worries - if (!exchanges.isEmpty()) - { - connection = acquire(); - if (connection != null) - process(connection, false); - } - } - - public void close() - { - for (Connection connection : idleConnections) - connection.close(); - idleConnections.clear(); - - // A bit drastic, but we cannot wait for all requests to complete - for (Connection connection : activeConnections) - connection.close(); - activeConnections.clear(); - - abort(new AsynchronousCloseException()); - - connectionCount.set(0); - - LOG.debug("Closed {}", this); - } - - public boolean remove(HttpExchange exchange) - { - return exchanges.remove(exchange); - } - protected void abort(HttpExchange exchange, Throwable cause) { Request request = exchange.getRequest(); @@ -431,8 +239,7 @@ public class HttpDestination implements Destination, Closeable, Dumpable protected void tunnelSucceeded(Connection connection, Promise<Connection> promise) { // Wrap the connection with TLS - Connection tunnel = client.tunnel(connection); - promise.succeeded(tunnel); + promise.succeeded(client.getTransport().tunnel(connection)); } protected void tunnelFailed(Connection connection, Promise<Connection> promise, Throwable failure) @@ -451,22 +258,19 @@ public class HttpDestination implements Destination, Closeable, Dumpable public void dump(Appendable out, String indent) throws IOException { ContainerLifeCycle.dumpObject(out, this + " - requests queued: " + exchanges.size()); - List<String> connections = new ArrayList<>(); - for (Connection connection : idleConnections) - connections.add(connection + " - IDLE"); - for (Connection connection : activeConnections) - connections.add(connection + " - ACTIVE"); - ContainerLifeCycle.dump(out, indent, connections); + } + + public String asString() + { + return client.address(getScheme(), getHost(), getPort()); } @Override public String toString() { - return String.format("%s(%s://%s:%d)%s", + return String.format("%s(%s)%s", HttpDestination.class.getSimpleName(), - getScheme(), - getHost(), - getPort(), + asString(), proxyAddress == null ? "" : " via " + proxyAddress.getHost() + ":" + proxyAddress.getPort()); } |