diff options
author | Simone Bordet | 2015-09-14 09:50:44 +0000 |
---|---|---|
committer | Simone Bordet | 2015-09-14 09:51:19 +0000 |
commit | edce119c0e8a9ca050cfc5fc7cca728586dbdb59 (patch) | |
tree | 19ee428659a5f7466d2cb1fe1a4b01de36bb9f8b | |
parent | dbd66b131b2eb6ebd3752df1c024858ac032decf (diff) | |
download | org.eclipse.jetty.project-edce119c0e8a9ca050cfc5fc7cca728586dbdb59.tar.gz org.eclipse.jetty.project-edce119c0e8a9ca050cfc5fc7cca728586dbdb59.tar.xz org.eclipse.jetty.project-edce119c0e8a9ca050cfc5fc7cca728586dbdb59.zip |
477270 - Add ability to send a single PRIORITY frame.
Also fixed the mistake of sending the stream id as the parent stream id.
7 files changed, 236 insertions, 52 deletions
diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/PriorityTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/PriorityTest.java new file mode 100644 index 0000000000..a83ed98595 --- /dev/null +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/PriorityTest.java @@ -0,0 +1,144 @@ +// +// ======================================================================== +// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http2.client; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.api.server.ServerSessionListener; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.frames.PriorityFrame; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.FuturePromise; +import org.eclipse.jetty.util.Promise; +import org.junit.Assert; +import org.junit.Test; + +public class PriorityTest extends AbstractTest +{ + @Test + public void testPriorityBeforeHeaders() throws Exception + { + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()); + HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, true); + stream.headers(responseFrame, Callback.NOOP); + return null; + } + }); + + Session session = newClient(new Session.Listener.Adapter()); + int streamId = session.priority(new PriorityFrame(0, 13, false), Callback.NOOP); + Assert.assertTrue(streamId > 0); + + CountDownLatch latch = new CountDownLatch(2); + MetaData metaData = newRequest("GET", new HttpFields()); + HeadersFrame headersFrame = new HeadersFrame(streamId, metaData, null, true); + session.newStream(headersFrame, new Promise.Adapter<Stream>() + { + @Override + public void succeeded(Stream result) + { + Assert.assertEquals(streamId, result.getId()); + latch.countDown(); + } + }, new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + if (frame.isEndStream()) + latch.countDown(); + } + }); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testPriorityAfterHeaders() throws Exception + { + CountDownLatch beforeRequests = new CountDownLatch(1); + CountDownLatch afterRequests = new CountDownLatch(2); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + try + { + beforeRequests.await(5, TimeUnit.SECONDS); + MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()); + HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, true); + stream.headers(responseFrame, Callback.NOOP); + afterRequests.countDown(); + return null; + } + catch (InterruptedException x) + { + x.printStackTrace(); + return null; + } + } + }); + + CountDownLatch responses = new CountDownLatch(2); + Stream.Listener.Adapter listener = new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + if (frame.isEndStream()) + responses.countDown(); + } + }; + + Session session = newClient(new Session.Listener.Adapter()); + MetaData metaData1 = newRequest("GET", "/one", new HttpFields()); + HeadersFrame headersFrame1 = new HeadersFrame(metaData1, null, true); + FuturePromise<Stream> promise1 = new FuturePromise<>(); + session.newStream(headersFrame1, promise1, listener); + Stream stream1 = promise1.get(5, TimeUnit.SECONDS); + + MetaData metaData2 = newRequest("GET", "/two", new HttpFields()); + HeadersFrame headersFrame2 = new HeadersFrame(metaData2, null, true); + FuturePromise<Stream> promise2 = new FuturePromise<>(); + session.newStream(headersFrame2, promise2, listener); + Stream stream2 = promise2.get(5, TimeUnit.SECONDS); + + int streamId = session.priority(new PriorityFrame(stream1.getId(), stream2.getId(), 13, false), Callback.NOOP); + Assert.assertEquals(stream1.getId(), streamId); + + // Give time to the PRIORITY frame to arrive to server. + Thread.sleep(1000); + beforeRequests.countDown(); + + Assert.assertTrue(afterRequests.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(responses.await(5, TimeUnit.SECONDS)); + } +} diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 6ee9c136b9..6f3d907878 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -201,6 +201,9 @@ public abstract class HTTP2Session implements ISession, Parser.Listener { if (LOG.isDebugEnabled()) LOG.debug("Received {}", frame); + + + } @Override @@ -418,11 +421,15 @@ public abstract class HTTP2Session implements ISession, Parser.Listener boolean queued; synchronized (this) { - int streamId = streamIds.getAndAdd(2); - PriorityFrame priority = frame.getPriority(); - priority = priority == null ? null : new PriorityFrame(streamId, priority.getDependentStreamId(), - priority.getWeight(), priority.isExclusive()); - frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream()); + int streamId = frame.getStreamId(); + if (streamId <= 0) + { + streamId = streamIds.getAndAdd(2); + PriorityFrame priority = frame.getPriority(); + priority = priority == null ? null : new PriorityFrame(streamId, priority.getParentStreamId(), + priority.getWeight(), priority.isExclusive()); + frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream()); + } final IStream stream = createLocalStream(streamId, promise); if (stream == null) return; @@ -437,6 +444,21 @@ public abstract class HTTP2Session implements ISession, Parser.Listener } @Override + public int priority(PriorityFrame frame, Callback callback) + { + int streamId = frame.getStreamId(); + IStream stream = streams.get(streamId); + if (stream == null) + { + streamId = streamIds.getAndAdd(2); + frame = new PriorityFrame(streamId, frame.getParentStreamId(), + frame.getWeight(), frame.isExclusive()); + } + control(stream, callback, frame); + return streamId; + } + + @Override public void push(IStream stream, Promise<Stream> promise, PushPromiseFrame frame, Stream.Listener listener) { // Synchronization is necessary to atomically create @@ -652,20 +674,6 @@ public abstract class HTTP2Session implements ISession, Parser.Listener } } - public IStream createUpgradeStream() - { - // SPEC: upgrade stream is id=1 and can't exceed maximum - remoteStreamCount.incrementAndGet(); - IStream stream = newStream(1); - streams.put(1,stream); - updateLastStreamId(1); - stream.setIdleTimeout(getStreamIdleTimeout()); - flowControl.onStreamCreated(stream, false); - if (LOG.isDebugEnabled()) - LOG.debug("Created upgrade {}", stream); - return stream; - } - protected IStream newStream(int streamId) { return new HTTP2Stream(scheduler, this, streamId); @@ -1176,7 +1184,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener } } - private class PromiseCallback<C> implements Callback + private static class PromiseCallback<C> implements Callback { private final Promise<C> promise; private final C value; diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java index 4b8d05393d..883ed322f6 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java @@ -25,6 +25,7 @@ import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.GoAwayFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.PingFrame; +import org.eclipse.jetty.http2.frames.PriorityFrame; import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.frames.SettingsFrame; import org.eclipse.jetty.util.Callback; @@ -63,6 +64,19 @@ public interface Session public void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listener listener); /** + * <p>Sends the given PRIORITY {@code frame}.</p> + * <p>If the {@code frame} references a {@code streamId} that does not exist + * (for example {@code 0}), then a new {@code streamId} will be allocated, to + * support <em>unused anchor streams</em> that act as parent for other streams.</p> + * + * @param frame the PRIORITY frame to send + * @param callback the callback that gets notified when the frame has been sent + * @return the new stream id generated by the PRIORITY frame, or the stream id + * that it is already referencing + */ + public int priority(PriorityFrame frame, Callback callback); + + /** * <p>Sends the given SETTINGS {@code frame} to configure the session.</p> * * @param frame the SETTINGS frame to send diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PriorityFrame.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PriorityFrame.java index c920d1eaf8..d5b8e3c416 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PriorityFrame.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/PriorityFrame.java @@ -21,15 +21,20 @@ package org.eclipse.jetty.http2.frames; public class PriorityFrame extends Frame { private final int streamId; - private final int dependentStreamId; + private final int parentStreamId; private final int weight; private final boolean exclusive; - public PriorityFrame(int streamId, int dependentStreamId, int weight, boolean exclusive) + public PriorityFrame(int parentStreamId, int weight, boolean exclusive) + { + this(0, parentStreamId, weight, exclusive); + } + + public PriorityFrame(int streamId, int parentStreamId, int weight, boolean exclusive) { super(FrameType.PRIORITY); this.streamId = streamId; - this.dependentStreamId = dependentStreamId; + this.parentStreamId = parentStreamId; this.weight = weight; this.exclusive = exclusive; } @@ -39,9 +44,18 @@ public class PriorityFrame extends Frame return streamId; } + /** + * @deprecated use {@link #getParentStreamId()} instead. + */ + @Deprecated public int getDependentStreamId() { - return dependentStreamId; + return getParentStreamId(); + } + + public int getParentStreamId() + { + return parentStreamId; } public int getWeight() @@ -57,6 +71,6 @@ public class PriorityFrame extends Frame @Override public String toString() { - return String.format("%s#%d/#%d{weight=%d,ex=%b}", super.toString(), streamId, dependentStreamId, weight, exclusive); + return String.format("%s#%d/#%d{weight=%d,exclusive=%b}", super.toString(), streamId, parentStreamId, weight, exclusive); } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/PriorityGenerator.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/PriorityGenerator.java index 36bd63064f..c2c0892ee6 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/PriorityGenerator.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/PriorityGenerator.java @@ -38,22 +38,22 @@ public class PriorityGenerator extends FrameGenerator public void generate(ByteBufferPool.Lease lease, Frame frame) { PriorityFrame priorityFrame = (PriorityFrame)frame; - generatePriority(lease, priorityFrame.getStreamId(), priorityFrame.getDependentStreamId(), priorityFrame.getWeight(), priorityFrame.isExclusive()); + generatePriority(lease, priorityFrame.getStreamId(), priorityFrame.getParentStreamId(), priorityFrame.getWeight(), priorityFrame.isExclusive()); } - public void generatePriority(ByteBufferPool.Lease lease, int streamId, int dependentStreamId, int weight, boolean exclusive) + public void generatePriority(ByteBufferPool.Lease lease, int streamId, int parentStreamId, int weight, boolean exclusive) { if (streamId < 0) throw new IllegalArgumentException("Invalid stream id: " + streamId); - if (dependentStreamId < 0) - throw new IllegalArgumentException("Invalid dependent stream id: " + dependentStreamId); + if (parentStreamId < 0) + throw new IllegalArgumentException("Invalid parent stream id: " + parentStreamId); - ByteBuffer header = generateHeader(lease, FrameType.PRIORITY, 5, Flags.NONE, dependentStreamId); + ByteBuffer header = generateHeader(lease, FrameType.PRIORITY, 5, Flags.NONE, streamId); if (exclusive) - streamId |= 0x80_00_00_00; + parentStreamId |= 0x80_00_00_00; - header.putInt(streamId); + header.putInt(parentStreamId); header.put((byte)weight); diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/parser/PriorityBodyParser.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/parser/PriorityBodyParser.java index 6e9d313fa4..54c8c391e7 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/parser/PriorityBodyParser.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/parser/PriorityBodyParser.java @@ -28,7 +28,7 @@ public class PriorityBodyParser extends BodyParser private State state = State.PREPARE; private int cursor; private boolean exclusive; - private int streamId; + private int parentStreamId; public PriorityBodyParser(HeaderParser headerParser, Parser.Listener listener) { @@ -40,7 +40,7 @@ public class PriorityBodyParser extends BodyParser state = State.PREPARE; cursor = 0; exclusive = false; - streamId = 0; + parentStreamId = 0; } @Override @@ -67,40 +67,44 @@ public class PriorityBodyParser extends BodyParser // because the 31 least significant bits represent the stream id. int currByte = buffer.get(buffer.position()); exclusive = (currByte & 0x80) == 0x80; - state = State.STREAM_ID; + state = State.PARENT_STREAM_ID; break; } - case STREAM_ID: + case PARENT_STREAM_ID: { if (buffer.remaining() >= 4) { - streamId = buffer.getInt(); - streamId &= 0x7F_FF_FF_FF; + parentStreamId = buffer.getInt(); + parentStreamId &= 0x7F_FF_FF_FF; state = State.WEIGHT; } else { - state = State.STREAM_ID_BYTES; + state = State.PARENT_STREAM_ID_BYTES; cursor = 4; } break; } - case STREAM_ID_BYTES: + case PARENT_STREAM_ID_BYTES: { int currByte = buffer.get() & 0xFF; --cursor; - streamId += currByte << (8 * cursor); + parentStreamId += currByte << (8 * cursor); if (cursor == 0) { - streamId &= 0x7F_FF_FF_FF; + parentStreamId &= 0x7F_FF_FF_FF; state = State.WEIGHT; } break; } case WEIGHT: { + // SPEC: stream cannot depend on itself. + if (getStreamId() == parentStreamId) + return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_priority_frame"); + int weight = buffer.get() & 0xFF; - return onPriority(streamId, weight, exclusive); + return onPriority(parentStreamId, weight, exclusive); } default: { @@ -111,9 +115,9 @@ public class PriorityBodyParser extends BodyParser return false; } - private boolean onPriority(int streamId, int weight, boolean exclusive) + private boolean onPriority(int parentStreamId, int weight, boolean exclusive) { - PriorityFrame frame = new PriorityFrame(streamId, getStreamId(), weight, exclusive); + PriorityFrame frame = new PriorityFrame(getStreamId(), parentStreamId, weight, exclusive); reset(); notifyPriority(frame); return true; @@ -121,6 +125,6 @@ public class PriorityBodyParser extends BodyParser private enum State { - PREPARE, EXCLUSIVE, STREAM_ID, STREAM_ID_BYTES, WEIGHT + PREPARE, EXCLUSIVE, PARENT_STREAM_ID, PARENT_STREAM_ID_BYTES, WEIGHT } } diff --git a/jetty-http2/http2-common/src/test/java/org/eclipse/jetty/http2/frames/PriorityGenerateParseTest.java b/jetty-http2/http2-common/src/test/java/org/eclipse/jetty/http2/frames/PriorityGenerateParseTest.java index 396a31d969..9e7dbea427 100644 --- a/jetty-http2/http2-common/src/test/java/org/eclipse/jetty/http2/frames/PriorityGenerateParseTest.java +++ b/jetty-http2/http2-common/src/test/java/org/eclipse/jetty/http2/frames/PriorityGenerateParseTest.java @@ -50,7 +50,7 @@ public class PriorityGenerateParseTest }, 4096, 8192); int streamId = 13; - int dependentStreamId = 17; + int parentStreamId = 17; int weight = 3; boolean exclusive = true; @@ -58,7 +58,7 @@ public class PriorityGenerateParseTest for (int i = 0; i < 2; ++i) { ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool); - generator.generatePriority(lease, streamId, dependentStreamId, weight, exclusive); + generator.generatePriority(lease, streamId, parentStreamId, weight, exclusive); frames.clear(); for (ByteBuffer buffer : lease.getByteBuffers()) @@ -73,7 +73,7 @@ public class PriorityGenerateParseTest Assert.assertEquals(1, frames.size()); PriorityFrame frame = frames.get(0); Assert.assertEquals(streamId, frame.getStreamId()); - Assert.assertEquals(dependentStreamId, frame.getDependentStreamId()); + Assert.assertEquals(parentStreamId, frame.getParentStreamId()); Assert.assertEquals(weight, frame.getWeight()); Assert.assertEquals(exclusive, frame.isExclusive()); } @@ -94,12 +94,12 @@ public class PriorityGenerateParseTest }, 4096, 8192); int streamId = 13; - int dependentStreamId = 17; + int parentStreamId = 17; int weight = 3; boolean exclusive = true; ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool); - generator.generatePriority(lease, streamId, dependentStreamId, weight, exclusive); + generator.generatePriority(lease, streamId, parentStreamId, weight, exclusive); for (ByteBuffer buffer : lease.getByteBuffers()) { @@ -112,7 +112,7 @@ public class PriorityGenerateParseTest Assert.assertEquals(1, frames.size()); PriorityFrame frame = frames.get(0); Assert.assertEquals(streamId, frame.getStreamId()); - Assert.assertEquals(dependentStreamId, frame.getDependentStreamId()); + Assert.assertEquals(parentStreamId, frame.getParentStreamId()); Assert.assertEquals(weight, frame.getWeight()); Assert.assertEquals(exclusive, frame.isExclusive()); } |