Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJoakim Erdfelt2015-05-14 14:42:34 +0000
committerJoakim Erdfelt2015-08-06 22:16:30 +0000
commitaac9568a30e133dad5633dd22f0a43f58346513f (patch)
treecbd6ceb25c0a3d9fa6dab5776d5424c4947ed54c /jetty-websocket/websocket-common
parent39f9f3ad44cc3c4a3d7e733719e4e716f522de3c (diff)
downloadorg.eclipse.jetty.project-aac9568a30e133dad5633dd22f0a43f58346513f.tar.gz
org.eclipse.jetty.project-aac9568a30e133dad5633dd22f0a43f58346513f.tar.xz
org.eclipse.jetty.project-aac9568a30e133dad5633dd22f0a43f58346513f.zip
Work on permessage-deflate continues
Conflicts: jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/FrameCompressionExtensionTest.java
Diffstat (limited to 'jetty-websocket/websocket-common')
-rw-r--r--jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java367
-rw-r--r--jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java39
2 files changed, 285 insertions, 121 deletions
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java
index f4777fc2c3..dae6e184c3 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java
@@ -18,8 +18,10 @@
package org.eclipse.jetty.websocket.common.extensions.compress;
+import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.Queue;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.Inflater;
@@ -30,7 +32,6 @@ import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
-import org.eclipse.jetty.websocket.api.BadPayloadException;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
@@ -40,9 +41,10 @@ import org.eclipse.jetty.websocket.common.frames.DataFrame;
public abstract class CompressExtension extends AbstractExtension
{
- protected static final byte[] TAIL_BYTES = new byte[]{0x00, 0x00, (byte)0xFF, (byte)0xFF};
+ protected static final byte[] TAIL_BYTES = new byte[] { 0x00, 0x00, (byte)0xFF, (byte)0xFF};
+ protected static final ByteBuffer TAIL_BYTES_BUF = ByteBuffer.wrap(TAIL_BYTES);
private static final Logger LOG = Log.getLogger(CompressExtension.class);
-
+
/** Never drop tail bytes 0000FFFF, from any frame type */
protected static final int TAIL_DROP_NEVER = 0;
/** Always drop tail bytes 0000FFFF, from all frame types */
@@ -52,37 +54,43 @@ public abstract class CompressExtension extends AbstractExtension
/** Always set RSV flag, on all frame types */
protected static final int RSV_USE_ALWAYS = 0;
- /**
+ /**
* Only set RSV flag on first frame in multi-frame messages.
* <p>
- * Note: this automatically means no-continuation frames have
- * the RSV bit set
+ * Note: this automatically means no-continuation frames have the RSV bit set
*/
protected static final int RSV_USE_ONLY_FIRST = 1;
+ /** Inflater / Decompressed Buffer Size */
+ protected static final int INFLATE_BUFFER_SIZE = 8 * 1024;
+ /** Deflater / Inflater: Maximum Input Buffer Size */
+ protected static final int INPUT_MAX_BUFFER_SIZE = 8 * 1024;
+ private final static boolean NOWRAP = true;
+
private final Queue<FrameEntry> entries = new ConcurrentArrayQueue<>();
private final IteratingCallback flusher = new Flusher();
- private final Deflater compressor;
- private final Inflater decompressor;
+ private final Deflater deflater;
+ private final Inflater inflater;
+ protected AtomicInteger decompressCount = new AtomicInteger(0);
private int tailDrop = TAIL_DROP_NEVER;
private int rsvUse = RSV_USE_ALWAYS;
protected CompressExtension()
{
- compressor = new Deflater(Deflater.BEST_COMPRESSION, true);
- decompressor = new Inflater(true);
+ deflater = new Deflater(Deflater.DEFAULT_COMPRESSION,NOWRAP);
+ inflater = new Inflater(NOWRAP);
tailDrop = getTailDropMode();
rsvUse = getRsvUseMode();
}
-
+
public Deflater getDeflater()
{
- return compressor;
+ return deflater;
}
public Inflater getInflater()
{
- return decompressor;
+ return inflater;
}
/**
@@ -93,7 +101,7 @@ public abstract class CompressExtension extends AbstractExtension
{
return true;
}
-
+
/**
* Return the mode of operation for dropping (or keeping) tail bytes in frames generated by compress (outgoing)
*
@@ -114,7 +122,7 @@ public abstract class CompressExtension extends AbstractExtension
// Unset RSV1 since it's not compressed anymore.
newFrame.setRsv1(false);
- ByteBuffer buffer = getBufferPool().acquire(accumulator.getLength(), false);
+ ByteBuffer buffer = getBufferPool().acquire(accumulator.getLength(),false);
try
{
BufferUtil.flipToFill(buffer);
@@ -127,55 +135,68 @@ public abstract class CompressExtension extends AbstractExtension
getBufferPool().release(buffer);
}
}
-
- protected ByteAccumulator decompress(byte[] input)
+
+ protected ByteAccumulator newByteAccumulator()
{
- // Since we don't track text vs binary vs continuation state, just grab whatever is the greater value.
- int maxSize = Math.max(getPolicy().getMaxTextMessageSize(), getPolicy().getMaxBinaryMessageBufferSize());
- ByteAccumulator accumulator = new ByteAccumulator(maxSize);
-
- decompressor.setInput(input, 0, input.length);
+ int maxSize = Math.max(getPolicy().getMaxTextMessageSize(),getPolicy().getMaxBinaryMessageBufferSize());
+ return new ByteAccumulator(maxSize);
+ }
+
+ protected void decompress(ByteAccumulator accumulator, ByteBuffer buf)
+ throws DataFormatException
+ {
+ if ((buf == null) || (!buf.hasRemaining()))
+ {
+ return;
+ }
+ byte[] output = new byte[1024];
- if (LOG.isDebugEnabled())
- LOG.debug("Decompressing {} bytes", input.length);
+ if (inflater.needsInput() && !supplyInput(inflater, buf))
+ {
+ LOG.debug("Needed input, but no buffer could supply input");
+ return;
+ }
- try
+ int read = 0;
+ while ((read = inflater.inflate(output)) >= 0)
{
- // It is allowed to send DEFLATE blocks with BFINAL=1.
- // For such blocks, getRemaining() will be > 0 but finished()
- // will be true, so we need to check for both.
- // When BFINAL=0, finished() will always be false and we only
- // check the remaining bytes.
- while (decompressor.getRemaining() > 0 && !decompressor.finished())
+ if (read == 0)
{
- byte[] output = new byte[Math.min(input.length * 2, 32 * 1024)];
- int decompressed = decompressor.inflate(output);
- if (decompressed == 0)
+ LOG.debug("Decompress: read 0 {}",toDetail(inflater));
+ if (inflater.finished() || inflater.needsDictionary())
{
- if (decompressor.needsInput())
+ if (LOG.isDebugEnabled())
{
- throw new BadPayloadException("Unable to inflate frame, not enough input on frame");
+ LOG.debug("Decompress: finished? {}",toDetail(inflater));
}
- if (decompressor.needsDictionary())
+ // We are finished ?
+ break;
+ }
+ else if (inflater.needsInput())
+ {
+ if (!supplyInput(inflater, buf))
{
- throw new BadPayloadException("Unable to inflate frame, frame erroneously says it needs a dictionary");
+ break;
}
}
- else
+ }
+ else
+ {
+ // do something with output
+ if (LOG.isDebugEnabled())
{
- accumulator.addChunk(output, 0, decompressed);
+ LOG.debug("Decompressed {} bytes: {}",read,toDetail(inflater));
}
+ accumulator.addChunk(output,0,read);
}
- if (LOG.isDebugEnabled())
- LOG.debug("Decompressed {}->{} bytes", input.length, accumulator.getLength());
- return accumulator;
}
- catch (DataFormatException x)
+
+ if (LOG.isDebugEnabled())
{
- throw new BadPayloadException(x);
+ LOG.debug("Decompress: exiting {}",toDetail(inflater));
}
}
-
+
@Override
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
{
@@ -185,13 +206,13 @@ public abstract class CompressExtension extends AbstractExtension
if (flusher.isFailed())
{
- notifyCallbackFailure(callback, new ZipException());
+ notifyCallbackFailure(callback,new ZipException());
return;
}
- FrameEntry entry = new FrameEntry(frame, callback, batchMode);
+ FrameEntry entry = new FrameEntry(frame,callback,batchMode);
if (LOG.isDebugEnabled())
- LOG.debug("Queuing {}", entry);
+ LOG.debug("Queuing {}",entry);
entries.offer(entry);
flusher.iterate();
}
@@ -206,7 +227,7 @@ public abstract class CompressExtension extends AbstractExtension
catch (Throwable x)
{
if (LOG.isDebugEnabled())
- LOG.debug("Exception while notifying success of callback " + callback, x);
+ LOG.debug("Exception while notifying success of callback " + callback,x);
}
}
@@ -220,10 +241,119 @@ public abstract class CompressExtension extends AbstractExtension
catch (Throwable x)
{
if (LOG.isDebugEnabled())
- LOG.debug("Exception while notifying failure of callback " + callback, x);
+ LOG.debug("Exception while notifying failure of callback " + callback,x);
}
}
+ private static boolean supplyInput(Inflater inflater, ByteBuffer buf)
+ {
+ if (buf.remaining() <= 0)
+ {
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("No data left left to supply to Inflater");
+ }
+ return false;
+ }
+
+ byte input[];
+ int inputOffset = 0;
+ int len;
+
+ if (buf.hasArray())
+ {
+ // no need to create a new byte buffer, just return this one.
+ len = buf.remaining();
+ input = buf.array();
+ inputOffset = buf.position() + buf.arrayOffset();
+ buf.position(buf.position() + len);
+ }
+ else
+ {
+ // Only create an return byte buffer that is reasonable in size
+ len = Math.min(INPUT_MAX_BUFFER_SIZE, buf.remaining());
+ input = new byte[len];
+ inputOffset = 0;
+ buf.get(input,0,len);
+ }
+
+ inflater.setInput(input,inputOffset,len);
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("Supplied {} input bytes: {}",input.length,toDetail(inflater));
+ }
+ return true;
+ }
+
+ private static boolean supplyInput(Deflater deflater, ByteBuffer buf)
+ {
+ if (buf.remaining() <= 0)
+ {
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("No data left left to supply to Deflater");
+ }
+ return false;
+ }
+
+ byte input[];
+ int inputOffset = 0;
+ int len;
+
+ if (buf.hasArray())
+ {
+ // no need to create a new byte buffer, just return this one.
+ len = buf.remaining();
+ input = buf.array();
+ inputOffset = buf.position() + buf.arrayOffset();
+ buf.position(buf.position() + len);
+ }
+ else
+ {
+ // Only create an return byte buffer that is reasonable in size
+ len = Math.min(INPUT_MAX_BUFFER_SIZE, buf.remaining());
+ input = new byte[len];
+ inputOffset = 0;
+ buf.get(input,0,len);
+ }
+
+ deflater.setInput(input,inputOffset,len);
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("Supplied {} input bytes: {}",input.length,toDetail(deflater));
+ }
+ return true;
+ }
+
+ private static String toDetail(Inflater inflater)
+ {
+ return String.format("Inflater[finished=%b,read=%d,written=%d,remaining=%d,in=%d,out=%d]",inflater.finished(),inflater.getBytesRead(),
+ inflater.getBytesWritten(),inflater.getRemaining(),inflater.getTotalIn(),inflater.getTotalOut());
+ }
+
+ private static String toDetail(Deflater deflater)
+ {
+ return String.format("Deflater[finished=%b,read=%d,written=%d,in=%d,out=%d]",deflater.finished(),deflater.getBytesRead(),
+ deflater.getBytesWritten(),deflater.getTotalIn(),deflater.getTotalOut());
+ }
+
+ public static boolean endsWithTail(ByteBuffer buf)
+ {
+ if ((buf == null) || (buf.remaining() < TAIL_BYTES.length))
+ {
+ return false;
+ }
+ int limit = buf.limit();
+ for (int i = TAIL_BYTES.length; i > 0; i--)
+ {
+ if (buf.get(limit - i) != TAIL_BYTES[TAIL_BYTES.length - i])
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
@Override
public String toString()
{
@@ -254,7 +384,6 @@ public abstract class CompressExtension extends AbstractExtension
{
private static final int INPUT_BUFSIZE = 32 * 1024;
private FrameEntry current;
- private ByteBuffer payload;
private boolean finished = true;
@Override
@@ -263,14 +392,14 @@ public abstract class CompressExtension extends AbstractExtension
if (finished)
{
current = entries.poll();
- LOG.debug("Processing {}", current);
+ LOG.debug("Processing {}",current);
if (current == null)
return Action.IDLE;
deflate(current);
}
else
{
- compress(current, false);
+ compress(current,false);
}
return Action.SCHEDULED;
}
@@ -281,97 +410,117 @@ public abstract class CompressExtension extends AbstractExtension
BatchMode batchMode = entry.batchMode;
if (OpCode.isControlFrame(frame.getOpCode()) || !frame.hasPayload())
{
- nextOutgoingFrame(frame, this, batchMode);
+ nextOutgoingFrame(frame,this,batchMode);
return;
}
- compress(entry, true);
+ compress(entry,true);
}
private void compress(FrameEntry entry, boolean first)
{
+ final int flush = Deflater.SYNC_FLUSH;
+
// Get a chunk of the payload to avoid to blow
// the heap if the payload is a huge mapped file.
Frame frame = entry.frame;
ByteBuffer data = frame.getPayload();
int remaining = data.remaining();
- int inputLength = Math.min(remaining, INPUT_BUFSIZE);
+ int chunkSize = Math.min(remaining,INPUT_BUFSIZE);
if (LOG.isDebugEnabled())
- LOG.debug("Compressing {}: {} bytes in {} bytes chunk", entry, remaining, inputLength);
-
- // Avoid to copy the bytes if the ByteBuffer
- // is backed by an array.
- int inputOffset;
- byte[] input;
- if (data.hasArray())
+ LOG.debug("Compressing {}: {} bytes in {} bytes chunk",entry,remaining,chunkSize);
+
+ boolean needsCompress = true;
+
+ if (deflater.needsInput() && !supplyInput(deflater,data))
{
- input = data.array();
- int position = data.position();
- inputOffset = position + data.arrayOffset();
- data.position(position + inputLength);
+ // no input supplied
+ needsCompress = false;
}
- else
- {
- input = new byte[inputLength];
- inputOffset = 0;
- data.get(input, 0, inputLength);
- }
- finished = inputLength == remaining;
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
- compressor.setInput(input, inputOffset, inputLength);
+ byte[] output = new byte[chunkSize];
- // Use an additional space in case the content is not compressible.
- byte[] output = new byte[inputLength + 64];
- int outputOffset = 0;
- int outputLength = 0;
- while (true)
+ // Compress the data
+ while (needsCompress)
{
- int space = output.length - outputOffset;
- int compressed = compressor.deflate(output, outputOffset, space, Deflater.SYNC_FLUSH);
- outputLength += compressed;
- if (compressed < space)
+ int read = deflater.deflate(output,0,chunkSize,flush);
+ if (read == 0)
{
- // Everything was compressed.
- break;
+ if (deflater.finished())
+ {
+ // done
+ break;
+ }
+ else if (deflater.needsInput())
+ {
+ if (!supplyInput(deflater,data))
+ {
+ // done
+ needsCompress = false;
+ }
+ }
}
else
{
- // The compressed output is bigger than the uncompressed input.
- byte[] newOutput = new byte[output.length * 2];
- System.arraycopy(output, 0, newOutput, 0, output.length);
- outputOffset += output.length;
- output = newOutput;
+ // Append the output for the eventual frame.
+ out.write(output,0,read);
}
}
- boolean fin = frame.isFin() && finished;
+ boolean fin = frame.isFin();
+
+ ByteBuffer payload = ByteBuffer.wrap(out.toByteArray());
+
+ if (payload.remaining()>0)
+ {
+ // Handle tail bytes generated by SYNC_FLUSH.
+ LOG.debug("compressed bytes[] = {}",BufferUtil.toDetailString(payload));
- // Handle tail bytes generated by SYNC_FLUSH.
- if(tailDrop == TAIL_DROP_ALWAYS) {
- payload = ByteBuffer.wrap(output, 0, outputLength - TAIL_BYTES.length);
- } else if(tailDrop == TAIL_DROP_FIN_ONLY) {
- payload = ByteBuffer.wrap(output, 0, outputLength - (fin?TAIL_BYTES.length:0));
- } else {
- // always include
- payload = ByteBuffer.wrap(output, 0, outputLength);
+ if (tailDrop == TAIL_DROP_ALWAYS)
+ {
+ if (endsWithTail(payload))
+ {
+ payload.limit(payload.limit() - TAIL_BYTES.length);
+ }
+ LOG.debug("payload (TAIL_DROP_ALWAYS) = {}",BufferUtil.toDetailString(payload));
+ }
+ else if (tailDrop == TAIL_DROP_FIN_ONLY)
+ {
+ if (frame.isFin() && endsWithTail(payload))
+ {
+ payload.limit(payload.limit() - TAIL_BYTES.length);
+ }
+ LOG.debug("payload (TAIL_DROP_FIN_ONLY) = {}",BufferUtil.toDetailString(payload));
+ }
+ }
+ else if (fin)
+ {
+ // Special case: 8.2.3.6. Generating an Empty Fragment Manually
+ payload = ByteBuffer.wrap(new byte[] { 0x00 });
}
+
if (LOG.isDebugEnabled())
{
- LOG.debug("Compressed {}: {}->{} chunk bytes",entry,inputLength,outputLength);
+ LOG.debug("Compressed {}: input:{} -> payload:{}",entry,chunkSize,payload.remaining());
}
boolean continuation = frame.getType().isContinuation() || !first;
- DataFrame chunk = new DataFrame(frame, continuation);
- if(rsvUse == RSV_USE_ONLY_FIRST) {
+ DataFrame chunk = new DataFrame(frame,continuation);
+ if (rsvUse == RSV_USE_ONLY_FIRST)
+ {
chunk.setRsv1(!continuation);
- } else {
+ }
+ else
+ {
// always set
chunk.setRsv1(true);
}
chunk.setPayload(payload);
chunk.setFin(fin);
- nextOutgoingFrame(chunk, this, entry.batchMode);
+ nextOutgoingFrame(chunk,this,entry.batchMode);
}
@Override
@@ -379,14 +528,14 @@ public abstract class CompressExtension extends AbstractExtension
{
// This IteratingCallback never completes.
}
-
+
@Override
protected void onCompleteFailure(Throwable x)
{
// Fail all the frames in the queue.
FrameEntry entry;
while ((entry = entries.poll()) != null)
- notifyCallbackFailure(entry.callback, x);
+ notifyCallbackFailure(entry.callback,x);
}
@Override
@@ -400,7 +549,7 @@ public abstract class CompressExtension extends AbstractExtension
@Override
public void writeFailed(Throwable x)
{
- notifyCallbackFailure(current.callback, x);
+ notifyCallbackFailure(current.callback,x);
// If something went wrong, very likely the compression context
// will be invalid, so we need to fail this IteratingCallback.
failed(x);
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java
index 09ffc6fccd..fe97e57c69 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java
@@ -19,9 +19,11 @@
package org.eclipse.jetty.websocket.common.extensions.compress;
import java.nio.ByteBuffer;
+import java.util.zip.DataFormatException;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.websocket.api.BadPayloadException;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
@@ -59,23 +61,33 @@ public class PerMessageDeflateExtension extends CompressExtension
// This extension requires the RSV1 bit set only in the first frame.
// Subsequent continuation frames don't have RSV1 set, but are compressed.
if (frame.getType().isData())
+ {
incomingCompressed = frame.isRsv1();
+ }
- if (OpCode.isControlFrame(frame.getOpCode()) || !frame.hasPayload() || !incomingCompressed)
+ if (OpCode.isControlFrame(frame.getOpCode()) || !incomingCompressed)
{
nextIncomingFrame(frame);
return;
}
-
- boolean appendTail = frame.isFin();
- ByteBuffer payload = frame.getPayload();
- int remaining = payload.remaining();
- byte[] input = new byte[remaining + (appendTail ? TAIL_BYTES.length : 0)];
- payload.get(input, 0, remaining);
- if (appendTail)
- System.arraycopy(TAIL_BYTES, 0, input, remaining, TAIL_BYTES.length);
-
- forwardIncoming(frame, decompress(input));
+
+ ByteAccumulator accumulator = newByteAccumulator();
+
+ try
+ {
+ ByteBuffer payload = frame.getPayload();
+ decompress(accumulator, payload);
+ if (frame.isFin())
+ {
+ decompress(accumulator, TAIL_BYTES_BUF.slice());
+ }
+
+ forwardIncoming(frame, accumulator);
+ }
+ catch (DataFormatException e)
+ {
+ throw new BadPayloadException(e);
+ }
if (frame.isFin())
incomingCompressed = false;
@@ -87,6 +99,7 @@ public class PerMessageDeflateExtension extends CompressExtension
if (frame.isFin() && !incomingContextTakeover)
{
LOG.debug("Incoming Context Reset");
+ decompressCount.set(0);
getInflater().reset();
}
super.nextIncomingFrame(frame);
@@ -167,6 +180,8 @@ public class PerMessageDeflateExtension extends CompressExtension
}
}
}
+
+ LOG.debug("config: outgoingContextTakover={}, incomingContextTakeover={} : {}", outgoingContextTakeover, incomingContextTakeover, this);
super.setConfig(configNegotiated);
}
@@ -174,7 +189,7 @@ public class PerMessageDeflateExtension extends CompressExtension
@Override
public String toString()
{
- return String.format("%s[requested=%s,negotiated=%s]",
+ return String.format("%s[requested=\"%s\", negotiated=\"%s\"]",
getClass().getSimpleName(),
configRequested.getParameterizedName(),
configNegotiated.getParameterizedName());

Back to the top