diff options
author | Simone Bordet | 2015-11-25 13:00:19 +0000 |
---|---|---|
committer | Simone Bordet | 2015-11-25 13:00:19 +0000 |
commit | 50041395f9853ce22601f7d6ff9360ce8efa1e3f (patch) | |
tree | 8712ef0643565f0e2e22ac9f542acfabf70c3d84 /jetty-http2 | |
parent | 5c04f7f86db6f6dd573a57220e6beda367289d07 (diff) | |
parent | 3c54806d4707c8a1278f34615e58f63e5d4f6d53 (diff) | |
download | org.eclipse.jetty.project-50041395f9853ce22601f7d6ff9360ce8efa1e3f.tar.gz org.eclipse.jetty.project-50041395f9853ce22601f7d6ff9360ce8efa1e3f.tar.xz org.eclipse.jetty.project-50041395f9853ce22601f7d6ff9360ce8efa1e3f.zip |
Merged branch 'jetty-9.3.x' into 'master'.
Diffstat (limited to 'jetty-http2')
10 files changed, 190 insertions, 43 deletions
diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/HTTP2Test.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/HTTP2Test.java index cef666ece7..405e6d8d56 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/HTTP2Test.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/HTTP2Test.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.WritePendingException; import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -43,8 +45,10 @@ import org.eclipse.jetty.http2.api.server.ServerSessionListener; import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.GoAwayFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.frames.SettingsFrame; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.Jetty; import org.eclipse.jetty.util.Promise; import org.junit.Assert; @@ -290,6 +294,131 @@ public class HTTP2Test extends AbstractTest } @Test + public void testMaxConcurrentStreams() throws Exception + { + int maxStreams = 2; + start(new ServerSessionListener.Adapter() + { + @Override + public Map<Integer, Integer> onPreface(Session session) + { + Map<Integer, Integer> settings = new HashMap<>(1); + settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, maxStreams); + return settings; + } + + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields(), 0); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + return null; + } + }); + + CountDownLatch settingsLatch = new CountDownLatch(1); + Session session = newClient(new Session.Listener.Adapter() + { + @Override + public void onSettings(Session session, SettingsFrame frame) + { + settingsLatch.countDown(); + } + }); + Assert.assertTrue(settingsLatch.await(5, TimeUnit.SECONDS)); + + MetaData.Request request1 = newRequest("GET", new HttpFields()); + FuturePromise<Stream> promise1 = new FuturePromise<>(); + CountDownLatch exchangeLatch1 = new CountDownLatch(2); + session.newStream(new HeadersFrame(request1, null, false), promise1, new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + if (frame.isEndStream()) + exchangeLatch1.countDown(); + } + }); + Stream stream1 = promise1.get(5, TimeUnit.SECONDS); + + MetaData.Request request2 = newRequest("GET", new HttpFields()); + FuturePromise<Stream> promise2 = new FuturePromise<>(); + CountDownLatch exchangeLatch2 = new CountDownLatch(2); + session.newStream(new HeadersFrame(request2, null, false), promise2, new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + if (frame.isEndStream()) + exchangeLatch2.countDown(); + } + }); + Stream stream2 = promise2.get(5, TimeUnit.SECONDS); + + // The third stream must not be created. + MetaData.Request request3 = newRequest("GET", new HttpFields()); + CountDownLatch maxStreamsLatch = new CountDownLatch(1); + session.newStream(new HeadersFrame(request3, null, false), new Promise.Adapter<Stream>() + { + @Override + public void failed(Throwable x) + { + if (x instanceof IllegalStateException) + maxStreamsLatch.countDown(); + } + }, new Stream.Listener.Adapter()); + + Assert.assertTrue(maxStreamsLatch.await(5, TimeUnit.SECONDS)); + Assert.assertEquals(2, session.getStreams().size()); + + // End the second stream. + stream2.data(new DataFrame(stream2.getId(), BufferUtil.EMPTY_BUFFER, true), new Callback() + { + @Override + public void succeeded() + { + exchangeLatch2.countDown(); + } + }); + Assert.assertTrue(exchangeLatch2.await(5, TimeUnit.SECONDS)); + Assert.assertEquals(1, session.getStreams().size()); + + // Create a fourth stream. + MetaData.Request request4 = newRequest("GET", new HttpFields()); + CountDownLatch exchangeLatch4 = new CountDownLatch(2); + session.newStream(new HeadersFrame(request4, null, true), new Promise.Adapter<Stream>() + { + @Override + public void succeeded(Stream result) + { + exchangeLatch4.countDown(); + } + }, new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + if (frame.isEndStream()) + exchangeLatch4.countDown(); + } + }); + Assert.assertTrue(exchangeLatch4.await(5, TimeUnit.SECONDS)); + Assert.assertEquals(1, session.getStreams().size()); + + // End the first stream. + stream1.data(new DataFrame(stream1.getId(), BufferUtil.EMPTY_BUFFER, true), new Callback() + { + @Override + public void succeeded() + { + exchangeLatch1.countDown(); + } + }); + Assert.assertTrue(exchangeLatch2.await(5, TimeUnit.SECONDS)); + Assert.assertEquals(0, session.getStreams().size()); + } + + @Test public void testInvalidAPIUsageOnClient() throws Exception { start(new ServerSessionListener.Adapter() @@ -383,19 +512,6 @@ public class HTTP2Test extends AbstractTest Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); } - private static void sleep(long time) - { - try - { - Thread.sleep(time); - } - catch (InterruptedException x) - { - throw new RuntimeException(); - } - } -} -/* @Test public void testInvalidAPIUsageOnServer() throws Exception { @@ -469,4 +585,15 @@ public class HTTP2Test extends AbstractTest Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); } - */ + private static void sleep(long time) + { + try + { + Thread.sleep(time); + } + catch (InterruptedException x) + { + throw new RuntimeException(); + } + } +} diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/AbstractFlowControlStrategy.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/AbstractFlowControlStrategy.java index 5145ad9ce7..318ed81237 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/AbstractFlowControlStrategy.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/AbstractFlowControlStrategy.java @@ -47,14 +47,14 @@ public abstract class AbstractFlowControlStrategy implements FlowControlStrategy } @Override - public void onStreamCreated(IStream stream, boolean local) + public void onStreamCreated(IStream stream) { stream.updateSendWindow(initialStreamSendWindow); stream.updateRecvWindow(initialStreamRecvWindow); } @Override - public void onStreamDestroyed(IStream stream, boolean local) + public void onStreamDestroyed(IStream stream) { } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java index 5d851a189b..819e7703e4 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java @@ -68,17 +68,17 @@ public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy } @Override - public void onStreamCreated(IStream stream, boolean local) + public void onStreamCreated(IStream stream) { - super.onStreamCreated(stream, local); + super.onStreamCreated(stream); streamLevels.put(stream, new AtomicInteger()); } @Override - public void onStreamDestroyed(IStream stream, boolean local) + public void onStreamDestroyed(IStream stream) { streamLevels.remove(stream); - super.onStreamDestroyed(stream, local); + super.onStreamDestroyed(stream); } @Override diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/FlowControlStrategy.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/FlowControlStrategy.java index b5847aeaf8..2907abdc00 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/FlowControlStrategy.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/FlowControlStrategy.java @@ -24,9 +24,9 @@ public interface FlowControlStrategy { public static int DEFAULT_WINDOW_SIZE = 65535; - public void onStreamCreated(IStream stream, boolean local); + public void onStreamCreated(IStream stream); - public void onStreamDestroyed(IStream stream, boolean local); + public void onStreamDestroyed(IStream stream); public void updateInitialStreamWindow(ISession session, int initialStreamWindow, boolean local); diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java index f30238dc8c..dd0bc539ce 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java @@ -366,7 +366,7 @@ public class HTTP2Flusher extends IteratingCallback if (stream != null) { stream.close(); - stream.getSession().removeStream(stream, true); + stream.getSession().removeStream(stream); } callback.failed(x); } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 41ef131a52..29daa61f8e 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -618,11 +618,11 @@ public abstract class HTTP2Session implements ISession, Parser.Listener break; } - IStream stream = newStream(streamId); + IStream stream = newStream(streamId, true); if (streams.putIfAbsent(streamId, stream) == null) { stream.setIdleTimeout(getStreamIdleTimeout()); - flowControl.onStreamCreated(stream, true); + flowControl.onStreamCreated(stream); if (LOG.isDebugEnabled()) LOG.debug("Created local {}", stream); return stream; @@ -650,14 +650,14 @@ public abstract class HTTP2Session implements ISession, Parser.Listener break; } - IStream stream = newStream(streamId); + IStream stream = newStream(streamId, false); // SPEC: duplicate stream is treated as connection error. if (streams.putIfAbsent(streamId, stream) == null) { updateLastStreamId(streamId); stream.setIdleTimeout(getStreamIdleTimeout()); - flowControl.onStreamCreated(stream, false); + flowControl.onStreamCreated(stream); if (LOG.isDebugEnabled()) LOG.debug("Created remote {}", stream); return stream; @@ -669,28 +669,29 @@ public abstract class HTTP2Session implements ISession, Parser.Listener } } - protected IStream newStream(int streamId) + protected IStream newStream(int streamId, boolean local) { - return new HTTP2Stream(scheduler, this, streamId); + return new HTTP2Stream(scheduler, this, streamId, local); } @Override - public void removeStream(IStream stream, boolean local) + public void removeStream(IStream stream) { IStream removed = streams.remove(stream.getId()); if (removed != null) { assert removed == stream; + boolean local = stream.isLocal(); if (local) localStreamCount.decrementAndGet(); else remoteStreamCount.decrementAndGet(); - flowControl.onStreamDestroyed(stream, local); + flowControl.onStreamDestroyed(stream); if (LOG.isDebugEnabled()) - LOG.debug("Removed {}", stream); + LOG.debug("Removed {} {}", local ? "local" : "remote", stream); } } @@ -1058,7 +1059,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener { HeadersFrame headersFrame = (HeadersFrame)frame; if (stream.updateClose(headersFrame.isEndStream(), true)) - removeStream(stream, true); + removeStream(stream); break; } case RST_STREAM: @@ -1066,7 +1067,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener if (stream != null) { stream.close(); - removeStream(stream, true); + removeStream(stream); } break; } @@ -1174,7 +1175,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener // Only now we can update the close state // and eventually remove the stream. if (stream.updateClose(dataFrame.isEndStream(), true)) - removeStream(stream, true); + removeStream(stream); callback.succeeded(); } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index 4094bd5d5d..4d56983048 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -51,15 +51,17 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback private final AtomicInteger recvWindow = new AtomicInteger(); private final ISession session; private final int streamId; + private final boolean local; private volatile Listener listener; private volatile boolean localReset; private volatile boolean remoteReset; - public HTTP2Stream(Scheduler scheduler, ISession session, int streamId) + public HTTP2Stream(Scheduler scheduler, ISession session, int streamId, boolean local) { super(scheduler); this.session = session; this.streamId = streamId; + this.local = local; } @Override @@ -69,6 +71,12 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback } @Override + public boolean isLocal() + { + return local; + } + + @Override public ISession getSession() { return session; @@ -242,7 +250,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback private void onHeaders(HeadersFrame frame, Callback callback) { if (updateClose(frame.isEndStream(), false)) - session.removeStream(this, false); + session.removeStream(this); callback.succeeded(); } @@ -273,7 +281,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback } if (updateClose(frame.isEndStream(), false)) - session.removeStream(this, false); + session.removeStream(this); notifyData(this, frame, callback); } @@ -281,7 +289,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback { remoteReset = true; close(); - session.removeStream(this, false); + session.removeStream(this); callback.succeeded(); notifyReset(this, frame); } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java index 428a0fcd0b..4e12150976 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java @@ -41,9 +41,8 @@ public interface ISession extends Session * <p>Removes the given {@code stream}.</p> * * @param stream the stream to remove - * @param local whether the stream is local or remote */ - public void removeStream(IStream stream, boolean local); + public void removeStream(IStream stream); /** * <p>Enqueues the given frames to be written to the connection.</p> diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java index e041f5477b..f820efcb73 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java @@ -39,6 +39,11 @@ public interface IStream extends Stream, Closeable */ public static final String CHANNEL_ATTRIBUTE = IStream.class.getName() + ".channel"; + /** + * @return whether this stream is local or remote + */ + public boolean isLocal(); + @Override public ISession getSession(); diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java index 883ed322f6..16bdc08f3b 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java @@ -134,11 +134,18 @@ public interface Session public interface Listener { /** - * <p>Callback method invoked when the preface has been received.</p> + * <p>Callback method invoked:</p> + * <ul> + * <li>for clients, just before the preface is sent, to gather the + * SETTINGS configuration options the client wants to send to the server;</li> + * <li>for servers, just after having received the preface, to gather + * the SETTINGS configuration options the server wants to send to the + * client.</li> + * </ul> * * @param session the session * @return a (possibly empty or null) map containing SETTINGS configuration - * options that are sent after the preface. + * options to send. */ public Map<Integer, Integer> onPreface(Session session); |