Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimone Bordet2015-05-05 13:54:02 +0000
committerSimone Bordet2015-05-05 13:54:02 +0000
commit4573e20033208579121e40a0e59c80ef227f63ef (patch)
tree7ef70a28a835854011ba4b03a21e20ea70954c36
parentb5818034eb4cf6bc3f8bb5fef1ac44e8f4cf5986 (diff)
downloadorg.eclipse.jetty.project-4573e20033208579121e40a0e59c80ef227f63ef.tar.gz
org.eclipse.jetty.project-4573e20033208579121e40a0e59c80ef227f63ef.tar.xz
org.eclipse.jetty.project-4573e20033208579121e40a0e59c80ef227f63ef.zip
464292 - Implement stream-based transformer for AsyncMiddleManServlet.
Added boolean return value from transform(Source, Sink) so that the implementation can optimize source-to-sink write in case of no transformations.
-rw-r--r--jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AfterContentTransformer.java88
-rw-r--r--jetty-proxy/src/test/java/org/eclipse/jetty/proxy/AsyncMiddleManServletTest.java92
2 files changed, 142 insertions, 38 deletions
diff --git a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AfterContentTransformer.java b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AfterContentTransformer.java
index 676083d09e..e34aed18f0 100644
--- a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AfterContentTransformer.java
+++ b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AfterContentTransformer.java
@@ -59,7 +59,7 @@ public abstract class AfterContentTransformer implements AsyncMiddleManServlet.C
{
private static final Logger LOG = Log.getLogger(AfterContentTransformer.class);
- private final List<ByteBuffer> buffers = new ArrayList<>();
+ private final List<ByteBuffer> sourceBuffers = new ArrayList<>();
private Path overflowDirectory = Paths.get(System.getProperty("java.io.tmpdir"));
private String inputFilePrefix = "amms_adct_in_";
private String outputFilePrefix = "amms_adct_out_";
@@ -188,7 +188,7 @@ public abstract class AfterContentTransformer implements AsyncMiddleManServlet.C
{
ByteBuffer copy = ByteBuffer.allocate(input.remaining());
copy.put(input).flip();
- buffers.add(copy);
+ sourceBuffers.add(copy);
}
}
@@ -196,8 +196,10 @@ public abstract class AfterContentTransformer implements AsyncMiddleManServlet.C
{
Source source = new Source();
Sink sink = new Sink();
- transform(source, sink);
- sink.drainTo(output);
+ if (transform(source, sink))
+ sink.drainTo(output);
+ else
+ source.drainTo(output);
}
}
@@ -211,20 +213,28 @@ public abstract class AfterContentTransformer implements AsyncMiddleManServlet.C
* method is invoked only when the whole content is available, and offers
* a blocking API via the InputStream and OutputStream that can be obtained
* from {@link Source} and {@link Sink} respectively.</p>
+ * <p>Implementations may read the source, inspect the input bytes and decide
+ * that no transformation is necessary, and therefore the source must be copied
+ * unchanged to the sink. In such case, the implementation must return false to
+ * indicate that it wishes to just pipe the bytes from the source to the sink.</p>
* <p>Typical implementations:</p>
* <pre>
* // Identity transformation (no transformation, the input is copied to the output)
- * public void transform(Source source, Sink sink)
+ * public boolean transform(Source source, Sink sink)
* {
* org.eclipse.jetty.util.IO.copy(source.getInputStream(), sink.getOutputStream());
+ * return true;
* }
* </pre>
*
* @param source where the original content is read
* @param sink where the transformed content is written
+ * @return true if the transformation happened and the transformed bytes have
+ * been written to the sink, false if no transformation happened and the source
+ * must be copied to the sink.
* @throws IOException if the transformation fails
*/
- public abstract void transform(Source source, Sink sink) throws IOException;
+ public abstract boolean transform(Source source, Sink sink) throws IOException;
private void overflow(ByteBuffer input) throws IOException
{
@@ -236,11 +246,11 @@ public abstract class AfterContentTransformer implements AsyncMiddleManServlet.C
StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.DELETE_ON_CLOSE);
- int size = buffers.size();
+ int size = sourceBuffers.size();
if (size > 0)
{
- inputFile.write(buffers.toArray(new ByteBuffer[size]));
- buffers.clear();
+ inputFile.write(sourceBuffers.toArray(new ByteBuffer[size]));
+ sourceBuffers.clear();
}
}
inputFile.write(input);
@@ -253,6 +263,22 @@ public abstract class AfterContentTransformer implements AsyncMiddleManServlet.C
close(outputFile);
}
+ private void drain(FileChannel file, List<ByteBuffer> output) throws IOException
+ {
+ long position = 0;
+ long length = file.size();
+ file.position(position);
+ while (length > 0)
+ {
+ // At most 1 GiB file maps.
+ long size = Math.min(1024 * 1024 * 1024, length);
+ ByteBuffer buffer = file.map(FileChannel.MapMode.READ_ONLY, position, size);
+ output.add(buffer);
+ position += size;
+ length -= size;
+ }
+ }
+
private void close(Closeable closeable)
{
try
@@ -297,6 +323,19 @@ public abstract class AfterContentTransformer implements AsyncMiddleManServlet.C
{
return stream;
}
+
+ private void drainTo(List<ByteBuffer> output) throws IOException
+ {
+ if (inputFile == null)
+ {
+ output.addAll(sourceBuffers);
+ sourceBuffers.clear();
+ }
+ else
+ {
+ drain(inputFile, output);
+ }
+ }
}
private class ChannelInputStream extends InputStream
@@ -333,11 +372,11 @@ public abstract class AfterContentTransformer implements AsyncMiddleManServlet.C
{
if (len == 0)
return 0;
- if (index == buffers.size())
+ if (index == sourceBuffers.size())
return -1;
if (slice == null)
- slice = buffers.get(index).slice();
+ slice = sourceBuffers.get(index).slice();
int size = Math.min(len, slice.remaining());
slice.get(b, off, size);
@@ -371,7 +410,7 @@ public abstract class AfterContentTransformer implements AsyncMiddleManServlet.C
*/
public class Sink
{
- private final List<ByteBuffer> buffers = new ArrayList<>();
+ private final List<ByteBuffer> sinkBuffers = new ArrayList<>();
private final OutputStream stream = new SinkOutputStream();
/**
@@ -392,11 +431,11 @@ public abstract class AfterContentTransformer implements AsyncMiddleManServlet.C
StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.DELETE_ON_CLOSE);
- int size = buffers.size();
+ int size = sinkBuffers.size();
if (size > 0)
{
- outputFile.write(buffers.toArray(new ByteBuffer[size]));
- buffers.clear();
+ outputFile.write(sinkBuffers.toArray(new ByteBuffer[size]));
+ sinkBuffers.clear();
}
}
outputFile.write(output);
@@ -406,24 +445,13 @@ public abstract class AfterContentTransformer implements AsyncMiddleManServlet.C
{
if (outputFile == null)
{
- output.addAll(buffers);
- buffers.clear();
+ output.addAll(sinkBuffers);
+ sinkBuffers.clear();
}
else
{
outputFile.force(true);
- long position = 0;
- long length = outputFile.size();
- outputFile.position(position);
- while (length > 0)
- {
- // At most 1 GiB file maps.
- long size = Math.min(1024 * 1024 * 1024, length);
- ByteBuffer buffer = outputFile.map(FileChannel.MapMode.READ_ONLY, position, size);
- output.add(buffer);
- position += size;
- length -= size;
- }
+ drain(outputFile, output);
}
}
@@ -447,7 +475,7 @@ public abstract class AfterContentTransformer implements AsyncMiddleManServlet.C
// application so we need to copy it.
byte[] copy = new byte[len];
System.arraycopy(b, off, copy, 0, len);
- buffers.add(ByteBuffer.wrap(copy));
+ sinkBuffers.add(ByteBuffer.wrap(copy));
}
}
diff --git a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/AsyncMiddleManServletTest.java b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/AsyncMiddleManServletTest.java
index a8a104770b..c9121e7f07 100644
--- a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/AsyncMiddleManServletTest.java
+++ b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/AsyncMiddleManServletTest.java
@@ -40,6 +40,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
+
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
@@ -75,13 +76,13 @@ import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.ajax.JSON;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.util.log.StdErrLog;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
-@SuppressWarnings("serial")
public class AsyncMiddleManServletTest
{
private static final Logger LOG = Log.getLogger(AsyncMiddleManServletTest.class);
@@ -108,12 +109,12 @@ public class AsyncMiddleManServletTest
server.start();
}
- private void startProxy(HttpServlet proxyServlet) throws Exception
+ private void startProxy(AsyncMiddleManServlet proxyServlet) throws Exception
{
startProxy(proxyServlet, new HashMap<String, String>());
}
- private void startProxy(HttpServlet proxyServlet, Map<String, String> initParams) throws Exception
+ private void startProxy(AsyncMiddleManServlet proxyServlet, Map<String, String> initParams) throws Exception
{
QueuedThreadPool proxyPool = new QueuedThreadPool();
proxyPool.setName("proxy");
@@ -134,6 +135,8 @@ public class AsyncMiddleManServletTest
proxyContext.addServlet(proxyServletHolder, "/*");
proxy.start();
+
+ ((StdErrLog)proxyServlet._log).setHideStacks(true);
}
private void startClient() throws Exception
@@ -871,7 +874,7 @@ public class AsyncMiddleManServletTest
return new AfterContentTransformer()
{
@Override
- public void transform(Source source, Sink sink) throws IOException
+ public boolean transform(Source source, Sink sink) throws IOException
{
InputStream input = source.getInputStream();
@SuppressWarnings("unchecked")
@@ -881,6 +884,7 @@ public class AsyncMiddleManServletTest
try (OutputStream output = sink.getOutputStream())
{
output.write(JSON.toString(obj).getBytes(StandardCharsets.UTF_8));
+ return true;
}
}
};
@@ -941,7 +945,7 @@ public class AsyncMiddleManServletTest
}
@Override
- public void transform(Source source, Sink sink) throws IOException
+ public boolean transform(Source source, Sink sink) throws IOException
{
// Consume the stream once.
InputStream input = source.getInputStream();
@@ -950,6 +954,7 @@ public class AsyncMiddleManServletTest
// Reset the stream and re-read it.
input.reset();
IO.copy(input, sink.getOutputStream());
+ return true;
}
};
}
@@ -994,7 +999,7 @@ public class AsyncMiddleManServletTest
AfterContentTransformer transformer = new AfterContentTransformer()
{
@Override
- public void transform(Source source, Sink sink) throws IOException
+ public boolean transform(Source source, Sink sink) throws IOException
{
InputStream input = source.getInputStream();
@SuppressWarnings("unchecked")
@@ -1004,6 +1009,7 @@ public class AsyncMiddleManServletTest
try (OutputStream output = sink.getOutputStream())
{
output.write(JSON.toString(obj).getBytes(StandardCharsets.UTF_8));
+ return true;
}
}
};
@@ -1068,9 +1074,10 @@ public class AsyncMiddleManServletTest
}
@Override
- public void transform(Source source, Sink sink) throws IOException
+ public boolean transform(Source source, Sink sink) throws IOException
{
IO.copy(source.getInputStream(), sink.getOutputStream());
+ return true;
}
@Override
@@ -1140,9 +1147,10 @@ public class AsyncMiddleManServletTest
}
@Override
- public void transform(Source source, Sink sink) throws IOException
+ public boolean transform(Source source, Sink sink) throws IOException
{
IO.copy(source.getInputStream(), sink.getOutputStream());
+ return true;
}
@Override
@@ -1165,6 +1173,74 @@ public class AsyncMiddleManServletTest
Assert.assertEquals(HttpStatus.BAD_GATEWAY_502, response.getStatus());
}
+ @Test
+ public void testAfterContentTransformerDoNotReadSourceDoNotTransform() throws Exception
+ {
+ testAfterContentTransformerDoNoTransform(false, false);
+ }
+
+ @Test
+ public void testAfterContentTransformerReadSourceDoNotTransform() throws Exception
+ {
+ testAfterContentTransformerDoNoTransform(true, true);
+ }
+
+ private void testAfterContentTransformerDoNoTransform(final boolean readSource, final boolean useDisk) throws Exception
+ {
+ final String key0 = "id";
+ long value0 = 1;
+ final String key1 = "channel";
+ String value1 = "foo";
+ final String json = "{ \"" + key0 + "\":" + value0 + ", \"" + key1 + "\":\"" + value1 + "\" }";
+ startServer(new HttpServlet()
+ {
+ @Override
+ protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
+ {
+ response.getOutputStream().write(json.getBytes(StandardCharsets.UTF_8));
+ }
+ });
+ startProxy(new AsyncMiddleManServlet()
+ {
+ @Override
+ protected ContentTransformer newServerResponseContentTransformer(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Response serverResponse)
+ {
+ return new AfterContentTransformer()
+ {
+ {
+ if (useDisk)
+ setMaxInputBufferSize(0);
+ }
+
+ @Override
+ public boolean transform(Source source, Sink sink) throws IOException
+ {
+ if (readSource)
+ {
+ InputStream input = source.getInputStream();
+ JSON.parse(new InputStreamReader(input, "UTF-8"));
+ }
+ // No transformation.
+ return false;
+ }
+ };
+ }
+ });
+ startClient();
+
+ ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort())
+ .timeout(5, TimeUnit.SECONDS)
+ .send();
+
+ Assert.assertEquals(200, response.getStatus());
+ @SuppressWarnings("unchecked")
+ Map<String, Object> obj = (Map<String, Object>)JSON.parse(response.getContentAsString());
+ Assert.assertNotNull(obj);
+ Assert.assertEquals(2, obj.size());
+ Assert.assertEquals(value0, obj.get(key0));
+ Assert.assertEquals(value1, obj.get(key1));
+ }
+
private Path prepareTargetTestsDir() throws IOException
{
final Path targetTestsDir = MavenTestingUtils.getTargetTestingDir().toPath();

Back to the top