Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimone Bordet2015-04-23 06:56:48 +0000
committerSimone Bordet2015-04-23 06:58:54 +0000
commit1270d291ccdd43d8d2059630bd5b0b9fef12909d (patch)
treedbbdf02082f39407cdee87d92319824b3bf6e060
parentf061ae79f4d8b4909ea98cf694715065153562dc (diff)
downloadorg.eclipse.jetty.project-1270d291ccdd43d8d2059630bd5b0b9fef12909d.tar.gz
org.eclipse.jetty.project-1270d291ccdd43d8d2059630bd5b0b9fef12909d.tar.xz
org.eclipse.jetty.project-1270d291ccdd43d8d2059630bd5b0b9fef12909d.zip
465181 - HttpParser parse full end chunk.
Continue parsing until the buffer is empty, or the parser returns true to indicate that content is being handled asynchronously.
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java17
-rw-r--r--jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientChunkedContentTest.java216
2 files changed, 226 insertions, 7 deletions
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java
index b3cd6dbbb3..d4d037844b 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java
@@ -149,13 +149,16 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
*/
private boolean parse()
{
- // Must parse even if the buffer is fully consumed, to allow the
- // parser to advance from asynchronous content to response complete.
- boolean handle = parser.parseNext(buffer);
- if (LOG.isDebugEnabled())
- LOG.debug("Parsed {}, remaining {} {}", handle, buffer.remaining(), parser);
-
- return handle;
+ while (true)
+ {
+ // Must parse even if the buffer is fully consumed, to allow the
+ // parser to advance from asynchronous content to response complete.
+ boolean handle = parser.parseNext(buffer);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Parsed {}, remaining {} {}", handle, buffer.remaining(), parser);
+ if (handle || !buffer.hasRemaining())
+ return handle;
+ }
}
protected void fillInterested()
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientChunkedContentTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientChunkedContentTest.java
new file mode 100644
index 0000000000..4ef82f19a6
--- /dev/null
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientChunkedContentTest.java
@@ -0,0 +1,216 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.client.api.Response;
+import org.eclipse.jetty.client.api.Result;
+import org.eclipse.jetty.client.util.FutureResponseListener;
+import org.eclipse.jetty.toolchain.test.TestTracker;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class HttpClientChunkedContentTest
+{
+ @Rule
+ public final TestTracker tracker = new TestTracker();
+ private HttpClient client;
+
+ private void startClient() throws Exception
+ {
+ QueuedThreadPool clientThreads = new QueuedThreadPool();
+ clientThreads.setName("client");
+ client = new HttpClient();
+ client.setExecutor(clientThreads);
+ client.start();
+ }
+
+ @After
+ public void dispose() throws Exception
+ {
+ if (client != null)
+ client.stop();
+ }
+
+ @Test
+ public void test_Server_HeadersPauseTerminal_Client_Response() throws Exception
+ {
+ startClient();
+
+ try (ServerSocket server = new ServerSocket())
+ {
+ server.bind(new InetSocketAddress("localhost", 0));
+
+ final AtomicReference<Result> resultRef = new AtomicReference<>();
+ final CountDownLatch completeLatch = new CountDownLatch(1);
+ client.newRequest("localhost", server.getLocalPort())
+ .timeout(5, TimeUnit.SECONDS)
+ .send(new Response.CompleteListener()
+ {
+ @Override
+ public void onComplete(Result result)
+ {
+ resultRef.set(result);
+ completeLatch.countDown();
+ }
+ });
+
+ try (Socket socket = server.accept())
+ {
+ consumeRequestHeaders(socket);
+
+ OutputStream output = socket.getOutputStream();
+ String headers = "" +
+ "HTTP/1.1 200 OK\r\n" +
+ "Transfer-Encoding: chunked\r\n" +
+ "\r\n";
+ output.write(headers.getBytes(StandardCharsets.UTF_8));
+ output.flush();
+
+ Thread.sleep(1000);
+
+ String terminal = "" +
+ "0\r\n" +
+ "\r\n";
+ output.write(terminal.getBytes(StandardCharsets.UTF_8));
+ output.flush();
+
+ assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
+ Result result = resultRef.get();
+ assertTrue(result.isSucceeded());
+ Response response = result.getResponse();
+ Assert.assertEquals(200, response.getStatus());
+ }
+ }
+ }
+
+ @Test
+ public void test_Server_ContentTerminal_Client_ContentDelay() throws Exception
+ {
+ startClient();
+
+ try (ServerSocket server = new ServerSocket())
+ {
+ server.bind(new InetSocketAddress("localhost", 0));
+
+ final AtomicReference<Callback> callbackRef = new AtomicReference<>();
+ final CountDownLatch firstContentLatch = new CountDownLatch(1);
+ final AtomicReference<Result> resultRef = new AtomicReference<>();
+ final CountDownLatch completeLatch = new CountDownLatch(1);
+ client.newRequest("localhost", server.getLocalPort())
+ .onResponseContentAsync(new Response.AsyncContentListener()
+ {
+ @Override
+ public void onContent(Response response, ByteBuffer content, Callback callback)
+ {
+ if (callbackRef.compareAndSet(null, callback))
+ firstContentLatch.countDown();
+ else
+ callback.succeeded();
+ }
+ })
+ .timeout(5, TimeUnit.SECONDS)
+ .send(new Response.CompleteListener()
+ {
+ @Override
+ public void onComplete(Result result)
+ {
+ resultRef.set(result);
+ completeLatch.countDown();
+ }
+ });
+
+ try (Socket socket = server.accept())
+ {
+ consumeRequestHeaders(socket);
+
+ OutputStream output = socket.getOutputStream();
+ String response = "" +
+ "HTTP/1.1 200 OK\r\n" +
+ "Transfer-Encoding: chunked\r\n" +
+ "\r\n" +
+ "8\r\n" +
+ "01234567\r\n" +
+ "0\r\n" +
+ "\r\n";
+ output.write(response.getBytes(StandardCharsets.UTF_8));
+ output.flush();
+
+ // Simulate a delay in consuming the content.
+ assertTrue(firstContentLatch.await(5, TimeUnit.SECONDS));
+ Thread.sleep(1000);
+ callbackRef.get().succeeded();
+
+ // Wait for the client to read 0 and become idle.
+ Thread.sleep(1000);
+
+ assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
+ Result result = resultRef.get();
+ assertTrue(result.isSucceeded());
+ Assert.assertEquals(200, result.getResponse().getStatus());
+
+ // Issue another request to be sure the connection is sane.
+ Request request = client.newRequest("localhost", server.getLocalPort())
+ .timeout(5, TimeUnit.SECONDS);
+ FutureResponseListener listener = new FutureResponseListener(request);
+ request.send(listener);
+
+ consumeRequestHeaders(socket);
+ output.write(response.getBytes(StandardCharsets.UTF_8));
+ output.flush();
+
+ Assert.assertEquals(200, listener.get(5, TimeUnit.SECONDS).getStatus());
+ }
+ }
+ }
+
+ private void consumeRequestHeaders(Socket socket) throws IOException
+ {
+ InputStream input = socket.getInputStream();
+ int crlfs = 0;
+ while (true)
+ {
+ int read = input.read();
+ if (read == '\r' || read == '\n')
+ ++crlfs;
+ else
+ crlfs = 0;
+ if (crlfs == 4)
+ break;
+ }
+ }
+}

Back to the top