diff options
author | Simone Bordet | 2015-10-30 14:33:12 +0000 |
---|---|---|
committer | Simone Bordet | 2015-10-30 14:33:12 +0000 |
commit | 0bd1e0ad7d2563b7f756ea56340903a02d5bca5c (patch) | |
tree | 7a978a161a095a116962b4f1cc4f272fc61dc5be /jetty-client | |
parent | 0b95a9e23e25af3d1606785613d8035e29cbf9e8 (diff) | |
download | org.eclipse.jetty.project-0bd1e0ad7d2563b7f756ea56340903a02d5bca5c.tar.gz org.eclipse.jetty.project-0bd1e0ad7d2563b7f756ea56340903a02d5bca5c.tar.xz org.eclipse.jetty.project-0bd1e0ad7d2563b7f756ea56340903a02d5bca5c.zip |
481116 - Introduce connection pooling also for HTTP/2 transport.
Implemented connection pooling for multiplexed transports.
Reworked the ConnectionPool code and its relationship with
HttpDestination.
Diffstat (limited to 'jetty-client')
22 files changed, 931 insertions, 1193 deletions
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java new file mode 100644 index 0000000000..d3b13b668d --- /dev/null +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java @@ -0,0 +1,199 @@ +// +// ======================================================================== +// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client; + +import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.eclipse.jetty.client.api.Connection; +import org.eclipse.jetty.client.api.Destination; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.annotation.ManagedAttribute; +import org.eclipse.jetty.util.component.ContainerLifeCycle; +import org.eclipse.jetty.util.component.Dumpable; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; + +public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable +{ + private static final Logger LOG = Log.getLogger(AbstractConnectionPool.class); + + private final AtomicBoolean closed = new AtomicBoolean(); + private final AtomicInteger connectionCount = new AtomicInteger(); + private final Destination destination; + private final int maxConnections; + private final Callback requester; + + protected AbstractConnectionPool(Destination destination, int maxConnections, Callback requester) + { + this.destination = destination; + this.maxConnections = maxConnections; + this.requester = requester; + } + + @ManagedAttribute(value = "The max number of connections", readonly = true) + public int getMaxConnectionCount() + { + return maxConnections; + } + + @ManagedAttribute(value = "The number of connections", readonly = true) + public int getConnectionCount() + { + return connectionCount.get(); + } + + @Override + public boolean isEmpty() + { + return connectionCount.get() == 0; + } + + @Override + public boolean isClosed() + { + return closed.get(); + } + + @Override + public Connection acquire() + { + Connection connection = activate(); + if (connection == null) + connection = tryCreate(); + return connection; + } + + private Connection tryCreate() + { + while (true) + { + int current = getConnectionCount(); + final int next = current + 1; + + if (next > maxConnections) + { + if (LOG.isDebugEnabled()) + LOG.debug("Max connections {}/{} reached", current, maxConnections); + // Try again the idle connections + return activate(); + } + + if (connectionCount.compareAndSet(current, next)) + { + if (LOG.isDebugEnabled()) + LOG.debug("Connection {}/{} creation", next, maxConnections); + + destination.newConnection(new Promise<Connection>() + { + @Override + public void succeeded(Connection connection) + { + if (LOG.isDebugEnabled()) + LOG.debug("Connection {}/{} creation succeeded {}", next, maxConnections, connection); + onCreated(connection); + proceed(); + } + + @Override + public void failed(Throwable x) + { + if (LOG.isDebugEnabled()) + LOG.debug("Connection " + next + "/" + maxConnections + " creation failed", x); + connectionCount.decrementAndGet(); + requester.failed(x); + } + }); + + // Try again the idle connections + return activate(); + } + } + } + + protected abstract void onCreated(Connection connection); + + protected void proceed() + { + requester.succeeded(); + } + + protected abstract Connection activate(); + + protected Connection active(Connection connection) + { + if (LOG.isDebugEnabled()) + LOG.debug("Connection active {}", connection); + acquired(connection); + return connection; + } + + protected void acquired(Connection connection) + { + } + + protected boolean idle(Connection connection, boolean close) + { + if (close) + { + if (LOG.isDebugEnabled()) + LOG.debug("Connection idle close {}", connection); + return false; + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("Connection idle {}", connection); + return true; + } + } + + protected void released(Connection connection) + { + } + + protected void removed(Connection connection) + { + int pooled = connectionCount.decrementAndGet(); + if (LOG.isDebugEnabled()) + LOG.debug("Connection removed {} - pooled: {}", connection, pooled); + } + + @Override + public void close() + { + if (closed.compareAndSet(false, true)) + { + connectionCount.set(0); + } + } + + protected void close(Collection<Connection> connections) + { + connections.forEach(Connection::close); + } + + @Override + public String dump() + { + return ContainerLifeCycle.dump(this); + } +} diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java index fc95421a0b..029a388d33 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java @@ -18,17 +18,24 @@ package org.eclipse.jetty.client; -import org.eclipse.jetty.client.api.Destination; -import org.eclipse.jetty.util.Callback; - -/** - * @deprecated use {@link DuplexConnectionPool} instead - */ -@Deprecated -public class ConnectionPool extends DuplexConnectionPool +import java.io.Closeable; + +import org.eclipse.jetty.client.api.Connection; + +public interface ConnectionPool extends Closeable { - public ConnectionPool(Destination destination, int maxConnections, Callback requester) - { - super(destination, maxConnections, requester); - } + boolean isActive(Connection connection); + + boolean isEmpty(); + + boolean isClosed(); + + Connection acquire(); + + boolean release(Connection connection); + + boolean remove(Connection connection); + + @Override + void close(); } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java index dcf74709ce..c22966c372 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java @@ -18,21 +18,20 @@ package org.eclipse.jetty.client; -import java.io.Closeable; import java.io.IOException; +import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collection; import java.util.Deque; +import java.util.HashSet; import java.util.List; import java.util.Queue; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.Set; import java.util.concurrent.locks.ReentrantLock; import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Destination; -import org.eclipse.jetty.util.BlockingArrayQueue; import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.ContainerLifeCycle; @@ -42,31 +41,29 @@ import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Sweeper; @ManagedObject("The connection pool") -public class DuplexConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable +public class DuplexConnectionPool extends AbstractConnectionPool implements Dumpable, Sweeper.Sweepable { private static final Logger LOG = Log.getLogger(DuplexConnectionPool.class); - private final AtomicInteger connectionCount = new AtomicInteger(); private final ReentrantLock lock = new ReentrantLock(); - private final Destination destination; - private final int maxConnections; - private final Callback requester; private final Deque<Connection> idleConnections; - private final Queue<Connection> activeConnections; + private final Set<Connection> activeConnections; public DuplexConnectionPool(Destination destination, int maxConnections, Callback requester) { - this.destination = destination; - this.maxConnections = maxConnections; - this.requester = requester; - this.idleConnections = new LinkedBlockingDeque<>(maxConnections); - this.activeConnections = new BlockingArrayQueue<>(maxConnections); + super(destination, maxConnections, requester); + this.idleConnections = new ArrayDeque<>(maxConnections); + this.activeConnections = new HashSet<>(maxConnections); } - @ManagedAttribute(value = "The number of connections", readonly = true) - public int getConnectionCount() + protected void lock() { - return connectionCount.get(); + lock.lock(); + } + + protected void unlock() + { + lock.unlock(); } @ManagedAttribute(value = "The number of idle connections", readonly = true) @@ -102,139 +99,76 @@ public class DuplexConnectionPool implements Closeable, Dumpable, Sweeper.Sweepa return idleConnections; } - public Queue<Connection> getActiveConnections() + public Collection<Connection> getActiveConnections() { return activeConnections; } - public Connection acquire() - { - Connection connection = activateIdle(); - if (connection == null) - connection = tryCreate(); - return connection; - } - - private Connection tryCreate() + @Override + public boolean isActive(Connection connection) { - while (true) + lock(); + try { - int current = getConnectionCount(); - final int next = current + 1; - - if (next > maxConnections) - { - if (LOG.isDebugEnabled()) - LOG.debug("Max connections {}/{} reached", current, maxConnections); - // Try again the idle connections - return activateIdle(); - } - - if (connectionCount.compareAndSet(current, next)) - { - if (LOG.isDebugEnabled()) - LOG.debug("Connection {}/{} creation", next, maxConnections); - - destination.newConnection(new Promise<Connection>() - { - @Override - public void succeeded(Connection connection) - { - if (LOG.isDebugEnabled()) - LOG.debug("Connection {}/{} creation succeeded {}", next, maxConnections, connection); - - idleCreated(connection); - - proceed(); - } - - @Override - public void failed(Throwable x) - { - if (LOG.isDebugEnabled()) - LOG.debug("Connection " + next + "/" + maxConnections + " creation failed", x); - - connectionCount.decrementAndGet(); - - requester.failed(x); - } - }); - - // Try again the idle connections - return activateIdle(); - } + return activeConnections.contains(connection); + } + finally + { + unlock(); } } - protected void proceed() - { - requester.succeeded(); - } - - protected void idleCreated(Connection connection) + @Override + protected void onCreated(Connection connection) { - boolean idle; lock(); try { // Use "cold" new connections as last. - idle = idleConnections.offerLast(connection); + idleConnections.offer(connection); } finally { unlock(); } - idle(connection, idle); + idle(connection, false); } - private Connection activateIdle() + @Override + protected Connection activate() { - boolean acquired; Connection connection; lock(); try { - connection = idleConnections.pollFirst(); + connection = idleConnections.poll(); if (connection == null) return null; - acquired = activeConnections.offer(connection); + activeConnections.add(connection); } finally { unlock(); } - if (acquired) - { - if (LOG.isDebugEnabled()) - LOG.debug("Connection active {}", connection); - acquired(connection); - return connection; - } - else - { - if (LOG.isDebugEnabled()) - LOG.debug("Connection active overflow {}", connection); - connection.close(); - return null; - } - } - - protected void acquired(Connection connection) - { + return active(connection); } public boolean release(Connection connection) { - boolean idle; + boolean closed = isClosed(); lock(); try { if (!activeConnections.remove(connection)) return false; - // Make sure we use "hot" connections first. - idle = offerIdle(connection); + + if (!closed) + { + // Make sure we use "hot" connections first. + deactivate(connection); + } } finally { @@ -242,35 +176,14 @@ public class DuplexConnectionPool implements Closeable, Dumpable, Sweeper.Sweepa } released(connection); - return idle(connection, idle); + return idle(connection, closed); } - protected boolean offerIdle(Connection connection) + protected boolean deactivate(Connection connection) { return idleConnections.offerFirst(connection); } - protected boolean idle(Connection connection, boolean idle) - { - if (idle) - { - if (LOG.isDebugEnabled()) - LOG.debug("Connection idle {}", connection); - return true; - } - else - { - if (LOG.isDebugEnabled()) - LOG.debug("Connection idle overflow {}", connection); - connection.close(); - return false; - } - } - - protected void released(Connection connection) - { - } - public boolean remove(Connection connection) { return remove(connection, false); @@ -295,55 +208,21 @@ public class DuplexConnectionPool implements Closeable, Dumpable, Sweeper.Sweepa released(connection); boolean removed = activeRemoved || idleRemoved || force; if (removed) - { - int pooled = connectionCount.decrementAndGet(); - if (LOG.isDebugEnabled()) - LOG.debug("Connection removed {} - pooled: {}", connection, pooled); - } + removed(connection); return removed; } - public boolean isActive(Connection connection) - { - lock(); - try - { - return activeConnections.contains(connection); - } - finally - { - unlock(); - } - } - - public boolean isIdle(Connection connection) - { - lock(); - try - { - return idleConnections.contains(connection); - } - finally - { - unlock(); - } - } - - public boolean isEmpty() - { - return connectionCount.get() == 0; - } - public void close() { - List<Connection> idles = new ArrayList<>(); - List<Connection> actives = new ArrayList<>(); + super.close(); + + List<Connection> connections = new ArrayList<>(); lock(); try { - idles.addAll(idleConnections); + connections.addAll(idleConnections); idleConnections.clear(); - actives.addAll(activeConnections); + connections.addAll(activeConnections); activeConnections.clear(); } finally @@ -351,32 +230,18 @@ public class DuplexConnectionPool implements Closeable, Dumpable, Sweeper.Sweepa unlock(); } - connectionCount.set(0); - - for (Connection connection : idles) - connection.close(); - - // A bit drastic, but we cannot wait for all requests to complete - for (Connection connection : actives) - connection.close(); - } - - @Override - public String dump() - { - return ContainerLifeCycle.dump(this); + close(connections); } @Override public void dump(Appendable out, String indent) throws IOException { - List<Connection> actives = new ArrayList<>(); - List<Connection> idles = new ArrayList<>(); + List<Connection> connections = new ArrayList<>(); lock(); try { - actives.addAll(activeConnections); - idles.addAll(idleConnections); + connections.addAll(activeConnections); + connections.addAll(idleConnections); } finally { @@ -384,7 +249,7 @@ public class DuplexConnectionPool implements Closeable, Dumpable, Sweeper.Sweepa } ContainerLifeCycle.dumpObject(out, this); - ContainerLifeCycle.dump(out, indent, actives, idles); + ContainerLifeCycle.dump(out, indent, connections); } @Override @@ -422,16 +287,6 @@ public class DuplexConnectionPool implements Closeable, Dumpable, Sweeper.Sweepa return false; } - protected void lock() - { - lock.lock(); - } - - protected void unlock() - { - lock.unlock(); - } - @Override public String toString() { @@ -450,8 +305,8 @@ public class DuplexConnectionPool implements Closeable, Dumpable, Sweeper.Sweepa return String.format("%s[c=%d/%d,a=%d,i=%d]", getClass().getSimpleName(), - connectionCount.get(), - maxConnections, + getConnectionCount(), + getMaxConnectionCount(), activeSize, idleSize); } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java index e59be2f026..58eaee5ef5 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java @@ -22,18 +22,21 @@ import java.io.Closeable; import java.io.IOException; import java.nio.channels.AsynchronousCloseException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Queue; import java.util.concurrent.RejectedExecutionException; import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Destination; +import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.io.ClientConnectionFactory; import org.eclipse.jetty.io.ssl.SslClientConnectionFactory; import org.eclipse.jetty.util.BlockingArrayQueue; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; @@ -41,9 +44,10 @@ import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Sweeper; @ManagedObject -public abstract class HttpDestination extends ContainerLifeCycle implements Destination, Closeable, Dumpable +public abstract class HttpDestination extends ContainerLifeCycle implements Destination, Closeable, Callback, Dumpable { protected static final Logger LOG = Log.getLogger(HttpDestination.class); @@ -55,6 +59,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest private final ProxyConfiguration.Proxy proxy; private final ClientConnectionFactory connectionFactory; private final HttpField hostField; + private ConnectionPool connectionPool; public HttpDestination(HttpClient client, Origin origin) { @@ -86,6 +91,29 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest hostField = new HttpField(HttpHeader.HOST, host); } + @Override + protected void doStart() throws Exception + { + this.connectionPool = newConnectionPool(client); + addBean(connectionPool); + super.doStart(); + Sweeper sweeper = client.getBean(Sweeper.class); + if (sweeper != null && connectionPool instanceof Sweeper.Sweepable) + sweeper.offer((Sweeper.Sweepable)connectionPool); + } + + @Override + protected void doStop() throws Exception + { + Sweeper sweeper = client.getBean(Sweeper.class); + if (sweeper != null && connectionPool instanceof Sweeper.Sweepable) + sweeper.remove((Sweeper.Sweepable)connectionPool); + super.doStop(); + removeBean(connectionPool); + } + + protected abstract ConnectionPool newConnectionPool(HttpClient client); + protected Queue<HttpExchange> newExchangeQueue(HttpClient client) { return new BlockingArrayQueue<>(client.getMaxRequestsQueuedPerDestination()); @@ -175,6 +203,24 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest return hostField; } + @ManagedAttribute(value = "The connection pool", readonly = true) + public ConnectionPool getConnectionPool() + { + return connectionPool; + } + + @Override + public void succeeded() + { + send(); + } + + @Override + public void failed(Throwable x) + { + abort(x); + } + protected void send(HttpRequest request, List<Response.ResponseListener> listeners) { if (!getScheme().equalsIgnoreCase(request.getScheme())) @@ -221,7 +267,59 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest return queue.offer(exchange); } - public abstract void send(); + public void send() + { + if (getHttpExchanges().isEmpty()) + return; + process(); + } + + private void process() + { + Connection connection = connectionPool.acquire(); + if (connection != null) + process(connection); + } + + public void process(final Connection connection) + { + HttpClient client = getHttpClient(); + final HttpExchange exchange = getHttpExchanges().poll(); + if (LOG.isDebugEnabled()) + LOG.debug("Processing exchange {} on {} of {}", exchange, connection, this); + if (exchange == null) + { + if (!connectionPool.release(connection)) + connection.close(); + + if (!client.isRunning()) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} is stopping", client); + connection.close(); + } + } + else + { + final Request request = exchange.getRequest(); + Throwable cause = request.getAbortCause(); + if (cause != null) + { + if (LOG.isDebugEnabled()) + LOG.debug("Aborted before processing {}: {}", exchange, cause); + // It may happen that the request is aborted before the exchange + // is created. Aborting the exchange a second time will result in + // a no-operation, so we just abort here to cover that edge case. + exchange.abort(cause); + } + else + { + send(connection, exchange); + } + } + } + + protected abstract void send(Connection connection, HttpExchange exchange); public void newConnection(Promise<Connection> promise) { @@ -243,14 +341,67 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest abort(new AsynchronousCloseException()); if (LOG.isDebugEnabled()) LOG.debug("Closed {}", this); + connectionPool.close(); } public void release(Connection connection) { + if (LOG.isDebugEnabled()) + LOG.debug("Released {}", connection); + HttpClient client = getHttpClient(); + if (client.isRunning()) + { + if (connectionPool.isActive(connection)) + { + if (connectionPool.release(connection)) + send(); + else + connection.close(); + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("Released explicit {}", connection); + } + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("{} is stopped", client); + connection.close(); + } + } + + public boolean remove(Connection connection) + { + return connectionPool.remove(connection); } public void close(Connection connection) { + boolean removed = remove(connection); + + if (getHttpExchanges().isEmpty()) + { + if (getHttpClient().isRemoveIdleDestinations() && connectionPool.isEmpty()) + { + // There is a race condition between this thread removing the destination + // and another thread queueing a request to this same destination. + // If this destination is removed, but the request queued, a new connection + // will be opened, the exchange will be executed and eventually the connection + // will idle timeout and be closed. Meanwhile a new destination will be created + // in HttpClient and will be used for other requests. + getHttpClient().removeDestination(this); + } + } + else + { + // We need to execute queued requests even if this connection failed. + // We may create a connection that is not needed, but it will eventually + // idle timeout, so no worries. + if (removed) + process(); + } } /** @@ -278,6 +429,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest public void dump(Appendable out, String indent) throws IOException { ContainerLifeCycle.dumpObject(out, toString()); + ContainerLifeCycle.dump(out, indent, Collections.singletonList(connectionPool)); } public String asString() @@ -288,11 +440,12 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest @Override public String toString() { - return String.format("%s[%s]%x%s,queue=%d", + return String.format("%s[%s]%x%s,queue=%d,pool=%s", HttpDestination.class.getSimpleName(), asString(), hashCode(), proxy == null ? "" : "(via " + proxy + ")", - exchanges.size()); + exchanges.size(), + connectionPool); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/LeakTrackingConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/LeakTrackingConnectionPool.java index 7762af09f9..f5d3b98580 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/LeakTrackingConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/LeakTrackingConnectionPool.java @@ -25,7 +25,7 @@ import org.eclipse.jetty.util.LeakDetector; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -public class LeakTrackingConnectionPool extends ConnectionPool +public class LeakTrackingConnectionPool extends DuplexConnectionPool { private static final Logger LOG = Log.getLogger(LeakTrackingConnectionPool.class); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java new file mode 100644 index 0000000000..88561bb75b --- /dev/null +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java @@ -0,0 +1,302 @@ +// +// ======================================================================== +// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + +import org.eclipse.jetty.client.api.Connection; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.component.ContainerLifeCycle; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; + +public class MultiplexConnectionPool extends AbstractConnectionPool +{ + private static final Logger LOG = Log.getLogger(MultiplexConnectionPool.class); + + private final ReentrantLock lock = new ReentrantLock(); + private final int maxMultiplexed; + private final Deque<Holder> idleConnections; + private final Map<Connection, Holder> muxedConnections; + private final Map<Connection, Holder> busyConnections; + + public MultiplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplexed) + { + super(destination, maxConnections, requester); + this.maxMultiplexed = maxMultiplexed; + this.idleConnections = new ArrayDeque<>(maxConnections); + this.muxedConnections = new HashMap<>(maxConnections); + this.busyConnections = new HashMap<>(maxConnections); + } + + protected void lock() + { + lock.lock(); + } + + protected void unlock() + { + lock.unlock(); + } + + @Override + public boolean isActive(Connection connection) + { + lock(); + try + { + if (muxedConnections.containsKey(connection)) + return true; + if (busyConnections.containsKey(connection)) + return true; + return false; + } + finally + { + unlock(); + } + } + + @Override + protected void onCreated(Connection connection) + { + lock(); + try + { + // Use "cold" connections as last. + idleConnections.offer(new Holder(connection)); + } + finally + { + unlock(); + } + + idle(connection, false); + } + + @Override + protected Connection activate() + { + Holder holder; + lock(); + try + { + while (true) + { + if (muxedConnections.isEmpty()) + { + holder = idleConnections.poll(); + if (holder == null) + return null; + muxedConnections.put(holder.connection, holder); + } + else + { + holder = muxedConnections.values().iterator().next(); + } + + if (holder.count < maxMultiplexed) + { + ++holder.count; + break; + } + else + { + muxedConnections.remove(holder.connection); + busyConnections.put(holder.connection, holder); + } + } + } + finally + { + unlock(); + } + + return active(holder.connection); + } + + @Override + public boolean release(Connection connection) + { + boolean closed = isClosed(); + boolean idle = false; + Holder holder; + lock(); + try + { + holder = muxedConnections.get(connection); + if (holder != null) + { + int count = --holder.count; + if (count == 0) + { + muxedConnections.remove(connection); + if (!closed) + { + idleConnections.offerFirst(holder); + idle = true; + } + } + } + else + { + holder = busyConnections.remove(connection); + if (holder != null) + { + int count = --holder.count; + if (!closed) + { + if (count == 0) + { + idleConnections.offerFirst(holder); + idle = true; + } + else + { + muxedConnections.put(connection, holder); + } + } + } + } + } + finally + { + unlock(); + } + + if (holder == null) + return false; + + released(connection); + if (idle || closed) + return idle(connection, closed); + return true; + } + + @Override + public boolean remove(Connection connection) + { + return remove(connection, false); + } + + protected boolean remove(Connection connection, boolean force) + { + boolean activeRemoved = true; + boolean idleRemoved = false; + lock(); + try + { + Holder holder = muxedConnections.remove(connection); + if (holder == null) + holder = busyConnections.remove(connection); + if (holder == null) + { + activeRemoved = false; + for (Iterator<Holder> iterator = idleConnections.iterator(); iterator.hasNext();) + { + holder = iterator.next(); + if (holder.connection == connection) + { + idleRemoved = true; + iterator.remove(); + break; + } + } + } + } + finally + { + unlock(); + } + + if (activeRemoved || force) + released(connection); + boolean removed = activeRemoved || idleRemoved || force; + if (removed) + removed(connection); + return removed; + } + + @Override + public void close() + { + super.close(); + + List<Connection> connections; + lock(); + try + { + connections = idleConnections.stream().map(holder -> holder.connection).collect(Collectors.toList()); + connections.addAll(muxedConnections.keySet()); + connections.addAll(busyConnections.keySet()); + } + finally + { + unlock(); + } + + close(connections); + } + + @Override + public void dump(Appendable out, String indent) throws IOException + { + List<Holder> connections = new ArrayList<>(); + lock(); + try + { + connections.addAll(busyConnections.values()); + connections.addAll(muxedConnections.values()); + connections.addAll(idleConnections); + } + finally + { + unlock(); + } + + ContainerLifeCycle.dumpObject(out, this); + ContainerLifeCycle.dump(out, indent, connections); + } + + private static class Holder + { + private final Connection connection; + private int count; + + private Holder(Connection connection) + { + this.connection = connection; + } + + @Override + public String toString() + { + return String.format("%s[%d]", connection, count); + } + } +} diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java index a50131f1ed..a23fb34a82 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java @@ -18,136 +18,16 @@ package org.eclipse.jetty.client; -import java.util.concurrent.atomic.AtomicReference; - -import org.eclipse.jetty.client.api.Connection; -import org.eclipse.jetty.client.api.Request; -import org.eclipse.jetty.util.Promise; - -public abstract class MultiplexHttpDestination<C extends Connection> extends HttpDestination implements Promise<Connection> +public abstract class MultiplexHttpDestination extends HttpDestination { - private final AtomicReference<ConnectState> connect = new AtomicReference<>(ConnectState.DISCONNECTED); - private C connection; - protected MultiplexHttpDestination(HttpClient client, Origin origin) { super(client, origin); } - @Override - public void send() - { - while (true) - { - ConnectState current = connect.get(); - switch (current) - { - case DISCONNECTED: - { - if (!connect.compareAndSet(current, ConnectState.CONNECTING)) - break; - newConnection(this); - return; - } - case CONNECTING: - { - // Waiting to connect, just return - return; - } - case CONNECTED: - { - if (process(connection)) - break; - return; - } - default: - { - abort(new IllegalStateException("Invalid connection state " + current)); - return; - } - } - } - } - - @Override - @SuppressWarnings("unchecked") - public void succeeded(Connection result) - { - C connection = this.connection = (C)result; - if (connect.compareAndSet(ConnectState.CONNECTING, ConnectState.CONNECTED)) - { - process(connection); - } - else - { - connection.close(); - failed(new IllegalStateException()); - } - } - - @Override - public void failed(Throwable x) - { - connect.set(ConnectState.DISCONNECTED); - abort(x); - } - - protected boolean process(final C connection) - { - HttpClient client = getHttpClient(); - final HttpExchange exchange = getHttpExchanges().poll(); - if (LOG.isDebugEnabled()) - LOG.debug("Processing {} on {}", exchange, connection); - if (exchange == null) - return false; - - final Request request = exchange.getRequest(); - Throwable cause = request.getAbortCause(); - if (cause != null) - { - if (LOG.isDebugEnabled()) - LOG.debug("Aborted before processing {}: {}", exchange, cause); - // It may happen that the request is aborted before the exchange - // is created. Aborting the exchange a second time will result in - // a no-operation, so we just abort here to cover that edge case. - exchange.abort(cause); - } - else - { - send(connection, exchange); - } - return true; - } - - @Override - public void close() - { - super.close(); - C connection = this.connection; - if (connection != null) - connection.close(); - } - - @Override - public void close(Connection connection) - { - super.close(connection); - while (true) - { - ConnectState current = connect.get(); - if (connect.compareAndSet(current, ConnectState.DISCONNECTED)) - { - if (getHttpClient().isRemoveIdleDestinations()) - getHttpClient().removeDestination(this); - break; - } - } - } - - protected abstract void send(C connection, HttpExchange exchange); - - private enum ConnectState + protected ConnectionPool newConnectionPool(HttpClient client) { - DISCONNECTED, CONNECTING, CONNECTED + return new MultiplexConnectionPool(this, client.getMaxConnectionsPerDestination(), this, + client.getMaxRequestsQueuedPerDestination()); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/PoolingHttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/PoolingHttpDestination.java index 7005e16fd6..b77805aeae 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/PoolingHttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/PoolingHttpDestination.java @@ -18,229 +18,15 @@ package org.eclipse.jetty.client; -import java.io.IOException; -import java.util.Collections; - -import org.eclipse.jetty.client.api.Connection; -import org.eclipse.jetty.client.api.Request; -import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.annotation.ManagedAttribute; -import org.eclipse.jetty.util.annotation.ManagedObject; -import org.eclipse.jetty.util.component.ContainerLifeCycle; -import org.eclipse.jetty.util.thread.Sweeper; - -@ManagedObject -public abstract class PoolingHttpDestination<C extends Connection> extends HttpDestination implements Callback +public abstract class PoolingHttpDestination extends HttpDestination { - private DuplexConnectionPool connectionPool; - public PoolingHttpDestination(HttpClient client, Origin origin) { super(client, origin); - this.connectionPool = newConnectionPool(client); - addBean(connectionPool); - Sweeper sweeper = client.getBean(Sweeper.class); - if (sweeper != null) - sweeper.offer(connectionPool); } - @Override - protected void doStart() throws Exception - { - HttpClient client = getHttpClient(); - this.connectionPool = newConnectionPool(client); - addBean(connectionPool); - super.doStart(); - Sweeper sweeper = client.getBean(Sweeper.class); - if (sweeper != null) - sweeper.offer(connectionPool); - } - - @Override - protected void doStop() throws Exception - { - HttpClient client = getHttpClient(); - Sweeper sweeper = client.getBean(Sweeper.class); - if (sweeper != null) - sweeper.remove(connectionPool); - super.doStop(); - removeBean(connectionPool); - } - - protected DuplexConnectionPool newConnectionPool(HttpClient client) + protected ConnectionPool newConnectionPool(HttpClient client) { return new DuplexConnectionPool(this, client.getMaxConnectionsPerDestination(), this); } - - @ManagedAttribute(value = "The connection pool", readonly = true) - public DuplexConnectionPool getConnectionPool() - { - return connectionPool; - } - - @Override - public void succeeded() - { - send(); - } - - @Override - public void failed(final Throwable x) - { - abort(x); - } - - public void send() - { - if (getHttpExchanges().isEmpty()) - return; - process(); - } - - @SuppressWarnings("unchecked") - public C acquire() - { - return (C)connectionPool.acquire(); - } - - private void process() - { - C connection = acquire(); - if (connection != null) - process(connection); - } - - /** - * <p>Processes a new connection making it idle or active depending on whether requests are waiting to be sent.</p> - * <p>A new connection is created when a request needs to be executed; it is possible that the request that - * triggered the request creation is executed by another connection that was just released, so the new connection - * may become idle.</p> - * <p>If a request is waiting to be executed, it will be dequeued and executed by the new connection.</p> - * - * @param connection the new connection - */ - public void process(final C connection) - { - HttpClient client = getHttpClient(); - final HttpExchange exchange = getHttpExchanges().poll(); - if (LOG.isDebugEnabled()) - LOG.debug("Processing exchange {} on {} of {}", exchange, connection, this); - if (exchange == null) - { - if (!connectionPool.release(connection)) - connection.close(); - - if (!client.isRunning()) - { - if (LOG.isDebugEnabled()) - LOG.debug("{} is stopping", client); - connection.close(); - } - } - else - { - final Request request = exchange.getRequest(); - Throwable cause = request.getAbortCause(); - if (cause != null) - { - if (LOG.isDebugEnabled()) - LOG.debug("Aborted before processing {}: {}", exchange, cause); - // It may happen that the request is aborted before the exchange - // is created. Aborting the exchange a second time will result in - // a no-operation, so we just abort here to cover that edge case. - exchange.abort(cause); - } - else - { - send(connection, exchange); - } - } - } - - protected abstract void send(C connection, HttpExchange exchange); - - @Override - public void release(Connection c) - { - @SuppressWarnings("unchecked") - C connection = (C)c; - if (LOG.isDebugEnabled()) - LOG.debug("Released {}", connection); - HttpClient client = getHttpClient(); - if (client.isRunning()) - { - if (connectionPool.isActive(connection)) - { - if (connectionPool.release(connection)) - send(); - else - connection.close(); - } - else - { - if (LOG.isDebugEnabled()) - LOG.debug("Released explicit {}", connection); - } - } - else - { - if (LOG.isDebugEnabled()) - LOG.debug("{} is stopped", client); - connection.close(); - } - } - - public boolean remove(Connection connection) - { - return connectionPool.remove(connection); - } - - @Override - public void close(Connection connection) - { - super.close(connection); - - boolean removed = remove(connection); - - if (getHttpExchanges().isEmpty()) - { - if (getHttpClient().isRemoveIdleDestinations() && connectionPool.isEmpty()) - { - // There is a race condition between this thread removing the destination - // and another thread queueing a request to this same destination. - // If this destination is removed, but the request queued, a new connection - // will be opened, the exchange will be executed and eventually the connection - // will idle timeout and be closed. Meanwhile a new destination will be created - // in HttpClient and will be used for other requests. - getHttpClient().removeDestination(this); - } - } - else - { - // We need to execute queued requests even if this connection failed. - // We may create a connection that is not needed, but it will eventually - // idle timeout, so no worries. - if (removed) - process(); - } - } - - public void close() - { - super.close(); - connectionPool.close(); - } - - @Override - public void dump(Appendable out, String indent) throws IOException - { - super.dump(out, indent); - ContainerLifeCycle.dump(out, indent, Collections.singletonList(connectionPool)); - } - - @Override - public String toString() - { - return String.format("%s,pool=%s", super.toString(), connectionPool); - } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/ValidatingConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/ValidatingConnectionPool.java index 2235f75dab..a218d14e87 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/ValidatingConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/ValidatingConnectionPool.java @@ -56,7 +56,7 @@ import org.eclipse.jetty.util.thread.Scheduler; * tuning the idle timeout of the servers to be larger than * that of the client.</p> */ -public class ValidatingConnectionPool extends ConnectionPool +public class ValidatingConnectionPool extends DuplexConnectionPool { private static final Logger LOG = Log.getLogger(ValidatingConnectionPool.class); @@ -154,7 +154,7 @@ public class ValidatingConnectionPool extends ConnectionPool private class Holder implements Runnable { private final long timestamp = System.nanoTime(); - private final AtomicBoolean latch = new AtomicBoolean(); + private final AtomicBoolean done = new AtomicBoolean(); private final Connection connection; public Scheduler.Task task; @@ -166,30 +166,31 @@ public class ValidatingConnectionPool extends ConnectionPool @Override public void run() { - if (latch.compareAndSet(false, true)) + if (done.compareAndSet(false, true)) { - boolean idle; + boolean closed = isClosed(); lock(); try { - quarantine.remove(connection); - idle = offerIdle(connection); if (LOG.isDebugEnabled()) LOG.debug("Validated {}", connection); + quarantine.remove(connection); + if (!closed) + deactivate(connection); } finally { unlock(); } - if (idle(connection, idle)) - proceed(); + idle(connection, closed); + proceed(); } } public boolean cancel() { - if (latch.compareAndSet(false, true)) + if (done.compareAndSet(false, true)) { task.cancel(); return true; diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTP.java index 304ba96d35..284ce08fe5 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTP.java @@ -22,8 +22,9 @@ import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpExchange; import org.eclipse.jetty.client.Origin; import org.eclipse.jetty.client.PoolingHttpDestination; +import org.eclipse.jetty.client.api.Connection; -public class HttpDestinationOverHTTP extends PoolingHttpDestination<HttpConnectionOverHTTP> +public class HttpDestinationOverHTTP extends PoolingHttpDestination { public HttpDestinationOverHTTP(HttpClient client, Origin origin) { @@ -31,8 +32,8 @@ public class HttpDestinationOverHTTP extends PoolingHttpDestination<HttpConnecti } @Override - protected void send(HttpConnectionOverHTTP connection, HttpExchange exchange) + protected void send(Connection connection, HttpExchange exchange) { - connection.send(exchange); + ((HttpConnectionOverHTTP)connection).send(exchange); } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientExplicitConnectionTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientExplicitConnectionTest.java index c54f88981e..dd35154613 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientExplicitConnectionTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientExplicitConnectionTest.java @@ -59,7 +59,7 @@ public class HttpClientExplicitConnectionTest extends AbstractHttpClientServerTe Assert.assertEquals(200, response.getStatus()); HttpDestinationOverHTTP httpDestination = (HttpDestinationOverHTTP)destination; - DuplexConnectionPool connectionPool = httpDestination.getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)httpDestination.getConnectionPool(); Assert.assertTrue(connectionPool.getActiveConnections().isEmpty()); Assert.assertTrue(connectionPool.getIdleConnections().isEmpty()); } @@ -94,7 +94,7 @@ public class HttpClientExplicitConnectionTest extends AbstractHttpClientServerTe Assert.assertFalse(httpConnection.getEndPoint().isOpen()); HttpDestinationOverHTTP httpDestination = (HttpDestinationOverHTTP)destination; - DuplexConnectionPool connectionPool = httpDestination.getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)httpDestination.getConnectionPool(); Assert.assertTrue(connectionPool.getActiveConnections().isEmpty()); Assert.assertTrue(connectionPool.getIdleConnections().isEmpty()); } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientFailureTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientFailureTest.java index 0db34443c2..e7ab277f20 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientFailureTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientFailureTest.java @@ -25,9 +25,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.client.api.Connection; -import org.eclipse.jetty.client.api.Request; -import org.eclipse.jetty.client.api.Response; -import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; import org.eclipse.jetty.client.http.HttpConnectionOverHTTP; import org.eclipse.jetty.client.util.DeferredContentProvider; @@ -89,14 +86,7 @@ public class HttpClientFailureTest try { client.newRequest("localhost", connector.getLocalPort()) - .onRequestHeaders(new Request.HeadersListener() - { - @Override - public void onHeaders(Request request) - { - connectionRef.get().getEndPoint().close(); - } - }) + .onRequestHeaders(request -> connectionRef.get().getEndPoint().close()) .timeout(5, TimeUnit.SECONDS) .send(); Assert.fail(); @@ -106,7 +96,7 @@ public class HttpClientFailureTest // Expected. } - DuplexConnectionPool connectionPool = connectionRef.get().getHttpDestination().getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)connectionRef.get().getHttpDestination().getConnectionPool(); Assert.assertEquals(0, connectionPool.getConnectionCount()); Assert.assertEquals(0, connectionPool.getActiveConnections().size()); Assert.assertEquals(0, connectionPool.getIdleConnections().size()); @@ -134,25 +124,17 @@ public class HttpClientFailureTest final CountDownLatch completeLatch = new CountDownLatch(1); DeferredContentProvider content = new DeferredContentProvider(); client.newRequest("localhost", connector.getLocalPort()) - .onRequestCommit(new Request.CommitListener() + .onRequestCommit(request -> { - @Override - public void onCommit(Request request) - { - connectionRef.get().getEndPoint().close(); - commitLatch.countDown(); - } + connectionRef.get().getEndPoint().close(); + commitLatch.countDown(); }) .content(content) .idleTimeout(2, TimeUnit.SECONDS) - .send(new Response.CompleteListener() + .send(result -> { - @Override - public void onComplete(Result result) - { - if (result.isFailed()) - completeLatch.countDown(); - } + if (result.isFailed()) + completeLatch.countDown(); }); Assert.assertTrue(commitLatch.await(5, TimeUnit.SECONDS)); @@ -170,7 +152,7 @@ public class HttpClientFailureTest Assert.assertTrue(contentLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); - DuplexConnectionPool connectionPool = connectionRef.get().getHttpDestination().getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)connectionRef.get().getHttpDestination().getConnectionPool(); Assert.assertEquals(0, connectionPool.getConnectionCount()); Assert.assertEquals(0, connectionPool.getActiveConnections().size()); Assert.assertEquals(0, connectionPool.getIdleConnections().size()); diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientLoadTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientLoadTest.java deleted file mode 100644 index 7ddeea8b90..0000000000 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientLoadTest.java +++ /dev/null @@ -1,320 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.client; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Locale; -import java.util.Random; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import org.eclipse.jetty.client.api.Connection; -import org.eclipse.jetty.client.api.Request; -import org.eclipse.jetty.client.api.Response; -import org.eclipse.jetty.client.api.Result; -import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; -import org.eclipse.jetty.client.http.HttpConnectionOverHTTP; -import org.eclipse.jetty.client.http.HttpDestinationOverHTTP; -import org.eclipse.jetty.client.util.BytesContentProvider; -import org.eclipse.jetty.http.HttpHeader; -import org.eclipse.jetty.http.HttpMethod; -import org.eclipse.jetty.http.HttpScheme; -import org.eclipse.jetty.io.LeakTrackingByteBufferPool; -import org.eclipse.jetty.io.MappedByteBufferPool; -import org.eclipse.jetty.server.AbstractConnectionFactory; -import org.eclipse.jetty.server.HttpConnectionFactory; -import org.eclipse.jetty.server.ServerConnector; -import org.eclipse.jetty.server.handler.AbstractHandler; -import org.eclipse.jetty.util.IO; -import org.eclipse.jetty.util.LeakDetector; -import org.eclipse.jetty.util.SocketAddressResolver; -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.ssl.SslContextFactory; -import org.eclipse.jetty.util.thread.Scheduler; -import org.hamcrest.Matchers; -import org.junit.Assert; -import org.junit.Test; - -import static org.junit.Assert.assertThat; - -public class HttpClientLoadTest extends AbstractHttpClientServerTest -{ - private final Logger logger = Log.getLogger(HttpClientLoadTest.class); - - public HttpClientLoadTest(SslContextFactory sslContextFactory) - { - super(sslContextFactory); - } - - @Test - public void testIterative() throws Exception - { - int cores = Runtime.getRuntime().availableProcessors(); - - final AtomicLong connectionLeaks = new AtomicLong(); - - start(new LoadHandler()); - server.stop(); - server.removeConnector(connector); - LeakTrackingByteBufferPool serverBufferPool = new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged()); - connector = new ServerConnector(server, connector.getExecutor(), connector.getScheduler(), - serverBufferPool , 1, Math.min(1, cores / 2), - AbstractConnectionFactory.getFactories(sslContextFactory, new HttpConnectionFactory())); - server.addConnector(connector); - server.start(); - - client.stop(); - - HttpClient newClient = new HttpClient(new HttpClientTransportOverHTTP() - { - @Override - public HttpDestination newHttpDestination(Origin origin) - { - return new HttpDestinationOverHTTP(getHttpClient(), origin) - { - @Override - protected DuplexConnectionPool newConnectionPool(HttpClient client) - { - return new LeakTrackingConnectionPool(this, client.getMaxConnectionsPerDestination(), this) - { - @Override - protected void leaked(LeakDetector.LeakInfo resource) - { - connectionLeaks.incrementAndGet(); - } - }; - } - }; - } - }, sslContextFactory); - newClient.setExecutor(client.getExecutor()); - newClient.setSocketAddressResolver(new SocketAddressResolver.Sync()); - client = newClient; - LeakTrackingByteBufferPool clientBufferPool = new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged()); - client.setByteBufferPool(clientBufferPool); - client.setMaxConnectionsPerDestination(32768); - client.setMaxRequestsQueuedPerDestination(1024 * 1024); - client.setDispatchIO(false); - client.setStrictEventOrdering(false); - client.start(); - - Random random = new Random(); - // At least 25k requests to warmup properly (use -XX:+PrintCompilation to verify JIT activity) - int runs = 1; - int iterations = 500; - for (int i = 0; i < runs; ++i) - { - run(random, iterations); - } - - // Re-run after warmup - iterations = 5_000; - for (int i = 0; i < runs; ++i) - { - run(random, iterations); - } - - System.gc(); - - assertThat("Server BufferPool - leaked acquires", serverBufferPool.getLeakedAcquires(), Matchers.is(0L)); - assertThat("Server BufferPool - leaked releases", serverBufferPool.getLeakedReleases(), Matchers.is(0L)); - assertThat("Server BufferPool - unreleased", serverBufferPool.getLeakedResources(), Matchers.is(0L)); - - assertThat("Client BufferPool - leaked acquires", clientBufferPool.getLeakedAcquires(), Matchers.is(0L)); - assertThat("Client BufferPool - leaked releases", clientBufferPool.getLeakedReleases(), Matchers.is(0L)); - assertThat("Client BufferPool - unreleased", clientBufferPool.getLeakedResources(), Matchers.is(0L)); - - assertThat("Connection Leaks", connectionLeaks.get(), Matchers.is(0L)); - } - - private void run(Random random, int iterations) throws InterruptedException - { - CountDownLatch latch = new CountDownLatch(iterations); - List<String> failures = new ArrayList<>(); - - int factor = logger.isDebugEnabled() ? 25 : 1; - factor *= "http".equalsIgnoreCase(scheme) ? 10 : 1000; - - // Dumps the state of the client if the test takes too long - final Thread testThread = Thread.currentThread(); - Scheduler.Task task = client.getScheduler().schedule(new Runnable() - { - @Override - public void run() - { - logger.warn("Interrupting test, it is taking too long"); - for (String host : Arrays.asList("localhost", "127.0.0.1")) - { - HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, connector.getLocalPort()); - DuplexConnectionPool connectionPool = destination.getConnectionPool(); - for (Connection connection : new ArrayList<>(connectionPool.getActiveConnections())) - { - HttpConnectionOverHTTP active = (HttpConnectionOverHTTP)connection; - logger.warn(active.getEndPoint() + " exchange " + active.getHttpChannel().getHttpExchange()); - } - } - testThread.interrupt(); - } - }, iterations * factor, TimeUnit.MILLISECONDS); - - long begin = System.nanoTime(); - for (int i = 0; i < iterations; ++i) - { - test(random, latch, failures); -// test("http", "localhost", "GET", false, false, 64 * 1024, false, latch, failures); - } - Assert.assertTrue(latch.await(iterations, TimeUnit.SECONDS)); - long end = System.nanoTime(); - task.cancel(); - long elapsed = TimeUnit.NANOSECONDS.toMillis(end - begin); - logger.info("{} requests in {} ms, {} req/s", iterations, elapsed, elapsed > 0 ? iterations * 1000 / elapsed : -1); - - for (String failure : failures) - System.err.println("FAILED: "+failure); - - Assert.assertTrue(failures.toString(), failures.isEmpty()); - } - - private void test(Random random, final CountDownLatch latch, final List<String> failures) throws InterruptedException - { - // Choose a random destination - String host = random.nextBoolean() ? "localhost" : "127.0.0.1"; - // Choose a random method - HttpMethod method = random.nextBoolean() ? HttpMethod.GET : HttpMethod.POST; - - boolean ssl = HttpScheme.HTTPS.is(scheme); - - // Choose randomly whether to close the connection on the client or on the server - boolean clientClose = false; - if (!ssl && random.nextBoolean()) - clientClose = true; - boolean serverClose = false; - if (!ssl && random.nextBoolean()) - serverClose = true; - - int maxContentLength = 64 * 1024; - int contentLength = random.nextInt(maxContentLength) + 1; - - test(scheme, host, method.asString(), clientClose, serverClose, contentLength, true, latch, failures); - } - - private void test(String scheme, String host, String method, boolean clientClose, boolean serverClose, int contentLength, final boolean checkContentLength, final CountDownLatch latch, final List<String> failures) throws InterruptedException - { - Request request = client.newRequest(host, connector.getLocalPort()) - .scheme(scheme) - .method(method); - - if (clientClose) - request.header(HttpHeader.CONNECTION, "close"); - else if (serverClose) - request.header("X-Close", "true"); - - switch (method) - { - case "GET": - request.header("X-Download", String.valueOf(contentLength)); - break; - case "POST": - request.header("X-Upload", String.valueOf(contentLength)); - request.content(new BytesContentProvider(new byte[contentLength])); - break; - } - - final CountDownLatch requestLatch = new CountDownLatch(1); - request.send(new Response.Listener.Adapter() - { - private final AtomicInteger contentLength = new AtomicInteger(); - - @Override - public void onHeaders(Response response) - { - if (checkContentLength) - { - String content = response.getHeaders().get("X-Content"); - if (content != null) - contentLength.set(Integer.parseInt(content)); - } - } - - @Override - public void onContent(Response response, ByteBuffer content) - { - if (checkContentLength) - contentLength.addAndGet(-content.remaining()); - } - - @Override - public void onComplete(Result result) - { - if (result.isFailed()) - { - result.getFailure().printStackTrace(); - failures.add("Result failed " + result); - } - - if (checkContentLength && contentLength.get() != 0) - failures.add("Content length mismatch " + contentLength); - - requestLatch.countDown(); - latch.countDown(); - } - }); - requestLatch.await(5, TimeUnit.SECONDS); - } - - private class LoadHandler extends AbstractHandler - { - @Override - public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException - { - String method = request.getMethod().toUpperCase(Locale.ENGLISH); - switch (method) - { - case "GET": - int contentLength = request.getIntHeader("X-Download"); - if (contentLength > 0) - { - response.setHeader("X-Content", String.valueOf(contentLength)); - response.getOutputStream().write(new byte[contentLength]); - } - break; - case "POST": - response.setHeader("X-Content", request.getHeader("X-Upload")); - IO.copy(request.getInputStream(), response.getOutputStream()); - break; - } - - if (Boolean.parseBoolean(request.getHeader("X-Close"))) - response.setHeader("Connection", "close"); - - baseRequest.setHandled(true); - } - } -} diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java index bff369f79d..1d803b9548 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java @@ -111,7 +111,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest Assert.assertEquals(200, response.getStatus()); HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port); - DuplexConnectionPool connectionPool = destination.getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); long start = System.nanoTime(); HttpConnectionOverHTTP connection = null; @@ -367,16 +367,12 @@ public class HttpClientTest extends AbstractHttpClientServerTest final byte[] content = {0, 1, 2, 3}; ContentResponse response = client.POST(scheme + "://localhost:" + connector.getLocalPort()) - .onRequestContent(new Request.ContentListener() + .onRequestContent((request, buffer) -> { - @Override - public void onContent(Request request, ByteBuffer buffer) - { - byte[] bytes = new byte[buffer.remaining()]; - buffer.get(bytes); - if (!Arrays.equals(content, bytes)) - request.abort(new Exception()); - } + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + if (!Arrays.equals(content, bytes)) + request.abort(new Exception()); }) .content(new BytesContentProvider(content)) .timeout(5, TimeUnit.SECONDS) @@ -401,16 +397,12 @@ public class HttpClientTest extends AbstractHttpClientServerTest final AtomicInteger progress = new AtomicInteger(); ContentResponse response = client.POST(scheme + "://localhost:" + connector.getLocalPort()) - .onRequestContent(new Request.ContentListener() + .onRequestContent((request, buffer) -> { - @Override - public void onContent(Request request, ByteBuffer buffer) - { - byte[] bytes = new byte[buffer.remaining()]; - Assert.assertEquals(1, bytes.length); - buffer.get(bytes); - Assert.assertEquals(bytes[0], progress.getAndIncrement()); - } + byte[] bytes = new byte[buffer.remaining()]; + Assert.assertEquals(1, bytes.length); + buffer.get(bytes); + Assert.assertEquals(bytes[0], progress.getAndIncrement()); }) .content(new BytesContentProvider(new byte[]{0}, new byte[]{1}, new byte[]{2}, new byte[]{3}, new byte[]{4})) .timeout(5, TimeUnit.SECONDS) @@ -432,19 +424,15 @@ public class HttpClientTest extends AbstractHttpClientServerTest final CountDownLatch successLatch = new CountDownLatch(2); client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) - .onRequestBegin(new Request.BeginListener() + .onRequestBegin(request -> { - @Override - public void onBegin(Request request) + try { - try - { - latch.await(); - } - catch (InterruptedException x) - { - x.printStackTrace(); - } + latch.await(); + } + catch (InterruptedException x) + { + x.printStackTrace(); } }) .send(new Response.Listener.Adapter() @@ -459,14 +447,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) - .onRequestQueued(new Request.QueuedListener() - { - @Override - public void onQueued(Request request) - { - latch.countDown(); - } - }) + .onRequestQueued(request -> latch.countDown()) .send(new Response.Listener.Adapter() { @Override @@ -514,27 +495,16 @@ public class HttpClientTest extends AbstractHttpClientServerTest latch.countDown(); } }) - .onResponseFailure(new Response.FailureListener() - { - @Override - public void onFailure(Response response, Throwable failure) - { - latch.countDown(); - } - }) + .onResponseFailure((response, failure) -> latch.countDown()) .send(null); client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) .path("/two") - .onResponseSuccess(new Response.SuccessListener() + .onResponseSuccess(response -> { - @Override - public void onSuccess(Response response) - { - Assert.assertEquals(200, response.getStatus()); - latch.countDown(); - } + Assert.assertEquals(200, response.getStatus()); + latch.countDown(); }) .send(null); @@ -564,14 +534,10 @@ public class HttpClientTest extends AbstractHttpClientServerTest client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) .file(file) - .onRequestSuccess(new Request.SuccessListener() + .onRequestSuccess(request -> { - @Override - public void onSuccess(Request request) - { - requestTime.set(System.nanoTime()); - latch.countDown(); - } + requestTime.set(System.nanoTime()); + latch.countDown(); }) .send(new Response.Listener.Adapter() { @@ -674,14 +640,11 @@ public class HttpClientTest extends AbstractHttpClientServerTest final int port = connector.getLocalPort(); client.newRequest(host, port) .scheme(scheme) - .onRequestBegin(new Request.BeginListener() + .onRequestBegin(request -> { - @Override - public void onBegin(Request request) - { - HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port); - destination.getConnectionPool().getActiveConnections().peek().close(); - } + HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); + connectionPool.getActiveConnections().iterator().next().close(); }) .send(new Response.Listener.Adapter() { @@ -773,14 +736,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) - .onResponseHeader(new Response.HeaderListener() - { - @Override - public boolean onHeader(Response response, HttpField field) - { - return !field.getName().equals(headerName); - } - }) + .onResponseHeader((response1, field) -> !field.getName().equals(headerName)) .timeout(5, TimeUnit.SECONDS) .send(); @@ -864,16 +820,12 @@ public class HttpClientTest extends AbstractHttpClientServerTest final CountDownLatch latch = new CountDownLatch(1); client.newRequest("idontexist", 80) - .send(new Response.CompleteListener() + .send(result -> { - @Override - public void onComplete(Result result) - { - Assert.assertTrue(result.isFailed()); - Throwable failure = result.getFailure(); - Assert.assertTrue(failure instanceof UnknownHostException); - latch.countDown(); - } + Assert.assertTrue(result.isFailed()); + Throwable failure = result.getFailure(); + Assert.assertTrue(failure instanceof UnknownHostException); + latch.countDown(); }); Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); } @@ -1323,14 +1275,10 @@ public class HttpClientTest extends AbstractHttpClientServerTest final CountDownLatch completeLatch = new CountDownLatch(1); client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) - .send(new Response.CompleteListener() + .send(result -> { - @Override - public void onComplete(Result result) - { - if (result.isFailed()) - completeLatch.countDown(); - } + if (result.isFailed()) + completeLatch.countDown(); }); Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientUploadDuringServerShutdown.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientUploadDuringServerShutdown.java index b899a1f211..bc80ff7aae 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientUploadDuringServerShutdown.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientUploadDuringServerShutdown.java @@ -31,8 +31,6 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.client.api.Connection; -import org.eclipse.jetty.client.api.Response; -import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.http.HttpChannelOverHTTP; import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; import org.eclipse.jetty.client.http.HttpConnectionOverHTTP; @@ -121,14 +119,7 @@ public class HttpClientUploadDuringServerShutdown int length = 16 * 1024 * 1024 + random.nextInt(16 * 1024 * 1024); client.newRequest("localhost", 8888) .content(new BytesContentProvider(new byte[length])) - .send(new Response.CompleteListener() - { - @Override - public void onComplete(Result result) - { - latch.countDown(); - } - }); + .send(result -> latch.countDown()); long sleep = 1 + random.nextInt(10); TimeUnit.MILLISECONDS.sleep(sleep); } @@ -244,35 +235,24 @@ public class HttpClientUploadDuringServerShutdown final CountDownLatch completeLatch = new CountDownLatch(1); client.newRequest("localhost", connector.getLocalPort()) .timeout(10, TimeUnit.SECONDS) - .onRequestBegin(new org.eclipse.jetty.client.api.Request.BeginListener() + .onRequestBegin(request -> { - @Override - public void onBegin(org.eclipse.jetty.client.api.Request request) + try { - try - { - beginLatch.countDown(); - completeLatch.await(5, TimeUnit.SECONDS); - } - catch (InterruptedException x) - { - x.printStackTrace(); - } + beginLatch.countDown(); + completeLatch.await(5, TimeUnit.SECONDS); } - }) - .send(new Response.CompleteListener() - { - @Override - public void onComplete(Result result) + catch (InterruptedException x) { - completeLatch.countDown(); + x.printStackTrace(); } - }); + }) + .send(result -> completeLatch.countDown()); Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination("http", "localhost", connector.getLocalPort()); - DuplexConnectionPool pool = destination.getConnectionPool(); + DuplexConnectionPool pool = (DuplexConnectionPool)destination.getConnectionPool(); Assert.assertEquals(0, pool.getConnectionCount()); Assert.assertEquals(0, pool.getIdleConnections().size()); Assert.assertEquals(0, pool.getActiveConnections().size()); diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpConnectionLifecycleTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpConnectionLifecycleTest.java index f8cbb04c5a..684ff02dce 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpConnectionLifecycleTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpConnectionLifecycleTest.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.client; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collection; import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -69,35 +70,24 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest String host = "localhost"; int port = connector.getLocalPort(); HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port); - DuplexConnectionPool connectionPool = destination.getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); - final Queue<Connection> idleConnections = connectionPool.getIdleConnections(); + final Collection<Connection> idleConnections = connectionPool.getIdleConnections(); Assert.assertEquals(0, idleConnections.size()); - final Queue<Connection> activeConnections = connectionPool.getActiveConnections(); + final Collection<Connection> activeConnections = connectionPool.getActiveConnections(); Assert.assertEquals(0, activeConnections.size()); final CountDownLatch headersLatch = new CountDownLatch(1); final CountDownLatch successLatch = new CountDownLatch(3); client.newRequest(host, port) .scheme(scheme) - .onRequestSuccess(new Request.SuccessListener() - { - @Override - public void onSuccess(Request request) - { - successLatch.countDown(); - } - }) - .onResponseHeaders(new Response.HeadersListener() + .onRequestSuccess(request -> successLatch.countDown()) + .onResponseHeaders(response -> { - @Override - public void onHeaders(Response response) - { - Assert.assertEquals(0, idleConnections.size()); - Assert.assertEquals(1, activeConnections.size()); - headersLatch.countDown(); - } + Assert.assertEquals(0, idleConnections.size()); + Assert.assertEquals(1, activeConnections.size()); + headersLatch.countDown(); }) .send(new Response.Listener.Adapter() { @@ -130,12 +120,12 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest String host = "localhost"; int port = connector.getLocalPort(); HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port); - DuplexConnectionPool connectionPool = destination.getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); - final Queue<Connection> idleConnections = connectionPool.getIdleConnections(); + final Collection<Connection> idleConnections = connectionPool.getIdleConnections(); Assert.assertEquals(0, idleConnections.size()); - final Queue<Connection> activeConnections = connectionPool.getActiveConnections(); + final Collection<Connection> activeConnections = connectionPool.getActiveConnections(); Assert.assertEquals(0, activeConnections.size()); final CountDownLatch beginLatch = new CountDownLatch(1); @@ -145,7 +135,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest @Override public void onBegin(Request request) { - activeConnections.peek().close(); + activeConnections.iterator().next().close(); beginLatch.countDown(); } @@ -181,12 +171,12 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest String host = "localhost"; int port = connector.getLocalPort(); HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port); - DuplexConnectionPool connectionPool = destination.getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); final Queue<Connection> idleConnections = connectionPool.getIdleConnections(); Assert.assertEquals(0, idleConnections.size()); - final Queue<Connection> activeConnections = connectionPool.getActiveConnections(); + final Collection<Connection> activeConnections = connectionPool.getActiveConnections(); Assert.assertEquals(0, activeConnections.size()); final CountDownLatch successLatch = new CountDownLatch(3); @@ -241,12 +231,12 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest String host = "localhost"; int port = connector.getLocalPort(); HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port); - DuplexConnectionPool connectionPool = destination.getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); - final Queue<Connection> idleConnections = connectionPool.getIdleConnections(); + final Collection<Connection> idleConnections = connectionPool.getIdleConnections(); Assert.assertEquals(0, idleConnections.size()); - final Queue<Connection> activeConnections = connectionPool.getActiveConnections(); + final Collection<Connection> activeConnections = connectionPool.getActiveConnections(); Assert.assertEquals(0, activeConnections.size()); final long delay = 1000; @@ -314,12 +304,12 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest String host = "localhost"; int port = connector.getLocalPort(); HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port); - DuplexConnectionPool connectionPool = destination.getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); - final Queue<Connection> idleConnections = connectionPool.getIdleConnections(); + final Collection<Connection> idleConnections = connectionPool.getIdleConnections(); Assert.assertEquals(0, idleConnections.size()); - final Queue<Connection> activeConnections = connectionPool.getActiveConnections(); + final Collection<Connection> activeConnections = connectionPool.getActiveConnections(); Assert.assertEquals(0, activeConnections.size()); server.stop(); @@ -327,22 +317,11 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest final CountDownLatch failureLatch = new CountDownLatch(2); client.newRequest(host, port) .scheme(scheme) - .onRequestFailure(new Request.FailureListener() + .onRequestFailure((request, failure) -> failureLatch.countDown()) + .send(result -> { - @Override - public void onFailure(Request request, Throwable failure) - { - failureLatch.countDown(); - } - }) - .send(new Response.Listener.Adapter() - { - @Override - public void onComplete(Result result) - { - Assert.assertTrue(result.isFailed()); - failureLatch.countDown(); - } + Assert.assertTrue(result.isFailed()); + failureLatch.countDown(); }); Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS)); @@ -367,12 +346,12 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest String host = "localhost"; int port = connector.getLocalPort(); HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port); - DuplexConnectionPool connectionPool = destination.getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); - final Queue<Connection> idleConnections = connectionPool.getIdleConnections(); + final Collection<Connection> idleConnections = connectionPool.getIdleConnections(); Assert.assertEquals(0, idleConnections.size()); - final Queue<Connection> activeConnections = connectionPool.getActiveConnections(); + final Collection<Connection> activeConnections = connectionPool.getActiveConnections(); Assert.assertEquals(0, activeConnections.size()); final CountDownLatch latch = new CountDownLatch(1); @@ -417,12 +396,12 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest String host = "localhost"; int port = connector.getLocalPort(); HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port); - DuplexConnectionPool connectionPool = destination.getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); - final Queue<Connection> idleConnections = connectionPool.getIdleConnections(); + final Collection<Connection> idleConnections = connectionPool.getIdleConnections(); Assert.assertEquals(0, idleConnections.size()); - final Queue<Connection> activeConnections = connectionPool.getActiveConnections(); + final Collection<Connection> activeConnections = connectionPool.getActiveConnections(); Assert.assertEquals(0, activeConnections.size()); Log.getLogger(HttpConnection.class).info("Expecting java.lang.IllegalStateException: HttpParser{s=CLOSED,..."); @@ -467,12 +446,12 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest String host = "localhost"; int port = connector.getLocalPort(); HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port); - DuplexConnectionPool connectionPool = destination.getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); - final Queue<Connection> idleConnections = connectionPool.getIdleConnections(); + final Collection<Connection> idleConnections = connectionPool.getIdleConnections(); Assert.assertEquals(0, idleConnections.size()); - final Queue<Connection> activeConnections = connectionPool.getActiveConnections(); + final Collection<Connection> activeConnections = connectionPool.getActiveConnections(); Assert.assertEquals(0, activeConnections.size()); ContentResponse response = client.newRequest(host, port) @@ -499,25 +478,21 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest String host = "localhost"; int port = connector.getLocalPort(); HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port); - DuplexConnectionPool connectionPool = destination.getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); - final Queue<Connection> idleConnections = connectionPool.getIdleConnections(); + final Collection<Connection> idleConnections = connectionPool.getIdleConnections(); Assert.assertEquals(0, idleConnections.size()); - final Queue<Connection> activeConnections = connectionPool.getActiveConnections(); + final Collection<Connection> activeConnections = connectionPool.getActiveConnections(); Assert.assertEquals(0, activeConnections.size()); client.setStrictEventOrdering(false); ContentResponse response = client.newRequest(host, port) .scheme(scheme) - .onResponseBegin(new Response.BeginListener() + .onResponseBegin(response1 -> { - @Override - public void onBegin(Response response) - { - // Simulate a HTTP 1.0 response has been received. - ((HttpResponse)response).version(HttpVersion.HTTP_1_0); - } + // Simulate a HTTP 1.0 response has been received. + ((HttpResponse)response1).version(HttpVersion.HTTP_1_0); }) .send(); diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpRequestAbortTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpRequestAbortTest.java index 45b78e7d18..4b33e574e1 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpRequestAbortTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpRequestAbortTest.java @@ -25,12 +25,12 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; + import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.client.api.Request; -import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.http.HttpDestinationOverHTTP; import org.eclipse.jetty.client.util.ByteBufferContentProvider; @@ -88,7 +88,7 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest } HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, "localhost", connector.getLocalPort()); - DuplexConnectionPool connectionPool = destination.getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); Assert.assertEquals(0, connectionPool.getConnectionCount()); Assert.assertEquals(0, connectionPool.getActiveConnections().size()); Assert.assertEquals(0, connectionPool.getIdleConnections().size()); @@ -135,7 +135,7 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest } HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, "localhost", connector.getLocalPort()); - DuplexConnectionPool connectionPool = destination.getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); Assert.assertEquals(0, connectionPool.getConnectionCount()); Assert.assertEquals(0, connectionPool.getActiveConnections().size()); Assert.assertEquals(0, connectionPool.getIdleConnections().size()); @@ -182,7 +182,7 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest } HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, "localhost", connector.getLocalPort()); - DuplexConnectionPool connectionPool = destination.getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); Assert.assertEquals(0, connectionPool.getConnectionCount()); Assert.assertEquals(0, connectionPool.getActiveConnections().size()); Assert.assertEquals(0, connectionPool.getIdleConnections().size()); @@ -204,14 +204,10 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest { client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) - .onRequestCommit(new Request.CommitListener() + .onRequestCommit(request -> { - @Override - public void onCommit(Request request) - { - aborted.set(request.abort(cause)); - latch.countDown(); - } + aborted.set(request.abort(cause)); + latch.countDown(); }) .timeout(5, TimeUnit.SECONDS) .send(); @@ -225,7 +221,7 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest } HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, "localhost", connector.getLocalPort()); - DuplexConnectionPool connectionPool = destination.getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); Assert.assertEquals(0, connectionPool.getConnectionCount()); Assert.assertEquals(0, connectionPool.getActiveConnections().size()); Assert.assertEquals(0, connectionPool.getIdleConnections().size()); @@ -260,14 +256,10 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest { client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) - .onRequestCommit(new Request.CommitListener() + .onRequestCommit(request -> { - @Override - public void onCommit(Request request) - { - aborted.set(request.abort(cause)); - latch.countDown(); - } + aborted.set(request.abort(cause)); + latch.countDown(); }) .content(new ByteBufferContentProvider(ByteBuffer.wrap(new byte[]{0}), ByteBuffer.wrap(new byte[]{1})) { @@ -289,7 +281,7 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest } HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, "localhost", connector.getLocalPort()); - DuplexConnectionPool connectionPool = destination.getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); Assert.assertEquals(0, connectionPool.getConnectionCount()); Assert.assertEquals(0, connectionPool.getActiveConnections().size()); Assert.assertEquals(0, connectionPool.getIdleConnections().size()); @@ -315,14 +307,10 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest { client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) - .onRequestContent(new Request.ContentListener() + .onRequestContent((request, content) -> { - @Override - public void onContent(Request request, ByteBuffer content) - { - aborted.set(request.abort(cause)); - latch.countDown(); - } + aborted.set(request.abort(cause)); + latch.countDown(); }) .content(new ByteBufferContentProvider(ByteBuffer.wrap(new byte[]{0}), ByteBuffer.wrap(new byte[]{1})) { @@ -344,7 +332,7 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest } HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, "localhost", connector.getLocalPort()); - DuplexConnectionPool connectionPool = destination.getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); Assert.assertEquals(0, connectionPool.getConnectionCount()); Assert.assertEquals(0, connectionPool.getActiveConnections().size()); Assert.assertEquals(0, connectionPool.getIdleConnections().size()); @@ -454,7 +442,7 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest } HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, "localhost", connector.getLocalPort()); - DuplexConnectionPool connectionPool = destination.getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); Assert.assertEquals(0, connectionPool.getConnectionCount()); Assert.assertEquals(0, connectionPool.getActiveConnections().size()); Assert.assertEquals(0, connectionPool.getIdleConnections().size()); @@ -486,15 +474,11 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest Request request = client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) .timeout(3 * delay, TimeUnit.MILLISECONDS); - request.send(new Response.CompleteListener() + request.send(result -> { - @Override - public void onComplete(Result result) - { - Assert.assertTrue(result.isFailed()); - Assert.assertSame(cause, result.getFailure()); - latch.countDown(); - } + Assert.assertTrue(result.isFailed()); + Assert.assertSame(cause, result.getFailure()); + latch.countDown(); }); TimeUnit.MILLISECONDS.sleep(delay); diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ServerConnectionCloseTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ServerConnectionCloseTest.java index 23d72de601..47a7760e1a 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/ServerConnectionCloseTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ServerConnectionCloseTest.java @@ -151,7 +151,7 @@ public class ServerConnectionCloseTest // Connection should have been removed from pool. HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination("http", "localhost", port); - DuplexConnectionPool connectionPool = destination.getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); Assert.assertEquals(0, connectionPool.getConnectionCount()); Assert.assertEquals(0, connectionPool.getIdleConnectionCount()); Assert.assertEquals(0, connectionPool.getActiveConnectionCount()); diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/TLSServerConnectionCloseTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/TLSServerConnectionCloseTest.java index 79a9cd1879..d48ed22314 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/TLSServerConnectionCloseTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/TLSServerConnectionCloseTest.java @@ -183,7 +183,7 @@ public class TLSServerConnectionCloseTest // Connection should have been removed from pool. HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination("http", "localhost", port); - DuplexConnectionPool connectionPool = destination.getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); Assert.assertEquals(0, connectionPool.getConnectionCount()); Assert.assertEquals(0, connectionPool.getIdleConnectionCount()); Assert.assertEquals(0, connectionPool.getActiveConnectionCount()); diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java index aa3b4f5e74..bf9af834d9 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java @@ -24,6 +24,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import org.eclipse.jetty.client.AbstractHttpClientServerTest; +import org.eclipse.jetty.client.ConnectionPool; import org.eclipse.jetty.client.DuplexConnectionPool; import org.eclipse.jetty.client.EmptyServerHandler; import org.eclipse.jetty.client.HttpClient; @@ -31,9 +32,6 @@ import org.eclipse.jetty.client.Origin; import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Destination; -import org.eclipse.jetty.client.api.Request; -import org.eclipse.jetty.client.api.Response; -import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpHeaderValue; import org.eclipse.jetty.util.ssl.SslContextFactory; @@ -59,11 +57,13 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest public void test_FirstAcquire_WithEmptyQueue() throws Exception { HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort())); - Connection connection = destination.acquire(); + destination.start(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); + Connection connection = connectionPool.acquire(); if (connection == null) { // There are no queued requests, so the newly created connection will be idle - connection = timedPoll(destination.getConnectionPool().getIdleConnections(), 5, TimeUnit.SECONDS); + connection = timedPoll(connectionPool.getIdleConnections(), 5, TimeUnit.SECONDS); } Assert.assertNotNull(connection); } @@ -72,7 +72,9 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest public void test_SecondAcquire_AfterFirstAcquire_WithEmptyQueue_ReturnsSameConnection() throws Exception { HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort())); - Connection connection1 = destination.acquire(); + destination.start(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); + Connection connection1 = connectionPool.acquire(); if (connection1 == null) { // There are no queued requests, so the newly created connection will be idle @@ -80,11 +82,11 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest while (connection1 == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5) { TimeUnit.MILLISECONDS.sleep(50); - connection1 = destination.getConnectionPool().getIdleConnections().peek(); + connection1 = connectionPool.getIdleConnections().peek(); } Assert.assertNotNull(connection1); - Connection connection2 = destination.acquire(); + Connection connection2 = connectionPool.acquire(); Assert.assertSame(connection1, connection2); } } @@ -97,18 +99,18 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort())) { @Override - protected DuplexConnectionPool newConnectionPool(HttpClient client) + protected ConnectionPool newConnectionPool(HttpClient client) { return new DuplexConnectionPool(this, client.getMaxConnectionsPerDestination(), this) { @Override - protected void idleCreated(Connection connection) + protected void onCreated(Connection connection) { try { idleLatch.countDown(); latch.await(5, TimeUnit.SECONDS); - super.idleCreated(connection); + super.onCreated(connection); } catch (InterruptedException x) { @@ -118,7 +120,9 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest }; } }; - Connection connection1 = destination.acquire(); + destination.start(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); + Connection connection1 = connectionPool.acquire(); // Make sure we entered idleCreated(). Assert.assertTrue(idleLatch.await(5, TimeUnit.SECONDS)); @@ -128,13 +132,13 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest Assert.assertNull(connection1); // Second attempt also returns null because we delayed idleCreated() above. - Connection connection2 = destination.acquire(); + Connection connection2 = connectionPool.acquire(); Assert.assertNull(connection2); latch.countDown(); // There must be 2 idle connections. - Queue<Connection> idleConnections = destination.getConnectionPool().getIdleConnections(); + Queue<Connection> idleConnections = connectionPool.getIdleConnections(); Connection connection = timedPoll(idleConnections, 5, TimeUnit.SECONDS); Assert.assertNotNull(connection); connection = timedPoll(idleConnections, 5, TimeUnit.SECONDS); @@ -145,23 +149,25 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest public void test_Acquire_Process_Release_Acquire_ReturnsSameConnection() throws Exception { HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort())); - HttpConnectionOverHTTP connection1 = destination.acquire(); + destination.start(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); + HttpConnectionOverHTTP connection1 = (HttpConnectionOverHTTP)connectionPool.acquire(); long start = System.nanoTime(); while (connection1 == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5) { TimeUnit.MILLISECONDS.sleep(50); - connection1 = (HttpConnectionOverHTTP)destination.getConnectionPool().getIdleConnections().peek(); + connection1 = (HttpConnectionOverHTTP)connectionPool.getIdleConnections().peek(); } Assert.assertNotNull(connection1); // Acquire the connection to make it active - Assert.assertSame(connection1, destination.acquire()); + Assert.assertSame(connection1, connectionPool.acquire()); destination.process(connection1); destination.release(connection1); - Connection connection2 = destination.acquire(); + Connection connection2 = connectionPool.acquire(); Assert.assertSame(connection1, connection2); } @@ -172,7 +178,9 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest client.setIdleTimeout(idleTimeout); HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort())); - Connection connection1 = destination.acquire(); + destination.start(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); + Connection connection1 = connectionPool.acquire(); if (connection1 == null) { // There are no queued requests, so the newly created connection will be idle @@ -180,13 +188,13 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest while (connection1 == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5) { TimeUnit.MILLISECONDS.sleep(50); - connection1 = destination.getConnectionPool().getIdleConnections().peek(); + connection1 = connectionPool.getIdleConnections().peek(); } Assert.assertNotNull(connection1); TimeUnit.MILLISECONDS.sleep(2 * idleTimeout); - connection1 = destination.getConnectionPool().getIdleConnections().poll(); + connection1 = connectionPool.getIdleConnections().poll(); Assert.assertNull(connection1); } } @@ -210,35 +218,23 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) .path("/one") - .onRequestQueued(new Request.QueuedListener() + .onRequestQueued(request -> { - @Override - public void onQueued(Request request) - { - // This request exceeds the maximum queued, should fail - client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) - .path("/two") - .send(new Response.CompleteListener() - { - @Override - public void onComplete(Result result) - { - Assert.assertTrue(result.isFailed()); - Assert.assertThat(result.getRequestFailure(), Matchers.instanceOf(RejectedExecutionException.class)); - failureLatch.countDown(); - } - }); - } + // This request exceeds the maximum queued, should fail + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .path("/two") + .send(result -> + { + Assert.assertTrue(result.isFailed()); + Assert.assertThat(result.getRequestFailure(), Matchers.instanceOf(RejectedExecutionException.class)); + failureLatch.countDown(); + }); }) - .send(new Response.CompleteListener() + .send(result -> { - @Override - public void onComplete(Result result) - { - if (result.isSucceeded()) - successLatch.countDown(); - } + if (result.isSucceeded()) + successLatch.countDown(); }); Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS)); diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java index e057cf34f9..feb5cf0c58 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java @@ -30,6 +30,7 @@ import org.eclipse.jetty.client.HttpExchange; import org.eclipse.jetty.client.HttpRequest; import org.eclipse.jetty.client.HttpResponseException; import org.eclipse.jetty.client.Origin; +import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.util.FutureResponseListener; import org.eclipse.jetty.http.HttpFields; @@ -60,6 +61,7 @@ public class HttpReceiverOverHTTPTest client = new HttpClient(); client.start(); destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080)); + destination.start(); endPoint = new ByteArrayEndPoint(); connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<>()); endPoint.setConnection(connection); @@ -235,7 +237,7 @@ public class HttpReceiverOverHTTPTest } }; endPoint.setConnection(connection); - + // Partial response to trigger the call to fillInterested(). endPoint.addInput("" + "HTTP/1.1 200 OK\r\n" + diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpSenderOverHTTPTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpSenderOverHTTPTest.java index b98aea13ab..e592c42ca8 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpSenderOverHTTPTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpSenderOverHTTPTest.java @@ -67,6 +67,7 @@ public class HttpSenderOverHTTPTest { ByteArrayEndPoint endPoint = new ByteArrayEndPoint(); HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080)); + destination.start(); HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<Connection>()); Request request = client.newRequest(URI.create("http://localhost/")); final CountDownLatch headersLatch = new CountDownLatch(1); @@ -100,6 +101,7 @@ public class HttpSenderOverHTTPTest { ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16); HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080)); + destination.start(); HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<Connection>()); Request request = client.newRequest(URI.create("http://localhost/")); connection.send(request, null); @@ -129,6 +131,7 @@ public class HttpSenderOverHTTPTest // Shutdown output to trigger the exception on write endPoint.shutdownOutput(); HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080)); + destination.start(); HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<Connection>()); Request request = client.newRequest(URI.create("http://localhost/")); final CountDownLatch failureLatch = new CountDownLatch(2); @@ -158,6 +161,7 @@ public class HttpSenderOverHTTPTest { ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16); HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080)); + destination.start(); HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<Connection>()); Request request = client.newRequest(URI.create("http://localhost/")); final CountDownLatch failureLatch = new CountDownLatch(2); @@ -193,6 +197,7 @@ public class HttpSenderOverHTTPTest { ByteArrayEndPoint endPoint = new ByteArrayEndPoint(); HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080)); + destination.start(); HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<Connection>()); Request request = client.newRequest(URI.create("http://localhost/")); String content = "abcdef"; @@ -227,6 +232,7 @@ public class HttpSenderOverHTTPTest { ByteArrayEndPoint endPoint = new ByteArrayEndPoint(); HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080)); + destination.start(); HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<Connection>()); Request request = client.newRequest(URI.create("http://localhost/")); String content1 = "0123456789"; @@ -262,6 +268,7 @@ public class HttpSenderOverHTTPTest { ByteArrayEndPoint endPoint = new ByteArrayEndPoint(); HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080)); + destination.start(); HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<Connection>()); Request request = client.newRequest(URI.create("http://localhost/")); String content1 = "0123456789"; |