From 4fbdafb9e9e914962d49d1c7784b336b41553956 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 9 Apr 2015 15:15:27 +0200 Subject: 464292 - Implement stream-based transformer for AsyncMiddleManServlet. Introduced AfterContentTransformer with a transform(Source, Sink) method that offers an InputStream to read the original content from, and an OutputStream to write transformed content to. --- jetty-proxy/pom.xml | 20 +- .../jetty/proxy/AfterContentTransformer.java | 461 +++++++++++++++++++++ .../eclipse/jetty/proxy/AsyncMiddleManServlet.java | 34 +- .../jetty/proxy/AsyncMiddleManServletTest.java | 365 +++++++++++++++- 4 files changed, 859 insertions(+), 21 deletions(-) create mode 100644 jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AfterContentTransformer.java diff --git a/jetty-proxy/pom.xml b/jetty-proxy/pom.xml index 291a4069b5..5b6483f071 100644 --- a/jetty-proxy/pom.xml +++ b/jetty-proxy/pom.xml @@ -71,9 +71,9 @@ - org.eclipse.jetty.toolchain - jetty-test-helper - test + javax.servlet + javax.servlet-api + provided org.eclipse.jetty @@ -81,6 +81,11 @@ ${project.version} provided + + org.eclipse.jetty + jetty-util + ${project.version} + org.eclipse.jetty jetty-client @@ -88,13 +93,14 @@ org.eclipse.jetty - jetty-util + jetty-util-ajax ${project.version} + test - javax.servlet - javax.servlet-api - provided + org.eclipse.jetty.toolchain + jetty-test-helper + test diff --git a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AfterContentTransformer.java b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AfterContentTransformer.java new file mode 100644 index 0000000000..676083d09e --- /dev/null +++ b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AfterContentTransformer.java @@ -0,0 +1,461 @@ +// +// ======================================================================== +// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.proxy; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.nio.file.attribute.FileAttribute; +import java.util.ArrayList; +import java.util.List; + +import org.eclipse.jetty.util.component.Destroyable; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; + +/** + *

A specialized transformer for {@link AsyncMiddleManServlet} that performs + * the transformation when the whole content has been received.

+ *

The content is buffered in memory up to a configurable {@link #getMaxInputBufferSize() maximum size}, + * after which it is overflown to a file on disk. The overflow file is saved + * in the {@link #getOverflowDirectory() overflow directory} as a + * {@link Files#createTempFile(Path, String, String, FileAttribute[]) temporary file} + * with a name starting with the {@link #getInputFilePrefix() input prefix} + * and default suffix.

+ *

Application must implement the {@link #transform(Source, Sink) transformation method} + * to transform the content.

+ *

The transformed content is buffered in memory up to a configurable {@link #getMaxOutputBufferSize() maximum size} + * after which it is overflown to a file on disk. The overflow file is saved + * in the {@link #getOverflowDirectory() overflow directory} as a + * {@link Files#createTempFile(Path, String, String, FileAttribute[]) temporary file} + * with a name starting with the {@link #getOutputFilePrefix()} output prefix} + * and default suffix.

+ */ +public abstract class AfterContentTransformer implements AsyncMiddleManServlet.ContentTransformer, Destroyable +{ + private static final Logger LOG = Log.getLogger(AfterContentTransformer.class); + + private final List buffers = new ArrayList<>(); + private Path overflowDirectory = Paths.get(System.getProperty("java.io.tmpdir")); + private String inputFilePrefix = "amms_adct_in_"; + private String outputFilePrefix = "amms_adct_out_"; + private long maxInputBufferSize = 1024 * 1024; + private long inputBufferSize; + private FileChannel inputFile; + private long maxOutputBufferSize = maxInputBufferSize; + private long outputBufferSize; + private FileChannel outputFile; + + /** + *

Returns the directory where input and output are overflown to + * temporary files if they exceed, respectively, the + * {@link #getMaxInputBufferSize() max input size} or the + * {@link #getMaxOutputBufferSize() max output size}.

+ *

Defaults to the directory pointed by the {@code java.io.tmpdir} + * system property.

+ * + * @return the overflow directory path + * @see #setOverflowDirectory(Path) + */ + public Path getOverflowDirectory() + { + return overflowDirectory; + } + + /** + * @param overflowDirectory the overflow directory path + * @see #getOverflowDirectory() + */ + public void setOverflowDirectory(Path overflowDirectory) + { + this.overflowDirectory = overflowDirectory; + } + + /** + * @return the prefix of the input overflow temporary files + * @see #setInputFilePrefix(String) + */ + public String getInputFilePrefix() + { + return inputFilePrefix; + } + + /** + * @param inputFilePrefix the prefix of the input overflow temporary files + * @see #getInputFilePrefix() + */ + public void setInputFilePrefix(String inputFilePrefix) + { + this.inputFilePrefix = inputFilePrefix; + } + + /** + *

Returns the maximum input buffer size, after which the input is overflown to disk.

+ *

Defaults to 1 MiB, i.e. 1048576 bytes.

+ * + * @return the max input buffer size + * @see #setMaxInputBufferSize(long) + */ + public long getMaxInputBufferSize() + { + return maxInputBufferSize; + } + + /** + * @param maxInputBufferSize the max input buffer size + * @see #getMaxInputBufferSize() + */ + public void setMaxInputBufferSize(long maxInputBufferSize) + { + this.maxInputBufferSize = maxInputBufferSize; + } + + /** + * @return the prefix of the output overflow temporary files + * @see #setOutputFilePrefix(String) + */ + public String getOutputFilePrefix() + { + return outputFilePrefix; + } + + /** + * @param outputFilePrefix the prefix of the output overflow temporary files + * @see #getOutputFilePrefix() + */ + public void setOutputFilePrefix(String outputFilePrefix) + { + this.outputFilePrefix = outputFilePrefix; + } + + /** + *

Returns the maximum output buffer size, after which the output is overflown to disk.

+ *

Defaults to 1 MiB, i.e. 1048576 bytes.

+ * + * @return the max output buffer size + * @see #setMaxOutputBufferSize(long) + */ + public long getMaxOutputBufferSize() + { + return maxOutputBufferSize; + } + + /** + * @param maxOutputBufferSize the max output buffer size + */ + public void setMaxOutputBufferSize(long maxOutputBufferSize) + { + this.maxOutputBufferSize = maxOutputBufferSize; + } + + @Override + public final void transform(ByteBuffer input, boolean finished, List output) throws IOException + { + int remaining = input.remaining(); + if (remaining > 0) + { + inputBufferSize += remaining; + long max = getMaxInputBufferSize(); + if (max >= 0 && inputBufferSize > max) + { + overflow(input); + } + else + { + ByteBuffer copy = ByteBuffer.allocate(input.remaining()); + copy.put(input).flip(); + buffers.add(copy); + } + } + + if (finished) + { + Source source = new Source(); + Sink sink = new Sink(); + transform(source, sink); + sink.drainTo(output); + } + } + + /** + *

Transforms the original content read from the {@code source} into + * transformed content written to the {@code sink}.

+ *

The transformation must happen synchronously in the context of a call + * to this method (it is not supported to perform the transformation in another + * thread spawned during the call to this method).

+ *

Differently from {@link #transform(ByteBuffer, boolean, List)}, this + * method is invoked only when the whole content is available, and offers + * a blocking API via the InputStream and OutputStream that can be obtained + * from {@link Source} and {@link Sink} respectively.

+ *

Typical implementations:

+ *
+     * // Identity transformation (no transformation, the input is copied to the output)
+     * public void transform(Source source, Sink sink)
+     * {
+     *     org.eclipse.jetty.util.IO.copy(source.getInputStream(), sink.getOutputStream());
+     * }
+     * 
+ * + * @param source where the original content is read + * @param sink where the transformed content is written + * @throws IOException if the transformation fails + */ + public abstract void transform(Source source, Sink sink) throws IOException; + + private void overflow(ByteBuffer input) throws IOException + { + if (inputFile == null) + { + Path path = Files.createTempFile(getOverflowDirectory(), getInputFilePrefix(), null); + inputFile = FileChannel.open(path, + StandardOpenOption.CREATE, + StandardOpenOption.READ, + StandardOpenOption.WRITE, + StandardOpenOption.DELETE_ON_CLOSE); + int size = buffers.size(); + if (size > 0) + { + inputFile.write(buffers.toArray(new ByteBuffer[size])); + buffers.clear(); + } + } + inputFile.write(input); + } + + @Override + public void destroy() + { + close(inputFile); + close(outputFile); + } + + private void close(Closeable closeable) + { + try + { + if (closeable != null) + closeable.close(); + } + catch (IOException x) + { + LOG.ignore(x); + } + } + + /** + *

The source from where the original content is read to be transformed.

+ *

The {@link #getInputStream() input stream} provided by this + * class supports the {@link InputStream#reset()} method so that + * the stream can be rewound to the beginning.

+ */ + public class Source + { + private final InputStream stream; + + private Source() throws IOException + { + if (inputFile != null) + { + inputFile.force(true); + stream = new ChannelInputStream(); + } + else + { + stream = new MemoryInputStream(); + } + stream.reset(); + } + + /** + * @return an input stream to read the original content from + */ + public InputStream getInputStream() + { + return stream; + } + } + + private class ChannelInputStream extends InputStream + { + private final InputStream stream = Channels.newInputStream(inputFile); + + @Override + public int read(byte[] b, int off, int len) throws IOException + { + return stream.read(b, off, len); + } + + @Override + public int read() throws IOException + { + return stream.read(); + } + + @Override + public void reset() throws IOException + { + inputFile.position(0); + } + } + + private class MemoryInputStream extends InputStream + { + private final byte[] oneByte = new byte[1]; + private int index; + private ByteBuffer slice; + + @Override + public int read(byte[] b, int off, int len) throws IOException + { + if (len == 0) + return 0; + if (index == buffers.size()) + return -1; + + if (slice == null) + slice = buffers.get(index).slice(); + + int size = Math.min(len, slice.remaining()); + slice.get(b, off, size); + + if (!slice.hasRemaining()) + { + ++index; + slice = null; + } + + return size; + } + + @Override + public int read() throws IOException + { + int read = read(oneByte, 0, 1); + return read < 0 ? read : oneByte[0] & 0xFF; + } + + @Override + public void reset() throws IOException + { + index = 0; + slice = null; + } + } + + /** + *

The target to where the transformed content is written after the transformation.

+ */ + public class Sink + { + private final List buffers = new ArrayList<>(); + private final OutputStream stream = new SinkOutputStream(); + + /** + * @return an output stream to write the transformed content to + */ + public OutputStream getOutputStream() + { + return stream; + } + + private void overflow(ByteBuffer output) throws IOException + { + if (outputFile == null) + { + Path path = Files.createTempFile(getOverflowDirectory(), getOutputFilePrefix(), null); + outputFile = FileChannel.open(path, + StandardOpenOption.CREATE, + StandardOpenOption.READ, + StandardOpenOption.WRITE, + StandardOpenOption.DELETE_ON_CLOSE); + int size = buffers.size(); + if (size > 0) + { + outputFile.write(buffers.toArray(new ByteBuffer[size])); + buffers.clear(); + } + } + outputFile.write(output); + } + + private void drainTo(List output) throws IOException + { + if (outputFile == null) + { + output.addAll(buffers); + buffers.clear(); + } + else + { + outputFile.force(true); + long position = 0; + long length = outputFile.size(); + outputFile.position(position); + while (length > 0) + { + // At most 1 GiB file maps. + long size = Math.min(1024 * 1024 * 1024, length); + ByteBuffer buffer = outputFile.map(FileChannel.MapMode.READ_ONLY, position, size); + output.add(buffer); + position += size; + length -= size; + } + } + } + + private class SinkOutputStream extends OutputStream + { + @Override + public void write(byte[] b, int off, int len) throws IOException + { + if (len <= 0) + return; + + outputBufferSize += len; + long max = getMaxOutputBufferSize(); + if (max >= 0 && outputBufferSize > max) + { + overflow(ByteBuffer.wrap(b, off, len)); + } + else + { + // The array may be reused by the + // application so we need to copy it. + byte[] copy = new byte[len]; + System.arraycopy(b, off, copy, 0, len); + buffers.add(ByteBuffer.wrap(copy)); + } + } + + @Override + public void write(int b) throws IOException + { + write(new byte[]{(byte)b}, 0, 1); + } + } + } +} diff --git a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncMiddleManServlet.java b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncMiddleManServlet.java index e7b4937fa5..d8e0030a14 100644 --- a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncMiddleManServlet.java +++ b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncMiddleManServlet.java @@ -51,6 +51,7 @@ import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.CountingCallback; import org.eclipse.jetty.util.IteratingCallback; +import org.eclipse.jetty.util.component.Destroyable; @SuppressWarnings("serial") public class AsyncMiddleManServlet extends AbstractProxyServlet @@ -141,6 +142,19 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet return ContentTransformer.IDENTITY; } + private void transform(ContentTransformer transformer, ByteBuffer input, boolean finished, List output) throws IOException + { + try + { + transformer.transform(input, finished, output); + } + catch (Throwable x) + { + _log.info("Exception while transforming " + transformer, x); + throw x; + } + } + int readClientRequestContent(ServletInputStream input, byte[] buffer) throws IOException { return input.read(buffer); @@ -169,6 +183,16 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet output.write(buffer, offset, length); } + private void cleanup(HttpServletRequest clientRequest) + { + ContentTransformer clientTransformer = (ContentTransformer)clientRequest.getAttribute(CLIENT_TRANSFORMER); + if (clientTransformer instanceof Destroyable) + ((Destroyable)clientTransformer).destroy(); + ContentTransformer serverTransformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER); + if (serverTransformer instanceof Destroyable) + ((Destroyable)serverTransformer).destroy(); + } + protected class ProxyReader extends IteratingCallback implements ReadListener { private final byte[] buffer = new byte[getHttpClient().getRequestBufferSize()]; @@ -217,6 +241,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet @Override public void onError(Throwable t) { + cleanup(clientRequest); onClientRequestFailure(clientRequest, proxyRequest, proxyResponse, t); } @@ -272,7 +297,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet } int contentBytes = content.remaining(); - transformer.transform(content, finished, buffers); + transform(transformer, content, finished, buffers); int newContentBytes = 0; int size = buffers.size(); @@ -372,7 +397,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet length += contentBytes; boolean finished = contentLength > 0 && length == contentLength; - transformer.transform(content, finished, buffers); + transform(transformer, content, finished, buffers); int newContentBytes = 0; int size = buffers.size(); @@ -436,7 +461,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet ProxyWriter proxyWriter = (ProxyWriter)clientRequest.getAttribute(WRITE_LISTENER_ATTRIBUTE); ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER); - transformer.transform(BufferUtil.EMPTY_BUFFER, true, buffers); + transform(transformer, BufferUtil.EMPTY_BUFFER, true, buffers); long newContentBytes = 0; int size = buffers.size(); @@ -486,12 +511,14 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet @Override public void succeeded() { + cleanup(clientRequest); onProxyResponseSuccess(clientRequest, proxyResponse, response); } @Override public void failed(Throwable failure) { + cleanup(clientRequest); onProxyResponseFailure(clientRequest, proxyResponse, response, failure); } } @@ -718,4 +745,5 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet return ByteBuffer.wrap(gzipBytes); } } + } diff --git a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/AsyncMiddleManServletTest.java b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/AsyncMiddleManServletTest.java index 772f795860..a8a104770b 100644 --- a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/AsyncMiddleManServletTest.java +++ b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/AsyncMiddleManServletTest.java @@ -20,10 +20,16 @@ package org.eclipse.jetty.proxy; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; import java.net.URLDecoder; import java.net.URLEncoder; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -34,7 +40,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; - import javax.servlet.ServletException; import javax.servlet.ServletInputStream; import javax.servlet.ServletOutputStream; @@ -53,6 +58,8 @@ import org.eclipse.jetty.client.util.BytesContentProvider; import org.eclipse.jetty.client.util.DeferredContentProvider; import org.eclipse.jetty.client.util.FutureResponseListener; import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpHeaderValue; +import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.io.RuntimeIOException; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConnectionFactory; @@ -60,10 +67,12 @@ import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; -import org.eclipse.jetty.toolchain.test.IO; +import org.eclipse.jetty.toolchain.test.MavenTestingUtils; import org.eclipse.jetty.toolchain.test.TestTracker; import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.Utf8StringBuilder; +import org.eclipse.jetty.util.ajax.JSON; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.QueuedThreadPool; @@ -289,7 +298,7 @@ public class AsyncMiddleManServletTest Assert.assertEquals(200, response.getStatus()); Assert.assertArrayEquals(bytes, response.getContent()); } - + @Test public void testTransformGzippedHead() throws Exception { @@ -299,21 +308,21 @@ public class AsyncMiddleManServletTest protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { response.setHeader(HttpHeader.CONTENT_ENCODING.asString(), "gzip"); - + String sample = "Webtide\nGoogle\n"; byte[] bytes = sample.getBytes(StandardCharsets.UTF_8); - + ServletOutputStream out = response.getOutputStream(); out.write(gzip(bytes)); - + // create a byte buffer larger enough to create 2 (or more) transforms. - byte[] randomFiller = new byte[64*1024]; + byte[] randomFiller = new byte[64 * 1024]; /* fill with nonsense * Using random data to ensure compressed buffer size is large * enough to trigger at least 2 transform() events. */ new Random().nextBytes(randomFiller); - + out.write(gzip(randomFiller)); } }); @@ -333,7 +342,7 @@ public class AsyncMiddleManServletTest .send(); Assert.assertEquals(200, response.getStatus()); - + String expectedStr = "Webtide"; byte[] expected = expectedStr.getBytes(StandardCharsets.UTF_8); Assert.assertArrayEquals(expected, response.getContent()); @@ -436,7 +445,7 @@ public class AsyncMiddleManServletTest { // decode input stream thru gzip ByteArrayOutputStream bos = new ByteArrayOutputStream(); - IO.copy(new GZIPInputStream(request.getInputStream()),bos); + IO.copy(new GZIPInputStream(request.getInputStream()), bos); // ensure decompressed is 0 length Assert.assertEquals(0, bos.toByteArray().length); response.setHeader(HttpHeader.CONTENT_ENCODING.asString(), "gzip"); @@ -837,6 +846,340 @@ public class AsyncMiddleManServletTest Assert.assertEquals(502, response.getStatus()); } + @Test + public void testAfterContentTransformer() throws Exception + { + final String key0 = "id"; + long value0 = 1; + final String key1 = "channel"; + String value1 = "foo"; + final String json = "{ \"" + key0 + "\":" + value0 + ", \"" + key1 + "\":\"" + value1 + "\" }"; + startServer(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + response.getOutputStream().write(json.getBytes(StandardCharsets.UTF_8)); + } + }); + final String key2 = "c"; + startProxy(new AsyncMiddleManServlet() + { + @Override + protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse) + { + return new AfterContentTransformer() + { + @Override + public void transform(Source source, Sink sink) throws IOException + { + InputStream input = source.getInputStream(); + @SuppressWarnings("unchecked") + Map obj = (Map)JSON.parse(new InputStreamReader(input, "UTF-8")); + // Transform the object. + obj.put(key2, obj.remove(key1)); + try (OutputStream output = sink.getOutputStream()) + { + output.write(JSON.toString(obj).getBytes(StandardCharsets.UTF_8)); + } + } + }; + } + }); + startClient(); + + ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort()) + .timeout(5, TimeUnit.SECONDS) + .send(); + + Assert.assertEquals(200, response.getStatus()); + @SuppressWarnings("unchecked") + Map obj = (Map)JSON.parse(response.getContentAsString()); + Assert.assertNotNull(obj); + Assert.assertEquals(2, obj.size()); + Assert.assertEquals(value0, obj.get(key0)); + Assert.assertEquals(value1, obj.get(key2)); + } + + @Test + public void testAfterContentTransformerMemoryInputStreamReset() throws Exception + { + testAfterContentTransformerInputStreamReset(false); + } + + @Test + public void testAfterContentTransformerDiskInputStreamReset() throws Exception + { + testAfterContentTransformerInputStreamReset(true); + } + + private void testAfterContentTransformerInputStreamReset(final boolean overflow) throws Exception + { + final byte[] data = new byte[]{'c', 'o', 'f', 'f', 'e', 'e'}; + startServer(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + // Write the content in two chunks. + int chunk = data.length / 2; + ServletOutputStream output = response.getOutputStream(); + output.write(data, 0, chunk); + sleep(1000); + output.write(data, chunk, data.length - chunk); + } + }); + startProxy(new AsyncMiddleManServlet() + { + @Override + protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse) + { + return new AfterContentTransformer() + { + { + setMaxInputBufferSize(overflow ? data.length / 2 : data.length * 2); + } + + @Override + public void transform(Source source, Sink sink) throws IOException + { + // Consume the stream once. + InputStream input = source.getInputStream(); + IO.copy(input, IO.getNullStream()); + + // Reset the stream and re-read it. + input.reset(); + IO.copy(input, sink.getOutputStream()); + } + }; + } + }); + startClient(); + + ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort()) + .timeout(5, TimeUnit.SECONDS) + .send(); + + Assert.assertEquals(HttpStatus.OK_200, response.getStatus()); + Assert.assertArrayEquals(data, response.getContent()); + } + + @Test + public void testAfterContentTransformerOverflowingToDisk() throws Exception + { + // Make sure the temporary directory we use exists and it's empty. + final Path targetTestsDir = prepareTargetTestsDir(); + + final String key0 = "id"; + long value0 = 1; + final String key1 = "channel"; + String value1 = "foo"; + final String json = "{ \"" + key0 + "\":" + value0 + ", \"" + key1 + "\":\"" + value1 + "\" }"; + startServer(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + response.getOutputStream().write(json.getBytes(StandardCharsets.UTF_8)); + } + }); + final String inputPrefix = "in_"; + final String outputPrefix = "out_"; + final String key2 = "c"; + startProxy(new AsyncMiddleManServlet() + { + @Override + protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse) + { + AfterContentTransformer transformer = new AfterContentTransformer() + { + @Override + public void transform(Source source, Sink sink) throws IOException + { + InputStream input = source.getInputStream(); + @SuppressWarnings("unchecked") + Map obj = (Map)JSON.parse(new InputStreamReader(input, "UTF-8")); + // Transform the object. + obj.put(key2, obj.remove(key1)); + try (OutputStream output = sink.getOutputStream()) + { + output.write(JSON.toString(obj).getBytes(StandardCharsets.UTF_8)); + } + } + }; + transformer.setOverflowDirectory(targetTestsDir); + int maxBufferSize = json.length() / 4; + transformer.setMaxInputBufferSize(maxBufferSize); + transformer.setInputFilePrefix(inputPrefix); + transformer.setMaxOutputBufferSize(maxBufferSize); + transformer.setOutputFilePrefix(outputPrefix); + return transformer; + } + }); + startClient(); + + ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort()) + .timeout(5, TimeUnit.SECONDS) + .send(); + + Assert.assertEquals(200, response.getStatus()); + @SuppressWarnings("unchecked") + Map obj = (Map)JSON.parse(response.getContentAsString()); + Assert.assertNotNull(obj); + Assert.assertEquals(2, obj.size()); + Assert.assertEquals(value0, obj.get(key0)); + Assert.assertEquals(value1, obj.get(key2)); + // Make sure the files do not exist. + try (DirectoryStream paths = Files.newDirectoryStream(targetTestsDir, inputPrefix + "*.*")) + { + Assert.assertFalse(paths.iterator().hasNext()); + } + try (DirectoryStream paths = Files.newDirectoryStream(targetTestsDir, outputPrefix + "*.*")) + { + Assert.assertFalse(paths.iterator().hasNext()); + } + } + + @Test + public void testAfterContentTransformerClosingFilesOnClientRequestException() throws Exception + { + final Path targetTestsDir = prepareTargetTestsDir(); + + startServer(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + IO.copy(request.getInputStream(), IO.getNullStream()); + } + }); + final CountDownLatch destroyLatch = new CountDownLatch(1); + startProxy(new AsyncMiddleManServlet() + { + @Override + protected ContentTransformer newClientRequestContentTransformer(HttpServletRequest clientRequest, Request proxyRequest) + { + return new AfterContentTransformer() + { + { + setOverflowDirectory(targetTestsDir); + setMaxInputBufferSize(0); + setMaxOutputBufferSize(0); + } + + @Override + public void transform(Source source, Sink sink) throws IOException + { + IO.copy(source.getInputStream(), sink.getOutputStream()); + } + + @Override + public void destroy() + { + super.destroy(); + destroyLatch.countDown(); + } + }; + } + }); + long idleTimeout = 1000; + proxyConnector.setIdleTimeout(idleTimeout); + startClient(); + + // Send only part of the content; the proxy will idle timeout. + final byte[] data = new byte[]{'c', 'a', 'f', 'e'}; + ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort()) + .header(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString()) + .content(new BytesContentProvider(data) + { + @Override + public long getLength() + { + return data.length + 1; + } + }) + .timeout(5 * idleTimeout, TimeUnit.MILLISECONDS) + .send(); + + Assert.assertTrue(destroyLatch.await(5 * idleTimeout, TimeUnit.MILLISECONDS)); + Assert.assertEquals(HttpStatus.GATEWAY_TIMEOUT_504, response.getStatus()); + } + + @Test + public void testAfterContentTransformerClosingFilesOnServerResponseException() throws Exception + { + final Path targetTestsDir = prepareTargetTestsDir(); + + final CountDownLatch serviceLatch = new CountDownLatch(1); + startServer(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + response.setHeader(HttpHeader.CONNECTION.asString(), HttpHeaderValue.CLOSE.asString()); + response.setContentLength(2); + // Send only part of the content. + OutputStream output = response.getOutputStream(); + output.write('x'); + output.flush(); + serviceLatch.countDown(); + } + }); + final CountDownLatch destroyLatch = new CountDownLatch(1); + startProxy(new AsyncMiddleManServlet() + { + @Override + protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse) + { + return new AfterContentTransformer() + { + { + setOverflowDirectory(targetTestsDir); + setMaxInputBufferSize(0); + setMaxOutputBufferSize(0); + } + + @Override + public void transform(Source source, Sink sink) throws IOException + { + IO.copy(source.getInputStream(), sink.getOutputStream()); + } + + @Override + public void destroy() + { + super.destroy(); + destroyLatch.countDown(); + } + }; + } + }); + startClient(); + + ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort()) + .timeout(5, TimeUnit.SECONDS) + .send(); + + Assert.assertTrue(serviceLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(destroyLatch.await(5, TimeUnit.SECONDS)); + Assert.assertEquals(HttpStatus.BAD_GATEWAY_502, response.getStatus()); + } + + private Path prepareTargetTestsDir() throws IOException + { + final Path targetTestsDir = MavenTestingUtils.getTargetTestingDir().toPath(); + Files.createDirectories(targetTestsDir); + try (DirectoryStream files = Files.newDirectoryStream(targetTestsDir, "*.*")) + { + for (Path file : files) + { + if (!Files.isDirectory(file)) + Files.delete(file); + } + } + return targetTestsDir; + } + private void sleep(long delay) { try @@ -1005,7 +1348,7 @@ public class AsyncMiddleManServletTest } } } - + /** * A transformer that discards all but the first line of text. */ -- cgit v1.2.3