aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorThomas Becker2013-04-09 12:28:55 (EDT)
committerThomas Becker2013-04-10 10:49:45 (EDT)
commit6a6660bfc61554966dcdf3ff85199621225b203f (patch)
treeaaed0be3e38aeab1d73dcd10cf96fc35b8ff48de
parentaa4e79efe2e1db27b86d9c943e1e9af7ef1ba7f3 (diff)
downloadorg.eclipse.jetty.project-6a6660bfc61554966dcdf3ff85199621225b203f.zip
org.eclipse.jetty.project-6a6660bfc61554966dcdf3ff85199621225b203f.tar.gz
org.eclipse.jetty.project-6a6660bfc61554966dcdf3ff85199621225b203f.tar.bz2
405364 spdy imeplement MAX_CONCURRENT_STREAMS
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java49
-rw-r--r--jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java43
-rw-r--r--jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java42
-rw-r--r--jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/AbstractTest.java6
-rw-r--r--jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/MaxConcurrentStreamTest.java120
5 files changed, 236 insertions, 24 deletions
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java
index c642050..a0bda51 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java
@@ -110,7 +110,9 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
private final AtomicBoolean goAwaySent = new AtomicBoolean();
private final AtomicBoolean goAwayReceived = new AtomicBoolean();
private final AtomicInteger lastStreamId = new AtomicInteger();
+ private final AtomicInteger localStreamCount = new AtomicInteger(0);
private final FlowControlStrategy flowControlStrategy;
+ private volatile int maxConcurrentLocalStreams = -1;
private boolean flushing;
private Throwable failure;
@@ -181,6 +183,8 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
// TODO: for SPDYv3 we need to support the "slot" argument
SynStreamFrame synStream = new SynStreamFrame(version, synInfo.getFlags(), streamId, associatedStreamId, synInfo.getPriority(), (short)0, synInfo.getHeaders());
IStream stream = createStream(synStream, listener, true, promise);
+ if (stream == null)
+ return;
generateAndEnqueueControlFrame(stream, synStream, synInfo.getTimeout(), synInfo.getUnit(), stream);
}
flush();
@@ -535,15 +539,39 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
}
int streamId = stream.getId();
+
+ if (local)
+ {
+ while (true)
+ {
+ int oldStreamCountValue = localStreamCount.get();
+ int maxConcurrentStreams = maxConcurrentLocalStreams;
+ if (maxConcurrentStreams > -1 && oldStreamCountValue >= maxConcurrentStreams)
+ {
+ String msg = String.format("Max concurrent local streams (%d) exceeded.",
+ maxConcurrentStreams);
+ LOG.debug(msg);
+ promise.failed(new SPDYException(msg));
+ return null;
+ }
+ if (localStreamCount.compareAndSet(oldStreamCountValue, oldStreamCountValue + 1))
+ break;
+ }
+ }
+
if (streams.putIfAbsent(streamId, stream) != null)
{
+ //TODO: fail promise
if (local)
+ {
+ localStreamCount.decrementAndGet();
throw new IllegalStateException("Duplicate stream id " + streamId);
+ }
RstInfo rstInfo = new RstInfo(streamId, StreamStatus.PROTOCOL_ERROR);
LOG.debug("Duplicate stream, {}", rstInfo);
try
{
- rst(rstInfo);
+ rst(rstInfo); //TODO: non blocking reset or find the reason why blocking is used
}
catch (InterruptedException | ExecutionException | TimeoutException e)
{
@@ -554,8 +582,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
else
{
LOG.debug("Created {}", stream);
- if (local)
- notifyStreamCreated(stream);
+ notifyStreamCreated(stream);
return stream;
}
}
@@ -590,10 +617,15 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
IStream removed = streams.remove(stream.getId());
if (removed != null)
+ {
assert removed == stream;
- LOG.debug("Removed {}", stream);
- notifyStreamClosed(stream);
+ if (streamIds.get() % 2 == stream.getId() % 2)
+ localStreamCount.decrementAndGet();
+
+ LOG.debug("Removed {}", stream);
+ notifyStreamClosed(stream);
+ }
}
private void notifyStreamClosed(IStream stream)
@@ -666,6 +698,13 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
setWindowSize(windowSize);
LOG.debug("Updated session window size to {}", windowSize);
}
+ Settings.Setting maxConcurrentStreamsSetting = frame.getSettings().get(Settings.ID.MAX_CONCURRENT_STREAMS);
+ if (maxConcurrentStreamsSetting != null)
+ {
+ int maxConcurrentStreamsValue = maxConcurrentStreamsSetting.value();
+ maxConcurrentLocalStreams = maxConcurrentStreamsValue;
+ LOG.debug("Updated session maxConcurrentLocalStreams to {}", maxConcurrentStreamsValue);
+ }
SettingsInfo settingsInfo = new SettingsInfo(frame.getSettings(), frame.isClearPersisted());
notifyOnSettings(listener, settingsInfo);
flush();
diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java
index 44e56ff..813fb0c 100644
--- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java
+++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java
@@ -18,15 +18,6 @@
package org.eclipse.jetty.spdy;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.HashSet;
@@ -47,12 +38,14 @@ import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session;
+import org.eclipse.jetty.spdy.api.Settings;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StreamStatus;
import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.frames.DataFrame;
+import org.eclipse.jetty.spdy.frames.SettingsFrame;
import org.eclipse.jetty.spdy.frames.SynReplyFrame;
import org.eclipse.jetty.spdy.frames.SynStreamFrame;
import org.eclipse.jetty.spdy.generator.Generator;
@@ -74,6 +67,15 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
@RunWith(MockitoJUnitRunner.class)
public class StandardSessionTest
{
@@ -457,7 +459,30 @@ public class StandardSessionTest
stream.headers(new HeadersInfo(headers, true));
verify(controller, times(3)).write(any(ByteBuffer.class), any(Callback.class));
+ }
+
+ @Test
+ public void testMaxConcurrentStreams() throws InterruptedException
+ {
+ final CountDownLatch failedBecauseMaxConcurrentStreamsExceeded = new CountDownLatch(1);
+
+ Settings settings = new Settings();
+ settings.put(new Settings.Setting(Settings.ID.MAX_CONCURRENT_STREAMS, 0));
+ SettingsFrame settingsFrame = new SettingsFrame(VERSION, (byte)0, settings);
+ session.onControlFrame(settingsFrame);
+
+ PushSynInfo pushSynInfo = new PushSynInfo(1, new PushInfo(new Fields(), false));
+ session.syn(pushSynInfo, null, new Promise.Adapter<Stream>()
+ {
+ @Override
+ public void failed(Throwable x)
+ {
+ failedBecauseMaxConcurrentStreamsExceeded.countDown();
+ }
+ });
+ assertThat("Opening push stream failed because maxConcurrentStream is exceeded",
+ failedBecauseMaxConcurrentStreamsExceeded.await(5, TimeUnit.SECONDS), is(true));
}
@Test
diff --git a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java
index f52c875..099f736 100644
--- a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java
+++ b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java
@@ -101,14 +101,14 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
@Test
public void testPushHeadersAreValid() throws Exception
{
- sendMainRequestAndCSSRequest();
+ sendMainRequestAndCSSRequest(null);
run2ndClientRequests(true, true);
}
@Test
public void testClientResetsPushStreams() throws Exception
{
- sendMainRequestAndCSSRequest();
+ sendMainRequestAndCSSRequest(null);
final CountDownLatch pushDataLatch = new CountDownLatch(1);
final CountDownLatch pushSynHeadersValid = new CountDownLatch(1);
Session session = startClient(version, serverAddress, null);
@@ -125,14 +125,14 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
public void testUserAgentBlackList() throws Exception
{
pushStrategy.setUserAgentBlacklist(Arrays.asList(".*(?i)firefox/16.*"));
- sendMainRequestAndCSSRequest();
+ sendMainRequestAndCSSRequest(null);
run2ndClientRequests(false, false);
}
@Test
public void testReferrerPushPeriod() throws Exception
{
- Session session = sendMainRequestAndCSSRequest();
+ Session session = sendMainRequestAndCSSRequest(null);
// Sleep for pushPeriod This should prevent application.js from being mapped as pushResource
Thread.sleep(referrerPushPeriod + 1);
@@ -148,13 +148,38 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
connector.addConnectionFactory(defaultFactory);
connector.setDefaultProtocol(defaultFactory.getProtocol()); // TODO I don't think this is right
- Session session = sendMainRequestAndCSSRequest();
+ Session session = sendMainRequestAndCSSRequest(null);
sendRequest(session, associatedJSRequestHeaders, null, null);
run2ndClientRequests(false, true);
}
+ @Test
+ public void testMaxConcurrentStreamsToDisablePush() throws Exception
+ {
+ final CountDownLatch pushReceivedLatch = new CountDownLatch(1);
+ Session session = sendMainRequestAndCSSRequest(new SessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ pushReceivedLatch.countDown();
+ return null;
+ }
+ });
+
+// Settings settings = new Settings();
+// settings.put(new Settings.Setting(Settings.ID.MAX_CONCURRENT_STREAMS, 0));
+// SettingsInfo settingsInfo = new SettingsInfo(settings);
+//
+// session.settings(settingsInfo);
+
+ sendRequest(session, mainRequestHeaders, null, null);
+
+ assertThat(pushReceivedLatch.await(1, TimeUnit.SECONDS), is(false));
+ }
+
private InetSocketAddress createServer() throws Exception
{
GzipHandler gzipHandler = new GzipHandler();
@@ -177,9 +202,9 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
return startHTTPServer(version, gzipHandler);
}
- private Session sendMainRequestAndCSSRequest() throws Exception
+ private Session sendMainRequestAndCSSRequest(SessionFrameListener sessionFrameListener) throws Exception
{
- Session session = startClient(version, serverAddress, null);
+ Session session = startClient(version, serverAddress, sessionFrameListener);
sendRequest(session, mainRequestHeaders, null, null);
sendRequest(session, associatedCSSRequestHeaders, null, null);
@@ -197,7 +222,8 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
- validateHeaders(pushInfo.getHeaders(), pushSynHeadersValid);
+ if (pushSynHeadersValid != null)
+ validateHeaders(pushInfo.getHeaders(), pushSynHeadersValid);
assertThat("Stream is unidirectional", stream.isUnidirectional(), is(true));
assertThat("URI header ends with css", pushInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version))
diff --git a/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/AbstractTest.java b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/AbstractTest.java
index 08625cc..cdbe2ba 100644
--- a/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/AbstractTest.java
+++ b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/AbstractTest.java
@@ -53,13 +53,15 @@ public abstract class AbstractTest
}
};
+ protected final short version = SPDY.V2;
+
protected Server server;
protected SPDYClient.Factory clientFactory;
protected SPDYServerConnector connector;
protected InetSocketAddress startServer(ServerSessionFrameListener listener) throws Exception
{
- return startServer(SPDY.V2, listener);
+ return startServer(version, listener);
}
protected InetSocketAddress startServer(short version, ServerSessionFrameListener listener) throws Exception
@@ -99,7 +101,7 @@ public abstract class AbstractTest
protected Session startClient(InetSocketAddress socketAddress, SessionFrameListener listener) throws Exception
{
- return startClient(SPDY.V2, socketAddress, listener);
+ return startClient(version, socketAddress, listener);
}
protected Session startClient(short version, InetSocketAddress socketAddress, SessionFrameListener listener) throws Exception
diff --git a/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/MaxConcurrentStreamTest.java b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/MaxConcurrentStreamTest.java
new file mode 100644
index 0000000..5c74ca3
--- /dev/null
+++ b/jetty-spdy/spdy-server/src/test/java/org/eclipse/jetty/spdy/server/MaxConcurrentStreamTest.java
@@ -0,0 +1,120 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.spdy.server;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
+import org.eclipse.jetty.spdy.api.DataInfo;
+import org.eclipse.jetty.spdy.api.ReplyInfo;
+import org.eclipse.jetty.spdy.api.Session;
+import org.eclipse.jetty.spdy.api.SessionFrameListener;
+import org.eclipse.jetty.spdy.api.Settings;
+import org.eclipse.jetty.spdy.api.SettingsInfo;
+import org.eclipse.jetty.spdy.api.Stream;
+import org.eclipse.jetty.spdy.api.StreamFrameListener;
+import org.eclipse.jetty.spdy.api.SynInfo;
+import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
+import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.util.Fields;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+public class MaxConcurrentStreamTest extends AbstractTest
+{
+ @Test
+ public void testMaxConcurrentStreamsSetByServer() throws Exception, ExecutionException
+ {
+ final CountDownLatch settingsReceivedLatch = new CountDownLatch(1);
+ final CountDownLatch dataReceivedLatch = new CountDownLatch(1);
+
+ Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()
+ {
+ @Override
+ public void onConnect(Session session)
+ {
+ Settings settings = new Settings();
+ settings.put(new Settings.Setting(Settings.ID.MAX_CONCURRENT_STREAMS, 1));
+ try
+ {
+ session.settings(new SettingsInfo(settings));
+ }
+ catch (ExecutionException | InterruptedException | TimeoutException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ try
+ {
+ stream.reply(new ReplyInfo(true));
+ }
+ catch (ExecutionException | InterruptedException | TimeoutException e)
+ {
+ e.printStackTrace();
+ }
+ return new StreamFrameListener.Adapter()
+ {
+ @Override
+ public void onData(Stream stream, DataInfo dataInfo)
+ {
+ dataReceivedLatch.countDown();
+ }
+ };
+ }
+ }), new SessionFrameListener.Adapter()
+ {
+ @Override
+ public void onSettings(Session session, SettingsInfo settingsInfo)
+ {
+ settingsReceivedLatch.countDown();
+ }
+ });
+
+ assertThat("Settings frame received", settingsReceivedLatch.await(5, TimeUnit.SECONDS), is(true));
+
+ SynInfo synInfo = new SynInfo(new Fields(), false);
+ Stream stream = session.syn(synInfo, null);
+
+ boolean failed = false;
+ try
+ {
+ session.syn(synInfo, null);
+ }
+ catch (ExecutionException | InterruptedException | TimeoutException e)
+ {
+ failed = true;
+ }
+
+ assertThat("Opening second stream failed", failed, is(true));
+
+ stream.data(new ByteBufferDataInfo(BufferUtil.EMPTY_BUFFER, true));
+ assertThat("Data has been received on first stream.", dataReceivedLatch.await(5, TimeUnit.SECONDS), is(true));
+
+ session.syn(synInfo, null);
+ }
+}