Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimone Bordet2012-10-26 09:46:01 +0000
committerSimone Bordet2012-10-26 12:57:06 +0000
commit58e8ff8fbfe96876031efea8b709e86d0c21069b (patch)
tree464e1c3bbd24cfbc3664118f42f79da01bb5169b /jetty-client
parent61ba84bda5f27ff2c4729a3c81f1534837ae3318 (diff)
downloadorg.eclipse.jetty.project-58e8ff8fbfe96876031efea8b709e86d0c21069b.tar.gz
org.eclipse.jetty.project-58e8ff8fbfe96876031efea8b709e86d0c21069b.tar.xz
org.eclipse.jetty.project-58e8ff8fbfe96876031efea8b709e86d0c21069b.zip
#392733 - Implement a total timeout for asynchronous sends.
Reworked the implementation. Instead of adding another method for asynchronous sends with timeout parameters, we now use a TimedResponseListener utility class, that holds the timeout information.
Diffstat (limited to 'jetty-client')
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java16
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java3
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java6
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java9
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/ResponseListener.java32
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java17
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/util/TimedResponseListener.java126
-rw-r--r--jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTimeoutTest.java162
8 files changed, 298 insertions, 73 deletions
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java
index a56be6d49a..c375d22e4b 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java
@@ -36,7 +36,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLEngine;
import org.eclipse.jetty.client.api.AuthenticationStore;
@@ -308,7 +307,7 @@ public class HttpClient extends ContainerLifeCycle
return new ArrayList<Destination>(destinations.values());
}
- protected void send(final Request request, long timeout, TimeUnit unit, Response.Listener listener)
+ protected void send(final Request request, Response.Listener listener)
{
String scheme = request.scheme().toLowerCase();
if (!Arrays.asList("http", "https").contains(scheme))
@@ -318,17 +317,8 @@ public class HttpClient extends ContainerLifeCycle
if (port < 0)
port = "https".equals(scheme) ? 443 : 80;
- if (timeout > 0)
- {
- scheduler.schedule(new Runnable()
- {
- @Override
- public void run()
- {
- request.abort("Total timeout elapsed");
- }
- }, timeout, unit);
- }
+ if (listener instanceof ResponseListener.Timed)
+ ((ResponseListener.Timed)listener).schedule(scheduler);
HttpDestination destination = provideDestination(scheme, request.host(), port);
destination.send(request, listener);
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java
index 56e7f8befa..bb9c08af7d 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java
@@ -116,6 +116,9 @@ public class HttpConnection extends AbstractConnection implements Connection
setExchange(exchange);
conversation.exchanges().offer(exchange);
+ if (listener instanceof ResponseListener.Timed)
+ ((ResponseListener.Timed)listener).schedule(client.getScheduler());
+
sender.send(exchange);
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java
index d05ac84c0d..705063ff5b 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java
@@ -160,7 +160,13 @@ public class HttpExchange
// Request and response completed
LOG.debug("{} complete", this);
if (conversation.last() == this)
+ {
+ HttpExchange first = conversation.exchanges().peekFirst();
+ Response.Listener listener = first.listener();
+ if (listener instanceof ResponseListener.Timed)
+ ((ResponseListener.Timed)listener).cancel();
conversation.complete();
+ }
}
result = new Result(request(), requestFailure(), response(), responseFailure());
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java
index cc740e9bb8..7d839f01eb 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java
@@ -29,7 +29,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.client.api.ContentProvider;
@@ -323,13 +322,7 @@ public class HttpRequest implements Request
@Override
public void send(final Response.Listener listener)
{
- send(0, TimeUnit.SECONDS, listener);
- }
-
- @Override
- public void send(long timeout, TimeUnit unit, Response.Listener listener)
- {
- client.send(this, timeout, unit, listener);
+ client.send(this, listener);
}
@Override
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/ResponseListener.java b/jetty-client/src/main/java/org/eclipse/jetty/client/ResponseListener.java
new file mode 100644
index 0000000000..dd61fb0bff
--- /dev/null
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/ResponseListener.java
@@ -0,0 +1,32 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.client;
+
+import org.eclipse.jetty.client.api.Response;
+import org.eclipse.jetty.util.thread.Scheduler;
+
+public interface ResponseListener extends Response.Listener
+{
+ public interface Timed extends Response.Listener
+ {
+ public boolean schedule(Scheduler scheduler);
+
+ public boolean cancel();
+ }
+}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java b/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java
index adb0cc4360..5600f90b8d 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java
@@ -23,7 +23,6 @@ import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
@@ -242,26 +241,10 @@ public interface Request
* as they happen, or when the application needs to efficiently manage the response content.
*
* @param listener the listener that receives response events
- * @see #send(long, TimeUnit, Response.Listener)
*/
void send(Response.Listener listener);
/**
- * Sends this request and asynchronously notifies the given listener for response events.
- * <p />
- * This method should be used when the application needs to be notified of the various response events
- * as they happen, or when the application needs to efficiently manage the response content.
- * <p />
- * This method waits for the given timeout before aborting the HTTP conversation. A {@code timeout}
- * value of zero means to wait indefinitely to the conversation to complete.
- *
- * @param timeout the total timeout in the given {@code unit}
- * @param unit the timeout unit
- * @param listener the listener that receives response events
- */
- void send(long timeout, TimeUnit unit, Response.Listener listener);
-
- /**
* Attempts to abort the send of this request.
*
* @param reason the abort reason
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/TimedResponseListener.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/TimedResponseListener.java
new file mode 100644
index 0000000000..693feebedc
--- /dev/null
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/TimedResponseListener.java
@@ -0,0 +1,126 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.client.util;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.eclipse.jetty.client.ResponseListener;
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.client.api.Response;
+import org.eclipse.jetty.client.api.Result;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.util.thread.Scheduler;
+
+public class TimedResponseListener implements ResponseListener.Timed, Runnable
+{
+ private static final Logger LOG = Log.getLogger(TimedResponseListener.class);
+
+ private final AtomicReference<Scheduler.Task> task = new AtomicReference<>();
+ private final long timeout;
+ private final TimeUnit unit;
+ private final Request request;
+ private final Response.Listener delegate;
+
+ public TimedResponseListener(long timeout, TimeUnit unit, Request request)
+ {
+ this(timeout, unit, request, new Empty());
+ }
+
+ public TimedResponseListener(long timeout, TimeUnit unit, Request request, Response.Listener delegate)
+ {
+ this.timeout = timeout;
+ this.unit = unit;
+ this.request = request;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void onBegin(Response response)
+ {
+ delegate.onBegin(response);
+ }
+
+ @Override
+ public void onHeaders(Response response)
+ {
+ delegate.onHeaders(response);
+ }
+
+ @Override
+ public void onContent(Response response, ByteBuffer content)
+ {
+ delegate.onContent(response, content);
+ }
+
+ @Override
+ public void onSuccess(Response response)
+ {
+ delegate.onSuccess(response);
+ }
+
+ @Override
+ public void onFailure(Response response, Throwable failure)
+ {
+ delegate.onFailure(response, failure);
+ }
+
+ @Override
+ public void onComplete(Result result)
+ {
+ delegate.onComplete(result);
+ }
+
+ public boolean schedule(Scheduler scheduler)
+ {
+ Scheduler.Task task = this.task.get();
+ if (task != null)
+ return false;
+
+ task = scheduler.schedule(this, timeout, unit);
+ if (this.task.compareAndSet(null, task))
+ {
+ LOG.debug("Scheduled timeout task {} in {} ms", task, unit.toMillis(timeout));
+ return true;
+ }
+ else
+ {
+ task.cancel();
+ return false;
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ request.abort("Total timeout elapsed");
+ }
+
+ public boolean cancel()
+ {
+ Scheduler.Task task = this.task.get();
+ if (task == null)
+ return false;
+ boolean result = task.cancel();
+ LOG.debug("Cancelled timeout task {}", task);
+ return result;
+ }
+}
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTimeoutTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTimeoutTest.java
index 753a74f5a6..deff91a3cc 100644
--- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTimeoutTest.java
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTimeoutTest.java
@@ -26,11 +26,17 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.eclipse.jetty.client.api.Connection;
+import org.eclipse.jetty.client.api.Destination;
+import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
-import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.client.util.BufferingResponseListener;
+import org.eclipse.jetty.client.util.BytesContentProvider;
+import org.eclipse.jetty.client.util.TimedResponseListener;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
+import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert;
import org.junit.Test;
@@ -62,17 +68,16 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
start(new TimeoutHandler(2 * timeout));
final CountDownLatch latch = new CountDownLatch(1);
- client.newRequest("localhost", connector.getLocalPort())
- .scheme(scheme)
- .send(timeout, TimeUnit.MILLISECONDS, new Response.Listener.Empty()
- {
- @Override
- public void onComplete(Result result)
- {
- Assert.assertTrue(result.isFailed());
- latch.countDown();
- }
- });
+ Request request = client.newRequest("localhost", connector.getLocalPort()).scheme(scheme);
+ request.send(new TimedResponseListener(timeout, TimeUnit.MILLISECONDS, request)
+ {
+ @Override
+ public void onComplete(Result result)
+ {
+ Assert.assertTrue(result.isFailed());
+ latch.countDown();
+ }
+ });
Assert.assertTrue(latch.await(3 * timeout, TimeUnit.MILLISECONDS));
}
@@ -88,31 +93,29 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
// The first request has a long timeout
final CountDownLatch firstLatch = new CountDownLatch(1);
- client.newRequest("localhost", connector.getLocalPort())
- .scheme(scheme)
- .send(4 * timeout, TimeUnit.MILLISECONDS, new Response.Listener.Empty()
- {
- @Override
- public void onComplete(Result result)
- {
- Assert.assertFalse(result.isFailed());
- firstLatch.countDown();
- }
- });
+ Request request = client.newRequest("localhost", connector.getLocalPort()).scheme(scheme);
+ request.send(new TimedResponseListener(4 * timeout, TimeUnit.MILLISECONDS, request)
+ {
+ @Override
+ public void onComplete(Result result)
+ {
+ Assert.assertFalse(result.isFailed());
+ firstLatch.countDown();
+ }
+ });
// Second request has a short timeout and should fail in the queue
final CountDownLatch secondLatch = new CountDownLatch(1);
- client.newRequest("localhost", connector.getLocalPort())
- .scheme(scheme)
- .send(timeout, TimeUnit.MILLISECONDS, new Response.Listener.Empty()
- {
- @Override
- public void onComplete(Result result)
- {
- Assert.assertTrue(result.isFailed());
- secondLatch.countDown();
- }
- });
+ request = client.newRequest("localhost", connector.getLocalPort()).scheme(scheme);
+ request.send(new TimedResponseListener(timeout, TimeUnit.MILLISECONDS, request)
+ {
+ @Override
+ public void onComplete(Result result)
+ {
+ Assert.assertTrue(result.isFailed());
+ secondLatch.countDown();
+ }
+ });
Assert.assertTrue(secondLatch.await(2 * timeout, TimeUnit.MILLISECONDS));
// The second request must fail before the first request has completed
@@ -120,6 +123,94 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
Assert.assertTrue(firstLatch.await(5 * timeout, TimeUnit.MILLISECONDS));
}
+ @Slow
+ @Test
+ public void testTimeoutIsCancelledOnSuccess() throws Exception
+ {
+ long timeout = 1000;
+ start(new TimeoutHandler(timeout));
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ final byte[] content = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+ Request request = client.newRequest("localhost", connector.getLocalPort())
+ .scheme(scheme)
+ .content(new BytesContentProvider(content));
+ request.send(new TimedResponseListener(2 * timeout, TimeUnit.MILLISECONDS, request, new BufferingResponseListener()
+ {
+ @Override
+ public void onComplete(Result result)
+ {
+ Assert.assertFalse(result.isFailed());
+ Assert.assertArrayEquals(content, getContent());
+ latch.countDown();
+ }
+ }));
+
+ Assert.assertTrue(latch.await(3 * timeout, TimeUnit.MILLISECONDS));
+
+ TimeUnit.MILLISECONDS.sleep(2 * timeout);
+
+ Assert.assertFalse(request.aborted());
+ }
+
+ @Slow
+ @Test
+ public void testTimeoutOnListenerWithExplicitConnection() throws Exception
+ {
+ long timeout = 1000;
+ start(new TimeoutHandler(2 * timeout));
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ Destination destination = client.getDestination(scheme, "localhost", connector.getLocalPort());
+ try (Connection connection = destination.newConnection().get(5, TimeUnit.SECONDS))
+ {
+ Request request = client.newRequest("localhost", connector.getLocalPort()).scheme(scheme);
+ connection.send(request, new TimedResponseListener(timeout, TimeUnit.MILLISECONDS, request)
+ {
+ @Override
+ public void onComplete(Result result)
+ {
+ Assert.assertTrue(result.isFailed());
+ latch.countDown();
+ }
+ });
+
+ Assert.assertTrue(latch.await(3 * timeout, TimeUnit.MILLISECONDS));
+ }
+ }
+
+ @Slow
+ @Test
+ public void testTimeoutIsCancelledOnSuccessWithExplicitConnection() throws Exception
+ {
+ long timeout = 1000;
+ start(new TimeoutHandler(timeout));
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ Destination destination = client.getDestination(scheme, "localhost", connector.getLocalPort());
+ try (Connection connection = destination.newConnection().get(5, TimeUnit.SECONDS))
+ {
+ Request request = client.newRequest(destination.host(), destination.port()).scheme(scheme);
+ connection.send(request, new TimedResponseListener(2 * timeout, TimeUnit.MILLISECONDS, request)
+ {
+ @Override
+ public void onComplete(Result result)
+ {
+ Response response = result.getResponse();
+ Assert.assertEquals(200, response.status());
+ Assert.assertFalse(result.isFailed());
+ latch.countDown();
+ }
+ });
+
+ Assert.assertTrue(latch.await(3 * timeout, TimeUnit.MILLISECONDS));
+
+ TimeUnit.MILLISECONDS.sleep(2 * timeout);
+
+ Assert.assertFalse(request.aborted());
+ }
+ }
+
private class TimeoutHandler extends AbstractHandler
{
private final long timeout;
@@ -130,12 +221,13 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
}
@Override
- public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+ public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
try
{
TimeUnit.MILLISECONDS.sleep(timeout);
+ IO.copy(request.getInputStream(), response.getOutputStream());
}
catch (InterruptedException x)
{

Back to the top