diff options
author | Simone Bordet | 2012-10-26 09:46:01 +0000 |
---|---|---|
committer | Simone Bordet | 2012-10-26 12:57:06 +0000 |
commit | 58e8ff8fbfe96876031efea8b709e86d0c21069b (patch) | |
tree | 464e1c3bbd24cfbc3664118f42f79da01bb5169b /jetty-client | |
parent | 61ba84bda5f27ff2c4729a3c81f1534837ae3318 (diff) | |
download | org.eclipse.jetty.project-58e8ff8fbfe96876031efea8b709e86d0c21069b.tar.gz org.eclipse.jetty.project-58e8ff8fbfe96876031efea8b709e86d0c21069b.tar.xz org.eclipse.jetty.project-58e8ff8fbfe96876031efea8b709e86d0c21069b.zip |
#392733 - Implement a total timeout for asynchronous sends.
Reworked the implementation.
Instead of adding another method for asynchronous sends with
timeout parameters, we now use a TimedResponseListener utility
class, that holds the timeout information.
Diffstat (limited to 'jetty-client')
8 files changed, 298 insertions, 73 deletions
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java index a56be6d49a..c375d22e4b 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java @@ -36,7 +36,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLEngine; import org.eclipse.jetty.client.api.AuthenticationStore; @@ -308,7 +307,7 @@ public class HttpClient extends ContainerLifeCycle return new ArrayList<Destination>(destinations.values()); } - protected void send(final Request request, long timeout, TimeUnit unit, Response.Listener listener) + protected void send(final Request request, Response.Listener listener) { String scheme = request.scheme().toLowerCase(); if (!Arrays.asList("http", "https").contains(scheme)) @@ -318,17 +317,8 @@ public class HttpClient extends ContainerLifeCycle if (port < 0) port = "https".equals(scheme) ? 443 : 80; - if (timeout > 0) - { - scheduler.schedule(new Runnable() - { - @Override - public void run() - { - request.abort("Total timeout elapsed"); - } - }, timeout, unit); - } + if (listener instanceof ResponseListener.Timed) + ((ResponseListener.Timed)listener).schedule(scheduler); HttpDestination destination = provideDestination(scheme, request.host(), port); destination.send(request, listener); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java index 56e7f8befa..bb9c08af7d 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java @@ -116,6 +116,9 @@ public class HttpConnection extends AbstractConnection implements Connection setExchange(exchange); conversation.exchanges().offer(exchange); + if (listener instanceof ResponseListener.Timed) + ((ResponseListener.Timed)listener).schedule(client.getScheduler()); + sender.send(exchange); } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java index d05ac84c0d..705063ff5b 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java @@ -160,7 +160,13 @@ public class HttpExchange // Request and response completed LOG.debug("{} complete", this); if (conversation.last() == this) + { + HttpExchange first = conversation.exchanges().peekFirst(); + Response.Listener listener = first.listener(); + if (listener instanceof ResponseListener.Timed) + ((ResponseListener.Timed)listener).cancel(); conversation.complete(); + } } result = new Result(request(), requestFailure(), response(), responseFailure()); } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java index cc740e9bb8..7d839f01eb 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java @@ -29,7 +29,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.eclipse.jetty.client.api.ContentProvider; @@ -323,13 +322,7 @@ public class HttpRequest implements Request @Override public void send(final Response.Listener listener) { - send(0, TimeUnit.SECONDS, listener); - } - - @Override - public void send(long timeout, TimeUnit unit, Response.Listener listener) - { - client.send(this, timeout, unit, listener); + client.send(this, listener); } @Override diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/ResponseListener.java b/jetty-client/src/main/java/org/eclipse/jetty/client/ResponseListener.java new file mode 100644 index 0000000000..dd61fb0bff --- /dev/null +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/ResponseListener.java @@ -0,0 +1,32 @@ +// +// ======================================================================== +// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client; + +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.util.thread.Scheduler; + +public interface ResponseListener extends Response.Listener +{ + public interface Timed extends Response.Listener + { + public boolean schedule(Scheduler scheduler); + + public boolean cancel(); + } +} diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java b/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java index adb0cc4360..5600f90b8d 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java @@ -23,7 +23,6 @@ import java.nio.file.Path; import java.util.List; import java.util.Map; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.util.InputStreamResponseListener; @@ -242,26 +241,10 @@ public interface Request * as they happen, or when the application needs to efficiently manage the response content. * * @param listener the listener that receives response events - * @see #send(long, TimeUnit, Response.Listener) */ void send(Response.Listener listener); /** - * Sends this request and asynchronously notifies the given listener for response events. - * <p /> - * This method should be used when the application needs to be notified of the various response events - * as they happen, or when the application needs to efficiently manage the response content. - * <p /> - * This method waits for the given timeout before aborting the HTTP conversation. A {@code timeout} - * value of zero means to wait indefinitely to the conversation to complete. - * - * @param timeout the total timeout in the given {@code unit} - * @param unit the timeout unit - * @param listener the listener that receives response events - */ - void send(long timeout, TimeUnit unit, Response.Listener listener); - - /** * Attempts to abort the send of this request. * * @param reason the abort reason diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/TimedResponseListener.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/TimedResponseListener.java new file mode 100644 index 0000000000..693feebedc --- /dev/null +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/TimedResponseListener.java @@ -0,0 +1,126 @@ +// +// ======================================================================== +// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client.util; + +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.jetty.client.ResponseListener; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Scheduler; + +public class TimedResponseListener implements ResponseListener.Timed, Runnable +{ + private static final Logger LOG = Log.getLogger(TimedResponseListener.class); + + private final AtomicReference<Scheduler.Task> task = new AtomicReference<>(); + private final long timeout; + private final TimeUnit unit; + private final Request request; + private final Response.Listener delegate; + + public TimedResponseListener(long timeout, TimeUnit unit, Request request) + { + this(timeout, unit, request, new Empty()); + } + + public TimedResponseListener(long timeout, TimeUnit unit, Request request, Response.Listener delegate) + { + this.timeout = timeout; + this.unit = unit; + this.request = request; + this.delegate = delegate; + } + + @Override + public void onBegin(Response response) + { + delegate.onBegin(response); + } + + @Override + public void onHeaders(Response response) + { + delegate.onHeaders(response); + } + + @Override + public void onContent(Response response, ByteBuffer content) + { + delegate.onContent(response, content); + } + + @Override + public void onSuccess(Response response) + { + delegate.onSuccess(response); + } + + @Override + public void onFailure(Response response, Throwable failure) + { + delegate.onFailure(response, failure); + } + + @Override + public void onComplete(Result result) + { + delegate.onComplete(result); + } + + public boolean schedule(Scheduler scheduler) + { + Scheduler.Task task = this.task.get(); + if (task != null) + return false; + + task = scheduler.schedule(this, timeout, unit); + if (this.task.compareAndSet(null, task)) + { + LOG.debug("Scheduled timeout task {} in {} ms", task, unit.toMillis(timeout)); + return true; + } + else + { + task.cancel(); + return false; + } + } + + @Override + public void run() + { + request.abort("Total timeout elapsed"); + } + + public boolean cancel() + { + Scheduler.Task task = this.task.get(); + if (task == null) + return false; + boolean result = task.cancel(); + LOG.debug("Cancelled timeout task {}", task); + return result; + } +} diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTimeoutTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTimeoutTest.java index 753a74f5a6..deff91a3cc 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTimeoutTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTimeoutTest.java @@ -26,11 +26,17 @@ import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.client.api.Connection; +import org.eclipse.jetty.client.api.Destination; +import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; -import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.client.util.BufferingResponseListener; +import org.eclipse.jetty.client.util.BytesContentProvider; +import org.eclipse.jetty.client.util.TimedResponseListener; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.toolchain.test.annotation.Slow; +import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.junit.Assert; import org.junit.Test; @@ -62,17 +68,16 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest start(new TimeoutHandler(2 * timeout)); final CountDownLatch latch = new CountDownLatch(1); - client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) - .send(timeout, TimeUnit.MILLISECONDS, new Response.Listener.Empty() - { - @Override - public void onComplete(Result result) - { - Assert.assertTrue(result.isFailed()); - latch.countDown(); - } - }); + Request request = client.newRequest("localhost", connector.getLocalPort()).scheme(scheme); + request.send(new TimedResponseListener(timeout, TimeUnit.MILLISECONDS, request) + { + @Override + public void onComplete(Result result) + { + Assert.assertTrue(result.isFailed()); + latch.countDown(); + } + }); Assert.assertTrue(latch.await(3 * timeout, TimeUnit.MILLISECONDS)); } @@ -88,31 +93,29 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest // The first request has a long timeout final CountDownLatch firstLatch = new CountDownLatch(1); - client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) - .send(4 * timeout, TimeUnit.MILLISECONDS, new Response.Listener.Empty() - { - @Override - public void onComplete(Result result) - { - Assert.assertFalse(result.isFailed()); - firstLatch.countDown(); - } - }); + Request request = client.newRequest("localhost", connector.getLocalPort()).scheme(scheme); + request.send(new TimedResponseListener(4 * timeout, TimeUnit.MILLISECONDS, request) + { + @Override + public void onComplete(Result result) + { + Assert.assertFalse(result.isFailed()); + firstLatch.countDown(); + } + }); // Second request has a short timeout and should fail in the queue final CountDownLatch secondLatch = new CountDownLatch(1); - client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) - .send(timeout, TimeUnit.MILLISECONDS, new Response.Listener.Empty() - { - @Override - public void onComplete(Result result) - { - Assert.assertTrue(result.isFailed()); - secondLatch.countDown(); - } - }); + request = client.newRequest("localhost", connector.getLocalPort()).scheme(scheme); + request.send(new TimedResponseListener(timeout, TimeUnit.MILLISECONDS, request) + { + @Override + public void onComplete(Result result) + { + Assert.assertTrue(result.isFailed()); + secondLatch.countDown(); + } + }); Assert.assertTrue(secondLatch.await(2 * timeout, TimeUnit.MILLISECONDS)); // The second request must fail before the first request has completed @@ -120,6 +123,94 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest Assert.assertTrue(firstLatch.await(5 * timeout, TimeUnit.MILLISECONDS)); } + @Slow + @Test + public void testTimeoutIsCancelledOnSuccess() throws Exception + { + long timeout = 1000; + start(new TimeoutHandler(timeout)); + + final CountDownLatch latch = new CountDownLatch(1); + final byte[] content = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + Request request = client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .content(new BytesContentProvider(content)); + request.send(new TimedResponseListener(2 * timeout, TimeUnit.MILLISECONDS, request, new BufferingResponseListener() + { + @Override + public void onComplete(Result result) + { + Assert.assertFalse(result.isFailed()); + Assert.assertArrayEquals(content, getContent()); + latch.countDown(); + } + })); + + Assert.assertTrue(latch.await(3 * timeout, TimeUnit.MILLISECONDS)); + + TimeUnit.MILLISECONDS.sleep(2 * timeout); + + Assert.assertFalse(request.aborted()); + } + + @Slow + @Test + public void testTimeoutOnListenerWithExplicitConnection() throws Exception + { + long timeout = 1000; + start(new TimeoutHandler(2 * timeout)); + + final CountDownLatch latch = new CountDownLatch(1); + Destination destination = client.getDestination(scheme, "localhost", connector.getLocalPort()); + try (Connection connection = destination.newConnection().get(5, TimeUnit.SECONDS)) + { + Request request = client.newRequest("localhost", connector.getLocalPort()).scheme(scheme); + connection.send(request, new TimedResponseListener(timeout, TimeUnit.MILLISECONDS, request) + { + @Override + public void onComplete(Result result) + { + Assert.assertTrue(result.isFailed()); + latch.countDown(); + } + }); + + Assert.assertTrue(latch.await(3 * timeout, TimeUnit.MILLISECONDS)); + } + } + + @Slow + @Test + public void testTimeoutIsCancelledOnSuccessWithExplicitConnection() throws Exception + { + long timeout = 1000; + start(new TimeoutHandler(timeout)); + + final CountDownLatch latch = new CountDownLatch(1); + Destination destination = client.getDestination(scheme, "localhost", connector.getLocalPort()); + try (Connection connection = destination.newConnection().get(5, TimeUnit.SECONDS)) + { + Request request = client.newRequest(destination.host(), destination.port()).scheme(scheme); + connection.send(request, new TimedResponseListener(2 * timeout, TimeUnit.MILLISECONDS, request) + { + @Override + public void onComplete(Result result) + { + Response response = result.getResponse(); + Assert.assertEquals(200, response.status()); + Assert.assertFalse(result.isFailed()); + latch.countDown(); + } + }); + + Assert.assertTrue(latch.await(3 * timeout, TimeUnit.MILLISECONDS)); + + TimeUnit.MILLISECONDS.sleep(2 * timeout); + + Assert.assertFalse(request.aborted()); + } + } + private class TimeoutHandler extends AbstractHandler { private final long timeout; @@ -130,12 +221,13 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest } @Override - public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { baseRequest.setHandled(true); try { TimeUnit.MILLISECONDS.sleep(timeout); + IO.copy(request.getInputStream(), response.getOutputStream()); } catch (InterruptedException x) { |