diff options
author | Simone Bordet | 2015-09-25 09:16:38 +0000 |
---|---|---|
committer | Simone Bordet | 2015-09-25 17:31:40 +0000 |
commit | 3201d0acd24efea32532b0d35c1b8ca74b4ee766 (patch) | |
tree | 325b5f97fdc071334bd7045bf695bc1179148f69 | |
parent | 6544af8ce7245ffe5c46192b00aa313da8ad56db (diff) | |
download | org.eclipse.jetty.project-3201d0acd24efea32532b0d35c1b8ca74b4ee766.tar.gz org.eclipse.jetty.project-3201d0acd24efea32532b0d35c1b8ca74b4ee766.tar.xz org.eclipse.jetty.project-3201d0acd24efea32532b0d35c1b8ca74b4ee766.zip |
477878 - HttpClient over HTTP/2 doesn't close upload stream.
Clarified the difference between last and consumed in HttpContent.
Fixed HTTP/2 transport to behave correctly in case of last content.
6 files changed, 242 insertions, 26 deletions
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpContent.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpContent.java index 58b16bb4ad..1d01f0637c 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpContent.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpContent.java @@ -67,11 +67,13 @@ public class HttpContent implements Callback, Closeable { private static final Logger LOG = Log.getLogger(HttpContent.class); private static final ByteBuffer AFTER = ByteBuffer.allocate(0); + private static final ByteBuffer CLOSE = ByteBuffer.allocate(0); private final ContentProvider provider; private final Iterator<ByteBuffer> iterator; private ByteBuffer buffer; - private volatile ByteBuffer content; + private ByteBuffer content; + private boolean last; public HttpContent(ContentProvider provider) { @@ -92,7 +94,7 @@ public class HttpContent implements Callback, Closeable */ public boolean isLast() { - return !iterator.hasNext(); + return last; } /** @@ -124,41 +126,50 @@ public class HttpContent implements Callback, Closeable */ public boolean advance() { - boolean advanced; - boolean hasNext; - ByteBuffer bytes; if (iterator instanceof Synchronizable) { synchronized (((Synchronizable)iterator).getLock()) { - advanced = iterator.hasNext(); - bytes = advanced ? iterator.next() : null; - hasNext = advanced && iterator.hasNext(); + return advance(iterator); } } else { - advanced = iterator.hasNext(); - bytes = advanced ? iterator.next() : null; - hasNext = advanced && iterator.hasNext(); + return advance(iterator); } + } + + private boolean advance(Iterator<ByteBuffer> iterator) + { + boolean hasNext = iterator.hasNext(); + ByteBuffer bytes = hasNext ? iterator.next() : null; + boolean hasMore = hasNext && iterator.hasNext(); + boolean wasLast = last; + last = !hasMore; - if (advanced) + if (hasNext) { buffer = bytes; content = bytes == null ? null : bytes.slice(); if (LOG.isDebugEnabled()) - LOG.debug("Advanced content to {} chunk {}", hasNext ? "next" : "last", bytes); + LOG.debug("Advanced content to {} chunk {}", hasMore ? "next" : "last", String.valueOf(bytes)); return bytes != null; } else { - if (content != AFTER) + // No more content, but distinguish between last and consumed. + if (wasLast) { - content = buffer = AFTER; + buffer = content = AFTER; if (LOG.isDebugEnabled()) LOG.debug("Advanced content past last chunk"); } + else + { + buffer = content = CLOSE; + if (LOG.isDebugEnabled()) + LOG.debug("Advanced content to last chunk"); + } return false; } } @@ -168,7 +179,7 @@ public class HttpContent implements Callback, Closeable */ public boolean isConsumed() { - return content == AFTER; + return buffer == AFTER; } @Override @@ -176,6 +187,8 @@ public class HttpContent implements Callback, Closeable { if (isConsumed()) return; + if (buffer == CLOSE) + return; if (iterator instanceof Callback) ((Callback)iterator).succeeded(); } @@ -185,6 +198,8 @@ public class HttpContent implements Callback, Closeable { if (isConsumed()) return; + if (buffer == CLOSE) + return; if (iterator instanceof Callback) ((Callback)iterator).failed(x); } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java index 650587e48c..5417a122e8 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java @@ -678,7 +678,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener { return content.isNonBlocking(); } - + @Override public void succeeded() { @@ -811,9 +811,9 @@ public abstract class HttpSender implements AsyncContentProvider.Listener while (true) { boolean advanced = content.advance(); - boolean consumed = content.isConsumed(); + boolean lastContent = content.isLast(); if (LOG.isDebugEnabled()) - LOG.debug("Content {} consumed {} for {}", advanced, consumed, exchange.getRequest()); + LOG.debug("Content present {}, last {}, consumed {} for {}", advanced, lastContent, content.isConsumed(), exchange.getRequest()); if (advanced) { @@ -821,7 +821,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener return Action.SCHEDULED; } - if (consumed) + if (lastContent) { sendContent(exchange, content, lastCallback); return Action.IDLE; @@ -894,7 +894,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener { return content.isNonBlocking(); } - + @Override public void succeeded() { diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java index 73d2c10709..80e762fb18 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java @@ -79,7 +79,9 @@ public class HttpSenderOverHTTP extends HttpSender boolean lastContent = content.isLast(); HttpGenerator.Result result = generator.generateRequest(null, null, chunk, contentBuffer, lastContent); if (LOG.isDebugEnabled()) - LOG.debug("Generated content: {} - {}", result, generator); + LOG.debug("Generated content ({} bytes) - {}/{}", + contentBuffer == null ? -1 : contentBuffer.remaining(), + result, generator); switch (result) { case NEED_CHUNK: @@ -200,7 +202,11 @@ public class HttpSenderOverHTTP extends HttpSender { HttpGenerator.Result result = generator.generateRequest(metaData, headerBuffer, chunkBuffer, contentBuffer, lastContent); if (LOG.isDebugEnabled()) - LOG.debug("Generated headers: {} - ", result, generator); + LOG.debug("Generated headers ({} bytes), chunk ({} bytes), content ({} bytes) - {}/{}", + headerBuffer == null ? -1 : headerBuffer.remaining(), + chunkBuffer == null ? -1 : chunkBuffer.remaining(), + contentBuffer == null ? -1 : contentBuffer.remaining(), + result, generator); switch (result) { case NEED_HEADER: diff --git a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java index ce4870f7b9..f7d589cc97 100644 --- a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java +++ b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java @@ -891,8 +891,9 @@ public class HttpGenerator @Override public String toString() { - return String.format("%s{s=%s}", + return String.format("%s@%x{s=%s}", getClass().getSimpleName(), + hashCode(), _state); } diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java index 1506f37f2c..a4cfaae397 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java @@ -64,9 +64,11 @@ public class HttpSenderOverHTTP2 extends HttpSender if (content.hasContent() && !expects100Continue(request)) { - if (content.advance()) + boolean advanced = content.advance(); + boolean lastContent = content.isLast(); + if (advanced || lastContent) { - DataFrame dataFrame = new DataFrame(stream.getId(), content.getByteBuffer(), content.isLast()); + DataFrame dataFrame = new DataFrame(stream.getId(), content.getByteBuffer(), lastContent); stream.data(dataFrame, callback); return; } @@ -80,6 +82,7 @@ public class HttpSenderOverHTTP2 extends HttpSender callback.failed(failure); } }; + // TODO optimize the send of HEADERS and DATA frames. channel.getSession().newStream(headersFrame, promise, channel.getStreamListener()); } diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncRequestContentTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncRequestContentTest.java new file mode 100644 index 0000000000..09ff0b0b04 --- /dev/null +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncRequestContentTest.java @@ -0,0 +1,191 @@ +// +// ======================================================================== +// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http.client; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.servlet.ServletException; +import javax.servlet.ServletInputStream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.client.util.DeferredContentProvider; +import org.eclipse.jetty.client.util.InputStreamContentProvider; +import org.eclipse.jetty.client.util.OutputStreamContentProvider; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.junit.Assert; +import org.junit.Test; + +public class AsyncRequestContentTest extends AbstractTest +{ + public AsyncRequestContentTest(Transport transport) + { + super(transport); + } + + @Test + public void testEmptyDeferredContent() throws Exception + { + start(new ConsumeInputHandler()); + + DeferredContentProvider contentProvider = new DeferredContentProvider(); + CountDownLatch latch = new CountDownLatch(1); + client.POST("http://localhost:" + connector.getLocalPort()) + .content(contentProvider) + .send(result -> + { + if (result.isSucceeded() && + result.getResponse().getStatus() == HttpStatus.OK_200) + latch.countDown(); + }); + contentProvider.close(); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testDeferredContent() throws Exception + { + start(new ConsumeInputHandler()); + + DeferredContentProvider contentProvider = new DeferredContentProvider(); + CountDownLatch latch = new CountDownLatch(1); + client.POST("http://localhost:" + connector.getLocalPort()) + .content(contentProvider) + .send(result -> + { + if (result.isSucceeded() && + result.getResponse().getStatus() == HttpStatus.OK_200) + latch.countDown(); + }); + contentProvider.offer(ByteBuffer.wrap(new byte[1])); + contentProvider.close(); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testEmptyInputStream() throws Exception + { + start(new ConsumeInputHandler()); + + InputStreamContentProvider contentProvider = + new InputStreamContentProvider(new ByteArrayInputStream(new byte[0])); + CountDownLatch latch = new CountDownLatch(1); + client.POST("http://localhost:" + connector.getLocalPort()) + .content(contentProvider) + .send(result -> + { + if (result.isSucceeded() && + result.getResponse().getStatus() == HttpStatus.OK_200) + latch.countDown(); + }); + contentProvider.close(); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testInputStream() throws Exception + { + start(new ConsumeInputHandler()); + + InputStreamContentProvider contentProvider = + new InputStreamContentProvider(new ByteArrayInputStream(new byte[1])); + CountDownLatch latch = new CountDownLatch(1); + client.POST("http://localhost:" + connector.getLocalPort()) + .content(contentProvider) + .send(result -> + { + if (result.isSucceeded() && + result.getResponse().getStatus() == HttpStatus.OK_200) + latch.countDown(); + }); + contentProvider.close(); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testEmptyOutputStream() throws Exception + { + start(new ConsumeInputHandler()); + + OutputStreamContentProvider contentProvider = new OutputStreamContentProvider(); + CountDownLatch latch = new CountDownLatch(1); + client.POST("http://localhost:" + connector.getLocalPort()) + .content(contentProvider) + .send(result -> + { + if (result.isSucceeded() && + result.getResponse().getStatus() == HttpStatus.OK_200) + latch.countDown(); + }); + contentProvider.close(); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testOutputStream() throws Exception + { + start(new ConsumeInputHandler()); + + OutputStreamContentProvider contentProvider = new OutputStreamContentProvider(); + CountDownLatch latch = new CountDownLatch(1); + client.POST("http://localhost:" + connector.getLocalPort()) + .content(contentProvider) + .send(result -> + { + if (result.isSucceeded() && + result.getResponse().getStatus() == HttpStatus.OK_200) + latch.countDown(); + }); + OutputStream output = contentProvider.getOutputStream(); + output.write(new byte[1]); + output.flush(); + contentProvider.close(); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + private static class ConsumeInputHandler extends AbstractHandler + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + ServletInputStream input = request.getInputStream(); + while (true) + { + int read = input.read(); + if (read < 0) + break; + } + response.setStatus(HttpStatus.OK_200); + } + } +} |