Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimone Bordet2015-04-09 13:15:27 +0000
committerSimone Bordet2015-04-09 13:15:27 +0000
commit4fbdafb9e9e914962d49d1c7784b336b41553956 (patch)
tree79dff4d51cdf67d6f496ffc9a8f1fc1b1e7fecd7
parent990a045588f005325e1bcd484af6bfb987a1a05d (diff)
downloadorg.eclipse.jetty.project-4fbdafb9e9e914962d49d1c7784b336b41553956.tar.gz
org.eclipse.jetty.project-4fbdafb9e9e914962d49d1c7784b336b41553956.tar.xz
org.eclipse.jetty.project-4fbdafb9e9e914962d49d1c7784b336b41553956.zip
464292 - Implement stream-based transformer for AsyncMiddleManServlet.
Introduced AfterContentTransformer with a transform(Source, Sink) method that offers an InputStream to read the original content from, and an OutputStream to write transformed content to.
-rw-r--r--jetty-proxy/pom.xml20
-rw-r--r--jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AfterContentTransformer.java461
-rw-r--r--jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncMiddleManServlet.java34
-rw-r--r--jetty-proxy/src/test/java/org/eclipse/jetty/proxy/AsyncMiddleManServletTest.java365
4 files changed, 859 insertions, 21 deletions
diff --git a/jetty-proxy/pom.xml b/jetty-proxy/pom.xml
index 291a4069b5..5b6483f071 100644
--- a/jetty-proxy/pom.xml
+++ b/jetty-proxy/pom.xml
@@ -71,9 +71,9 @@
</build>
<dependencies>
<dependency>
- <groupId>org.eclipse.jetty.toolchain</groupId>
- <artifactId>jetty-test-helper</artifactId>
- <scope>test</scope>
+ <groupId>javax.servlet</groupId>
+ <artifactId>javax.servlet-api</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
@@ -83,18 +83,24 @@
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-util</artifactId>
+ <artifactId>jetty-util-ajax</artifactId>
<version>${project.version}</version>
+ <scope>test</scope>
</dependency>
<dependency>
- <groupId>javax.servlet</groupId>
- <artifactId>javax.servlet-api</artifactId>
- <scope>provided</scope>
+ <groupId>org.eclipse.jetty.toolchain</groupId>
+ <artifactId>jetty-test-helper</artifactId>
+ <scope>test</scope>
</dependency>
</dependencies>
</project>
diff --git a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AfterContentTransformer.java b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AfterContentTransformer.java
new file mode 100644
index 0000000000..676083d09e
--- /dev/null
+++ b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AfterContentTransformer.java
@@ -0,0 +1,461 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.proxy;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.FileAttribute;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.eclipse.jetty.util.component.Destroyable;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+
+/**
+ * <p>A specialized transformer for {@link AsyncMiddleManServlet} that performs
+ * the transformation when the whole content has been received.</p>
+ * <p>The content is buffered in memory up to a configurable {@link #getMaxInputBufferSize() maximum size},
+ * after which it is overflown to a file on disk. The overflow file is saved
+ * in the {@link #getOverflowDirectory() overflow directory} as a
+ * {@link Files#createTempFile(Path, String, String, FileAttribute[]) temporary file}
+ * with a name starting with the {@link #getInputFilePrefix() input prefix}
+ * and default suffix.</p>
+ * <p>Application must implement the {@link #transform(Source, Sink) transformation method}
+ * to transform the content.</p>
+ * <p>The transformed content is buffered in memory up to a configurable {@link #getMaxOutputBufferSize() maximum size}
+ * after which it is overflown to a file on disk. The overflow file is saved
+ * in the {@link #getOverflowDirectory() overflow directory} as a
+ * {@link Files#createTempFile(Path, String, String, FileAttribute[]) temporary file}
+ * with a name starting with the {@link #getOutputFilePrefix()} output prefix}
+ * and default suffix.</p>
+ */
+public abstract class AfterContentTransformer implements AsyncMiddleManServlet.ContentTransformer, Destroyable
+{
+ private static final Logger LOG = Log.getLogger(AfterContentTransformer.class);
+
+ private final List<ByteBuffer> buffers = new ArrayList<>();
+ private Path overflowDirectory = Paths.get(System.getProperty("java.io.tmpdir"));
+ private String inputFilePrefix = "amms_adct_in_";
+ private String outputFilePrefix = "amms_adct_out_";
+ private long maxInputBufferSize = 1024 * 1024;
+ private long inputBufferSize;
+ private FileChannel inputFile;
+ private long maxOutputBufferSize = maxInputBufferSize;
+ private long outputBufferSize;
+ private FileChannel outputFile;
+
+ /**
+ * <p>Returns the directory where input and output are overflown to
+ * temporary files if they exceed, respectively, the
+ * {@link #getMaxInputBufferSize() max input size} or the
+ * {@link #getMaxOutputBufferSize() max output size}.</p>
+ * <p>Defaults to the directory pointed by the {@code java.io.tmpdir}
+ * system property.</p>
+ *
+ * @return the overflow directory path
+ * @see #setOverflowDirectory(Path)
+ */
+ public Path getOverflowDirectory()
+ {
+ return overflowDirectory;
+ }
+
+ /**
+ * @param overflowDirectory the overflow directory path
+ * @see #getOverflowDirectory()
+ */
+ public void setOverflowDirectory(Path overflowDirectory)
+ {
+ this.overflowDirectory = overflowDirectory;
+ }
+
+ /**
+ * @return the prefix of the input overflow temporary files
+ * @see #setInputFilePrefix(String)
+ */
+ public String getInputFilePrefix()
+ {
+ return inputFilePrefix;
+ }
+
+ /**
+ * @param inputFilePrefix the prefix of the input overflow temporary files
+ * @see #getInputFilePrefix()
+ */
+ public void setInputFilePrefix(String inputFilePrefix)
+ {
+ this.inputFilePrefix = inputFilePrefix;
+ }
+
+ /**
+ * <p>Returns the maximum input buffer size, after which the input is overflown to disk.</p>
+ * <p>Defaults to 1 MiB, i.e. 1048576 bytes.</p>
+ *
+ * @return the max input buffer size
+ * @see #setMaxInputBufferSize(long)
+ */
+ public long getMaxInputBufferSize()
+ {
+ return maxInputBufferSize;
+ }
+
+ /**
+ * @param maxInputBufferSize the max input buffer size
+ * @see #getMaxInputBufferSize()
+ */
+ public void setMaxInputBufferSize(long maxInputBufferSize)
+ {
+ this.maxInputBufferSize = maxInputBufferSize;
+ }
+
+ /**
+ * @return the prefix of the output overflow temporary files
+ * @see #setOutputFilePrefix(String)
+ */
+ public String getOutputFilePrefix()
+ {
+ return outputFilePrefix;
+ }
+
+ /**
+ * @param outputFilePrefix the prefix of the output overflow temporary files
+ * @see #getOutputFilePrefix()
+ */
+ public void setOutputFilePrefix(String outputFilePrefix)
+ {
+ this.outputFilePrefix = outputFilePrefix;
+ }
+
+ /**
+ * <p>Returns the maximum output buffer size, after which the output is overflown to disk.</p>
+ * <p>Defaults to 1 MiB, i.e. 1048576 bytes.</p>
+ *
+ * @return the max output buffer size
+ * @see #setMaxOutputBufferSize(long)
+ */
+ public long getMaxOutputBufferSize()
+ {
+ return maxOutputBufferSize;
+ }
+
+ /**
+ * @param maxOutputBufferSize the max output buffer size
+ */
+ public void setMaxOutputBufferSize(long maxOutputBufferSize)
+ {
+ this.maxOutputBufferSize = maxOutputBufferSize;
+ }
+
+ @Override
+ public final void transform(ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
+ {
+ int remaining = input.remaining();
+ if (remaining > 0)
+ {
+ inputBufferSize += remaining;
+ long max = getMaxInputBufferSize();
+ if (max >= 0 && inputBufferSize > max)
+ {
+ overflow(input);
+ }
+ else
+ {
+ ByteBuffer copy = ByteBuffer.allocate(input.remaining());
+ copy.put(input).flip();
+ buffers.add(copy);
+ }
+ }
+
+ if (finished)
+ {
+ Source source = new Source();
+ Sink sink = new Sink();
+ transform(source, sink);
+ sink.drainTo(output);
+ }
+ }
+
+ /**
+ * <p>Transforms the original content read from the {@code source} into
+ * transformed content written to the {@code sink}.</p>
+ * <p>The transformation must happen synchronously in the context of a call
+ * to this method (it is not supported to perform the transformation in another
+ * thread spawned during the call to this method).</p>
+ * <p>Differently from {@link #transform(ByteBuffer, boolean, List)}, this
+ * method is invoked only when the whole content is available, and offers
+ * a blocking API via the InputStream and OutputStream that can be obtained
+ * from {@link Source} and {@link Sink} respectively.</p>
+ * <p>Typical implementations:</p>
+ * <pre>
+ * // Identity transformation (no transformation, the input is copied to the output)
+ * public void transform(Source source, Sink sink)
+ * {
+ * org.eclipse.jetty.util.IO.copy(source.getInputStream(), sink.getOutputStream());
+ * }
+ * </pre>
+ *
+ * @param source where the original content is read
+ * @param sink where the transformed content is written
+ * @throws IOException if the transformation fails
+ */
+ public abstract void transform(Source source, Sink sink) throws IOException;
+
+ private void overflow(ByteBuffer input) throws IOException
+ {
+ if (inputFile == null)
+ {
+ Path path = Files.createTempFile(getOverflowDirectory(), getInputFilePrefix(), null);
+ inputFile = FileChannel.open(path,
+ StandardOpenOption.CREATE,
+ StandardOpenOption.READ,
+ StandardOpenOption.WRITE,
+ StandardOpenOption.DELETE_ON_CLOSE);
+ int size = buffers.size();
+ if (size > 0)
+ {
+ inputFile.write(buffers.toArray(new ByteBuffer[size]));
+ buffers.clear();
+ }
+ }
+ inputFile.write(input);
+ }
+
+ @Override
+ public void destroy()
+ {
+ close(inputFile);
+ close(outputFile);
+ }
+
+ private void close(Closeable closeable)
+ {
+ try
+ {
+ if (closeable != null)
+ closeable.close();
+ }
+ catch (IOException x)
+ {
+ LOG.ignore(x);
+ }
+ }
+
+ /**
+ * <p>The source from where the original content is read to be transformed.</p>
+ * <p>The {@link #getInputStream() input stream} provided by this
+ * class supports the {@link InputStream#reset()} method so that
+ * the stream can be rewound to the beginning.</p>
+ */
+ public class Source
+ {
+ private final InputStream stream;
+
+ private Source() throws IOException
+ {
+ if (inputFile != null)
+ {
+ inputFile.force(true);
+ stream = new ChannelInputStream();
+ }
+ else
+ {
+ stream = new MemoryInputStream();
+ }
+ stream.reset();
+ }
+
+ /**
+ * @return an input stream to read the original content from
+ */
+ public InputStream getInputStream()
+ {
+ return stream;
+ }
+ }
+
+ private class ChannelInputStream extends InputStream
+ {
+ private final InputStream stream = Channels.newInputStream(inputFile);
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException
+ {
+ return stream.read(b, off, len);
+ }
+
+ @Override
+ public int read() throws IOException
+ {
+ return stream.read();
+ }
+
+ @Override
+ public void reset() throws IOException
+ {
+ inputFile.position(0);
+ }
+ }
+
+ private class MemoryInputStream extends InputStream
+ {
+ private final byte[] oneByte = new byte[1];
+ private int index;
+ private ByteBuffer slice;
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException
+ {
+ if (len == 0)
+ return 0;
+ if (index == buffers.size())
+ return -1;
+
+ if (slice == null)
+ slice = buffers.get(index).slice();
+
+ int size = Math.min(len, slice.remaining());
+ slice.get(b, off, size);
+
+ if (!slice.hasRemaining())
+ {
+ ++index;
+ slice = null;
+ }
+
+ return size;
+ }
+
+ @Override
+ public int read() throws IOException
+ {
+ int read = read(oneByte, 0, 1);
+ return read < 0 ? read : oneByte[0] & 0xFF;
+ }
+
+ @Override
+ public void reset() throws IOException
+ {
+ index = 0;
+ slice = null;
+ }
+ }
+
+ /**
+ * <p>The target to where the transformed content is written after the transformation.</p>
+ */
+ public class Sink
+ {
+ private final List<ByteBuffer> buffers = new ArrayList<>();
+ private final OutputStream stream = new SinkOutputStream();
+
+ /**
+ * @return an output stream to write the transformed content to
+ */
+ public OutputStream getOutputStream()
+ {
+ return stream;
+ }
+
+ private void overflow(ByteBuffer output) throws IOException
+ {
+ if (outputFile == null)
+ {
+ Path path = Files.createTempFile(getOverflowDirectory(), getOutputFilePrefix(), null);
+ outputFile = FileChannel.open(path,
+ StandardOpenOption.CREATE,
+ StandardOpenOption.READ,
+ StandardOpenOption.WRITE,
+ StandardOpenOption.DELETE_ON_CLOSE);
+ int size = buffers.size();
+ if (size > 0)
+ {
+ outputFile.write(buffers.toArray(new ByteBuffer[size]));
+ buffers.clear();
+ }
+ }
+ outputFile.write(output);
+ }
+
+ private void drainTo(List<ByteBuffer> output) throws IOException
+ {
+ if (outputFile == null)
+ {
+ output.addAll(buffers);
+ buffers.clear();
+ }
+ else
+ {
+ outputFile.force(true);
+ long position = 0;
+ long length = outputFile.size();
+ outputFile.position(position);
+ while (length > 0)
+ {
+ // At most 1 GiB file maps.
+ long size = Math.min(1024 * 1024 * 1024, length);
+ ByteBuffer buffer = outputFile.map(FileChannel.MapMode.READ_ONLY, position, size);
+ output.add(buffer);
+ position += size;
+ length -= size;
+ }
+ }
+ }
+
+ private class SinkOutputStream extends OutputStream
+ {
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ if (len <= 0)
+ return;
+
+ outputBufferSize += len;
+ long max = getMaxOutputBufferSize();
+ if (max >= 0 && outputBufferSize > max)
+ {
+ overflow(ByteBuffer.wrap(b, off, len));
+ }
+ else
+ {
+ // The array may be reused by the
+ // application so we need to copy it.
+ byte[] copy = new byte[len];
+ System.arraycopy(b, off, copy, 0, len);
+ buffers.add(ByteBuffer.wrap(copy));
+ }
+ }
+
+ @Override
+ public void write(int b) throws IOException
+ {
+ write(new byte[]{(byte)b}, 0, 1);
+ }
+ }
+ }
+}
diff --git a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncMiddleManServlet.java b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncMiddleManServlet.java
index e7b4937fa5..d8e0030a14 100644
--- a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncMiddleManServlet.java
+++ b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncMiddleManServlet.java
@@ -51,6 +51,7 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.CountingCallback;
import org.eclipse.jetty.util.IteratingCallback;
+import org.eclipse.jetty.util.component.Destroyable;
@SuppressWarnings("serial")
public class AsyncMiddleManServlet extends AbstractProxyServlet
@@ -141,6 +142,19 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
return ContentTransformer.IDENTITY;
}
+ private void transform(ContentTransformer transformer, ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
+ {
+ try
+ {
+ transformer.transform(input, finished, output);
+ }
+ catch (Throwable x)
+ {
+ _log.info("Exception while transforming " + transformer, x);
+ throw x;
+ }
+ }
+
int readClientRequestContent(ServletInputStream input, byte[] buffer) throws IOException
{
return input.read(buffer);
@@ -169,6 +183,16 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
output.write(buffer, offset, length);
}
+ private void cleanup(HttpServletRequest clientRequest)
+ {
+ ContentTransformer clientTransformer = (ContentTransformer)clientRequest.getAttribute(CLIENT_TRANSFORMER);
+ if (clientTransformer instanceof Destroyable)
+ ((Destroyable)clientTransformer).destroy();
+ ContentTransformer serverTransformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER);
+ if (serverTransformer instanceof Destroyable)
+ ((Destroyable)serverTransformer).destroy();
+ }
+
protected class ProxyReader extends IteratingCallback implements ReadListener
{
private final byte[] buffer = new byte[getHttpClient().getRequestBufferSize()];
@@ -217,6 +241,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
@Override
public void onError(Throwable t)
{
+ cleanup(clientRequest);
onClientRequestFailure(clientRequest, proxyRequest, proxyResponse, t);
}
@@ -272,7 +297,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
}
int contentBytes = content.remaining();
- transformer.transform(content, finished, buffers);
+ transform(transformer, content, finished, buffers);
int newContentBytes = 0;
int size = buffers.size();
@@ -372,7 +397,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
length += contentBytes;
boolean finished = contentLength > 0 && length == contentLength;
- transformer.transform(content, finished, buffers);
+ transform(transformer, content, finished, buffers);
int newContentBytes = 0;
int size = buffers.size();
@@ -436,7 +461,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
ProxyWriter proxyWriter = (ProxyWriter)clientRequest.getAttribute(WRITE_LISTENER_ATTRIBUTE);
ContentTransformer transformer = (ContentTransformer)clientRequest.getAttribute(SERVER_TRANSFORMER);
- transformer.transform(BufferUtil.EMPTY_BUFFER, true, buffers);
+ transform(transformer, BufferUtil.EMPTY_BUFFER, true, buffers);
long newContentBytes = 0;
int size = buffers.size();
@@ -486,12 +511,14 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
@Override
public void succeeded()
{
+ cleanup(clientRequest);
onProxyResponseSuccess(clientRequest, proxyResponse, response);
}
@Override
public void failed(Throwable failure)
{
+ cleanup(clientRequest);
onProxyResponseFailure(clientRequest, proxyResponse, response, failure);
}
}
@@ -718,4 +745,5 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
return ByteBuffer.wrap(gzipBytes);
}
}
+
}
diff --git a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/AsyncMiddleManServletTest.java b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/AsyncMiddleManServletTest.java
index 772f795860..a8a104770b 100644
--- a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/AsyncMiddleManServletTest.java
+++ b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/AsyncMiddleManServletTest.java
@@ -20,10 +20,16 @@ package org.eclipse.jetty.proxy;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -34,7 +40,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
-
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
@@ -53,6 +58,8 @@ import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.http.HttpHeader;
+import org.eclipse.jetty.http.HttpHeaderValue;
+import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
@@ -60,10 +67,12 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
-import org.eclipse.jetty.toolchain.test.IO;
+import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.Utf8StringBuilder;
+import org.eclipse.jetty.util.ajax.JSON;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
@@ -289,7 +298,7 @@ public class AsyncMiddleManServletTest
Assert.assertEquals(200, response.getStatus());
Assert.assertArrayEquals(bytes, response.getContent());
}
-
+
@Test
public void testTransformGzippedHead() throws Exception
{
@@ -299,21 +308,21 @@ public class AsyncMiddleManServletTest
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
response.setHeader(HttpHeader.CONTENT_ENCODING.asString(), "gzip");
-
+
String sample = "<a href=\"http://webtide.com/\">Webtide</a>\n<a href=\"http://google.com\">Google</a>\n";
byte[] bytes = sample.getBytes(StandardCharsets.UTF_8);
-
+
ServletOutputStream out = response.getOutputStream();
out.write(gzip(bytes));
-
+
// create a byte buffer larger enough to create 2 (or more) transforms.
- byte[] randomFiller = new byte[64*1024];
+ byte[] randomFiller = new byte[64 * 1024];
/* fill with nonsense
* Using random data to ensure compressed buffer size is large
* enough to trigger at least 2 transform() events.
*/
new Random().nextBytes(randomFiller);
-
+
out.write(gzip(randomFiller));
}
});
@@ -333,7 +342,7 @@ public class AsyncMiddleManServletTest
.send();
Assert.assertEquals(200, response.getStatus());
-
+
String expectedStr = "<a href=\"http://webtide.com/\">Webtide</a>";
byte[] expected = expectedStr.getBytes(StandardCharsets.UTF_8);
Assert.assertArrayEquals(expected, response.getContent());
@@ -436,7 +445,7 @@ public class AsyncMiddleManServletTest
{
// decode input stream thru gzip
ByteArrayOutputStream bos = new ByteArrayOutputStream();
- IO.copy(new GZIPInputStream(request.getInputStream()),bos);
+ IO.copy(new GZIPInputStream(request.getInputStream()), bos);
// ensure decompressed is 0 length
Assert.assertEquals(0, bos.toByteArray().length);
response.setHeader(HttpHeader.CONTENT_ENCODING.asString(), "gzip");
@@ -837,6 +846,340 @@ public class AsyncMiddleManServletTest
Assert.assertEquals(502, response.getStatus());
}
+ @Test
+ public void testAfterContentTransformer() throws Exception
+ {
+ final String key0 = "id";
+ long value0 = 1;
+ final String key1 = "channel";
+ String value1 = "foo";
+ final String json = "{ \"" + key0 + "\":" + value0 + ", \"" + key1 + "\":\"" + value1 + "\" }";
+ startServer(new HttpServlet()
+ {
+ @Override
+ protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
+ {
+ response.getOutputStream().write(json.getBytes(StandardCharsets.UTF_8));
+ }
+ });
+ final String key2 = "c";
+ startProxy(new AsyncMiddleManServlet()
+ {
+ @Override
+ protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse)
+ {
+ return new AfterContentTransformer()
+ {
+ @Override
+ public void transform(Source source, Sink sink) throws IOException
+ {
+ InputStream input = source.getInputStream();
+ @SuppressWarnings("unchecked")
+ Map<String, Object> obj = (Map<String, Object>)JSON.parse(new InputStreamReader(input, "UTF-8"));
+ // Transform the object.
+ obj.put(key2, obj.remove(key1));
+ try (OutputStream output = sink.getOutputStream())
+ {
+ output.write(JSON.toString(obj).getBytes(StandardCharsets.UTF_8));
+ }
+ }
+ };
+ }
+ });
+ startClient();
+
+ ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
+ .timeout(5, TimeUnit.SECONDS)
+ .send();
+
+ Assert.assertEquals(200, response.getStatus());
+ @SuppressWarnings("unchecked")
+ Map<String, Object> obj = (Map<String, Object>)JSON.parse(response.getContentAsString());
+ Assert.assertNotNull(obj);
+ Assert.assertEquals(2, obj.size());
+ Assert.assertEquals(value0, obj.get(key0));
+ Assert.assertEquals(value1, obj.get(key2));
+ }
+
+ @Test
+ public void testAfterContentTransformerMemoryInputStreamReset() throws Exception
+ {
+ testAfterContentTransformerInputStreamReset(false);
+ }
+
+ @Test
+ public void testAfterContentTransformerDiskInputStreamReset() throws Exception
+ {
+ testAfterContentTransformerInputStreamReset(true);
+ }
+
+ private void testAfterContentTransformerInputStreamReset(final boolean overflow) throws Exception
+ {
+ final byte[] data = new byte[]{'c', 'o', 'f', 'f', 'e', 'e'};
+ startServer(new HttpServlet()
+ {
+ @Override
+ protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
+ {
+ // Write the content in two chunks.
+ int chunk = data.length / 2;
+ ServletOutputStream output = response.getOutputStream();
+ output.write(data, 0, chunk);
+ sleep(1000);
+ output.write(data, chunk, data.length - chunk);
+ }
+ });
+ startProxy(new AsyncMiddleManServlet()
+ {
+ @Override
+ protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse)
+ {
+ return new AfterContentTransformer()
+ {
+ {
+ setMaxInputBufferSize(overflow ? data.length / 2 : data.length * 2);
+ }
+
+ @Override
+ public void transform(Source source, Sink sink) throws IOException
+ {
+ // Consume the stream once.
+ InputStream input = source.getInputStream();
+ IO.copy(input, IO.getNullStream());
+
+ // Reset the stream and re-read it.
+ input.reset();
+ IO.copy(input, sink.getOutputStream());
+ }
+ };
+ }
+ });
+ startClient();
+
+ ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
+ .timeout(5, TimeUnit.SECONDS)
+ .send();
+
+ Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
+ Assert.assertArrayEquals(data, response.getContent());
+ }
+
+ @Test
+ public void testAfterContentTransformerOverflowingToDisk() throws Exception
+ {
+ // Make sure the temporary directory we use exists and it's empty.
+ final Path targetTestsDir = prepareTargetTestsDir();
+
+ final String key0 = "id";
+ long value0 = 1;
+ final String key1 = "channel";
+ String value1 = "foo";
+ final String json = "{ \"" + key0 + "\":" + value0 + ", \"" + key1 + "\":\"" + value1 + "\" }";
+ startServer(new HttpServlet()
+ {
+ @Override
+ protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
+ {
+ response.getOutputStream().write(json.getBytes(StandardCharsets.UTF_8));
+ }
+ });
+ final String inputPrefix = "in_";
+ final String outputPrefix = "out_";
+ final String key2 = "c";
+ startProxy(new AsyncMiddleManServlet()
+ {
+ @Override
+ protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse)
+ {
+ AfterContentTransformer transformer = new AfterContentTransformer()
+ {
+ @Override
+ public void transform(Source source, Sink sink) throws IOException
+ {
+ InputStream input = source.getInputStream();
+ @SuppressWarnings("unchecked")
+ Map<String, Object> obj = (Map<String, Object>)JSON.parse(new InputStreamReader(input, "UTF-8"));
+ // Transform the object.
+ obj.put(key2, obj.remove(key1));
+ try (OutputStream output = sink.getOutputStream())
+ {
+ output.write(JSON.toString(obj).getBytes(StandardCharsets.UTF_8));
+ }
+ }
+ };
+ transformer.setOverflowDirectory(targetTestsDir);
+ int maxBufferSize = json.length() / 4;
+ transformer.setMaxInputBufferSize(maxBufferSize);
+ transformer.setInputFilePrefix(inputPrefix);
+ transformer.setMaxOutputBufferSize(maxBufferSize);
+ transformer.setOutputFilePrefix(outputPrefix);
+ return transformer;
+ }
+ });
+ startClient();
+
+ ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
+ .timeout(5, TimeUnit.SECONDS)
+ .send();
+
+ Assert.assertEquals(200, response.getStatus());
+ @SuppressWarnings("unchecked")
+ Map<String, Object> obj = (Map<String, Object>)JSON.parse(response.getContentAsString());
+ Assert.assertNotNull(obj);
+ Assert.assertEquals(2, obj.size());
+ Assert.assertEquals(value0, obj.get(key0));
+ Assert.assertEquals(value1, obj.get(key2));
+ // Make sure the files do not exist.
+ try (DirectoryStream<Path> paths = Files.newDirectoryStream(targetTestsDir, inputPrefix + "*.*"))
+ {
+ Assert.assertFalse(paths.iterator().hasNext());
+ }
+ try (DirectoryStream<Path> paths = Files.newDirectoryStream(targetTestsDir, outputPrefix + "*.*"))
+ {
+ Assert.assertFalse(paths.iterator().hasNext());
+ }
+ }
+
+ @Test
+ public void testAfterContentTransformerClosingFilesOnClientRequestException() throws Exception
+ {
+ final Path targetTestsDir = prepareTargetTestsDir();
+
+ startServer(new HttpServlet()
+ {
+ @Override
+ protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
+ {
+ IO.copy(request.getInputStream(), IO.getNullStream());
+ }
+ });
+ final CountDownLatch destroyLatch = new CountDownLatch(1);
+ startProxy(new AsyncMiddleManServlet()
+ {
+ @Override
+ protected ContentTransformer newClientRequestContentTransformer(HttpServletRequest clientRequest, Request proxyRequest)
+ {
+ return new AfterContentTransformer()
+ {
+ {
+ setOverflowDirectory(targetTestsDir);
+ setMaxInputBufferSize(0);
+ setMaxOutputBufferSize(0);
+ }
+
+ @Override
+ public void transform(Source source, Sink sink) throws IOException
+ {
+ IO.copy(source.getInputStream(), sink.getOutputStream());
+ }
+
+ @Override
+ public void destroy()
+ {
+ super.destroy();
+ destroyLatch.countDown();
+ }
+ };
+ }
+ });
+ long idleTimeout = 1000;
+ proxyConnector.setIdleTimeout(idleTimeout);
+ startClient();
+
+ // Send only part of the content; the proxy will idle timeout.
+ final byte[] data = new byte[]{'c', 'a', 'f', 'e'};
+ ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
+ .header(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString())
+ .content(new BytesContentProvider(data)
+ {
+ @Override
+ public long getLength()
+ {
+ return data.length + 1;
+ }
+ })
+ .timeout(5 * idleTimeout, TimeUnit.MILLISECONDS)
+ .send();
+
+ Assert.assertTrue(destroyLatch.await(5 * idleTimeout, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(HttpStatus.GATEWAY_TIMEOUT_504, response.getStatus());
+ }
+
+ @Test
+ public void testAfterContentTransformerClosingFilesOnServerResponseException() throws Exception
+ {
+ final Path targetTestsDir = prepareTargetTestsDir();
+
+ final CountDownLatch serviceLatch = new CountDownLatch(1);
+ startServer(new HttpServlet()
+ {
+ @Override
+ protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
+ {
+ response.setHeader(HttpHeader.CONNECTION.asString(), HttpHeaderValue.CLOSE.asString());
+ response.setContentLength(2);
+ // Send only part of the content.
+ OutputStream output = response.getOutputStream();
+ output.write('x');
+ output.flush();
+ serviceLatch.countDown();
+ }
+ });
+ final CountDownLatch destroyLatch = new CountDownLatch(1);
+ startProxy(new AsyncMiddleManServlet()
+ {
+ @Override
+ protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse)
+ {
+ return new AfterContentTransformer()
+ {
+ {
+ setOverflowDirectory(targetTestsDir);
+ setMaxInputBufferSize(0);
+ setMaxOutputBufferSize(0);
+ }
+
+ @Override
+ public void transform(Source source, Sink sink) throws IOException
+ {
+ IO.copy(source.getInputStream(), sink.getOutputStream());
+ }
+
+ @Override
+ public void destroy()
+ {
+ super.destroy();
+ destroyLatch.countDown();
+ }
+ };
+ }
+ });
+ startClient();
+
+ ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
+ .timeout(5, TimeUnit.SECONDS)
+ .send();
+
+ Assert.assertTrue(serviceLatch.await(5, TimeUnit.SECONDS));
+ Assert.assertTrue(destroyLatch.await(5, TimeUnit.SECONDS));
+ Assert.assertEquals(HttpStatus.BAD_GATEWAY_502, response.getStatus());
+ }
+
+ private Path prepareTargetTestsDir() throws IOException
+ {
+ final Path targetTestsDir = MavenTestingUtils.getTargetTestingDir().toPath();
+ Files.createDirectories(targetTestsDir);
+ try (DirectoryStream<Path> files = Files.newDirectoryStream(targetTestsDir, "*.*"))
+ {
+ for (Path file : files)
+ {
+ if (!Files.isDirectory(file))
+ Files.delete(file);
+ }
+ }
+ return targetTestsDir;
+ }
+
private void sleep(long delay)
{
try
@@ -1005,7 +1348,7 @@ public class AsyncMiddleManServletTest
}
}
}
-
+
/**
* A transformer that discards all but the first line of text.
*/

Back to the top