aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorThomas Becker2012-05-25 14:21:10 (EDT)
committerThomas Becker2012-05-25 14:21:10 (EDT)
commitb92e7b01a972deb50e8305eb159d10b38b685fbd (patch)
tree3bb230621fbc93a718b75c563ce6909a3327d7a0
parent98ed2a9d9a6c4dc0747a555c0efa279cb2d30c44 (diff)
downloadorg.eclipse.jetty.project-b92e7b01a972deb50e8305eb159d10b38b685fbd.zip
org.eclipse.jetty.project-b92e7b01a972deb50e8305eb159d10b38b685fbd.tar.gz
org.eclipse.jetty.project-b92e7b01a972deb50e8305eb159d10b38b685fbd.tar.bz2
spdy: improve errorHandling, additional tests for sending big data with/without flow control, test that no more frames are sent on reset pushstreams, test for failing controller.writer(), small improvementsrefs/changes/26/6126/1
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Promise.java3
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java108
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java2
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Handler.java9
-rw-r--r--jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java4
-rw-r--r--jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java137
-rw-r--r--jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java20
-rw-r--r--jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/HTTPSPDYServerConnector.java1
-rw-r--r--jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYAsyncConnection.java3
-rw-r--r--jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java2
-rw-r--r--jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYServerConnector.java11
-rw-r--r--jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java1
-rw-r--r--jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/AbstractTest.java6
-rw-r--r--jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ClosedStreamTest.java11
-rw-r--r--jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java129
-rw-r--r--jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/PushStreamTest.java217
-rw-r--r--jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ResetStreamTest.java4
17 files changed, 500 insertions, 168 deletions
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Promise.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Promise.java
index d44da62..6f10787 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Promise.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Promise.java
@@ -44,7 +44,8 @@ public class Promise<T> implements Handler<T>, Future<T>
latch.countDown();
}
- public void failed(Throwable x)
+ @Override
+ public void failed(T context, Throwable x)
{
this.failure = x;
latch.countDown();
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java
index b77e9eb..7988e5b 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java
@@ -18,6 +18,7 @@ package org.eclipse.jetty.spdy;
import java.nio.ByteBuffer;
import java.nio.channels.InterruptedByTimeoutException;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -93,6 +94,8 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private final AtomicBoolean goAwayReceived = new AtomicBoolean();
private final AtomicInteger lastStreamId = new AtomicInteger();
private boolean flushing;
+ private boolean failed = false;
+ private volatile boolean flowControlEnabled = true;
private volatile int windowSize = 65536;
public StandardSession(short version, ByteBufferPool bufferPool, Executor threadPool, ScheduledExecutorService scheduler,
@@ -735,11 +738,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
try
{
if (stream != null)
- {
updateLastStreamId(stream);
- if (stream.isClosed())
- removeStream(stream);
- }
// Synchronization is necessary, since we may have concurrent replies
// and those needs to be generated and enqueued atomically in order
@@ -759,9 +758,9 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
append(frameBytes);
}
}
- catch (Throwable x)
+ catch (Exception x)
{
- notifyHandlerFailed(handler, x);
+ notifyHandlerFailed(handler, context, x);
}
}
@@ -787,9 +786,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
logger.debug("Queuing {} on {}",dataInfo,stream);
DataFrameBytes<C> frameBytes = new DataFrameBytes<>(stream,handler,context,dataInfo);
if (timeout > 0)
- {
frameBytes.task = scheduler.schedule(frameBytes,timeout,unit);
- }
append(frameBytes);
flush();
}
@@ -822,9 +819,11 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
if (buffer != null)
{
queue.remove(i);
- // TODO: stream.isUniDirectional() check here is only needed for pushStreams which send a syn with close=true --> find a better solution
- if (stream != null && !streams.containsValue(stream) && !stream.isUnidirectional())
+ if (stream != null && stream.isReset())
+ {
frameBytes.fail(new StreamException(stream.getId(),StreamStatus.INVALID_STREAM));
+ return;
+ }
break;
}
@@ -847,34 +846,50 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private void append(FrameBytes frameBytes)
{
+ boolean fail;
synchronized (queue)
{
- int index = queue.size();
- while (index > 0)
+ fail = failed;
+ if (!fail)
{
- FrameBytes element = queue.get(index - 1);
- if (element.compareTo(frameBytes) >= 0)
- break;
- --index;
+ int index = queue.size();
+ while (index > 0)
+ {
+ FrameBytes element = queue.get(index - 1);
+ if (element.compareTo(frameBytes) >= 0)
+ break;
+ --index;
+ }
+ queue.add(index,frameBytes);
}
- queue.add(index,frameBytes);
}
+
+ if (fail)
+ frameBytes.fail(new SPDYException("Session failed"));
}
private void prepend(FrameBytes frameBytes)
{
+ boolean fail;
synchronized (queue)
{
- int index = 0;
- while (index < queue.size())
+ fail = failed;
+ if (!fail)
{
- FrameBytes element = queue.get(index);
- if (element.compareTo(frameBytes) <= 0)
- break;
- ++index;
+ int index = 0;
+ while (index < queue.size())
+ {
+ FrameBytes element = queue.get(index);
+ if (element.compareTo(frameBytes) <= 0)
+ break;
+ ++index;
+ }
+ queue.add(index,frameBytes);
}
- queue.add(index,frameBytes);
}
+
+ if (fail)
+ frameBytes.fail(new SPDYException("Session failed"));
}
@Override
@@ -889,9 +904,23 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
@Override
- public void failed(Throwable x)
+ public void failed(FrameBytes frameBytes, Throwable x)
{
- throw new SPDYException(x);
+ List<FrameBytes> frameBytesToFail = new ArrayList<>();
+ frameBytesToFail.add(frameBytes);
+
+ synchronized (queue)
+ {
+ failed = true;
+ String logMessage = String.format("Failed write of %s, failing all %d frame(s) in queue",frameBytes,queue.size());
+ logger.debug(logMessage,x);
+ frameBytesToFail.addAll(queue);
+ queue.clear();
+ flushing = false;
+ }
+
+ for (FrameBytes fb : frameBytesToFail)
+ fb.fail(x);
}
protected void write(ByteBuffer buffer, Handler<FrameBytes> handler, FrameBytes frameBytes)
@@ -951,12 +980,12 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
}
- private <C> void notifyHandlerFailed(Handler<C> handler, Throwable x)
+ private <C> void notifyHandlerFailed(Handler<C> handler, C context, Throwable x)
{
try
{
if (handler != null)
- handler.failed(x);
+ handler.failed(context, x);
}
catch (Exception xx)
{
@@ -1013,7 +1042,8 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
public void fail(Throwable x)
{
cancelTask();
- notifyHandlerFailed(handler,x);
+ notifyHandlerFailed(handler,context,x);
+ StandardSession.this.flush();
}
private void cancelTask()
@@ -1062,6 +1092,9 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
// Recipients will know the last good stream id and act accordingly.
close();
}
+ IStream stream = getStream();
+ if (stream != null && stream.isClosed())
+ removeStream(stream);
}
@Override
@@ -1112,14 +1145,17 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
{
bufferPool.release(buffer);
IStream stream = getStream();
- stream.updateWindowSize(-size);
-
+ boolean flowControlEnabled = StandardSession.this.flowControlEnabled;
+ if (flowControlEnabled)
+ stream.updateWindowSize(-size);
if (dataInfo.available() > 0)
{
// We have written a frame out of this DataInfo, but there is more to write.
// We need to keep the correct ordering of frames, to avoid that another
// DataInfo for the same stream is written before this one is finished.
prepend(this);
+ if (!flowControlEnabled)
+ flush();
}
else
{
@@ -1136,4 +1172,14 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
return String.format("DATA bytes @%x available=%d consumed=%d on %s",dataInfo.hashCode(),dataInfo.available(),dataInfo.consumed(),getStream());
}
}
+
+ public boolean isFlowControlEnabled()
+ {
+ return flowControlEnabled;
+ }
+
+ public void setFlowControlEnabled(boolean flowControl)
+ {
+ this.flowControlEnabled = flowControl;
+ }
}
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java
index 4de901c..43c2876 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java
@@ -345,7 +345,7 @@ public class StandardStream implements IStream
{
if (isClosed() || isReset())
{
- handler.failed(new StreamException(getId(),StreamStatus.STREAM_ALREADY_CLOSED));
+ handler.failed(this, new StreamException(getId(),StreamStatus.STREAM_ALREADY_CLOSED));
return;
}
PushSynInfo pushSynInfo = new PushSynInfo(getId(),synInfo);
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Handler.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Handler.java
index 479d33f..c94d5ef 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Handler.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Handler.java
@@ -28,16 +28,16 @@ public interface Handler<C>
* <p>Callback invoked when the operation completes.</p>
*
* @param context the context
- * @see #failed(Throwable)
+ * @see #failed(Object, Throwable)
*/
public abstract void completed(C context);
/**
* <p>Callback invoked when the operation fails.</p>
- *
+ * @param context the context
* @param x the reason for the operation failure
*/
- public void failed(Throwable x);
+ public void failed(C context, Throwable x);
/**
* <p>Empty implementation of {@link Handler}</p>
@@ -52,9 +52,8 @@ public interface Handler<C>
}
@Override
- public void failed(Throwable x)
+ public void failed(C context, Throwable x)
{
- throw new SPDYException(x);
}
}
}
diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java
index 85cea34..d003ed4 100644
--- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java
+++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java
@@ -72,7 +72,7 @@ public class AsyncTimeoutTest
}
@Override
- public void failed(Throwable x)
+ public void failed(Stream stream, Throwable x)
{
failedLatch.countDown();
}
@@ -120,7 +120,7 @@ public class AsyncTimeoutTest
}
@Override
- public void failed(Throwable x)
+ public void failed(Void context, Throwable x)
{
failedLatch.countDown();
}
diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java
index c999c34..fed8180 100644
--- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java
+++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java
@@ -19,13 +19,10 @@ package org.eclipse.jetty.spdy;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
+import static org.mockito.Mockito.*;
import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-
import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
@@ -34,6 +31,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.eclipse.jetty.spdy.StandardSession.FrameBytes;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.Headers;
@@ -55,13 +53,16 @@ import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
@RunWith(MockitoJUnitRunner.class)
public class StandardSessionTest
{
@Mock
- private ISession sessionMock;
+ private Controller<FrameBytes> controller;
+
private ByteBufferPool bufferPool;
private Executor threadPool;
private StandardSession session;
@@ -76,13 +77,36 @@ public class StandardSessionTest
threadPool = Executors.newCachedThreadPool();
scheduler = Executors.newSingleThreadScheduledExecutor();
generator = new Generator(new StandardByteBufferPool(),new StandardCompressionFactory.StandardCompressor());
- session = new StandardSession(SPDY.V2,bufferPool,threadPool,scheduler,new TestController(),null,1,null,generator);
+ session = new StandardSession(SPDY.V2,bufferPool,threadPool,scheduler,controller,null,1,null,generator);
headers = new Headers();
}
+ @SuppressWarnings("unchecked")
+ private void setControllerWriteExpectationToFail(final boolean fail)
+ {
+ when(controller.write(any(ByteBuffer.class),any(Handler.class),any(StandardSession.FrameBytes.class))).thenAnswer(new Answer<Integer>()
+ {
+ public Integer answer(InvocationOnMock invocation)
+ {
+ Object[] args = invocation.getArguments();
+
+ Handler<StandardSession.FrameBytes> handler = (Handler<FrameBytes>)args[1];
+ FrameBytes context = (FrameBytes)args[2];
+
+ if (fail)
+ handler.failed(context,new ClosedChannelException());
+ else
+ handler.completed(context);
+ return 0;
+ }
+ });
+ }
+
@Test
public void testStreamIsRemovedFromSessionWhenReset() throws InterruptedException, ExecutionException, TimeoutException
{
+ setControllerWriteExpectationToFail(false);
+
IStream stream = createStream();
assertThatStreamIsInSession(stream);
assertThat("stream is not reset",stream.isReset(),is(false));
@@ -94,6 +118,8 @@ public class StandardSessionTest
@Test
public void testStreamIsAddedAndRemovedFromSession() throws InterruptedException, ExecutionException, TimeoutException
{
+ setControllerWriteExpectationToFail(false);
+
IStream stream = createStream();
assertThatStreamIsInSession(stream);
stream.updateCloseState(true,true);
@@ -105,6 +131,8 @@ public class StandardSessionTest
@Test
public void testStreamIsRemovedWhenHeadersWithCloseFlagAreSent() throws InterruptedException, ExecutionException, TimeoutException
{
+ setControllerWriteExpectationToFail(false);
+
IStream stream = createStream();
assertThatStreamIsInSession(stream);
stream.updateCloseState(true,false);
@@ -116,6 +144,8 @@ public class StandardSessionTest
@Test
public void testStreamIsUnidirectional() throws InterruptedException, ExecutionException, TimeoutException
{
+ setControllerWriteExpectationToFail(false);
+
IStream stream = createStream();
assertThat("stream is not unidirectional",stream.isUnidirectional(),not(true));
Stream pushStream = createPushStream(stream);
@@ -125,6 +155,8 @@ public class StandardSessionTest
@Test
public void testPushStreamCreation() throws InterruptedException, ExecutionException, TimeoutException
{
+ setControllerWriteExpectationToFail(false);
+
Stream stream = createStream();
IStream pushStream = createPushStream(stream);
assertThat("Push stream must be associated to the first stream created",pushStream.getAssociatedStream().getId(),is(stream.getId()));
@@ -134,6 +166,8 @@ public class StandardSessionTest
@Test
public void testPushStreamIsNotClosedWhenAssociatedStreamIsClosed() throws InterruptedException, ExecutionException, TimeoutException
{
+ setControllerWriteExpectationToFail(false);
+
IStream stream = createStream();
Stream pushStream = createPushStream(stream);
assertThatStreamIsNotHalfClosed(stream);
@@ -155,6 +189,8 @@ public class StandardSessionTest
@Test
public void testCreatePushStreamOnClosedStream() throws InterruptedException, ExecutionException, TimeoutException
{
+ setControllerWriteExpectationToFail(false);
+
IStream stream = createStream();
stream.updateCloseState(true,true);
assertThatStreamIsHalfClosed(stream);
@@ -167,15 +203,10 @@ public class StandardSessionTest
{
final CountDownLatch failedLatch = new CountDownLatch(1);
SynInfo synInfo = new SynInfo(headers,false,stream.getPriority());
- stream.syn(synInfo,5,TimeUnit.SECONDS,new Handler<Stream>()
+ stream.syn(synInfo,5,TimeUnit.SECONDS,new Handler.Adapter<Stream>()
{
@Override
- public void completed(Stream context)
- {
- }
-
- @Override
- public void failed(Throwable x)
+ public void failed(Stream stream, Throwable x)
{
failedLatch.countDown();
}
@@ -186,6 +217,8 @@ public class StandardSessionTest
@Test
public void testPushStreamIsAddedAndRemovedFromParentAndSessionWhenClosed() throws InterruptedException, ExecutionException, TimeoutException
{
+ setControllerWriteExpectationToFail(false);
+
IStream stream = createStream();
IStream pushStream = createPushStream(stream);
assertThatPushStreamIsHalfClosed(pushStream);
@@ -200,6 +233,8 @@ public class StandardSessionTest
@Test
public void testPushStreamIsRemovedWhenReset() throws InterruptedException, ExecutionException, TimeoutException
{
+ setControllerWriteExpectationToFail(false);
+
IStream stream = createStream();
IStream pushStream = (IStream)stream.syn(new SynInfo(false)).get();
assertThatPushStreamIsInSession(pushStream);
@@ -212,6 +247,8 @@ public class StandardSessionTest
@Test
public void testPushStreamWithSynInfoClosedTrue() throws InterruptedException, ExecutionException, TimeoutException
{
+ setControllerWriteExpectationToFail(false);
+
IStream stream = createStream();
SynInfo synInfo = new SynInfo(headers,true,stream.getPriority());
IStream pushStream = (IStream)stream.syn(synInfo).get(5,TimeUnit.SECONDS);
@@ -225,6 +262,8 @@ public class StandardSessionTest
public void testPushStreamSendHeadersWithCloseFlagIsRemovedFromSessionAndDisassociateFromParent() throws InterruptedException, ExecutionException,
TimeoutException
{
+ setControllerWriteExpectationToFail(false);
+
IStream stream = createStream();
SynInfo synInfo = new SynInfo(headers,false,stream.getPriority());
IStream pushStream = (IStream)stream.syn(synInfo).get(5,TimeUnit.SECONDS);
@@ -240,6 +279,8 @@ public class StandardSessionTest
@Test
public void testCreatedAndClosedListenersAreCalledForNewStream() throws InterruptedException, ExecutionException, TimeoutException
{
+ setControllerWriteExpectationToFail(false);
+
final CountDownLatch createdListenerCalledLatch = new CountDownLatch(1);
final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1);
session.addListener(new TestStreamListener(createdListenerCalledLatch,closedListenerCalledLatch));
@@ -253,6 +294,8 @@ public class StandardSessionTest
@Test
public void testListenerIsCalledForResetStream() throws InterruptedException, ExecutionException, TimeoutException
{
+ setControllerWriteExpectationToFail(false);
+
final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1);
session.addListener(new TestStreamListener(null,closedListenerCalledLatch));
IStream stream = createStream();
@@ -263,6 +306,8 @@ public class StandardSessionTest
@Test
public void testCreatedAndClosedListenersAreCalledForNewPushStream() throws InterruptedException, ExecutionException, TimeoutException
{
+ setControllerWriteExpectationToFail(false);
+
final CountDownLatch createdListenerCalledLatch = new CountDownLatch(2);
final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1);
session.addListener(new TestStreamListener(createdListenerCalledLatch,closedListenerCalledLatch));
@@ -277,6 +322,8 @@ public class StandardSessionTest
@Test
public void testListenerIsCalledForResetPushStream() throws InterruptedException, ExecutionException, TimeoutException
{
+ setControllerWriteExpectationToFail(false);
+
final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1);
session.addListener(new TestStreamListener(null,closedListenerCalledLatch));
IStream stream = createStream();
@@ -313,22 +360,12 @@ public class StandardSessionTest
}
}
- @SuppressWarnings("unchecked")
- @Test(expected = IllegalStateException.class)
- public void testSendDataOnHalfClosedStream() throws InterruptedException, ExecutionException, TimeoutException
- {
- SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2,SynInfo.FLAG_CLOSE,1,0,(byte)0,null);
- IStream stream = new StandardStream(synStreamFrame,sessionMock,8184,null);
- stream.updateCloseState(synStreamFrame.isClose(),true);
- assertThat("stream is half closed",stream.isHalfClosed(),is(true));
- stream.data(new StringDataInfo("data on half closed stream",true));
- verify(sessionMock,never()).data(any(IStream.class),any(DataInfo.class),anyInt(),any(TimeUnit.class),any(Handler.class),any(void.class));
- }
-
@Test
@Ignore("In V3 we need to rst the stream if we receive data on a remotely half closed stream.")
public void receiveDataOnRemotelyHalfClosedStreamResetsStreamInV3() throws InterruptedException, ExecutionException
{
+ setControllerWriteExpectationToFail(false);
+
IStream stream = (IStream)session.syn(new SynInfo(false),new StreamFrameListener.Adapter()).get();
stream.updateCloseState(true,false);
assertThat("stream is half closed from remote side",stream.isHalfClosed(),is(true));
@@ -338,6 +375,8 @@ public class StandardSessionTest
@Test
public void testReceiveDataOnRemotelyClosedStreamIsIgnored() throws InterruptedException, ExecutionException, TimeoutException
{
+ setControllerWriteExpectationToFail(false);
+
final CountDownLatch onDataCalledLatch = new CountDownLatch(1);
Stream stream = session.syn(new SynInfo(false),new StreamFrameListener.Adapter()
{
@@ -353,10 +392,39 @@ public class StandardSessionTest
assertThat("onData is never called",onDataCalledLatch.await(1,TimeUnit.SECONDS),not(true));
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testControllerWriteFailsInEndPointFlush() throws InterruptedException
+ {
+ setControllerWriteExpectationToFail(true);
+
+ final CountDownLatch failedCalledLatch = new CountDownLatch(2);
+ SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2,SynInfo.FLAG_CLOSE,1,0,(byte)0,null);
+ IStream stream = new StandardStream(synStreamFrame,session,8192,null);
+
+ Handler.Adapter<Void> handler = new Handler.Adapter<Void>()
+ {
+ @Override
+ public void failed(Void context, Throwable x)
+ {
+ failedCalledLatch.countDown();
+ }
+ };
+
+ // first data frame should fail on controller.write()
+ stream.data(new StringDataInfo("data",false),5,TimeUnit.SECONDS,handler);
+ // second data frame should fail without controller.writer() as the connection is expected to be broken after first controller.write() call failed.
+ stream.data(new StringDataInfo("data",false),5,TimeUnit.SECONDS,handler);
+
+ verify(controller,times(1)).write(any(ByteBuffer.class),any(Handler.class),any(FrameBytes.class));
+ assertThat("Handler.failed has been called twice",failedCalledLatch.await(5,TimeUnit.SECONDS),is(true));
+
+ }
+
private IStream createStream() throws InterruptedException, ExecutionException, TimeoutException
{
SynInfo synInfo = new SynInfo(headers,false,(byte)0);
- return (IStream)session.syn(synInfo,new StreamFrameListener.Adapter()).get(5,TimeUnit.SECONDS);
+ return (IStream)session.syn(synInfo,new StreamFrameListener.Adapter()).get(50,TimeUnit.SECONDS);
}
private IStream createPushStream(Stream stream) throws InterruptedException, ExecutionException, TimeoutException
@@ -365,21 +433,6 @@ public class StandardSessionTest
return (IStream)stream.syn(synInfo).get(5,TimeUnit.SECONDS);
}
- private static class TestController implements Controller<StandardSession.FrameBytes>
- {
- @Override
- public int write(ByteBuffer buffer, Handler<StandardSession.FrameBytes> handler, StandardSession.FrameBytes context)
- {
- handler.completed(context);
- return buffer.remaining();
- }
-
- @Override
- public void close(boolean onlyOutput)
- {
- }
- }
-
private void assertThatStreamIsClosed(IStream stream)
{
assertThat("stream is closed",stream.isClosed(),is(true));
diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java
index 68a6a95..ba5f84d 100644
--- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java
+++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java
@@ -18,19 +18,26 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Handler;
+import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
+import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.frames.SynStreamFrame;
import org.junit.Test;
@@ -101,7 +108,7 @@ public class StandardStreamTest
stream.syn(new SynInfo(false),1,TimeUnit.SECONDS,new Handler.Adapter<Stream>()
{
@Override
- public void failed(Throwable x)
+ public void failed(Stream stream, Throwable x)
{
failedLatch.countDown();
}
@@ -109,4 +116,15 @@ public class StandardStreamTest
assertThat("PushStream creation failed", failedLatch.getCount(), equalTo(0L));
}
+ @SuppressWarnings("unchecked")
+ @Test(expected = IllegalStateException.class)
+ public void testSendDataOnHalfClosedStream() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2,SynInfo.FLAG_CLOSE,1,0,(byte)0,null);
+ IStream stream = new StandardStream(synStreamFrame,session,8192,null);
+ stream.updateCloseState(synStreamFrame.isClose(),true);
+ assertThat("stream is half closed",stream.isHalfClosed(),is(true));
+ stream.data(new StringDataInfo("data on half closed stream",true));
+ verify(session,never()).data(any(IStream.class),any(DataInfo.class),anyInt(),any(TimeUnit.class),any(Handler.class),any(void.class));
+ }
}
diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/HTTPSPDYServerConnector.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/HTTPSPDYServerConnector.java
index 63544a0..8a01d0c 100644
--- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/HTTPSPDYServerConnector.java
+++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/HTTPSPDYServerConnector.java
@@ -41,6 +41,7 @@ public class HTTPSPDYServerConnector extends SPDYServerConnector
super(null, sslContextFactory);
// Override the default connection factory for non-SSL connections
defaultConnectionFactory = new ServerHTTPAsyncConnectionFactory(this);
+ setFlowControlEnabled(false);
}
@Override
diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYAsyncConnection.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYAsyncConnection.java
index 40dfecd..faa51a3 100644
--- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYAsyncConnection.java
+++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYAsyncConnection.java
@@ -122,7 +122,8 @@ public class SPDYAsyncConnection extends AbstractConnection implements AsyncConn
catch (Exception x)
{
close(false);
- handler.failed(x);
+ handler.failed(context, x);
+ return -1;
}
finally
{
diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java
index 438e2e3..6f04c5b 100644
--- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java
+++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java
@@ -398,7 +398,7 @@ public class SPDYClient
}
catch (RuntimeException x)
{
- sessionPromise.failed(x);
+ sessionPromise.failed(null,x);
throw x;
}
}
diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYServerConnector.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYServerConnector.java
index 41412b3..1e431e2 100644
--- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYServerConnector.java
+++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYServerConnector.java
@@ -58,6 +58,7 @@ public class SPDYServerConnector extends SelectChannelConnector
private final ServerSessionFrameListener listener;
private final SslContextFactory sslContextFactory;
private AsyncConnectionFactory defaultConnectionFactory;
+ private volatile boolean flowControlEnabled = true;
public SPDYServerConnector(ServerSessionFrameListener listener)
{
@@ -287,4 +288,14 @@ public class SPDYServerConnector extends SelectChannelConnector
{
return Collections.unmodifiableCollection(sessions);
}
+
+ public boolean isFlowControlEnabled()
+ {
+ return flowControlEnabled;
+ }
+
+ public void setFlowControlEnabled(boolean flowControl)
+ {
+ this.flowControlEnabled = flowControl;
+ }
}
diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java
index 79ac28d..4b187d7 100644
--- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java
+++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java
@@ -67,6 +67,7 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
endPoint.setConnection(connection);
final StandardSession session = new StandardSession(version, bufferPool, threadPool, scheduler, connection, connection, 2, listener, generator);
+ session.setFlowControlEnabled(connector.isFlowControlEnabled());
parser.addListener(session);
connection.setSession(session);
diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/AbstractTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/AbstractTest.java
index b30db27..262ee2d 100644
--- a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/AbstractTest.java
+++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/AbstractTest.java
@@ -53,9 +53,15 @@ public abstract class AbstractTest
protected InetSocketAddress startServer(ServerSessionFrameListener listener) throws Exception
{
+ return startServer(listener,true);
+ }
+
+ protected InetSocketAddress startServer(ServerSessionFrameListener listener, boolean flowControl) throws Exception
+ {
if (connector == null)
connector = newSPDYServerConnector(listener);
connector.setPort(0);
+ connector.setFlowControlEnabled(flowControl);
server = new Server();
server.addConnector(connector);
server.start();
diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ClosedStreamTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ClosedStreamTest.java
index 7b399f3..e58209c 100644
--- a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ClosedStreamTest.java
+++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ClosedStreamTest.java
@@ -38,7 +38,6 @@ import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.spdy.frames.ControlFrame;
-import org.eclipse.jetty.spdy.frames.DataFrame;
import org.eclipse.jetty.spdy.frames.GoAwayFrame;
import org.eclipse.jetty.spdy.frames.RstStreamFrame;
import org.eclipse.jetty.spdy.frames.SynReplyFrame;
@@ -145,14 +144,12 @@ public class ClosedStreamTest extends AbstractTest
public void onReply(Stream stream, ReplyInfo replyInfo)
{
replyReceivedLatch.countDown();
- super.onReply(stream,replyInfo);
}
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
clientReceivedDataLatch.countDown();
- super.onData(stream,dataInfo);
}
}).get();
assertThat("reply has been received by client",replyReceivedLatch.await(5,TimeUnit.SECONDS),is(true));
@@ -204,7 +201,6 @@ public class ClosedStreamTest extends AbstractTest
public void onData(Stream stream, DataInfo dataInfo)
{
serverDataReceivedLatch.countDown();
- super.onData(stream,dataInfo);
}
};
}
@@ -250,13 +246,6 @@ public class ClosedStreamTest extends AbstractTest
{
clientResetReceivedLatch.countDown();
}
- super.onControlFrame(frame);
- }
-
- @Override
- public void onDataFrame(DataFrame frame, ByteBuffer data)
- {
- super.onDataFrame(frame,data);
}
});
ByteBuffer response = ByteBuffer.allocate(28);
diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java
index db2303c..89792d3 100644
--- a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java
+++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java
@@ -15,6 +15,8 @@
*/
package org.eclipse.jetty.spdy;
+import static org.junit.Assert.*;
+import static org.hamcrest.Matchers.*;
import java.nio.ByteBuffer;
import java.util.concurrent.Callable;
@@ -25,6 +27,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
@@ -183,43 +186,22 @@ public class FlowControlTest extends AbstractTest
});
DataInfo dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
- // Check that we are flow control stalled
- expectException(TimeoutException.class, new Callable<DataInfo>()
- {
- @Override
- public DataInfo call() throws Exception
- {
- return exchanger.exchange(null, 1, TimeUnit.SECONDS);
- }
- });
+ checkThatWeAreFlowControlStalled(exchanger);
+
Assert.assertEquals(windowSize, dataInfo.available());
Assert.assertEquals(0, dataInfo.consumed());
dataInfo.asByteBuffer(true);
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
- // Check that we are flow control stalled
- expectException(TimeoutException.class, new Callable<DataInfo>()
- {
- @Override
- public DataInfo call() throws Exception
- {
- return exchanger.exchange(null, 1, TimeUnit.SECONDS);
- }
- });
+ checkThatWeAreFlowControlStalled(exchanger);
+
Assert.assertEquals(0, dataInfo.available());
Assert.assertEquals(0, dataInfo.consumed());
dataInfo.consume(dataInfo.length());
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
- // Check that we are flow control stalled
- expectException(TimeoutException.class, new Callable<DataInfo>()
- {
- @Override
- public DataInfo call() throws Exception
- {
- return exchanger.exchange(null, 1, TimeUnit.SECONDS);
- }
- });
+ checkThatWeAreFlowControlStalled(exchanger);
+
Assert.assertEquals(dataInfo.length() / 2, dataInfo.consumed());
dataInfo.asByteBuffer(true);
@@ -312,43 +294,22 @@ public class FlowControlTest extends AbstractTest
stream.data(new BytesDataInfo(new byte[length], true));
DataInfo dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
- // Check that we are flow control stalled
- expectException(TimeoutException.class, new Callable<DataInfo>()
- {
- @Override
- public DataInfo call() throws Exception
- {
- return exchanger.exchange(null, 1, TimeUnit.SECONDS);
- }
- });
+ checkThatWeAreFlowControlStalled(exchanger);
+
Assert.assertEquals(windowSize, dataInfo.available());
Assert.assertEquals(0, dataInfo.consumed());
dataInfo.asByteBuffer(true);
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
- // Check that we are flow control stalled
- expectException(TimeoutException.class, new Callable<DataInfo>()
- {
- @Override
- public DataInfo call() throws Exception
- {
- return exchanger.exchange(null, 1, TimeUnit.SECONDS);
- }
- });
+ checkThatWeAreFlowControlStalled(exchanger);
+
Assert.assertEquals(0, dataInfo.available());
Assert.assertEquals(0, dataInfo.consumed());
dataInfo.consume(dataInfo.length());
dataInfo = exchanger.exchange(null, 5, TimeUnit.SECONDS);
- // Check that we are flow control stalled
- expectException(TimeoutException.class, new Callable<DataInfo>()
- {
- @Override
- public DataInfo call() throws Exception
- {
- return exchanger.exchange(null, 1, TimeUnit.SECONDS);
- }
- });
+ checkThatWeAreFlowControlStalled(exchanger);
+
Assert.assertEquals(dataInfo.length() / 2, dataInfo.consumed());
dataInfo.asByteBuffer(true);
@@ -451,6 +412,66 @@ public class FlowControlTest extends AbstractTest
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
+ @Test
+ public void testSendBigFileWithoutFlowControl() throws Exception
+ {
+ boolean flowControlEnabled = false;
+ testSendBigFile(flowControlEnabled);
+ }
+
+ @Test
+ public void testSendBigFileWithFlowControl() throws Exception
+ {
+ boolean flowControlEnabled = true;
+ testSendBigFile(flowControlEnabled);
+ }
+
+ private void testSendBigFile(boolean flowControlEnabled) throws Exception, InterruptedException
+ {
+ final int dataSize = 1024 * 1024;
+ final ByteBufferDataInfo bigByteBufferDataInfo = new ByteBufferDataInfo(ByteBuffer.allocate(dataSize),false);
+ final CountDownLatch allDataReceivedLatch = new CountDownLatch(1);
+
+ Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ stream.reply(new ReplyInfo(false));
+ stream.data(bigByteBufferDataInfo);
+ return null;
+ }
+ },flowControlEnabled),new SessionFrameListener.Adapter());
+
+ session.syn(new SynInfo(false),new StreamFrameListener.Adapter()
+ {
+ private int dataBytesReceived;
+
+ @Override
+ public void onData(Stream stream, DataInfo dataInfo)
+ {
+ dataBytesReceived = dataBytesReceived + dataInfo.length();
+ dataInfo.consume(dataInfo.length());
+ if (dataBytesReceived == dataSize)
+ allDataReceivedLatch.countDown();
+ }
+ });
+
+ assertThat("all data bytes have been received by the client",allDataReceivedLatch.await(5,TimeUnit.SECONDS),is(true));
+ }
+
+ private void checkThatWeAreFlowControlStalled(final Exchanger<DataInfo> exchanger)
+ {
+ expectException(TimeoutException.class, new Callable<DataInfo>()
+ {
+ @Override
+ public DataInfo call() throws Exception
+ {
+ return exchanger.exchange(null, 1, TimeUnit.SECONDS);
+ }
+ });
+ }
+
private void expectException(Class<? extends Exception> exception, Callable<DataInfo> command)
{
try
diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/PushStreamTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/PushStreamTest.java
index 8843dc1..265aea0 100644
--- a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/PushStreamTest.java
+++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/PushStreamTest.java
@@ -16,34 +16,55 @@
package org.eclipse.jetty.spdy;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Exchanger;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
+import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.Handler;
+import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.ReplyInfo;
+import org.eclipse.jetty.spdy.api.RstInfo;
+import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
+import org.eclipse.jetty.spdy.api.SessionStatus;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
+import org.eclipse.jetty.spdy.api.StreamStatus;
import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
+import org.eclipse.jetty.spdy.frames.ControlFrame;
+import org.eclipse.jetty.spdy.frames.DataFrame;
+import org.eclipse.jetty.spdy.frames.GoAwayFrame;
+import org.eclipse.jetty.spdy.frames.RstStreamFrame;
+import org.eclipse.jetty.spdy.frames.SynStreamFrame;
+import org.eclipse.jetty.spdy.frames.WindowUpdateFrame;
+import org.eclipse.jetty.spdy.generator.Generator;
+import org.eclipse.jetty.spdy.parser.Parser;
+import org.eclipse.jetty.spdy.parser.Parser.Listener;
+import org.junit.Assert;
import org.junit.Test;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.sameInstance;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-
public class PushStreamTest extends AbstractTest
{
@Test
@@ -66,10 +87,10 @@ public class PushStreamTest extends AbstractTest
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
- assertThat("streamId is even", stream.getId() % 2, is(0));
- assertThat("stream is unidirectional", stream.isUnidirectional(), is(true));
- assertThat("stream is closed", stream.isClosed(), is(true));
- assertThat("stream has associated stream", stream.getAssociatedStream(), notNullValue());
+ assertThat("streamId is even",stream.getId() % 2,is(0));
+ assertThat("stream is unidirectional",stream.isUnidirectional(),is(true));
+ assertThat("stream is closed",stream.isClosed(),is(true));
+ assertThat("stream has associated stream",stream.getAssociatedStream(),notNullValue());
try
{
stream.reply(new ReplyInfo(false));
@@ -85,10 +106,10 @@ public class PushStreamTest extends AbstractTest
}
});
- Stream stream = clientSession.syn(new SynInfo(true), null).get();
- assertThat("onSyn has been called", pushStreamLatch.await(5, TimeUnit.SECONDS), is(true));
+ Stream stream = clientSession.syn(new SynInfo(true),null).get();
+ assertThat("onSyn has been called",pushStreamLatch.await(5,TimeUnit.SECONDS),is(true));
Stream pushStream = pushStreamRef.get();
- assertThat("main stream and associated stream are the same", stream, sameInstance(pushStream.getAssociatedStream()));
+ assertThat("main stream and associated stream are the same",stream,sameInstance(pushStream.getAssociatedStream()));
}
@Test
@@ -221,7 +242,7 @@ public class PushStreamTest extends AbstractTest
stream.syn(new SynInfo(false),1,TimeUnit.SECONDS,new Handler.Adapter<Stream>()
{
@Override
- public void failed(Throwable x)
+ public void failed(Stream stream, Throwable x)
{
pushStreamFailedLatch.countDown();
}
@@ -321,6 +342,170 @@ public class PushStreamTest extends AbstractTest
return bytes;
}
+
+ @Test
+ public void testClientResetsStreamAfterPushSynDoesPreventSendingDataFramesWithFlowControl() throws Exception
+ {
+ final boolean flowControl = true;
+ testNoMoreFramesAreSentOnPushStreamAfterClientResetsThePushStream(flowControl);
+ }
+
+ @Test
+ public void testClientResetsStreamAfterPushSynDoesPreventSendingDataFramesWithoutFlowControl() throws Exception
+ {
+ final boolean flowControl = false;
+ testNoMoreFramesAreSentOnPushStreamAfterClientResetsThePushStream(flowControl);
+ }
+
+ private volatile boolean read = true;
+ private void testNoMoreFramesAreSentOnPushStreamAfterClientResetsThePushStream(final boolean flowControl) throws Exception, IOException, InterruptedException
+ {
+ final short version = SPDY.V3;
+ final AtomicBoolean unexpectedExceptionOccured = new AtomicBoolean(false);
+ final CountDownLatch resetReceivedLatch = new CountDownLatch(1);
+ final CountDownLatch allDataFramesReceivedLatch = new CountDownLatch(1);
+ final CountDownLatch goAwayReceivedLatch = new CountDownLatch(1);
+ final int dataSizeInBytes = 1024 * 256;
+ final byte[] transferBytes = createHugeByteArray(dataSizeInBytes);
+
+ InetSocketAddress serverAddress = startServer(new ServerSessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(final Stream stream, SynInfo synInfo)
+ {
+ new Thread(new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ Stream pushStream=null;
+ try
+ {
+ stream.reply(new ReplyInfo(false));
+ pushStream = stream.syn(new SynInfo(false)).get();
+ resetReceivedLatch.await(5,TimeUnit.SECONDS);
+ }
+ catch (InterruptedException | ExecutionException e)
+ {
+ e.printStackTrace();
+ unexpectedExceptionOccured.set(true);
+ }
+ pushStream.data(new BytesDataInfo(transferBytes,true));
+ stream.data(new StringDataInfo("close",true));
+ }
+ }).start();
+ return null;
+ }
+
+ @Override
+ public void onRst(Session session, RstInfo rstInfo)
+ {
+ resetReceivedLatch.countDown();
+ }
+
+ @Override
+ public void onGoAway(Session session, GoAwayInfo goAwayInfo)
+ {
+ goAwayReceivedLatch.countDown();
+ }
+ }, flowControl);
+
+ final SocketChannel channel = SocketChannel.open(serverAddress);
+ final Generator generator = new Generator(new StandardByteBufferPool(),new StandardCompressionFactory.StandardCompressor());
+ int streamId = 1;
+ ByteBuffer writeBuffer = generator.control(new SynStreamFrame(version,(byte)0,streamId,0,(byte)0,new Headers()));
+ channel.write(writeBuffer);
+ assertThat("writeBuffer is fully written",writeBuffer.hasRemaining(), is(false));
+
+ final Parser parser = new Parser(new StandardCompressionFactory.StandardDecompressor());
+ parser.addListener(new Listener.Adapter()
+ {
+ int bytesRead = 0;
+
+ @Override
+ public void onControlFrame(ControlFrame frame)
+ {
+ if(frame instanceof SynStreamFrame){
+ int pushStreamId = ((SynStreamFrame)frame).getStreamId();
+ ByteBuffer writeBuffer = generator.control(new RstStreamFrame(version,pushStreamId,StreamStatus.CANCEL_STREAM.getCode(version)));
+ try
+ {
+ channel.write(writeBuffer);
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ unexpectedExceptionOccured.set(true);
+ }
+ }
+ }
+
+ @Override
+ public void onDataFrame(DataFrame frame, ByteBuffer data)
+ {
+ if(frame.getStreamId() == 2)
+ bytesRead = bytesRead + frame.getLength();
+ if(bytesRead == dataSizeInBytes){
+ allDataFramesReceivedLatch.countDown();
+ return;
+ }
+ if (flowControl)
+ {
+ ByteBuffer writeBuffer = generator.control(new WindowUpdateFrame(version,frame.getStreamId(),frame.getLength()));
+ try
+ {
+ channel.write(writeBuffer);
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ unexpectedExceptionOccured.set(true);
+ }
+ }
+ }
+ });
+
+ Thread reader = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ ByteBuffer readBuffer = ByteBuffer.allocate(dataSizeInBytes*2);
+ while (read)
+ {
+ try
+ {
+ channel.read(readBuffer);
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ unexpectedExceptionOccured.set(true);
+ }
+ readBuffer.flip();
+ parser.parse(readBuffer);
+ readBuffer.clear();
+ }
+
+ }
+ });
+ reader.start();
+ read = false;
+
+ assertThat("no unexpected exceptions occured", unexpectedExceptionOccured.get(), is(false));
+ assertThat("not all dataframes have been received as the pushstream has been reset by the client.",allDataFramesReceivedLatch.await(streamId,TimeUnit.SECONDS),is(false));
+
+
+ ByteBuffer buffer = generator.control(new GoAwayFrame(version, streamId, SessionStatus.OK.getCode()));
+ channel.write(buffer);
+ Assert.assertThat(buffer.hasRemaining(), is(false));
+
+ assertThat("GoAway frame is received by server", goAwayReceivedLatch.await(5,TimeUnit.SECONDS), is(true));
+ channel.shutdownOutput();
+ channel.close();
+ }
+
@Test
public void testOddEvenStreamIds() throws Exception
{
@@ -334,7 +519,7 @@ public class PushStreamTest extends AbstractTest
stream.syn(new SynInfo(false));
return null;
}
- }),new SessionFrameListener.Adapter()
+ }, true),new SessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
@@ -367,6 +552,6 @@ public class PushStreamTest extends AbstractTest
private void assertThatNoExceptionOccured(final CountDownLatch exceptionCountDownLatch) throws InterruptedException
{
- assertThat("No exception occured", exceptionCountDownLatch.await(1,TimeUnit.SECONDS),is(false));
+ assertThat("No exception occured",exceptionCountDownLatch.await(1,TimeUnit.SECONDS),is(false));
}
}
diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ResetStreamTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ResetStreamTest.java
index 654bb5f..7a9fe58 100644
--- a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ResetStreamTest.java
+++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ResetStreamTest.java
@@ -28,7 +28,7 @@ public class ResetStreamTest extends AbstractTest
@Test
public void testResetStreamIsRemoved() throws Exception
{
- Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()),null);
+ Session session = startClient(startServer(new ServerSessionFrameListener.Adapter(), true),null);
Stream stream = session.syn(new SynInfo(false),null).get(5,TimeUnit.SECONDS);
session.rst(new RstInfo(stream.getId(),StreamStatus.CANCEL_STREAM)).get(5,TimeUnit.SECONDS);
@@ -169,7 +169,7 @@ public class ResetStreamTest extends AbstractTest
stream.data(new StringDataInfo("2nd dataframe",false),5L,TimeUnit.SECONDS,new Handler.Adapter<Void>()
{
@Override
- public void failed(Throwable x)
+ public void failed(Void context, Throwable x)
{
failLatch.countDown();
}