diff options
author | Simone Bordet | 2015-09-01 13:44:30 +0000 |
---|---|---|
committer | Simone Bordet | 2015-09-01 13:44:30 +0000 |
commit | b23c2bd30920a1bab9b9c215e1af0a452c6e9d7b (patch) | |
tree | adcf84bc0a6a85b07e64413f45c202b6c7655e39 | |
parent | 60136c68253e5c587aa789cd8eb983467c8a3d15 (diff) | |
download | org.eclipse.jetty.project-b23c2bd30920a1bab9b9c215e1af0a452c6e9d7b.tar.gz org.eclipse.jetty.project-b23c2bd30920a1bab9b9c215e1af0a452c6e9d7b.tar.xz org.eclipse.jetty.project-b23c2bd30920a1bab9b9c215e1af0a452c6e9d7b.zip |
476170 - Support servers that close connections without sending Connection: close header.
Removed previous implementation in favor of a customized
ConnectionPool, that gives more flexibility on the actual logic to
validate connections.
10 files changed, 488 insertions, 256 deletions
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java index 83b1856222..ecf45697e1 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java @@ -44,7 +44,7 @@ import org.eclipse.jetty.util.thread.Sweeper; @ManagedObject("The connection pool") public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable { - protected static final Logger LOG = Log.getLogger(ConnectionPool.class); + private static final Logger LOG = Log.getLogger(ConnectionPool.class); private final AtomicInteger connectionCount = new AtomicInteger(); private final ReentrantLock lock = new ReentrantLock(); @@ -129,7 +129,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable idleCreated(connection); - requester.succeeded(); + proceed(); } @Override @@ -150,11 +150,15 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable } } + protected void proceed() + { + requester.succeeded(); + } + protected void idleCreated(Connection connection) { boolean idle; - final ReentrantLock lock = this.lock; - lock.lock(); + lock(); try { // Use "cold" new connections as last. @@ -162,7 +166,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable } finally { - lock.unlock(); + unlock(); } idle(connection, idle); @@ -172,8 +176,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable { boolean acquired; Connection connection; - final ReentrantLock lock = this.lock; - lock.lock(); + lock(); try { connection = idleConnections.pollFirst(); @@ -183,7 +186,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable } finally { - lock.unlock(); + unlock(); } if (acquired) @@ -209,24 +212,28 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable public boolean release(Connection connection) { boolean idle; - final ReentrantLock lock = this.lock; - lock.lock(); + lock(); try { if (!activeConnections.remove(connection)) return false; // Make sure we use "hot" connections first. - idle = idleConnections.offerFirst(connection); + idle = offerIdle(connection); } finally { - lock.unlock(); + unlock(); } released(connection); return idle(connection, idle); } + protected boolean offerIdle(Connection connection) + { + return idleConnections.offerFirst(connection); + } + protected boolean idle(Connection connection, boolean idle) { if (idle) @@ -250,10 +257,14 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable public boolean remove(Connection connection) { + return remove(connection, false); + } + + protected boolean remove(Connection connection, boolean force) + { boolean activeRemoved; boolean idleRemoved; - final ReentrantLock lock = this.lock; - lock.lock(); + lock(); try { activeRemoved = activeConnections.remove(connection); @@ -261,12 +272,12 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable } finally { - lock.unlock(); + unlock(); } if (activeRemoved) released(connection); - boolean removed = activeRemoved || idleRemoved; + boolean removed = activeRemoved || idleRemoved || force; if (removed) { int pooled = connectionCount.decrementAndGet(); @@ -278,29 +289,27 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable public boolean isActive(Connection connection) { - final ReentrantLock lock = this.lock; - lock.lock(); + lock(); try { return activeConnections.contains(connection); } finally { - lock.unlock(); + unlock(); } } public boolean isIdle(Connection connection) { - final ReentrantLock lock = this.lock; - lock.lock(); + lock(); try { return idleConnections.contains(connection); } finally { - lock.unlock(); + unlock(); } } @@ -313,8 +322,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable { List<Connection> idles = new ArrayList<>(); List<Connection> actives = new ArrayList<>(); - final ReentrantLock lock = this.lock; - lock.lock(); + lock(); try { idles.addAll(idleConnections); @@ -324,7 +332,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable } finally { - lock.unlock(); + unlock(); } connectionCount.set(0); @@ -348,8 +356,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable { List<Connection> actives = new ArrayList<>(); List<Connection> idles = new ArrayList<>(); - final ReentrantLock lock = this.lock; - lock.lock(); + lock(); try { actives.addAll(activeConnections); @@ -357,7 +364,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable } finally { - lock.unlock(); + unlock(); } ContainerLifeCycle.dumpObject(out, this); @@ -368,8 +375,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable public boolean sweep() { List<Sweeper.Sweepable> toSweep = new ArrayList<>(); - final ReentrantLock lock = this.lock; - lock.lock(); + lock(); try { for (Connection connection : getActiveConnections()) @@ -380,7 +386,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable } finally { - lock.unlock(); + unlock(); } for (Sweeper.Sweepable candidate : toSweep) @@ -400,13 +406,22 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable return false; } + protected void lock() + { + lock.lock(); + } + + protected void unlock() + { + lock.unlock(); + } + @Override public String toString() { int activeSize; int idleSize; - final ReentrantLock lock = this.lock; - lock.lock(); + lock(); try { activeSize = activeConnections.size(); @@ -414,7 +429,7 @@ public class ConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable } finally { - lock.unlock(); + unlock(); } return String.format("%s[c=%d/%d,a=%d,i=%d]", diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java index d202476012..93337516af 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java @@ -144,7 +144,6 @@ public class HttpClient extends ContainerLifeCycle private volatile HttpField encodingField; private volatile boolean removeIdleDestinations = false; private volatile boolean connectBlocking = false; - private volatile boolean validateConnections = false; /** * Creates a {@link HttpClient} instance that can perform requests to non-TLS destinations only @@ -1027,32 +1026,6 @@ public class HttpClient extends ContainerLifeCycle } /** - * @return whether connections are validated before sending a request - * @see #setValidateConnections(boolean) - */ - public boolean isValidateConnections() - { - return validateConnections; - } - - /** - * <p>Whether connections are validated before sending a request.</p> - * <p>This is a best-effort attempt to validate that the connection - * onto which requests are sent is valid before sending the request.</p> - * <p>The validation is performed only if the underlying transport - * implementation supports validating connections before their use - * (some transport implementation may not be able to perform such - * check).</p> - * - * @param validateConnections whether connections are validated before sending a request. - * @see #isValidateConnections() - */ - public void setValidateConnections(boolean validateConnections) - { - this.validateConnections = validateConnections; - } - - /** * @return the forward proxy configuration */ public ProxyConfiguration getProxyConfiguration() diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/LeakTrackingConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/LeakTrackingConnectionPool.java index cf500c89a2..7762af09f9 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/LeakTrackingConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/LeakTrackingConnectionPool.java @@ -22,9 +22,13 @@ import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Destination; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.LeakDetector; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; public class LeakTrackingConnectionPool extends ConnectionPool { + private static final Logger LOG = Log.getLogger(LeakTrackingConnectionPool.class); + private final LeakDetector<Connection> leakDetector = new LeakDetector<Connection>() { @Override diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/PoolingHttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/PoolingHttpDestination.java index 7989dcb86d..3030a97658 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/PoolingHttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/PoolingHttpDestination.java @@ -71,32 +71,20 @@ public abstract class PoolingHttpDestination<C extends Connection> extends HttpD { if (getHttpExchanges().isEmpty()) return; - C connection = acquire(); - if (connection != null) - process(connection); + process(); } + @SuppressWarnings("unchecked") public C acquire() { - while (true) - { - @SuppressWarnings("unchecked") - C c = (C)connectionPool.acquire(); - if (isValid(c)) - return c; - else - c.close(); - } + return (C)connectionPool.acquire(); } - private boolean isValid(Connection c) + private void process() { - if (getHttpClient().isValidateConnections()) - { - if (c instanceof Validateable) - return ((Validateable)c).validate(); - } - return true; + C connection = acquire(); + if (connection != null) + process(connection); } /** @@ -154,21 +142,21 @@ public abstract class PoolingHttpDestination<C extends Connection> extends HttpD @SuppressWarnings("unchecked") C connection = (C)c; if (LOG.isDebugEnabled()) - LOG.debug("{} released", connection); + LOG.debug("Released {}", connection); HttpClient client = getHttpClient(); if (client.isRunning()) { if (connectionPool.isActive(connection)) { - if (isValid(connection)) - process(connection); + if (connectionPool.release(connection)) + send(); else connection.close(); } else { if (LOG.isDebugEnabled()) - LOG.debug("{} explicit", connection); + LOG.debug("Released explicit {}", connection); } } else @@ -204,9 +192,7 @@ public abstract class PoolingHttpDestination<C extends Connection> extends HttpD // We need to execute queued requests even if this connection failed. // We may create a connection that is not needed, but it will eventually // idle timeout, so no worries. - C newConnection = acquire(); - if (newConnection != null) - process(newConnection); + process(); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/Validateable.java b/jetty-client/src/main/java/org/eclipse/jetty/client/Validateable.java deleted file mode 100644 index a42b653145..0000000000 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/Validateable.java +++ /dev/null @@ -1,24 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.client; - -public interface Validateable -{ - boolean validate(); -} diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/ValidatingConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/ValidatingConnectionPool.java new file mode 100644 index 0000000000..9ddc97a4f0 --- /dev/null +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/ValidatingConnectionPool.java @@ -0,0 +1,209 @@ +// +// ======================================================================== +// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.eclipse.jetty.client.api.Connection; +import org.eclipse.jetty.client.api.Destination; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.annotation.ManagedAttribute; +import org.eclipse.jetty.util.component.ContainerLifeCycle; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Scheduler; + +/** + * <p>A {@link ConnectionPool} that validates connections before + * making them available for use.</p> + * <p>Connections that have just been opened are not validated. + * Connections that are {@link #release(Connection) released} will + * be validated.</p> + * <p>Validation by reading from the EndPoint is not reliable, + * since the TCP FIN may arrive just after the validation read.</p> + * <p>This class validates connections by putting them in a + * "quarantine" for a configurable timeout, where they cannot + * be used to send requests. When the timeout expires, the + * quarantined connection is made idle and therefore available + * to send requests.</p> + * <p>The existing HttpClient mechanism to detect server closes + * will trigger and close quarantined connections, before they + * are made idle (and reusable) again.</p> + * <p>There still is a small chance that the timeout expires, + * the connection is made idle and available again, it is used + * to send a request exactly when the server decides to close. + * This case is however unavoidable and may be mitigated by + * tuning the idle timeout of the servers to be larger than + * that of the client.</p> + */ +public class ValidatingConnectionPool extends ConnectionPool +{ + private static final Logger LOG = Log.getLogger(ValidatingConnectionPool.class); + + private final Scheduler scheduler; + private final long timeout; + private final Map<Connection, Holder> quarantine; + + public ValidatingConnectionPool(Destination destination, int maxConnections, Callback requester, Scheduler scheduler, long timeout) + { + super(destination, maxConnections, requester); + this.scheduler = scheduler; + this.timeout = timeout; + this.quarantine = new HashMap<>(maxConnections); + } + + @ManagedAttribute(value = "The number of validating connections", readonly = true) + public int getValidatingConnectionCount() + { + return quarantine.size(); + } + + @Override + public boolean release(Connection connection) + { + lock(); + try + { + if (!getActiveConnections().remove(connection)) + return false; + Holder holder = new Holder(connection); + holder.task = scheduler.schedule(holder, timeout, TimeUnit.MILLISECONDS); + quarantine.put(connection, holder); + if (LOG.isDebugEnabled()) + LOG.debug("Validating for {}ms {}", timeout, connection); + } + finally + { + unlock(); + } + + released(connection); + return true; + } + + @Override + public boolean remove(Connection connection) + { + Holder holder; + lock(); + try + { + holder = quarantine.remove(connection); + } + finally + { + unlock(); + } + + if (holder == null) + return super.remove(connection); + + if (LOG.isDebugEnabled()) + LOG.debug("Removed while validating {}", connection); + + boolean cancelled = holder.cancel(); + if (cancelled) + return remove(connection, true); + + return super.remove(connection); + } + + @Override + public void dump(Appendable out, String indent) throws IOException + { + super.dump(out, indent); + ContainerLifeCycle.dump(out, indent, quarantine.values()); + } + + @Override + public String toString() + { + int size; + lock(); + try + { + size = quarantine.size(); + } + finally + { + unlock(); + } + return String.format("%s[v=%d]", super.toString(), size); + } + + private class Holder implements Runnable + { + private final long timestamp = System.nanoTime(); + private final AtomicBoolean latch = new AtomicBoolean(); + private final Connection connection; + public Scheduler.Task task; + + public Holder(Connection connection) + { + this.connection = connection; + } + + @Override + public void run() + { + if (latch.compareAndSet(false, true)) + { + boolean idle; + lock(); + try + { + quarantine.remove(connection); + idle = offerIdle(connection); + if (LOG.isDebugEnabled()) + LOG.debug("Validated {}", connection); + } + finally + { + unlock(); + } + + if (idle(connection, idle)) + proceed(); + } + } + + public boolean cancel() + { + if (latch.compareAndSet(false, true)) + { + task.cancel(); + return true; + } + return false; + } + + @Override + public String toString() + { + return String.format("%s[validationLeft=%dms]", + connection, + timeout - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - timestamp) + ); + } + } +} diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java index 2a3779fae7..da9fcd6e08 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java @@ -18,7 +18,6 @@ package org.eclipse.jetty.client.http; -import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -27,19 +26,17 @@ import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jetty.client.HttpConnection; import org.eclipse.jetty.client.HttpDestination; import org.eclipse.jetty.client.HttpExchange; -import org.eclipse.jetty.client.Validateable; import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.io.AbstractConnection; -import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Sweeper; -public class HttpConnectionOverHTTP extends AbstractConnection implements Connection, Sweeper.Sweepable, Validateable +public class HttpConnectionOverHTTP extends AbstractConnection implements Connection, Sweeper.Sweepable { private static final Logger LOG = Log.getLogger(HttpConnectionOverHTTP.class); @@ -144,10 +141,10 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec getHttpDestination().close(this); getEndPoint().shutdownOutput(); if (LOG.isDebugEnabled()) - LOG.debug("{} oshut", this); + LOG.debug("Shutdown {}", this); getEndPoint().close(); if (LOG.isDebugEnabled()) - LOG.debug("{} closed", this); + LOG.debug("Closed {}", this); abort(failure); } @@ -175,32 +172,6 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec } @Override - public boolean validate() - { - ByteBufferPool byteBufferPool = getHttpDestination().getHttpClient().getByteBufferPool(); - ByteBuffer buffer = byteBufferPool.acquire(1, true); - try - { - EndPoint endPoint = getEndPoint(); - int filled = endPoint.fill(buffer); - if (LOG.isDebugEnabled()) - LOG.debug("Validated {} {}", filled, this); - // Invalid if we read -1 or garbage bytes. - return filled == 0; - } - catch (Throwable x) - { - if (LOG.isDebugEnabled()) - LOG.debug("Could not validate connection " + this, x); - return false; - } - finally - { - byteBufferPool.release(buffer); - } - } - - @Override public String toString() { return String.format("%s@%h(l:%s <-> r:%s,closed=%b)[%s]", diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/AbstractHttpClientServerTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/AbstractHttpClientServerTest.java index 1ca753c1d8..034dfdaee6 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/AbstractHttpClientServerTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/AbstractHttpClientServerTest.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.client; import java.util.Arrays; import java.util.Collection; +import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; import org.eclipse.jetty.http.HttpScheme; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.NetworkConnector; @@ -85,9 +86,14 @@ public abstract class AbstractHttpClientServerTest protected void startClient() throws Exception { + startClient(new HttpClientTransportOverHTTP(1)); + } + + protected void startClient(HttpClientTransport transport) throws Exception + { QueuedThreadPool clientThreads = new QueuedThreadPool(); clientThreads.setName("client"); - client = new HttpClient(sslContextFactory); + client = new HttpClient(transport, sslContextFactory); client.setExecutor(clientThreads); client.start(); } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java index 5b8d0f758d..662b32a34e 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java @@ -74,9 +74,7 @@ import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpHeaderValue; import org.eclipse.jetty.http.HttpMethod; -import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpVersion; -import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.toolchain.test.TestingDir; import org.eclipse.jetty.toolchain.test.annotation.Slow; @@ -1541,116 +1539,6 @@ public class HttpClientTest extends AbstractHttpClientServerTest } } - @Test - public void testServerClosesConnectionAfterRedirect() throws Exception - { - start(new AbstractHandler() - { - @Override - public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException - { - baseRequest.setHandled(true); - if (target.endsWith("/redirect")) - { - response.setStatus(HttpStatus.TEMPORARY_REDIRECT_307); - response.setContentLength(0); - response.setHeader(HttpHeader.LOCATION.asString(), scheme + "://localhost:" + connector.getLocalPort() + "/"); - response.flushBuffer(); - baseRequest.getHttpChannel().getEndPoint().shutdownOutput(); - } - else - { - response.setStatus(HttpStatus.OK_200); - response.setContentLength(0); - response.setHeader(HttpHeader.CONNECTION.asString(), HttpHeaderValue.CLOSE.asString()); - } - } - }); - - client.setValidateConnections(true); - - ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) - .path("/redirect") - .send(); - Assert.assertEquals(200, response.getStatus()); - } - - @Test - public void testServerClosesConnectionAfterResponseWithQueuedRequestWithMaxConnectionsWithConnectionCloseHeader() throws Exception - { - testServerClosesConnectionAfterResponseWithQueuedRequestWithMaxConnections(new AbstractHandler() - { - @Override - public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException - { - baseRequest.setHandled(true); - response.setStatus(HttpStatus.OK_200); - response.setContentLength(0); - response.setHeader(HttpHeader.CONNECTION.asString(), HttpHeaderValue.CLOSE.asString()); - } - }); - } - - @Test - public void testServerClosesConnectionAfterResponseWithQueuedRequestWithMaxConnectionsWithoutConnectionCloseHeader() throws Exception - { - testServerClosesConnectionAfterResponseWithQueuedRequestWithMaxConnections(new AbstractHandler() - { - @Override - public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException - { - baseRequest.setHandled(true); - response.setStatus(HttpStatus.OK_200); - response.setContentLength(0); - response.flushBuffer(); - baseRequest.getHttpChannel().getEndPoint().shutdownOutput(); - } - }); - } - - private void testServerClosesConnectionAfterResponseWithQueuedRequestWithMaxConnections(Handler handler) throws Exception - { - start(handler); - - client.setValidateConnections(true); - client.setMaxConnectionsPerDestination(1); - - final CountDownLatch latch = new CountDownLatch(1); - Request request1 = client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) - .path("/one") - .onRequestBegin(r -> - { - try - { - latch.await(); - } - catch (InterruptedException x) - { - r.abort(x); - } - }); - FutureResponseListener listener1 = new FutureResponseListener(request1); - request1.send(listener1); - - Request request2 = client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) - .path("/two"); - FutureResponseListener listener2 = new FutureResponseListener(request2); - request2.send(listener2); - - // Now we have one request about to be sent, and one queued. - - latch.countDown(); - - ContentResponse response1 = listener1.get(5, TimeUnit.SECONDS); - Assert.assertEquals(200, response1.getStatus()); - - ContentResponse response2 = listener2.get(5, TimeUnit.SECONDS); - Assert.assertEquals(200, response2.getStatus()); - } - private void consume(InputStream input, boolean eof) throws IOException { int crlfs = 0; diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ValidatingConnectionPoolTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ValidatingConnectionPoolTest.java new file mode 100644 index 0000000000..d6b707f0d5 --- /dev/null +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ValidatingConnectionPoolTest.java @@ -0,0 +1,204 @@ +// +// ======================================================================== +// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; +import org.eclipse.jetty.client.http.HttpDestinationOverHTTP; +import org.eclipse.jetty.client.util.FutureResponseListener; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpHeaderValue; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.junit.Assert; +import org.junit.Test; + +public class ValidatingConnectionPoolTest extends AbstractHttpClientServerTest +{ + public ValidatingConnectionPoolTest(SslContextFactory sslContextFactory) + { + super(sslContextFactory); + } + + @Override + protected void startClient() throws Exception + { + startClient(new ValidatingHttpClientTransportOverHTTP(1000)); + } + + @Test + public void testRequestAfterValidation() throws Exception + { + start(new EmptyServerHandler()); + + client.setMaxConnectionsPerDestination(1); + + ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .send(); + Assert.assertEquals(200, response.getStatus()); + + // The second request should be sent after the validating timeout. + response = client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .send(); + Assert.assertEquals(200, response.getStatus()); + } + + @Test + public void testServerClosesConnectionAfterRedirectWithoutConnectionCloseHeader() throws Exception + { + start(new AbstractHandler() + { + @Override + public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + if (target.endsWith("/redirect")) + { + response.setStatus(HttpStatus.TEMPORARY_REDIRECT_307); + response.setContentLength(0); + response.setHeader(HttpHeader.LOCATION.asString(), scheme + "://localhost:" + connector.getLocalPort() + "/"); + response.flushBuffer(); + baseRequest.getHttpChannel().getEndPoint().shutdownOutput(); + } + else + { + response.setStatus(HttpStatus.OK_200); + response.setContentLength(0); + response.setHeader(HttpHeader.CONNECTION.asString(), HttpHeaderValue.CLOSE.asString()); + } + } + }); + + ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .path("/redirect") + .send(); + Assert.assertEquals(200, response.getStatus()); + } + + @Test + public void testServerClosesConnectionAfterResponseWithQueuedRequestWithMaxConnectionsWithConnectionCloseHeader() throws Exception + { + testServerClosesConnectionAfterResponseWithQueuedRequestWithMaxConnections(new AbstractHandler() + { + @Override + public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + response.setStatus(HttpStatus.OK_200); + response.setContentLength(0); + response.setHeader(HttpHeader.CONNECTION.asString(), HttpHeaderValue.CLOSE.asString()); + } + }); + } + + @Test + public void testServerClosesConnectionAfterResponseWithQueuedRequestWithMaxConnectionsWithoutConnectionCloseHeader() throws Exception + { + testServerClosesConnectionAfterResponseWithQueuedRequestWithMaxConnections(new AbstractHandler() + { + @Override + public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + response.setStatus(HttpStatus.OK_200); + response.setContentLength(0); + response.flushBuffer(); + baseRequest.getHttpChannel().getEndPoint().shutdownOutput(); + } + }); + } + + private void testServerClosesConnectionAfterResponseWithQueuedRequestWithMaxConnections(Handler handler) throws Exception + { + start(handler); + client.setMaxConnectionsPerDestination(1); + + final CountDownLatch latch = new CountDownLatch(1); + Request request1 = client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .path("/one") + .onRequestBegin(r -> + { + try + { + latch.await(); + } + catch (InterruptedException x) + { + r.abort(x); + } + }); + FutureResponseListener listener1 = new FutureResponseListener(request1); + request1.send(listener1); + + Request request2 = client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .path("/two"); + FutureResponseListener listener2 = new FutureResponseListener(request2); + request2.send(listener2); + + // Now we have one request about to be sent, and one queued. + + latch.countDown(); + + ContentResponse response1 = listener1.get(5, TimeUnit.SECONDS); + Assert.assertEquals(200, response1.getStatus()); + + ContentResponse response2 = listener2.get(5, TimeUnit.SECONDS); + Assert.assertEquals(200, response2.getStatus()); + } + + private static class ValidatingHttpClientTransportOverHTTP extends HttpClientTransportOverHTTP + { + private final long timeout; + + public ValidatingHttpClientTransportOverHTTP(long timeout) + { + super(1); + this.timeout = timeout; + } + + @Override + public HttpDestination newHttpDestination(Origin origin) + { + return new HttpDestinationOverHTTP(getHttpClient(), origin) + { + @Override + protected ConnectionPool newConnectionPool(HttpClient client) + { + return new ValidatingConnectionPool(this, client.getMaxConnectionsPerDestination(), this, client.getScheduler(), timeout); + } + }; + } + } +} |