Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java')
-rw-r--r--jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java129
1 files changed, 75 insertions, 54 deletions
diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java
index db2303cea3..89792d3bdd 100644
--- a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java
+++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java
@@ -15,6 +15,8 @@
*/
package org.eclipse.jetty.spdy;
+import static org.junit.Assert.*;
+import static org.hamcrest.Matchers.*;
import java.nio.ByteBuffer;
import java.util.concurrent.Callable;
@@ -25,6 +27,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
@@ -183,43 +186,22 @@ public class FlowControlTest extends AbstractTest
});
DataInfo dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
- // Check that we are flow control stalled
- expectException(TimeoutException.class, new Callable<DataInfo>()
- {
- @Override
- public DataInfo call() throws Exception
- {
- return exchanger.exchange(null, 1, TimeUnit.SECONDS);
- }
- });
+ checkThatWeAreFlowControlStalled(exchanger);
+
Assert.assertEquals(windowSize, dataInfo.available());
Assert.assertEquals(0, dataInfo.consumed());
dataInfo.asByteBuffer(true);
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
- // Check that we are flow control stalled
- expectException(TimeoutException.class, new Callable<DataInfo>()
- {
- @Override
- public DataInfo call() throws Exception
- {
- return exchanger.exchange(null, 1, TimeUnit.SECONDS);
- }
- });
+ checkThatWeAreFlowControlStalled(exchanger);
+
Assert.assertEquals(0, dataInfo.available());
Assert.assertEquals(0, dataInfo.consumed());
dataInfo.consume(dataInfo.length());
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
- // Check that we are flow control stalled
- expectException(TimeoutException.class, new Callable<DataInfo>()
- {
- @Override
- public DataInfo call() throws Exception
- {
- return exchanger.exchange(null, 1, TimeUnit.SECONDS);
- }
- });
+ checkThatWeAreFlowControlStalled(exchanger);
+
Assert.assertEquals(dataInfo.length() / 2, dataInfo.consumed());
dataInfo.asByteBuffer(true);
@@ -312,43 +294,22 @@ public class FlowControlTest extends AbstractTest
stream.data(new BytesDataInfo(new byte[length], true));
DataInfo dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
- // Check that we are flow control stalled
- expectException(TimeoutException.class, new Callable<DataInfo>()
- {
- @Override
- public DataInfo call() throws Exception
- {
- return exchanger.exchange(null, 1, TimeUnit.SECONDS);
- }
- });
+ checkThatWeAreFlowControlStalled(exchanger);
+
Assert.assertEquals(windowSize, dataInfo.available());
Assert.assertEquals(0, dataInfo.consumed());
dataInfo.asByteBuffer(true);
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
- // Check that we are flow control stalled
- expectException(TimeoutException.class, new Callable<DataInfo>()
- {
- @Override
- public DataInfo call() throws Exception
- {
- return exchanger.exchange(null, 1, TimeUnit.SECONDS);
- }
- });
+ checkThatWeAreFlowControlStalled(exchanger);
+
Assert.assertEquals(0, dataInfo.available());
Assert.assertEquals(0, dataInfo.consumed());
dataInfo.consume(dataInfo.length());
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
- // Check that we are flow control stalled
- expectException(TimeoutException.class, new Callable<DataInfo>()
- {
- @Override
- public DataInfo call() throws Exception
- {
- return exchanger.exchange(null, 1, TimeUnit.SECONDS);
- }
- });
+ checkThatWeAreFlowControlStalled(exchanger);
+
Assert.assertEquals(dataInfo.length() / 2, dataInfo.consumed());
dataInfo.asByteBuffer(true);
@@ -451,6 +412,66 @@ public class FlowControlTest extends AbstractTest
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
+ @Test
+ public void testSendBigFileWithoutFlowControl() throws Exception
+ {
+ boolean flowControlEnabled = false;
+ testSendBigFile(flowControlEnabled);
+ }
+
+ @Test
+ public void testSendBigFileWithFlowControl() throws Exception
+ {
+ boolean flowControlEnabled = true;
+ testSendBigFile(flowControlEnabled);
+ }
+
+ private void testSendBigFile(boolean flowControlEnabled) throws Exception, InterruptedException
+ {
+ final int dataSize = 1024 * 1024;
+ final ByteBufferDataInfo bigByteBufferDataInfo = new ByteBufferDataInfo(ByteBuffer.allocate(dataSize),false);
+ final CountDownLatch allDataReceivedLatch = new CountDownLatch(1);
+
+ Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ stream.reply(new ReplyInfo(false));
+ stream.data(bigByteBufferDataInfo);
+ return null;
+ }
+ },flowControlEnabled),new SessionFrameListener.Adapter());
+
+ session.syn(new SynInfo(false),new StreamFrameListener.Adapter()
+ {
+ private int dataBytesReceived;
+
+ @Override
+ public void onData(Stream stream, DataInfo dataInfo)
+ {
+ dataBytesReceived = dataBytesReceived + dataInfo.length();
+ dataInfo.consume(dataInfo.length());
+ if (dataBytesReceived == dataSize)
+ allDataReceivedLatch.countDown();
+ }
+ });
+
+ assertThat("all data bytes have been received by the client",allDataReceivedLatch.await(5,TimeUnit.SECONDS),is(true));
+ }
+
+ private void checkThatWeAreFlowControlStalled(final Exchanger<DataInfo> exchanger)
+ {
+ expectException(TimeoutException.class, new Callable<DataInfo>()
+ {
+ @Override
+ public DataInfo call() throws Exception
+ {
+ return exchanger.exchange(null, 1, TimeUnit.SECONDS);
+ }
+ });
+ }
+
private void expectException(Class<? extends Exception> exception, Callable<DataInfo> command)
{
try

Back to the top