Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimone Bordet2015-09-25 09:16:38 +0000
committerSimone Bordet2015-09-25 17:31:40 +0000
commit3201d0acd24efea32532b0d35c1b8ca74b4ee766 (patch)
tree325b5f97fdc071334bd7045bf695bc1179148f69
parent6544af8ce7245ffe5c46192b00aa313da8ad56db (diff)
downloadorg.eclipse.jetty.project-3201d0acd24efea32532b0d35c1b8ca74b4ee766.tar.gz
org.eclipse.jetty.project-3201d0acd24efea32532b0d35c1b8ca74b4ee766.tar.xz
org.eclipse.jetty.project-3201d0acd24efea32532b0d35c1b8ca74b4ee766.zip
477878 - HttpClient over HTTP/2 doesn't close upload stream.
Clarified the difference between last and consumed in HttpContent. Fixed HTTP/2 transport to behave correctly in case of last content.
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/HttpContent.java47
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java10
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java10
-rw-r--r--jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java3
-rw-r--r--jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java7
-rw-r--r--tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncRequestContentTest.java191
6 files changed, 242 insertions, 26 deletions
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpContent.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpContent.java
index 58b16bb4ad..1d01f0637c 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpContent.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpContent.java
@@ -67,11 +67,13 @@ public class HttpContent implements Callback, Closeable
{
private static final Logger LOG = Log.getLogger(HttpContent.class);
private static final ByteBuffer AFTER = ByteBuffer.allocate(0);
+ private static final ByteBuffer CLOSE = ByteBuffer.allocate(0);
private final ContentProvider provider;
private final Iterator<ByteBuffer> iterator;
private ByteBuffer buffer;
- private volatile ByteBuffer content;
+ private ByteBuffer content;
+ private boolean last;
public HttpContent(ContentProvider provider)
{
@@ -92,7 +94,7 @@ public class HttpContent implements Callback, Closeable
*/
public boolean isLast()
{
- return !iterator.hasNext();
+ return last;
}
/**
@@ -124,41 +126,50 @@ public class HttpContent implements Callback, Closeable
*/
public boolean advance()
{
- boolean advanced;
- boolean hasNext;
- ByteBuffer bytes;
if (iterator instanceof Synchronizable)
{
synchronized (((Synchronizable)iterator).getLock())
{
- advanced = iterator.hasNext();
- bytes = advanced ? iterator.next() : null;
- hasNext = advanced && iterator.hasNext();
+ return advance(iterator);
}
}
else
{
- advanced = iterator.hasNext();
- bytes = advanced ? iterator.next() : null;
- hasNext = advanced && iterator.hasNext();
+ return advance(iterator);
}
+ }
+
+ private boolean advance(Iterator<ByteBuffer> iterator)
+ {
+ boolean hasNext = iterator.hasNext();
+ ByteBuffer bytes = hasNext ? iterator.next() : null;
+ boolean hasMore = hasNext && iterator.hasNext();
+ boolean wasLast = last;
+ last = !hasMore;
- if (advanced)
+ if (hasNext)
{
buffer = bytes;
content = bytes == null ? null : bytes.slice();
if (LOG.isDebugEnabled())
- LOG.debug("Advanced content to {} chunk {}", hasNext ? "next" : "last", bytes);
+ LOG.debug("Advanced content to {} chunk {}", hasMore ? "next" : "last", String.valueOf(bytes));
return bytes != null;
}
else
{
- if (content != AFTER)
+ // No more content, but distinguish between last and consumed.
+ if (wasLast)
{
- content = buffer = AFTER;
+ buffer = content = AFTER;
if (LOG.isDebugEnabled())
LOG.debug("Advanced content past last chunk");
}
+ else
+ {
+ buffer = content = CLOSE;
+ if (LOG.isDebugEnabled())
+ LOG.debug("Advanced content to last chunk");
+ }
return false;
}
}
@@ -168,7 +179,7 @@ public class HttpContent implements Callback, Closeable
*/
public boolean isConsumed()
{
- return content == AFTER;
+ return buffer == AFTER;
}
@Override
@@ -176,6 +187,8 @@ public class HttpContent implements Callback, Closeable
{
if (isConsumed())
return;
+ if (buffer == CLOSE)
+ return;
if (iterator instanceof Callback)
((Callback)iterator).succeeded();
}
@@ -185,6 +198,8 @@ public class HttpContent implements Callback, Closeable
{
if (isConsumed())
return;
+ if (buffer == CLOSE)
+ return;
if (iterator instanceof Callback)
((Callback)iterator).failed(x);
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java
index 650587e48c..5417a122e8 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java
@@ -678,7 +678,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
{
return content.isNonBlocking();
}
-
+
@Override
public void succeeded()
{
@@ -811,9 +811,9 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
while (true)
{
boolean advanced = content.advance();
- boolean consumed = content.isConsumed();
+ boolean lastContent = content.isLast();
if (LOG.isDebugEnabled())
- LOG.debug("Content {} consumed {} for {}", advanced, consumed, exchange.getRequest());
+ LOG.debug("Content present {}, last {}, consumed {} for {}", advanced, lastContent, content.isConsumed(), exchange.getRequest());
if (advanced)
{
@@ -821,7 +821,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
return Action.SCHEDULED;
}
- if (consumed)
+ if (lastContent)
{
sendContent(exchange, content, lastCallback);
return Action.IDLE;
@@ -894,7 +894,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
{
return content.isNonBlocking();
}
-
+
@Override
public void succeeded()
{
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java
index 73d2c10709..80e762fb18 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java
@@ -79,7 +79,9 @@ public class HttpSenderOverHTTP extends HttpSender
boolean lastContent = content.isLast();
HttpGenerator.Result result = generator.generateRequest(null, null, chunk, contentBuffer, lastContent);
if (LOG.isDebugEnabled())
- LOG.debug("Generated content: {} - {}", result, generator);
+ LOG.debug("Generated content ({} bytes) - {}/{}",
+ contentBuffer == null ? -1 : contentBuffer.remaining(),
+ result, generator);
switch (result)
{
case NEED_CHUNK:
@@ -200,7 +202,11 @@ public class HttpSenderOverHTTP extends HttpSender
{
HttpGenerator.Result result = generator.generateRequest(metaData, headerBuffer, chunkBuffer, contentBuffer, lastContent);
if (LOG.isDebugEnabled())
- LOG.debug("Generated headers: {} - ", result, generator);
+ LOG.debug("Generated headers ({} bytes), chunk ({} bytes), content ({} bytes) - {}/{}",
+ headerBuffer == null ? -1 : headerBuffer.remaining(),
+ chunkBuffer == null ? -1 : chunkBuffer.remaining(),
+ contentBuffer == null ? -1 : contentBuffer.remaining(),
+ result, generator);
switch (result)
{
case NEED_HEADER:
diff --git a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java
index ce4870f7b9..f7d589cc97 100644
--- a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java
+++ b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java
@@ -891,8 +891,9 @@ public class HttpGenerator
@Override
public String toString()
{
- return String.format("%s{s=%s}",
+ return String.format("%s@%x{s=%s}",
getClass().getSimpleName(),
+ hashCode(),
_state);
}
diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java
index 1506f37f2c..a4cfaae397 100644
--- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java
+++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java
@@ -64,9 +64,11 @@ public class HttpSenderOverHTTP2 extends HttpSender
if (content.hasContent() && !expects100Continue(request))
{
- if (content.advance())
+ boolean advanced = content.advance();
+ boolean lastContent = content.isLast();
+ if (advanced || lastContent)
{
- DataFrame dataFrame = new DataFrame(stream.getId(), content.getByteBuffer(), content.isLast());
+ DataFrame dataFrame = new DataFrame(stream.getId(), content.getByteBuffer(), lastContent);
stream.data(dataFrame, callback);
return;
}
@@ -80,6 +82,7 @@ public class HttpSenderOverHTTP2 extends HttpSender
callback.failed(failure);
}
};
+ // TODO optimize the send of HEADERS and DATA frames.
channel.getSession().newStream(headersFrame, promise, channel.getStreamListener());
}
diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncRequestContentTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncRequestContentTest.java
new file mode 100644
index 0000000000..09ff0b0b04
--- /dev/null
+++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncRequestContentTest.java
@@ -0,0 +1,191 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.http.client;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.servlet.ServletException;
+import javax.servlet.ServletInputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.eclipse.jetty.client.util.DeferredContentProvider;
+import org.eclipse.jetty.client.util.InputStreamContentProvider;
+import org.eclipse.jetty.client.util.OutputStreamContentProvider;
+import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AsyncRequestContentTest extends AbstractTest
+{
+ public AsyncRequestContentTest(Transport transport)
+ {
+ super(transport);
+ }
+
+ @Test
+ public void testEmptyDeferredContent() throws Exception
+ {
+ start(new ConsumeInputHandler());
+
+ DeferredContentProvider contentProvider = new DeferredContentProvider();
+ CountDownLatch latch = new CountDownLatch(1);
+ client.POST("http://localhost:" + connector.getLocalPort())
+ .content(contentProvider)
+ .send(result ->
+ {
+ if (result.isSucceeded() &&
+ result.getResponse().getStatus() == HttpStatus.OK_200)
+ latch.countDown();
+ });
+ contentProvider.close();
+
+ Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testDeferredContent() throws Exception
+ {
+ start(new ConsumeInputHandler());
+
+ DeferredContentProvider contentProvider = new DeferredContentProvider();
+ CountDownLatch latch = new CountDownLatch(1);
+ client.POST("http://localhost:" + connector.getLocalPort())
+ .content(contentProvider)
+ .send(result ->
+ {
+ if (result.isSucceeded() &&
+ result.getResponse().getStatus() == HttpStatus.OK_200)
+ latch.countDown();
+ });
+ contentProvider.offer(ByteBuffer.wrap(new byte[1]));
+ contentProvider.close();
+
+ Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testEmptyInputStream() throws Exception
+ {
+ start(new ConsumeInputHandler());
+
+ InputStreamContentProvider contentProvider =
+ new InputStreamContentProvider(new ByteArrayInputStream(new byte[0]));
+ CountDownLatch latch = new CountDownLatch(1);
+ client.POST("http://localhost:" + connector.getLocalPort())
+ .content(contentProvider)
+ .send(result ->
+ {
+ if (result.isSucceeded() &&
+ result.getResponse().getStatus() == HttpStatus.OK_200)
+ latch.countDown();
+ });
+ contentProvider.close();
+
+ Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testInputStream() throws Exception
+ {
+ start(new ConsumeInputHandler());
+
+ InputStreamContentProvider contentProvider =
+ new InputStreamContentProvider(new ByteArrayInputStream(new byte[1]));
+ CountDownLatch latch = new CountDownLatch(1);
+ client.POST("http://localhost:" + connector.getLocalPort())
+ .content(contentProvider)
+ .send(result ->
+ {
+ if (result.isSucceeded() &&
+ result.getResponse().getStatus() == HttpStatus.OK_200)
+ latch.countDown();
+ });
+ contentProvider.close();
+
+ Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testEmptyOutputStream() throws Exception
+ {
+ start(new ConsumeInputHandler());
+
+ OutputStreamContentProvider contentProvider = new OutputStreamContentProvider();
+ CountDownLatch latch = new CountDownLatch(1);
+ client.POST("http://localhost:" + connector.getLocalPort())
+ .content(contentProvider)
+ .send(result ->
+ {
+ if (result.isSucceeded() &&
+ result.getResponse().getStatus() == HttpStatus.OK_200)
+ latch.countDown();
+ });
+ contentProvider.close();
+
+ Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testOutputStream() throws Exception
+ {
+ start(new ConsumeInputHandler());
+
+ OutputStreamContentProvider contentProvider = new OutputStreamContentProvider();
+ CountDownLatch latch = new CountDownLatch(1);
+ client.POST("http://localhost:" + connector.getLocalPort())
+ .content(contentProvider)
+ .send(result ->
+ {
+ if (result.isSucceeded() &&
+ result.getResponse().getStatus() == HttpStatus.OK_200)
+ latch.countDown();
+ });
+ OutputStream output = contentProvider.getOutputStream();
+ output.write(new byte[1]);
+ output.flush();
+ contentProvider.close();
+
+ Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+ }
+
+ private static class ConsumeInputHandler extends AbstractHandler
+ {
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+ {
+ baseRequest.setHandled(true);
+ ServletInputStream input = request.getInputStream();
+ while (true)
+ {
+ int read = input.read();
+ if (read < 0)
+ break;
+ }
+ response.setStatus(HttpStatus.OK_200);
+ }
+ }
+}

Back to the top