Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java')
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java268
1 files changed, 36 insertions, 232 deletions
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java
index 9b2caf889d..cf4c2dfe90 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java
@@ -22,13 +22,10 @@ import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.nio.channels.AsynchronousCloseException;
-import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
@@ -48,18 +45,15 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
-public class HttpDestination implements Destination, Closeable, Dumpable
+public abstract class HttpDestination implements Destination, Closeable, Dumpable
{
- private static final Logger LOG = Log.getLogger(HttpDestination.class);
+ protected static final Logger LOG = Log.getLogger(HttpDestination.class);
- private final AtomicInteger connectionCount = new AtomicInteger();
private final HttpClient client;
private final String scheme;
private final String host;
private final Address address;
private final Queue<HttpExchange> exchanges;
- private final BlockingQueue<Connection> idleConnections;
- private final BlockingQueue<Connection> activeConnections;
private final RequestNotifier requestNotifier;
private final ResponseNotifier responseNotifier;
private final Address proxyAddress;
@@ -72,14 +66,7 @@ public class HttpDestination implements Destination, Closeable, Dumpable
this.host = host;
this.address = new Address(host, port);
- int maxRequestsQueued = client.getMaxRequestsQueuedPerDestination();
- int capacity = Math.min(32, maxRequestsQueued);
- this.exchanges = new BlockingArrayQueue<>(capacity, capacity, maxRequestsQueued);
-
- int maxConnections = client.getMaxConnectionsPerDestination();
- capacity = Math.min(8, maxConnections);
- this.idleConnections = new BlockingArrayQueue<>(capacity, capacity, maxConnections);
- this.activeConnections = new BlockingArrayQueue<>(capacity, capacity, maxConnections);
+ this.exchanges = new BlockingArrayQueue<>(client.getMaxRequestsQueuedPerDestination());
this.requestNotifier = new RequestNotifier(client);
this.responseNotifier = new ResponseNotifier(client);
@@ -93,14 +80,14 @@ public class HttpDestination implements Destination, Closeable, Dumpable
hostField = new HttpField(HttpHeader.HOST, host);
}
- protected BlockingQueue<Connection> getIdleConnections()
+ public HttpClient getHttpClient()
{
- return idleConnections;
+ return client;
}
- protected BlockingQueue<Connection> getActiveConnections()
+ public Queue<HttpExchange> getHttpExchanges()
{
- return activeConnections;
+ return exchanges;
}
public RequestNotifier getRequestNotifier()
@@ -157,7 +144,7 @@ public class HttpDestination implements Destination, Closeable, Dumpable
return hostField;
}
- public void send(Request request, List<Response.ResponseListener> listeners)
+ protected void send(Request request, List<Response.ResponseListener> listeners)
{
if (!scheme.equals(request.getScheme()))
throw new IllegalArgumentException("Invalid request scheme " + request.getScheme() + " for destination " + this);
@@ -182,9 +169,7 @@ public class HttpDestination implements Destination, Closeable, Dumpable
{
LOG.debug("Queued {}", request);
requestNotifier.notifyQueued(request);
- Connection connection = acquire();
- if (connection != null)
- process(connection, false);
+ send();
}
}
else
@@ -199,6 +184,8 @@ public class HttpDestination implements Destination, Closeable, Dumpable
}
}
+ protected abstract void send();
+
public void newConnection(Promise<Connection> promise)
{
createConnection(new ProxyPromise(promise));
@@ -209,80 +196,24 @@ public class HttpDestination implements Destination, Closeable, Dumpable
client.newConnection(this, promise);
}
- protected Connection acquire()
+ public boolean remove(HttpExchange exchange)
{
- Connection result = idleConnections.poll();
- if (result != null)
- return result;
-
- final int maxConnections = client.getMaxConnectionsPerDestination();
- while (true)
- {
- int current = connectionCount.get();
- final int next = current + 1;
-
- if (next > maxConnections)
- {
- LOG.debug("Max connections per destination {} exceeded for {}", current, this);
- // Try again the idle connections
- return idleConnections.poll();
- }
-
- if (connectionCount.compareAndSet(current, next))
- {
- LOG.debug("Creating connection {}/{} for {}", next, maxConnections, this);
-
- // This is the promise that is being called when a connection (eventually proxied) succeeds or fails.
- Promise<Connection> promise = new Promise<Connection>()
- {
- @Override
- public void succeeded(Connection connection)
- {
- process(connection, true);
- }
-
- @Override
- public void failed(final Throwable x)
- {
- client.getExecutor().execute(new Runnable()
- {
- @Override
- public void run()
- {
- abort(x);
- }
- });
- }
- };
-
- // Create a new connection, and pass a ProxyPromise to establish a proxy tunnel, if needed.
- // Differently from the case where the connection is created explicitly by applications, here
- // we need to do a bit more logging and keep track of the connection count in case of failures.
- createConnection(new ProxyPromise(promise)
- {
- @Override
- public void succeeded(Connection connection)
- {
- LOG.debug("Created connection {}/{} {} for {}", next, maxConnections, connection, HttpDestination.this);
- super.succeeded(connection);
- }
-
- @Override
- public void failed(Throwable x)
- {
- LOG.debug("Connection failed {} for {}", x, HttpDestination.this);
- connectionCount.decrementAndGet();
- super.failed(x);
- }
- });
+ return exchanges.remove(exchange);
+ }
- // Try again the idle connections
- return idleConnections.poll();
- }
- }
+ public void close()
+ {
+ abort(new AsynchronousCloseException());
+ LOG.debug("Closed {}", this);
}
- private void abort(Throwable cause)
+ /**
+ * Aborts all the {@link HttpExchange}s queued in this destination.
+ *
+ * @param cause the abort cause
+ * @see #abort(HttpExchange, Throwable)
+ */
+ public void abort(Throwable cause)
{
HttpExchange exchange;
while ((exchange = exchanges.poll()) != null)
@@ -290,134 +221,11 @@ public class HttpDestination implements Destination, Closeable, Dumpable
}
/**
- * <p>Processes a new connection making it idle or active depending on whether requests are waiting to be sent.</p>
- * <p>A new connection is created when a request needs to be executed; it is possible that the request that
- * triggered the request creation is executed by another connection that was just released, so the new connection
- * may become idle.</p>
- * <p>If a request is waiting to be executed, it will be dequeued and executed by the new connection.</p>
+ * Aborts the given {@code exchange}, notifies listeners of the failure, and completes the exchange.
*
- * @param connection the new connection
- * @param dispatch whether to dispatch the processing to another thread
+ * @param exchange the {@link HttpExchange} to abort
+ * @param cause the abort cause
*/
- protected void process(Connection connection, boolean dispatch)
- {
- // Ugly cast, but lack of generic reification forces it
- final HttpConnection httpConnection = (HttpConnection)connection;
-
- final HttpExchange exchange = exchanges.poll();
- if (exchange == null)
- {
- LOG.debug("{} idle", httpConnection);
- if (!idleConnections.offer(httpConnection))
- {
- LOG.debug("{} idle overflow");
- httpConnection.close();
- }
- if (!client.isRunning())
- {
- LOG.debug("{} is stopping", client);
- remove(httpConnection);
- httpConnection.close();
- }
- }
- else
- {
- final Request request = exchange.getRequest();
- Throwable cause = request.getAbortCause();
- if (cause != null)
- {
- abort(exchange, cause);
- LOG.debug("Aborted before processing {}: {}", exchange, cause);
- }
- else
- {
- LOG.debug("{} active", httpConnection);
- if (!activeConnections.offer(httpConnection))
- {
- LOG.warn("{} active overflow");
- }
- if (dispatch)
- {
- client.getExecutor().execute(new Runnable()
- {
- @Override
- public void run()
- {
- httpConnection.send(exchange);
- }
- });
- }
- else
- {
- httpConnection.send(exchange);
- }
- }
- }
- }
-
- public void release(Connection connection)
- {
- LOG.debug("{} released", connection);
- if (client.isRunning())
- {
- boolean removed = activeConnections.remove(connection);
- if (removed)
- process(connection, false);
- else
- LOG.debug("{} explicit", connection);
- }
- else
- {
- LOG.debug("{} is stopped", client);
- remove(connection);
- connection.close();
- }
- }
-
- public void remove(Connection connection)
- {
- boolean removed = activeConnections.remove(connection);
- removed |= idleConnections.remove(connection);
- if (removed)
- {
- int open = connectionCount.decrementAndGet();
- LOG.debug("Removed connection {} for {} - open: {}", connection, this, open);
- }
-
- // We need to execute queued requests even if this connection failed.
- // We may create a connection that is not needed, but it will eventually
- // idle timeout, so no worries
- if (!exchanges.isEmpty())
- {
- connection = acquire();
- if (connection != null)
- process(connection, false);
- }
- }
-
- public void close()
- {
- for (Connection connection : idleConnections)
- connection.close();
- idleConnections.clear();
-
- // A bit drastic, but we cannot wait for all requests to complete
- for (Connection connection : activeConnections)
- connection.close();
- activeConnections.clear();
-
- abort(new AsynchronousCloseException());
-
- connectionCount.set(0);
-
- LOG.debug("Closed {}", this);
- }
-
- public boolean remove(HttpExchange exchange)
- {
- return exchanges.remove(exchange);
- }
-
protected void abort(HttpExchange exchange, Throwable cause)
{
Request request = exchange.getRequest();
@@ -431,8 +239,7 @@ public class HttpDestination implements Destination, Closeable, Dumpable
protected void tunnelSucceeded(Connection connection, Promise<Connection> promise)
{
// Wrap the connection with TLS
- Connection tunnel = client.tunnel(connection);
- promise.succeeded(tunnel);
+ promise.succeeded(client.getTransport().tunnel(connection));
}
protected void tunnelFailed(Connection connection, Promise<Connection> promise, Throwable failure)
@@ -451,22 +258,19 @@ public class HttpDestination implements Destination, Closeable, Dumpable
public void dump(Appendable out, String indent) throws IOException
{
ContainerLifeCycle.dumpObject(out, this + " - requests queued: " + exchanges.size());
- List<String> connections = new ArrayList<>();
- for (Connection connection : idleConnections)
- connections.add(connection + " - IDLE");
- for (Connection connection : activeConnections)
- connections.add(connection + " - ACTIVE");
- ContainerLifeCycle.dump(out, indent, connections);
+ }
+
+ public String asString()
+ {
+ return client.address(getScheme(), getHost(), getPort());
}
@Override
public String toString()
{
- return String.format("%s(%s://%s:%d)%s",
+ return String.format("%s(%s)%s",
HttpDestination.class.getSimpleName(),
- getScheme(),
- getHost(),
- getPort(),
+ asString(),
proxyAddress == null ? "" : " via " + proxyAddress.getHost() + ":" + proxyAddress.getPort());
}

Back to the top