diff options
author | Joakim Erdfelt | 2015-05-14 14:42:34 +0000 |
---|---|---|
committer | Joakim Erdfelt | 2015-08-06 22:16:30 +0000 |
commit | aac9568a30e133dad5633dd22f0a43f58346513f (patch) | |
tree | cbd6ceb25c0a3d9fa6dab5776d5424c4947ed54c /jetty-websocket/websocket-common | |
parent | 39f9f3ad44cc3c4a3d7e733719e4e716f522de3c (diff) | |
download | org.eclipse.jetty.project-aac9568a30e133dad5633dd22f0a43f58346513f.tar.gz org.eclipse.jetty.project-aac9568a30e133dad5633dd22f0a43f58346513f.tar.xz org.eclipse.jetty.project-aac9568a30e133dad5633dd22f0a43f58346513f.zip |
Work on permessage-deflate continues
Conflicts:
jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/FrameCompressionExtensionTest.java
Diffstat (limited to 'jetty-websocket/websocket-common')
2 files changed, 285 insertions, 121 deletions
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java index f4777fc2c3..dae6e184c3 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java @@ -18,8 +18,10 @@ package org.eclipse.jetty.websocket.common.extensions.compress; +import java.io.ByteArrayOutputStream; import java.nio.ByteBuffer; import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.DataFormatException; import java.util.zip.Deflater; import java.util.zip.Inflater; @@ -30,7 +32,6 @@ import org.eclipse.jetty.util.ConcurrentArrayQueue; import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.websocket.api.BadPayloadException; import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.Frame; @@ -40,9 +41,10 @@ import org.eclipse.jetty.websocket.common.frames.DataFrame; public abstract class CompressExtension extends AbstractExtension { - protected static final byte[] TAIL_BYTES = new byte[]{0x00, 0x00, (byte)0xFF, (byte)0xFF}; + protected static final byte[] TAIL_BYTES = new byte[] { 0x00, 0x00, (byte)0xFF, (byte)0xFF}; + protected static final ByteBuffer TAIL_BYTES_BUF = ByteBuffer.wrap(TAIL_BYTES); private static final Logger LOG = Log.getLogger(CompressExtension.class); - + /** Never drop tail bytes 0000FFFF, from any frame type */ protected static final int TAIL_DROP_NEVER = 0; /** Always drop tail bytes 0000FFFF, from all frame types */ @@ -52,37 +54,43 @@ public abstract class CompressExtension extends AbstractExtension /** Always set RSV flag, on all frame types */ protected static final int RSV_USE_ALWAYS = 0; - /** + /** * Only set RSV flag on first frame in multi-frame messages. * <p> - * Note: this automatically means no-continuation frames have - * the RSV bit set + * Note: this automatically means no-continuation frames have the RSV bit set */ protected static final int RSV_USE_ONLY_FIRST = 1; + /** Inflater / Decompressed Buffer Size */ + protected static final int INFLATE_BUFFER_SIZE = 8 * 1024; + /** Deflater / Inflater: Maximum Input Buffer Size */ + protected static final int INPUT_MAX_BUFFER_SIZE = 8 * 1024; + private final static boolean NOWRAP = true; + private final Queue<FrameEntry> entries = new ConcurrentArrayQueue<>(); private final IteratingCallback flusher = new Flusher(); - private final Deflater compressor; - private final Inflater decompressor; + private final Deflater deflater; + private final Inflater inflater; + protected AtomicInteger decompressCount = new AtomicInteger(0); private int tailDrop = TAIL_DROP_NEVER; private int rsvUse = RSV_USE_ALWAYS; protected CompressExtension() { - compressor = new Deflater(Deflater.BEST_COMPRESSION, true); - decompressor = new Inflater(true); + deflater = new Deflater(Deflater.DEFAULT_COMPRESSION,NOWRAP); + inflater = new Inflater(NOWRAP); tailDrop = getTailDropMode(); rsvUse = getRsvUseMode(); } - + public Deflater getDeflater() { - return compressor; + return deflater; } public Inflater getInflater() { - return decompressor; + return inflater; } /** @@ -93,7 +101,7 @@ public abstract class CompressExtension extends AbstractExtension { return true; } - + /** * Return the mode of operation for dropping (or keeping) tail bytes in frames generated by compress (outgoing) * @@ -114,7 +122,7 @@ public abstract class CompressExtension extends AbstractExtension // Unset RSV1 since it's not compressed anymore. newFrame.setRsv1(false); - ByteBuffer buffer = getBufferPool().acquire(accumulator.getLength(), false); + ByteBuffer buffer = getBufferPool().acquire(accumulator.getLength(),false); try { BufferUtil.flipToFill(buffer); @@ -127,55 +135,68 @@ public abstract class CompressExtension extends AbstractExtension getBufferPool().release(buffer); } } - - protected ByteAccumulator decompress(byte[] input) + + protected ByteAccumulator newByteAccumulator() { - // Since we don't track text vs binary vs continuation state, just grab whatever is the greater value. - int maxSize = Math.max(getPolicy().getMaxTextMessageSize(), getPolicy().getMaxBinaryMessageBufferSize()); - ByteAccumulator accumulator = new ByteAccumulator(maxSize); - - decompressor.setInput(input, 0, input.length); + int maxSize = Math.max(getPolicy().getMaxTextMessageSize(),getPolicy().getMaxBinaryMessageBufferSize()); + return new ByteAccumulator(maxSize); + } + + protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) + throws DataFormatException + { + if ((buf == null) || (!buf.hasRemaining())) + { + return; + } + byte[] output = new byte[1024]; - if (LOG.isDebugEnabled()) - LOG.debug("Decompressing {} bytes", input.length); + if (inflater.needsInput() && !supplyInput(inflater, buf)) + { + LOG.debug("Needed input, but no buffer could supply input"); + return; + } - try + int read = 0; + while ((read = inflater.inflate(output)) >= 0) { - // It is allowed to send DEFLATE blocks with BFINAL=1. - // For such blocks, getRemaining() will be > 0 but finished() - // will be true, so we need to check for both. - // When BFINAL=0, finished() will always be false and we only - // check the remaining bytes. - while (decompressor.getRemaining() > 0 && !decompressor.finished()) + if (read == 0) { - byte[] output = new byte[Math.min(input.length * 2, 32 * 1024)]; - int decompressed = decompressor.inflate(output); - if (decompressed == 0) + LOG.debug("Decompress: read 0 {}",toDetail(inflater)); + if (inflater.finished() || inflater.needsDictionary()) { - if (decompressor.needsInput()) + if (LOG.isDebugEnabled()) { - throw new BadPayloadException("Unable to inflate frame, not enough input on frame"); + LOG.debug("Decompress: finished? {}",toDetail(inflater)); } - if (decompressor.needsDictionary()) + // We are finished ? + break; + } + else if (inflater.needsInput()) + { + if (!supplyInput(inflater, buf)) { - throw new BadPayloadException("Unable to inflate frame, frame erroneously says it needs a dictionary"); + break; } } - else + } + else + { + // do something with output + if (LOG.isDebugEnabled()) { - accumulator.addChunk(output, 0, decompressed); + LOG.debug("Decompressed {} bytes: {}",read,toDetail(inflater)); } + accumulator.addChunk(output,0,read); } - if (LOG.isDebugEnabled()) - LOG.debug("Decompressed {}->{} bytes", input.length, accumulator.getLength()); - return accumulator; } - catch (DataFormatException x) + + if (LOG.isDebugEnabled()) { - throw new BadPayloadException(x); + LOG.debug("Decompress: exiting {}",toDetail(inflater)); } } - + @Override public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) { @@ -185,13 +206,13 @@ public abstract class CompressExtension extends AbstractExtension if (flusher.isFailed()) { - notifyCallbackFailure(callback, new ZipException()); + notifyCallbackFailure(callback,new ZipException()); return; } - FrameEntry entry = new FrameEntry(frame, callback, batchMode); + FrameEntry entry = new FrameEntry(frame,callback,batchMode); if (LOG.isDebugEnabled()) - LOG.debug("Queuing {}", entry); + LOG.debug("Queuing {}",entry); entries.offer(entry); flusher.iterate(); } @@ -206,7 +227,7 @@ public abstract class CompressExtension extends AbstractExtension catch (Throwable x) { if (LOG.isDebugEnabled()) - LOG.debug("Exception while notifying success of callback " + callback, x); + LOG.debug("Exception while notifying success of callback " + callback,x); } } @@ -220,10 +241,119 @@ public abstract class CompressExtension extends AbstractExtension catch (Throwable x) { if (LOG.isDebugEnabled()) - LOG.debug("Exception while notifying failure of callback " + callback, x); + LOG.debug("Exception while notifying failure of callback " + callback,x); } } + private static boolean supplyInput(Inflater inflater, ByteBuffer buf) + { + if (buf.remaining() <= 0) + { + if (LOG.isDebugEnabled()) + { + LOG.debug("No data left left to supply to Inflater"); + } + return false; + } + + byte input[]; + int inputOffset = 0; + int len; + + if (buf.hasArray()) + { + // no need to create a new byte buffer, just return this one. + len = buf.remaining(); + input = buf.array(); + inputOffset = buf.position() + buf.arrayOffset(); + buf.position(buf.position() + len); + } + else + { + // Only create an return byte buffer that is reasonable in size + len = Math.min(INPUT_MAX_BUFFER_SIZE, buf.remaining()); + input = new byte[len]; + inputOffset = 0; + buf.get(input,0,len); + } + + inflater.setInput(input,inputOffset,len); + if (LOG.isDebugEnabled()) + { + LOG.debug("Supplied {} input bytes: {}",input.length,toDetail(inflater)); + } + return true; + } + + private static boolean supplyInput(Deflater deflater, ByteBuffer buf) + { + if (buf.remaining() <= 0) + { + if (LOG.isDebugEnabled()) + { + LOG.debug("No data left left to supply to Deflater"); + } + return false; + } + + byte input[]; + int inputOffset = 0; + int len; + + if (buf.hasArray()) + { + // no need to create a new byte buffer, just return this one. + len = buf.remaining(); + input = buf.array(); + inputOffset = buf.position() + buf.arrayOffset(); + buf.position(buf.position() + len); + } + else + { + // Only create an return byte buffer that is reasonable in size + len = Math.min(INPUT_MAX_BUFFER_SIZE, buf.remaining()); + input = new byte[len]; + inputOffset = 0; + buf.get(input,0,len); + } + + deflater.setInput(input,inputOffset,len); + if (LOG.isDebugEnabled()) + { + LOG.debug("Supplied {} input bytes: {}",input.length,toDetail(deflater)); + } + return true; + } + + private static String toDetail(Inflater inflater) + { + return String.format("Inflater[finished=%b,read=%d,written=%d,remaining=%d,in=%d,out=%d]",inflater.finished(),inflater.getBytesRead(), + inflater.getBytesWritten(),inflater.getRemaining(),inflater.getTotalIn(),inflater.getTotalOut()); + } + + private static String toDetail(Deflater deflater) + { + return String.format("Deflater[finished=%b,read=%d,written=%d,in=%d,out=%d]",deflater.finished(),deflater.getBytesRead(), + deflater.getBytesWritten(),deflater.getTotalIn(),deflater.getTotalOut()); + } + + public static boolean endsWithTail(ByteBuffer buf) + { + if ((buf == null) || (buf.remaining() < TAIL_BYTES.length)) + { + return false; + } + int limit = buf.limit(); + for (int i = TAIL_BYTES.length; i > 0; i--) + { + if (buf.get(limit - i) != TAIL_BYTES[TAIL_BYTES.length - i]) + { + return false; + } + } + return true; + } + @Override public String toString() { @@ -254,7 +384,6 @@ public abstract class CompressExtension extends AbstractExtension { private static final int INPUT_BUFSIZE = 32 * 1024; private FrameEntry current; - private ByteBuffer payload; private boolean finished = true; @Override @@ -263,14 +392,14 @@ public abstract class CompressExtension extends AbstractExtension if (finished) { current = entries.poll(); - LOG.debug("Processing {}", current); + LOG.debug("Processing {}",current); if (current == null) return Action.IDLE; deflate(current); } else { - compress(current, false); + compress(current,false); } return Action.SCHEDULED; } @@ -281,97 +410,117 @@ public abstract class CompressExtension extends AbstractExtension BatchMode batchMode = entry.batchMode; if (OpCode.isControlFrame(frame.getOpCode()) || !frame.hasPayload()) { - nextOutgoingFrame(frame, this, batchMode); + nextOutgoingFrame(frame,this,batchMode); return; } - compress(entry, true); + compress(entry,true); } private void compress(FrameEntry entry, boolean first) { + final int flush = Deflater.SYNC_FLUSH; + // Get a chunk of the payload to avoid to blow // the heap if the payload is a huge mapped file. Frame frame = entry.frame; ByteBuffer data = frame.getPayload(); int remaining = data.remaining(); - int inputLength = Math.min(remaining, INPUT_BUFSIZE); + int chunkSize = Math.min(remaining,INPUT_BUFSIZE); if (LOG.isDebugEnabled()) - LOG.debug("Compressing {}: {} bytes in {} bytes chunk", entry, remaining, inputLength); - - // Avoid to copy the bytes if the ByteBuffer - // is backed by an array. - int inputOffset; - byte[] input; - if (data.hasArray()) + LOG.debug("Compressing {}: {} bytes in {} bytes chunk",entry,remaining,chunkSize); + + boolean needsCompress = true; + + if (deflater.needsInput() && !supplyInput(deflater,data)) { - input = data.array(); - int position = data.position(); - inputOffset = position + data.arrayOffset(); - data.position(position + inputLength); + // no input supplied + needsCompress = false; } - else - { - input = new byte[inputLength]; - inputOffset = 0; - data.get(input, 0, inputLength); - } - finished = inputLength == remaining; + + ByteArrayOutputStream out = new ByteArrayOutputStream(); - compressor.setInput(input, inputOffset, inputLength); + byte[] output = new byte[chunkSize]; - // Use an additional space in case the content is not compressible. - byte[] output = new byte[inputLength + 64]; - int outputOffset = 0; - int outputLength = 0; - while (true) + // Compress the data + while (needsCompress) { - int space = output.length - outputOffset; - int compressed = compressor.deflate(output, outputOffset, space, Deflater.SYNC_FLUSH); - outputLength += compressed; - if (compressed < space) + int read = deflater.deflate(output,0,chunkSize,flush); + if (read == 0) { - // Everything was compressed. - break; + if (deflater.finished()) + { + // done + break; + } + else if (deflater.needsInput()) + { + if (!supplyInput(deflater,data)) + { + // done + needsCompress = false; + } + } } else { - // The compressed output is bigger than the uncompressed input. - byte[] newOutput = new byte[output.length * 2]; - System.arraycopy(output, 0, newOutput, 0, output.length); - outputOffset += output.length; - output = newOutput; + // Append the output for the eventual frame. + out.write(output,0,read); } } - boolean fin = frame.isFin() && finished; + boolean fin = frame.isFin(); + + ByteBuffer payload = ByteBuffer.wrap(out.toByteArray()); + + if (payload.remaining()>0) + { + // Handle tail bytes generated by SYNC_FLUSH. + LOG.debug("compressed bytes[] = {}",BufferUtil.toDetailString(payload)); - // Handle tail bytes generated by SYNC_FLUSH. - if(tailDrop == TAIL_DROP_ALWAYS) { - payload = ByteBuffer.wrap(output, 0, outputLength - TAIL_BYTES.length); - } else if(tailDrop == TAIL_DROP_FIN_ONLY) { - payload = ByteBuffer.wrap(output, 0, outputLength - (fin?TAIL_BYTES.length:0)); - } else { - // always include - payload = ByteBuffer.wrap(output, 0, outputLength); + if (tailDrop == TAIL_DROP_ALWAYS) + { + if (endsWithTail(payload)) + { + payload.limit(payload.limit() - TAIL_BYTES.length); + } + LOG.debug("payload (TAIL_DROP_ALWAYS) = {}",BufferUtil.toDetailString(payload)); + } + else if (tailDrop == TAIL_DROP_FIN_ONLY) + { + if (frame.isFin() && endsWithTail(payload)) + { + payload.limit(payload.limit() - TAIL_BYTES.length); + } + LOG.debug("payload (TAIL_DROP_FIN_ONLY) = {}",BufferUtil.toDetailString(payload)); + } + } + else if (fin) + { + // Special case: 8.2.3.6. Generating an Empty Fragment Manually + payload = ByteBuffer.wrap(new byte[] { 0x00 }); } + if (LOG.isDebugEnabled()) { - LOG.debug("Compressed {}: {}->{} chunk bytes",entry,inputLength,outputLength); + LOG.debug("Compressed {}: input:{} -> payload:{}",entry,chunkSize,payload.remaining()); } boolean continuation = frame.getType().isContinuation() || !first; - DataFrame chunk = new DataFrame(frame, continuation); - if(rsvUse == RSV_USE_ONLY_FIRST) { + DataFrame chunk = new DataFrame(frame,continuation); + if (rsvUse == RSV_USE_ONLY_FIRST) + { chunk.setRsv1(!continuation); - } else { + } + else + { // always set chunk.setRsv1(true); } chunk.setPayload(payload); chunk.setFin(fin); - nextOutgoingFrame(chunk, this, entry.batchMode); + nextOutgoingFrame(chunk,this,entry.batchMode); } @Override @@ -379,14 +528,14 @@ public abstract class CompressExtension extends AbstractExtension { // This IteratingCallback never completes. } - + @Override protected void onCompleteFailure(Throwable x) { // Fail all the frames in the queue. FrameEntry entry; while ((entry = entries.poll()) != null) - notifyCallbackFailure(entry.callback, x); + notifyCallbackFailure(entry.callback,x); } @Override @@ -400,7 +549,7 @@ public abstract class CompressExtension extends AbstractExtension @Override public void writeFailed(Throwable x) { - notifyCallbackFailure(current.callback, x); + notifyCallbackFailure(current.callback,x); // If something went wrong, very likely the compression context // will be invalid, so we need to fail this IteratingCallback. failed(x); diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java index 09ffc6fccd..fe97e57c69 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/PerMessageDeflateExtension.java @@ -19,9 +19,11 @@ package org.eclipse.jetty.websocket.common.extensions.compress; import java.nio.ByteBuffer; +import java.util.zip.DataFormatException; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.BadPayloadException; import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig; @@ -59,23 +61,33 @@ public class PerMessageDeflateExtension extends CompressExtension // This extension requires the RSV1 bit set only in the first frame. // Subsequent continuation frames don't have RSV1 set, but are compressed. if (frame.getType().isData()) + { incomingCompressed = frame.isRsv1(); + } - if (OpCode.isControlFrame(frame.getOpCode()) || !frame.hasPayload() || !incomingCompressed) + if (OpCode.isControlFrame(frame.getOpCode()) || !incomingCompressed) { nextIncomingFrame(frame); return; } - - boolean appendTail = frame.isFin(); - ByteBuffer payload = frame.getPayload(); - int remaining = payload.remaining(); - byte[] input = new byte[remaining + (appendTail ? TAIL_BYTES.length : 0)]; - payload.get(input, 0, remaining); - if (appendTail) - System.arraycopy(TAIL_BYTES, 0, input, remaining, TAIL_BYTES.length); - - forwardIncoming(frame, decompress(input)); + + ByteAccumulator accumulator = newByteAccumulator(); + + try + { + ByteBuffer payload = frame.getPayload(); + decompress(accumulator, payload); + if (frame.isFin()) + { + decompress(accumulator, TAIL_BYTES_BUF.slice()); + } + + forwardIncoming(frame, accumulator); + } + catch (DataFormatException e) + { + throw new BadPayloadException(e); + } if (frame.isFin()) incomingCompressed = false; @@ -87,6 +99,7 @@ public class PerMessageDeflateExtension extends CompressExtension if (frame.isFin() && !incomingContextTakeover) { LOG.debug("Incoming Context Reset"); + decompressCount.set(0); getInflater().reset(); } super.nextIncomingFrame(frame); @@ -167,6 +180,8 @@ public class PerMessageDeflateExtension extends CompressExtension } } } + + LOG.debug("config: outgoingContextTakover={}, incomingContextTakeover={} : {}", outgoingContextTakeover, incomingContextTakeover, this); super.setConfig(configNegotiated); } @@ -174,7 +189,7 @@ public class PerMessageDeflateExtension extends CompressExtension @Override public String toString() { - return String.format("%s[requested=%s,negotiated=%s]", + return String.format("%s[requested=\"%s\", negotiated=\"%s\"]", getClass().getSimpleName(), configRequested.getParameterizedName(), configNegotiated.getParameterizedName()); |