Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimone Bordet2015-11-25 13:00:19 +0000
committerSimone Bordet2015-11-25 13:00:19 +0000
commit50041395f9853ce22601f7d6ff9360ce8efa1e3f (patch)
tree8712ef0643565f0e2e22ac9f542acfabf70c3d84 /jetty-http2
parent5c04f7f86db6f6dd573a57220e6beda367289d07 (diff)
parent3c54806d4707c8a1278f34615e58f63e5d4f6d53 (diff)
downloadorg.eclipse.jetty.project-50041395f9853ce22601f7d6ff9360ce8efa1e3f.tar.gz
org.eclipse.jetty.project-50041395f9853ce22601f7d6ff9360ce8efa1e3f.tar.xz
org.eclipse.jetty.project-50041395f9853ce22601f7d6ff9360ce8efa1e3f.zip
Merged branch 'jetty-9.3.x' into 'master'.
Diffstat (limited to 'jetty-http2')
-rw-r--r--jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/HTTP2Test.java155
-rw-r--r--jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/AbstractFlowControlStrategy.java4
-rw-r--r--jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java8
-rw-r--r--jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/FlowControlStrategy.java4
-rw-r--r--jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java2
-rw-r--r--jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java25
-rw-r--r--jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java16
-rw-r--r--jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java3
-rw-r--r--jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java5
-rw-r--r--jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java11
10 files changed, 190 insertions, 43 deletions
diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/HTTP2Test.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/HTTP2Test.java
index cef666ece7..405e6d8d56 100644
--- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/HTTP2Test.java
+++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/HTTP2Test.java
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -43,8 +45,10 @@ import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
+import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Jetty;
import org.eclipse.jetty.util.Promise;
import org.junit.Assert;
@@ -290,6 +294,131 @@ public class HTTP2Test extends AbstractTest
}
@Test
+ public void testMaxConcurrentStreams() throws Exception
+ {
+ int maxStreams = 2;
+ start(new ServerSessionListener.Adapter()
+ {
+ @Override
+ public Map<Integer, Integer> onPreface(Session session)
+ {
+ Map<Integer, Integer> settings = new HashMap<>(1);
+ settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, maxStreams);
+ return settings;
+ }
+
+ @Override
+ public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
+ {
+ MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields(), 0);
+ stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
+ return null;
+ }
+ });
+
+ CountDownLatch settingsLatch = new CountDownLatch(1);
+ Session session = newClient(new Session.Listener.Adapter()
+ {
+ @Override
+ public void onSettings(Session session, SettingsFrame frame)
+ {
+ settingsLatch.countDown();
+ }
+ });
+ Assert.assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
+
+ MetaData.Request request1 = newRequest("GET", new HttpFields());
+ FuturePromise<Stream> promise1 = new FuturePromise<>();
+ CountDownLatch exchangeLatch1 = new CountDownLatch(2);
+ session.newStream(new HeadersFrame(request1, null, false), promise1, new Stream.Listener.Adapter()
+ {
+ @Override
+ public void onHeaders(Stream stream, HeadersFrame frame)
+ {
+ if (frame.isEndStream())
+ exchangeLatch1.countDown();
+ }
+ });
+ Stream stream1 = promise1.get(5, TimeUnit.SECONDS);
+
+ MetaData.Request request2 = newRequest("GET", new HttpFields());
+ FuturePromise<Stream> promise2 = new FuturePromise<>();
+ CountDownLatch exchangeLatch2 = new CountDownLatch(2);
+ session.newStream(new HeadersFrame(request2, null, false), promise2, new Stream.Listener.Adapter()
+ {
+ @Override
+ public void onHeaders(Stream stream, HeadersFrame frame)
+ {
+ if (frame.isEndStream())
+ exchangeLatch2.countDown();
+ }
+ });
+ Stream stream2 = promise2.get(5, TimeUnit.SECONDS);
+
+ // The third stream must not be created.
+ MetaData.Request request3 = newRequest("GET", new HttpFields());
+ CountDownLatch maxStreamsLatch = new CountDownLatch(1);
+ session.newStream(new HeadersFrame(request3, null, false), new Promise.Adapter<Stream>()
+ {
+ @Override
+ public void failed(Throwable x)
+ {
+ if (x instanceof IllegalStateException)
+ maxStreamsLatch.countDown();
+ }
+ }, new Stream.Listener.Adapter());
+
+ Assert.assertTrue(maxStreamsLatch.await(5, TimeUnit.SECONDS));
+ Assert.assertEquals(2, session.getStreams().size());
+
+ // End the second stream.
+ stream2.data(new DataFrame(stream2.getId(), BufferUtil.EMPTY_BUFFER, true), new Callback()
+ {
+ @Override
+ public void succeeded()
+ {
+ exchangeLatch2.countDown();
+ }
+ });
+ Assert.assertTrue(exchangeLatch2.await(5, TimeUnit.SECONDS));
+ Assert.assertEquals(1, session.getStreams().size());
+
+ // Create a fourth stream.
+ MetaData.Request request4 = newRequest("GET", new HttpFields());
+ CountDownLatch exchangeLatch4 = new CountDownLatch(2);
+ session.newStream(new HeadersFrame(request4, null, true), new Promise.Adapter<Stream>()
+ {
+ @Override
+ public void succeeded(Stream result)
+ {
+ exchangeLatch4.countDown();
+ }
+ }, new Stream.Listener.Adapter()
+ {
+ @Override
+ public void onHeaders(Stream stream, HeadersFrame frame)
+ {
+ if (frame.isEndStream())
+ exchangeLatch4.countDown();
+ }
+ });
+ Assert.assertTrue(exchangeLatch4.await(5, TimeUnit.SECONDS));
+ Assert.assertEquals(1, session.getStreams().size());
+
+ // End the first stream.
+ stream1.data(new DataFrame(stream1.getId(), BufferUtil.EMPTY_BUFFER, true), new Callback()
+ {
+ @Override
+ public void succeeded()
+ {
+ exchangeLatch1.countDown();
+ }
+ });
+ Assert.assertTrue(exchangeLatch2.await(5, TimeUnit.SECONDS));
+ Assert.assertEquals(0, session.getStreams().size());
+ }
+
+ @Test
public void testInvalidAPIUsageOnClient() throws Exception
{
start(new ServerSessionListener.Adapter()
@@ -383,19 +512,6 @@ public class HTTP2Test extends AbstractTest
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
}
- private static void sleep(long time)
- {
- try
- {
- Thread.sleep(time);
- }
- catch (InterruptedException x)
- {
- throw new RuntimeException();
- }
- }
-}
-/*
@Test
public void testInvalidAPIUsageOnServer() throws Exception
{
@@ -469,4 +585,15 @@ public class HTTP2Test extends AbstractTest
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
}
- */
+ private static void sleep(long time)
+ {
+ try
+ {
+ Thread.sleep(time);
+ }
+ catch (InterruptedException x)
+ {
+ throw new RuntimeException();
+ }
+ }
+}
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/AbstractFlowControlStrategy.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/AbstractFlowControlStrategy.java
index 5145ad9ce7..318ed81237 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/AbstractFlowControlStrategy.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/AbstractFlowControlStrategy.java
@@ -47,14 +47,14 @@ public abstract class AbstractFlowControlStrategy implements FlowControlStrategy
}
@Override
- public void onStreamCreated(IStream stream, boolean local)
+ public void onStreamCreated(IStream stream)
{
stream.updateSendWindow(initialStreamSendWindow);
stream.updateRecvWindow(initialStreamRecvWindow);
}
@Override
- public void onStreamDestroyed(IStream stream, boolean local)
+ public void onStreamDestroyed(IStream stream)
{
}
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java
index 5d851a189b..819e7703e4 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java
@@ -68,17 +68,17 @@ public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy
}
@Override
- public void onStreamCreated(IStream stream, boolean local)
+ public void onStreamCreated(IStream stream)
{
- super.onStreamCreated(stream, local);
+ super.onStreamCreated(stream);
streamLevels.put(stream, new AtomicInteger());
}
@Override
- public void onStreamDestroyed(IStream stream, boolean local)
+ public void onStreamDestroyed(IStream stream)
{
streamLevels.remove(stream);
- super.onStreamDestroyed(stream, local);
+ super.onStreamDestroyed(stream);
}
@Override
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/FlowControlStrategy.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/FlowControlStrategy.java
index b5847aeaf8..2907abdc00 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/FlowControlStrategy.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/FlowControlStrategy.java
@@ -24,9 +24,9 @@ public interface FlowControlStrategy
{
public static int DEFAULT_WINDOW_SIZE = 65535;
- public void onStreamCreated(IStream stream, boolean local);
+ public void onStreamCreated(IStream stream);
- public void onStreamDestroyed(IStream stream, boolean local);
+ public void onStreamDestroyed(IStream stream);
public void updateInitialStreamWindow(ISession session, int initialStreamWindow, boolean local);
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java
index f30238dc8c..dd0bc539ce 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java
@@ -366,7 +366,7 @@ public class HTTP2Flusher extends IteratingCallback
if (stream != null)
{
stream.close();
- stream.getSession().removeStream(stream, true);
+ stream.getSession().removeStream(stream);
}
callback.failed(x);
}
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java
index 41ef131a52..29daa61f8e 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java
@@ -618,11 +618,11 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
break;
}
- IStream stream = newStream(streamId);
+ IStream stream = newStream(streamId, true);
if (streams.putIfAbsent(streamId, stream) == null)
{
stream.setIdleTimeout(getStreamIdleTimeout());
- flowControl.onStreamCreated(stream, true);
+ flowControl.onStreamCreated(stream);
if (LOG.isDebugEnabled())
LOG.debug("Created local {}", stream);
return stream;
@@ -650,14 +650,14 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
break;
}
- IStream stream = newStream(streamId);
+ IStream stream = newStream(streamId, false);
// SPEC: duplicate stream is treated as connection error.
if (streams.putIfAbsent(streamId, stream) == null)
{
updateLastStreamId(streamId);
stream.setIdleTimeout(getStreamIdleTimeout());
- flowControl.onStreamCreated(stream, false);
+ flowControl.onStreamCreated(stream);
if (LOG.isDebugEnabled())
LOG.debug("Created remote {}", stream);
return stream;
@@ -669,28 +669,29 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
}
- protected IStream newStream(int streamId)
+ protected IStream newStream(int streamId, boolean local)
{
- return new HTTP2Stream(scheduler, this, streamId);
+ return new HTTP2Stream(scheduler, this, streamId, local);
}
@Override
- public void removeStream(IStream stream, boolean local)
+ public void removeStream(IStream stream)
{
IStream removed = streams.remove(stream.getId());
if (removed != null)
{
assert removed == stream;
+ boolean local = stream.isLocal();
if (local)
localStreamCount.decrementAndGet();
else
remoteStreamCount.decrementAndGet();
- flowControl.onStreamDestroyed(stream, local);
+ flowControl.onStreamDestroyed(stream);
if (LOG.isDebugEnabled())
- LOG.debug("Removed {}", stream);
+ LOG.debug("Removed {} {}", local ? "local" : "remote", stream);
}
}
@@ -1058,7 +1059,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
{
HeadersFrame headersFrame = (HeadersFrame)frame;
if (stream.updateClose(headersFrame.isEndStream(), true))
- removeStream(stream, true);
+ removeStream(stream);
break;
}
case RST_STREAM:
@@ -1066,7 +1067,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
if (stream != null)
{
stream.close();
- removeStream(stream, true);
+ removeStream(stream);
}
break;
}
@@ -1174,7 +1175,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
// Only now we can update the close state
// and eventually remove the stream.
if (stream.updateClose(dataFrame.isEndStream(), true))
- removeStream(stream, true);
+ removeStream(stream);
callback.succeeded();
}
}
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java
index 4094bd5d5d..4d56983048 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java
@@ -51,15 +51,17 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
private final AtomicInteger recvWindow = new AtomicInteger();
private final ISession session;
private final int streamId;
+ private final boolean local;
private volatile Listener listener;
private volatile boolean localReset;
private volatile boolean remoteReset;
- public HTTP2Stream(Scheduler scheduler, ISession session, int streamId)
+ public HTTP2Stream(Scheduler scheduler, ISession session, int streamId, boolean local)
{
super(scheduler);
this.session = session;
this.streamId = streamId;
+ this.local = local;
}
@Override
@@ -69,6 +71,12 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
}
@Override
+ public boolean isLocal()
+ {
+ return local;
+ }
+
+ @Override
public ISession getSession()
{
return session;
@@ -242,7 +250,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
private void onHeaders(HeadersFrame frame, Callback callback)
{
if (updateClose(frame.isEndStream(), false))
- session.removeStream(this, false);
+ session.removeStream(this);
callback.succeeded();
}
@@ -273,7 +281,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
}
if (updateClose(frame.isEndStream(), false))
- session.removeStream(this, false);
+ session.removeStream(this);
notifyData(this, frame, callback);
}
@@ -281,7 +289,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
{
remoteReset = true;
close();
- session.removeStream(this, false);
+ session.removeStream(this);
callback.succeeded();
notifyReset(this, frame);
}
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java
index 428a0fcd0b..4e12150976 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java
@@ -41,9 +41,8 @@ public interface ISession extends Session
* <p>Removes the given {@code stream}.</p>
*
* @param stream the stream to remove
- * @param local whether the stream is local or remote
*/
- public void removeStream(IStream stream, boolean local);
+ public void removeStream(IStream stream);
/**
* <p>Enqueues the given frames to be written to the connection.</p>
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java
index e041f5477b..f820efcb73 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java
@@ -39,6 +39,11 @@ public interface IStream extends Stream, Closeable
*/
public static final String CHANNEL_ATTRIBUTE = IStream.class.getName() + ".channel";
+ /**
+ * @return whether this stream is local or remote
+ */
+ public boolean isLocal();
+
@Override
public ISession getSession();
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java
index 883ed322f6..16bdc08f3b 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java
@@ -134,11 +134,18 @@ public interface Session
public interface Listener
{
/**
- * <p>Callback method invoked when the preface has been received.</p>
+ * <p>Callback method invoked:</p>
+ * <ul>
+ * <li>for clients, just before the preface is sent, to gather the
+ * SETTINGS configuration options the client wants to send to the server;</li>
+ * <li>for servers, just after having received the preface, to gather
+ * the SETTINGS configuration options the server wants to send to the
+ * client.</li>
+ * </ul>
*
* @param session the session
* @return a (possibly empty or null) map containing SETTINGS configuration
- * options that are sent after the preface.
+ * options to send.
*/
public Map<Integer, Integer> onPreface(Session session);

Back to the top