Skip to main content
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimone Bordet2014-05-04 21:14:45 +0000
committerSimone Bordet2014-05-04 21:14:45 +0000
commitc4b5e3d3d7521bd6786ff0f1e542f257132a4c4a (patch)
tree98ed500d27449c891b1bcde3a8a316c425854ed9
parent9b388750fae307e9780ed96da4845cb9170012d0 (diff)
downloadorg.eclipse.jetty.project-c4b5e3d3d7521bd6786ff0f1e542f257132a4c4a.tar.gz
org.eclipse.jetty.project-c4b5e3d3d7521bd6786ff0f1e542f257132a4c4a.tar.xz
org.eclipse.jetty.project-c4b5e3d3d7521bd6786ff0f1e542f257132a4c4a.zip
Introduced CompletableCallback to handle asynchronous content on clients.
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java20
-rw-r--r--jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAsyncContentTest.java13
-rw-r--r--jetty-util/src/main/java/org/eclipse/jetty/util/CompletableCallback.java100
3 files changed, 109 insertions, 24 deletions
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java
index 5f499bd0c9..f98f482894 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java
@@ -20,7 +20,6 @@ package org.eclipse.jetty.client.http;
import java.io.EOFException;
import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpExchange;
@@ -34,7 +33,7 @@ import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
-import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.CompletableCallback;
public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
{
@@ -226,27 +225,22 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
if (exchange == null)
return false;
- final AtomicBoolean completed = new AtomicBoolean();
- Callback callback = new Callback()
+ CompletableCallback callback = new CompletableCallback()
{
@Override
- public void succeeded()
+ public void resume()
{
- if (!completed.compareAndSet(false, true))
- {
- LOG.debug("Content consumed asynchronously, resuming processing");
- process();
- }
+ LOG.debug("Content consumed asynchronously, resuming processing");
+ process();
}
- @Override
- public void failed(Throwable x)
+ public void abort(Throwable x)
{
failAndClose(x);
}
};
responseContent(exchange, buffer, callback);
- return completed.compareAndSet(false, true);
+ return callback.tryComplete();
}
@Override
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAsyncContentTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAsyncContentTest.java
index 56fcd1a922..4ca0db19cc 100644
--- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAsyncContentTest.java
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAsyncContentTest.java
@@ -19,7 +19,6 @@
package org.eclipse.jetty.client;
import java.io.IOException;
-import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -86,7 +85,7 @@ public class HttpClientAsyncContentTest extends AbstractHttpClientServerTest
}
});
- Assert.assertTrue(contentLatch.get().await(555, TimeUnit.SECONDS));
+ Assert.assertTrue(contentLatch.get().await(5, TimeUnit.SECONDS));
Callback callback = callbackRef.get();
// Wait a while to be sure that the parsing does not proceed.
@@ -99,7 +98,7 @@ public class HttpClientAsyncContentTest extends AbstractHttpClientServerTest
contentLatch.set(new CountDownLatch(1));
callback.succeeded();
- Assert.assertTrue(contentLatch.get().await(555, TimeUnit.SECONDS));
+ Assert.assertTrue(contentLatch.get().await(5, TimeUnit.SECONDS));
callback = callbackRef.get();
// Wait a while to be sure that the parsing does not proceed.
@@ -116,12 +115,4 @@ public class HttpClientAsyncContentTest extends AbstractHttpClientServerTest
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
Assert.assertEquals(2, contentCount.get());
}
-
- public void test() throws Exception
- {
- try (Socket socket = new Socket())
- {
- System.out.println("socket = " + socket);
- }
- }
}
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/CompletableCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/CompletableCallback.java
new file mode 100644
index 0000000000..ec5a30a84e
--- /dev/null
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/CompletableCallback.java
@@ -0,0 +1,100 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.util;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A callback to be used by driver code that needs to know whether the callback has been
+ * succeeded or failed (that is, completed) just after the asynchronous operation or not,
+ * typically because further processing depends on the callback being completed.
+ * The driver code competes with the asynchronous operation to complete the callback.
+ * <p />
+ * If the callback is already completed, the driver code continues the processing,
+ * otherwise it suspends it. If it is suspended, the callback will be completed some time
+ * later, and {@link #resume()} or {@link #abort(Throwable)} will be called to allow the
+ * application to resume the processing.
+ * <p />
+ * Typical usage:
+ * <pre>
+ * CompletableCallback callback = new CompletableCallback()
+ * {
+ * &#64;Override
+ * public void resume()
+ * {
+ * // continue processing
+ * }
+ *
+ * &#64;Override
+ * public void abort(Throwable failure)
+ * {
+ * // abort processing
+ * }
+ * }
+ * asyncOperation(callback);
+ * boolean completed = callback.tryComplete();
+ * if (completed)
+ * // suspend processing, async operation not done yet
+ * else
+ * // continue processing, async operation already done
+ * </pre>
+ */
+public abstract class CompletableCallback implements Callback
+{
+ private final AtomicBoolean completed = new AtomicBoolean();
+
+ @Override
+ public void succeeded()
+ {
+ if (!tryComplete())
+ resume();
+ }
+
+ @Override
+ public void failed(Throwable x)
+ {
+ if (!tryComplete())
+ abort(x);
+ }
+
+ /**
+ * Callback method invoked when this callback is succeeded
+ * <em>after</em> a first call to {@link #tryComplete()}.
+ */
+ public abstract void resume();
+
+ /**
+ * Callback method invoked when this callback is failed
+ * <em>after</em> a first call to {@link #tryComplete()}.
+ */
+ public abstract void abort(Throwable failure);
+
+ /**
+ * Tries to complete this callback; driver code should call
+ * this method once <em>after</em> the asynchronous operation
+ * to detect whether the asynchronous operation has already
+ * completed or not.
+ *
+ * @return whether the attempt to complete was successful.
+ */
+ public boolean tryComplete()
+ {
+ return completed.compareAndSet(false, true);
+ }
+}

Back to the top