Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimone Bordet2015-09-23 20:21:51 +0000
committerSimone Bordet2015-09-23 20:23:37 +0000
commit8c21871cf0ac74ee9569248f3c534e211728b507 (patch)
treeca419517279aec0dd06bbcc051009f9fa59bcf17 /jetty-client
parentf063df42003c7f4dc05e94ed7c0bee95f317c4b1 (diff)
downloadorg.eclipse.jetty.project-8c21871cf0ac74ee9569248f3c534e211728b507.tar.gz
org.eclipse.jetty.project-8c21871cf0ac74ee9569248f3c534e211728b507.tar.xz
org.eclipse.jetty.project-8c21871cf0ac74ee9569248f3c534e211728b507.zip
478021 - Client sending Connection: close does not shutdown output.
Fixed behavior of HttpGenerator to change its persistent also for requests. Reworked HttpSenderOverHTTP to send headers via IteratingCallback, so that multiple generation steps can be made to produce SHUTDOWN_OUT.
Diffstat (limited to 'jetty-client')
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java221
-rw-r--r--jetty-client/src/test/java/org/eclipse/jetty/client/ClientConnectionCloseTest.java122
2 files changed, 271 insertions, 72 deletions
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java
index 887315105b..73d2c10709 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java
@@ -33,6 +33,7 @@ import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.IteratingCallback;
public class HttpSenderOverHTTP extends HttpSender
{
@@ -52,77 +53,9 @@ public class HttpSenderOverHTTP extends HttpSender
@Override
protected void sendHeaders(HttpExchange exchange, HttpContent content, Callback callback)
{
- Request request = exchange.getRequest();
- ContentProvider requestContent = request.getContent();
- long contentLength = requestContent == null ? -1 : requestContent.getLength();
- String path = request.getPath();
- String query = request.getQuery();
- if (query != null)
- path += "?" + query;
- MetaData.Request requestInfo = new MetaData.Request(request.getMethod(), new HttpURI(path), request.getVersion(), request.getHeaders(), contentLength);
-
try
{
- HttpClient client = getHttpChannel().getHttpDestination().getHttpClient();
- ByteBufferPool bufferPool = client.getByteBufferPool();
- ByteBuffer header = bufferPool.acquire(client.getRequestBufferSize(), false);
- ByteBuffer chunk = null;
-
- ByteBuffer contentBuffer = null;
- boolean lastContent = false;
- if (!expects100Continue(request))
- {
- content.advance();
- contentBuffer = content.getByteBuffer();
- lastContent = content.isLast();
- }
- while (true)
- {
- HttpGenerator.Result result = generator.generateRequest(requestInfo, header, chunk, contentBuffer, lastContent);
- switch (result)
- {
- case NEED_CHUNK:
- {
- chunk = bufferPool.acquire(HttpGenerator.CHUNK_SIZE, false);
- break;
- }
- case FLUSH:
- {
- int size = 1;
- boolean hasChunk = chunk != null;
- if (hasChunk)
- ++size;
- boolean hasContent = contentBuffer != null;
- if (hasContent)
- ++size;
- ByteBuffer[] toWrite = new ByteBuffer[size];
- ByteBuffer[] toRecycle = new ByteBuffer[hasChunk ? 2 : 1];
- toWrite[0] = header;
- toRecycle[0] = header;
- if (hasChunk)
- {
- toWrite[1] = chunk;
- toRecycle[1] = chunk;
- }
- if (hasContent)
- toWrite[toWrite.length - 1] = contentBuffer;
- EndPoint endPoint = getHttpChannel().getHttpConnection().getEndPoint();
- endPoint.write(new ByteBufferRecyclerCallback(callback, bufferPool, toRecycle), toWrite);
- return;
- }
- case DONE:
- {
- // The headers have already been generated, perhaps by a concurrent abort.
- callback.failed(new HttpRequestException("Could not generate headers", request));
- return;
- }
- default:
- {
- callback.failed(new IllegalStateException(result.toString()));
- return;
- }
- }
- }
+ new HeadersCallback(exchange, content, callback).iterate();
}
catch (Throwable x)
{
@@ -145,6 +78,8 @@ public class HttpSenderOverHTTP extends HttpSender
ByteBuffer contentBuffer = content.getByteBuffer();
boolean lastContent = content.isLast();
HttpGenerator.Result result = generator.generateRequest(null, null, chunk, contentBuffer, lastContent);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Generated content: {} - {}", result, generator);
switch (result)
{
case NEED_CHUNK:
@@ -168,17 +103,19 @@ public class HttpSenderOverHTTP extends HttpSender
}
case CONTINUE:
{
- break;
+ if (lastContent)
+ break;
+ callback.succeeded();
+ return;
}
case DONE:
{
- assert generator.isEnd();
callback.succeeded();
return;
}
default:
{
- throw new IllegalStateException();
+ throw new IllegalStateException(result.toString());
}
}
}
@@ -208,6 +145,8 @@ public class HttpSenderOverHTTP extends HttpSender
private void shutdownOutput()
{
+ if (LOG.isDebugEnabled())
+ LOG.debug("Request shutdown output {}", getHttpExchange().getRequest());
getHttpChannel().getHttpConnection().getEndPoint().shutdownOutput();
}
@@ -217,6 +156,144 @@ public class HttpSenderOverHTTP extends HttpSender
return String.format("%s[%s]", super.toString(), generator);
}
+ private class HeadersCallback extends IteratingCallback
+ {
+ private final HttpExchange exchange;
+ private final Callback callback;
+ private final MetaData.Request metaData;
+ private ByteBuffer headerBuffer;
+ private ByteBuffer chunkBuffer;
+ private ByteBuffer contentBuffer;
+ private boolean lastContent;
+ private boolean generated;
+
+ public HeadersCallback(HttpExchange exchange, HttpContent content, Callback callback)
+ {
+ super(false);
+ this.exchange = exchange;
+ this.callback = callback;
+
+ Request request = exchange.getRequest();
+ ContentProvider requestContent = request.getContent();
+ long contentLength = requestContent == null ? -1 : requestContent.getLength();
+ String path = request.getPath();
+ String query = request.getQuery();
+ if (query != null)
+ path += "?" + query;
+ metaData = new MetaData.Request(request.getMethod(), new HttpURI(path), request.getVersion(), request.getHeaders(), contentLength);
+
+ if (!expects100Continue(request))
+ {
+ content.advance();
+ contentBuffer = content.getByteBuffer();
+ lastContent = content.isLast();
+ }
+ }
+
+ @Override
+ protected Action process() throws Exception
+ {
+ HttpClient client = getHttpChannel().getHttpDestination().getHttpClient();
+ ByteBufferPool bufferPool = client.getByteBufferPool();
+
+ while (true)
+ {
+ HttpGenerator.Result result = generator.generateRequest(metaData, headerBuffer, chunkBuffer, contentBuffer, lastContent);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Generated headers: {} - ", result, generator);
+ switch (result)
+ {
+ case NEED_HEADER:
+ {
+ headerBuffer = bufferPool.acquire(client.getRequestBufferSize(), false);
+ break;
+ }
+ case NEED_CHUNK:
+ {
+ chunkBuffer = bufferPool.acquire(HttpGenerator.CHUNK_SIZE, false);
+ break;
+ }
+ case FLUSH:
+ {
+ EndPoint endPoint = getHttpChannel().getHttpConnection().getEndPoint();
+ if (chunkBuffer == null)
+ {
+ if (contentBuffer == null)
+ endPoint.write(this, headerBuffer);
+ else
+ endPoint.write(this, headerBuffer, contentBuffer);
+ }
+ else
+ {
+ if (contentBuffer == null)
+ endPoint.write(this, headerBuffer, chunkBuffer);
+ else
+ endPoint.write(this, headerBuffer, chunkBuffer, contentBuffer);
+ }
+ generated = true;
+ return Action.SCHEDULED;
+ }
+ case SHUTDOWN_OUT:
+ {
+ shutdownOutput();
+ return Action.SUCCEEDED;
+ }
+ case CONTINUE:
+ {
+ if (generated)
+ return Action.SUCCEEDED;
+ break;
+ }
+ case DONE:
+ {
+ if (generated)
+ return Action.SUCCEEDED;
+ // The headers have already been generated by some
+ // other thread, perhaps by a concurrent abort().
+ throw new HttpRequestException("Could not generate headers", exchange.getRequest());
+ }
+ default:
+ {
+ throw new IllegalStateException(result.toString());
+ }
+ }
+ }
+ }
+
+ @Override
+ public void succeeded()
+ {
+ release();
+ super.succeeded();
+ }
+
+ @Override
+ public void failed(Throwable x)
+ {
+ release();
+ callback.failed(x);
+ super.failed(x);
+ }
+
+ @Override
+ protected void onCompleteSuccess()
+ {
+ super.onCompleteSuccess();
+ callback.succeeded();
+ }
+
+ private void release()
+ {
+ HttpClient client = getHttpChannel().getHttpDestination().getHttpClient();
+ ByteBufferPool bufferPool = client.getByteBufferPool();
+ bufferPool.release(headerBuffer);
+ headerBuffer = null;
+ if (chunkBuffer != null)
+ bufferPool.release(chunkBuffer);
+ chunkBuffer = null;
+ }
+ }
+
private class ByteBufferRecyclerCallback implements Callback
{
private final Callback callback;
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ClientConnectionCloseTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ClientConnectionCloseTest.java
new file mode 100644
index 0000000000..4564c5e22f
--- /dev/null
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ClientConnectionCloseTest.java
@@ -0,0 +1,122 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.client;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.servlet.ServletException;
+import javax.servlet.ServletInputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.eclipse.jetty.client.api.ContentProvider;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.client.util.DeferredContentProvider;
+import org.eclipse.jetty.client.util.StringContentProvider;
+import org.eclipse.jetty.http.HttpHeader;
+import org.eclipse.jetty.http.HttpHeaderValue;
+import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ClientConnectionCloseTest extends AbstractHttpClientServerTest
+{
+ public ClientConnectionCloseTest(SslContextFactory sslContextFactory)
+ {
+ super(sslContextFactory);
+ }
+
+ @Test
+ public void testClientConnectionCloseShutdownOutputWithoutRequestContent() throws Exception
+ {
+ testClientConnectionCloseShutdownOutput(null);
+ }
+
+ @Test
+ public void testClientConnectionCloseShutdownOutputWithRequestContent() throws Exception
+ {
+ testClientConnectionCloseShutdownOutput(new StringContentProvider("data", StandardCharsets.UTF_8));
+ }
+
+ @Test
+ public void testClientConnectionCloseShutdownOutputWithChunkedRequestContent() throws Exception
+ {
+ DeferredContentProvider content = new DeferredContentProvider()
+ {
+ @Override
+ public long getLength()
+ {
+ return -1;
+ }
+ };
+ content.offer(ByteBuffer.wrap("data".getBytes(StandardCharsets.UTF_8)));
+ content.close();
+ testClientConnectionCloseShutdownOutput(content);
+ }
+
+ private void testClientConnectionCloseShutdownOutput(ContentProvider content) throws Exception
+ {
+ AtomicReference<EndPoint> ref = new AtomicReference<>();
+ start(new AbstractHandler()
+ {
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+ {
+ baseRequest.setHandled(true);
+ ref.set(baseRequest.getHttpChannel().getEndPoint());
+ ServletInputStream input = request.getInputStream();
+ while (true)
+ {
+ int read = input.read();
+ if (read < 0)
+ break;
+ }
+ response.setStatus(HttpStatus.OK_200);
+ }
+ });
+
+ ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
+ .scheme(scheme)
+ .path("/ctx/path")
+ .header(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString())
+ .content(content)
+ .send();
+
+ Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
+
+ // Wait for the FIN to arrive to the server
+ Thread.sleep(1000);
+
+ // Do not read from the server because it will trigger
+ // the send of the TLS Close Message before the response.
+
+ EndPoint serverEndPoint = ref.get();
+ ByteBuffer buffer = BufferUtil.allocate(1);
+ int read = serverEndPoint.fill(buffer);
+ Assert.assertEquals(-1, read);
+ }
+}

Back to the top