aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorThomas Becker2012-04-27 11:44:22 (EDT)
committerThomas Becker2012-04-27 11:44:22 (EDT)
commitf362fb0b48eb9eb695c31bb21920d651fb91adee (patch)
treea0b81c30f8db895b007c950e9891cf74f4dcd712
parent011ae1f447073aa8dae51028b1368decfa0446fa (diff)
downloadorg.eclipse.jetty.project-f362fb0b48eb9eb695c31bb21920d651fb91adee.zip
org.eclipse.jetty.project-f362fb0b48eb9eb695c31bb21920d651fb91adee.tar.gz
org.eclipse.jetty.project-f362fb0b48eb9eb695c31bb21920d651fb91adee.tar.bz2
spdy push implementationrefs/changes/46/5746/1
-rw-r--r--jetty-spdy/spdy-core/pom.xml25
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/IStream.java24
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/PushSynInfo.java41
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/SessionException.java1
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java312
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java169
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/DataInfo.java2
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Session.java3
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java48
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/SynInfo.java54
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/frames/RstStreamFrame.java6
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/frames/SynStreamFrame.java3
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/parser/SynStreamBodyParser.java3
-rw-r--r--jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java457
-rw-r--r--jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java112
-rw-r--r--jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ServerUsageTest.java2
-rw-r--r--jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/frames/RstStreamGenerateParseTest.java20
-rw-r--r--jetty-spdy/spdy-jetty/pom.xml6
-rw-r--r--jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ClosedStreamTest.java263
-rw-r--r--jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java2
-rw-r--r--jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ProtocolViolationsTest.java7
-rw-r--r--jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/PushStreamTest.java355
-rw-r--r--jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ResetStreamTest.java97
-rw-r--r--jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SSLEngineLeakTest.java1
-rw-r--r--pom.xml5
25 files changed, 1746 insertions, 272 deletions
diff --git a/jetty-spdy/spdy-core/pom.xml b/jetty-spdy/spdy-core/pom.xml
index d6d4c0f..a933132 100644
--- a/jetty-spdy/spdy-core/pom.xml
+++ b/jetty-spdy/spdy-core/pom.xml
@@ -6,9 +6,9 @@
<version>7.6.4-SNAPSHOT</version>
</parent>
- <modelVersion>4.0.0</modelVersion>
- <artifactId>spdy-core</artifactId>
- <name>Jetty :: SPDY :: Core</name>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>spdy-core</artifactId>
+ <name>Jetty :: SPDY :: Core</name>
<dependencies>
<dependency>
@@ -16,10 +16,21 @@
<artifactId>jetty-util</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/IStream.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/IStream.java
index 2291cf7..07934c1 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/IStream.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/IStream.java
@@ -82,7 +82,7 @@ public interface IStream extends Stream
public void process(ControlFrame frame);
/**
- * <p>Processes the give data frame along with the given byte buffer,
+ * <p>Processes the given data frame along with the given byte buffer,
* for example by updating the stream's state or by calling listeners.</p>
*
* @param frame the data frame to process
@@ -90,4 +90,26 @@ public interface IStream extends Stream
* @see #process(ControlFrame)
*/
public void process(DataFrame frame, ByteBuffer data);
+
+ /**
+ * <p>Associate the given {@link IStream} to this {@link IStream}.</p>
+ *
+ * @param stream the stream to associate with this stream
+ */
+ public void associate(IStream stream);
+
+ /**
+ * <p>remove the given associated {@link IStream} from this stream</p>
+ *
+ * @param stream the stream to be removed
+ */
+ public void disassociate(IStream stream);
+
+ /**
+ * <p>Overrides Stream.getAssociatedStream() to return an instance of IStream instead of Stream
+ *
+ * @see Stream#getAssociatedStream()
+ */
+ @Override
+ public IStream getAssociatedStream();
}
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/PushSynInfo.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/PushSynInfo.java
new file mode 100644
index 0000000..a460d54
--- /dev/null
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/PushSynInfo.java
@@ -0,0 +1,41 @@
+package org.eclipse.jetty.spdy;
+
+import org.eclipse.jetty.spdy.api.SynInfo;
+
+/* ------------------------------------------------------------ */
+/**
+ * <p>A subclass container of {@link SynInfo} for unidirectional streams</p>
+ */
+public class PushSynInfo extends SynInfo
+{
+ public static final byte FLAG_UNIDIRECTIONAL = 2;
+
+ private int associatedStreamId;
+
+ public PushSynInfo(int associatedStreamId, SynInfo synInfo){
+ super(synInfo.getHeaders(), synInfo.isClose(), synInfo.getPriority());
+ this.associatedStreamId = associatedStreamId;
+ }
+
+ /**
+ * @return the close and unidirectional flags as integer
+ * @see #FLAG_CLOSE
+ * @see #FLAG_UNIDIRECTIONAL
+ */
+ @Override
+ public byte getFlags()
+ {
+ byte flags = super.getFlags();
+ flags += FLAG_UNIDIRECTIONAL;
+ return flags;
+ }
+
+ /**
+ * @return the id of the associated stream
+ */
+ public int getAssociatedStreamId()
+ {
+ return associatedStreamId;
+ }
+
+}
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/SessionException.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/SessionException.java
index fa9a55e..3e0c195 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/SessionException.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/SessionException.java
@@ -20,6 +20,7 @@ import org.eclipse.jetty.spdy.api.SessionStatus;
public class SessionException extends RuntimeException
{
+
private final SessionStatus sessionStatus;
public SessionException(SessionStatus sessionStatus)
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java
index 8b1a53d..fa89c6e 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java
@@ -95,7 +95,8 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private boolean flushing;
private volatile int windowSize = 65536;
- public StandardSession(short version, ByteBufferPool bufferPool, Executor threadPool, ScheduledExecutorService scheduler, Controller<FrameBytes> controller, IdleListener idleListener, int initialStreamId, SessionFrameListener listener, Generator generator)
+ public StandardSession(short version, ByteBufferPool bufferPool, Executor threadPool, ScheduledExecutorService scheduler,
+ Controller<FrameBytes> controller, IdleListener idleListener, int initialStreamId, SessionFrameListener listener, Generator generator)
{
this.version = version;
this.bufferPool = bufferPool;
@@ -109,6 +110,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
this.generator = generator;
}
+ @Override
public short getVersion()
{
return version;
@@ -130,7 +132,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
public Future<Stream> syn(SynInfo synInfo, StreamFrameListener listener)
{
Promise<Stream> result = new Promise<>();
- syn(synInfo, listener, 0, TimeUnit.MILLISECONDS, result);
+ syn(synInfo,listener,0,TimeUnit.MILLISECONDS,result);
return result;
}
@@ -143,20 +145,18 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
// have stream3 hit the network before stream1, not only to comply with the spec
// but also because the compression context for the headers would be wrong, as the
// frame with a compression history will come before the first compressed frame.
+ int associatedStreamId = 0;
+ if (synInfo instanceof PushSynInfo)
+ {
+ associatedStreamId = ((PushSynInfo)synInfo).getAssociatedStreamId();
+ }
+
synchronized (this)
{
- if (synInfo.isUnidirectional())
- {
- // TODO: unidirectional functionality
- throw new UnsupportedOperationException();
- }
- else
- {
- int streamId = streamIds.getAndAdd(2);
- SynStreamFrame synStream = new SynStreamFrame(version, synInfo.getFlags(), streamId, 0, synInfo.getPriority(), synInfo.getHeaders());
- IStream stream = createStream(synStream, listener);
- control(stream, synStream, timeout, unit, handler, stream);
- }
+ int streamId = streamIds.getAndAdd(2);
+ SynStreamFrame synStream = new SynStreamFrame(version,synInfo.getFlags(),streamId,associatedStreamId,synInfo.getPriority(),synInfo.getHeaders());
+ IStream stream = createStream(synStream,listener);
+ control(stream,synStream,timeout,unit,handler,stream);
}
}
@@ -164,7 +164,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
public Future<Void> rst(RstInfo rstInfo)
{
Promise<Void> result = new Promise<>();
- rst(rstInfo, 0, TimeUnit.MILLISECONDS, result);
+ rst(rstInfo,0,TimeUnit.MILLISECONDS,result);
return result;
}
@@ -174,16 +174,19 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
// SPEC v3, 2.2.2
if (goAwaySent.get())
{
- complete(handler, null);
+ complete(handler,null);
}
else
{
int streamId = rstInfo.getStreamId();
IStream stream = streams.get(streamId);
+ RstStreamFrame frame = new RstStreamFrame(version,streamId,rstInfo.getStreamStatus().getCode(version));
+ control(stream,frame,timeout,unit,handler,null);
if (stream != null)
+ {
+ stream.process(frame);
removeStream(stream);
- RstStreamFrame frame = new RstStreamFrame(version, streamId, rstInfo.getStreamStatus().getCode(version));
- control(null, frame, timeout, unit, handler, null);
+ }
}
}
@@ -191,22 +194,22 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
public Future<Void> settings(SettingsInfo settingsInfo)
{
Promise<Void> result = new Promise<>();
- settings(settingsInfo, 0, TimeUnit.MILLISECONDS, result);
+ settings(settingsInfo,0,TimeUnit.MILLISECONDS,result);
return result;
}
@Override
public void settings(SettingsInfo settingsInfo, long timeout, TimeUnit unit, Handler<Void> handler)
{
- SettingsFrame frame = new SettingsFrame(version, settingsInfo.getFlags(), settingsInfo.getSettings());
- control(null, frame, timeout, unit, handler, null);
+ SettingsFrame frame = new SettingsFrame(version,settingsInfo.getFlags(),settingsInfo.getSettings());
+ control(null,frame,timeout,unit,handler,null);
}
@Override
public Future<PingInfo> ping()
{
Promise<PingInfo> result = new Promise<>();
- ping(0, TimeUnit.MILLISECONDS, result);
+ ping(0,TimeUnit.MILLISECONDS,result);
return result;
}
@@ -215,8 +218,8 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
{
int pingId = pingIds.getAndAdd(2);
PingInfo pingInfo = new PingInfo(pingId);
- PingFrame frame = new PingFrame(version, pingId);
- control(null, frame, timeout, unit, handler, pingInfo);
+ PingFrame frame = new PingFrame(version,pingId);
+ control(null,frame,timeout,unit,handler,pingInfo);
}
@Override
@@ -228,28 +231,28 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private Future<Void> goAway(SessionStatus sessionStatus)
{
Promise<Void> result = new Promise<>();
- goAway(sessionStatus, 0, TimeUnit.MILLISECONDS, result);
+ goAway(sessionStatus,0,TimeUnit.MILLISECONDS,result);
return result;
}
@Override
public void goAway(long timeout, TimeUnit unit, Handler<Void> handler)
{
- goAway(SessionStatus.OK, timeout, unit, handler);
+ goAway(SessionStatus.OK,timeout,unit,handler);
}
private void goAway(SessionStatus sessionStatus, long timeout, TimeUnit unit, Handler<Void> handler)
{
- if (goAwaySent.compareAndSet(false, true))
+ if (goAwaySent.compareAndSet(false,true))
{
if (!goAwayReceived.get())
{
- GoAwayFrame frame = new GoAwayFrame(version, lastStreamId.get(), sessionStatus.getCode());
- control(null, frame, timeout, unit, handler, null);
+ GoAwayFrame frame = new GoAwayFrame(version,lastStreamId.get(),sessionStatus.getCode());
+ control(null,frame,timeout,unit,handler,null);
return;
}
}
- complete(handler, null);
+ complete(handler,null);
}
@Override
@@ -263,14 +266,14 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
@Override
public void onControlFrame(ControlFrame frame)
{
- notifyIdle(idleListener, false);
+ notifyIdle(idleListener,false);
try
{
- logger.debug("Processing {}", frame);
+ logger.debug("Processing {}",frame);
if (goAwaySent.get())
{
- logger.debug("Skipped processing of {}", frame);
+ logger.debug("Skipped processing of {}",frame);
return;
}
@@ -329,21 +332,21 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
finally
{
- notifyIdle(idleListener, true);
+ notifyIdle(idleListener,true);
}
}
@Override
public void onDataFrame(DataFrame frame, ByteBuffer data)
{
- notifyIdle(idleListener, false);
+ notifyIdle(idleListener,false);
try
{
- logger.debug("Processing {}, {} data bytes", frame, data.remaining());
+ logger.debug("Processing {}, {} data bytes",frame,data.remaining());
if (goAwaySent.get())
{
- logger.debug("Skipped processing of {}", frame);
+ logger.debug("Skipped processing of {}",frame);
return;
}
@@ -351,18 +354,18 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
IStream stream = streams.get(streamId);
if (stream == null)
{
- RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
- logger.debug("Unknown stream {}", rstInfo);
+ RstInfo rstInfo = new RstInfo(streamId,StreamStatus.INVALID_STREAM);
+ logger.debug("Unknown stream {}",rstInfo);
rst(rstInfo);
}
else
{
- processData(stream, frame, data);
+ processData(stream,frame,data);
}
}
finally
{
- notifyIdle(idleListener, true);
+ notifyIdle(idleListener,true);
}
}
@@ -374,7 +377,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private void processData(IStream stream, DataFrame frame, ByteBuffer data)
{
- stream.process(frame, data);
+ stream.process(frame,data);
updateLastStreamId(stream);
if (stream.isClosed())
removeStream(stream);
@@ -383,43 +386,42 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
@Override
public void onStreamException(StreamException x)
{
- notifyOnException(listener, x);
- rst(new RstInfo(x.getStreamId(), x.getStreamStatus()));
+ notifyOnException(listener,x);
+ rst(new RstInfo(x.getStreamId(),x.getStreamStatus()));
}
@Override
public void onSessionException(SessionException x)
{
Throwable cause = x.getCause();
- notifyOnException(listener, cause == null ? x : cause);
+ notifyOnException(listener,cause == null?x:cause);
goAway(x.getSessionStatus());
}
private void onSyn(SynStreamFrame frame)
{
- IStream stream = newStream(frame);
- stream.updateCloseState(frame.isClose(), false);
- logger.debug("Opening {}", stream);
- int streamId = frame.getStreamId();
- IStream existing = streams.putIfAbsent(streamId, stream);
+ IStream stream = newStream(frame,null);
+ stream.updateCloseState(frame.isClose(),false);
+ logger.debug("Opening {}",stream);
+ int streamId = stream.getId();
+ IStream existing = streams.putIfAbsent(streamId,stream);
if (existing != null)
{
- RstInfo rstInfo = new RstInfo(streamId, StreamStatus.PROTOCOL_ERROR);
- logger.debug("Duplicate stream, {}", rstInfo);
+ RstInfo rstInfo = new RstInfo(streamId,StreamStatus.PROTOCOL_ERROR);
+ logger.debug("Duplicate stream, {}",rstInfo);
rst(rstInfo);
}
else
{
- processSyn(listener, stream, frame);
+ processSyn(listener,stream,frame);
}
}
private void processSyn(SessionFrameListener listener, IStream stream, SynStreamFrame frame)
{
stream.process(frame);
- SynInfo synInfo = new SynInfo(frame.getHeaders(), frame.isClose(),
- frame.isUnidirectional(), frame.getAssociatedStreamId(), frame.getPriority());
- StreamFrameListener streamListener = notifyOnSyn(listener, stream, synInfo);
+ SynInfo synInfo = new SynInfo(frame.getHeaders(),frame.isClose(),frame.getPriority());
+ StreamFrameListener streamListener = notifyOnSyn(listener,stream,synInfo);
stream.setStreamFrameListener(streamListener);
flush();
// The onSyn() listener may have sent a frame that closed the stream
@@ -429,25 +431,36 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private IStream createStream(SynStreamFrame synStream, StreamFrameListener listener)
{
- IStream stream = newStream(synStream);
- stream.updateCloseState(synStream.isClose(), true);
+ IStream parentStream = streams.get(synStream.getAssociatedStreamId());
+
+ IStream stream = newStream(synStream,parentStream);
+ stream.updateCloseState(synStream.isClose(),true);
stream.setStreamFrameListener(listener);
- if (streams.putIfAbsent(synStream.getStreamId(), stream) != null)
+
+ if (synStream.isUnidirectional())
+ {
+ // unidirectional streams are implicitly half closed for the client
+ stream.updateCloseState(true,false);
+ if (!stream.isClosed())
+ parentStream.associate(stream);
+ }
+
+ if (streams.putIfAbsent(synStream.getStreamId(),stream) != null)
{
// If this happens we have a bug since we did not check that the peer's streamId was valid
// (if we're on server, then the client sent an odd streamId and we did not check that)
- throw new IllegalStateException();
+ throw new IllegalStateException("StreamId: " + synStream.getStreamId() + " invalid.");
}
- logger.debug("Created {}", stream);
+ logger.debug("Created {}",stream);
notifyStreamCreated(stream);
return stream;
}
- private IStream newStream(SynStreamFrame frame)
+ private IStream newStream(SynStreamFrame frame, IStream parentStream)
{
- return new StandardStream(frame, this, windowSize);
+ return new StandardStream(frame,this,windowSize,parentStream);
}
private void notifyStreamCreated(IStream stream)
@@ -462,7 +475,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
catch (Exception x)
{
- logger.info("Exception while notifying listener " + listener, x);
+ logger.info("Exception while notifying listener " + listener,x);
}
}
}
@@ -470,13 +483,17 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private void removeStream(IStream stream)
{
+ if (stream.isUnidirectional())
+ {
+ stream.getAssociatedStream().disassociate(stream);
+ }
+
IStream removed = streams.remove(stream.getId());
if (removed != null)
- {
assert removed == stream;
- logger.debug("Removed {}", stream);
- notifyStreamClosed(stream);
- }
+
+ logger.debug("Removed {}",stream);
+ notifyStreamClosed(stream);
}
private void notifyStreamClosed(IStream stream)
@@ -491,7 +508,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
catch (Exception x)
{
- logger.info("Exception while notifying listener " + listener, x);
+ logger.info("Exception while notifying listener " + listener,x);
}
}
}
@@ -503,13 +520,13 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
IStream stream = streams.get(streamId);
if (stream == null)
{
- RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
- logger.debug("Unknown stream {}", rstInfo);
+ RstInfo rstInfo = new RstInfo(streamId,StreamStatus.INVALID_STREAM);
+ logger.debug("Unknown stream {}",rstInfo);
rst(rstInfo);
}
else
{
- processReply(stream, frame);
+ processReply(stream,frame);
}
}
@@ -522,15 +539,13 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private void onRst(RstStreamFrame frame)
{
- // TODO: implement logic to clean up unidirectional streams associated with this stream
-
IStream stream = streams.get(frame.getStreamId());
if (stream != null)
stream.process(frame);
- RstInfo rstInfo = new RstInfo(frame.getStreamId(), StreamStatus.from(frame.getVersion(), frame.getStatusCode()));
- notifyOnRst(listener, rstInfo);
+ RstInfo rstInfo = new RstInfo(frame.getStreamId(),StreamStatus.from(frame.getVersion(),frame.getStatusCode()));
+ notifyOnRst(listener,rstInfo);
flush();
if (stream != null)
@@ -546,11 +561,11 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
windowSize = windowSizeSetting.value();
for (IStream stream : streams.values())
stream.updateWindowSize(windowSize - prevWindowSize);
- logger.debug("Updated window size to {}", windowSize);
+ logger.debug("Updated window size to {}",windowSize);
}
- SettingsInfo settingsInfo = new SettingsInfo(frame.getSettings(), frame.isClearPersisted());
- notifyOnSettings(listener, settingsInfo);
+ SettingsInfo settingsInfo = new SettingsInfo(frame.getSettings(),frame.isClearPersisted());
+ notifyOnSettings(listener,settingsInfo);
flush();
}
@@ -560,21 +575,21 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
if (pingId % 2 == pingIds.get() % 2)
{
PingInfo pingInfo = new PingInfo(frame.getPingId());
- notifyOnPing(listener, pingInfo);
+ notifyOnPing(listener,pingInfo);
flush();
}
else
{
- control(null, frame, 0, TimeUnit.MILLISECONDS, null, null);
+ control(null,frame,0,TimeUnit.MILLISECONDS,null,null);
}
}
private void onGoAway(GoAwayFrame frame)
{
- if (goAwayReceived.compareAndSet(false, true))
+ if (goAwayReceived.compareAndSet(false,true))
{
- GoAwayInfo goAwayInfo = new GoAwayInfo(frame.getLastStreamId(), SessionStatus.from(frame.getStatusCode()));
- notifyOnGoAway(listener, goAwayInfo);
+ GoAwayInfo goAwayInfo = new GoAwayInfo(frame.getLastStreamId(),SessionStatus.from(frame.getStatusCode()));
+ notifyOnGoAway(listener,goAwayInfo);
flush();
// SPDY does not require to send back a response to a GO_AWAY.
// We notified the application of the last good stream id,
@@ -589,13 +604,13 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
IStream stream = streams.get(streamId);
if (stream == null)
{
- RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
- logger.debug("Unknown stream, {}", rstInfo);
+ RstInfo rstInfo = new RstInfo(streamId,StreamStatus.INVALID_STREAM);
+ logger.debug("Unknown stream, {}",rstInfo);
rst(rstInfo);
}
else
{
- processHeaders(stream, frame);
+ processHeaders(stream,frame);
}
}
@@ -627,13 +642,13 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
{
if (listener != null)
{
- logger.debug("Invoking callback with {} on listener {}", x, listener);
+ logger.debug("Invoking callback with {} on listener {}",x,listener);
listener.onException(x);
}
}
catch (Exception xx)
{
- logger.info("Exception while notifying listener " + listener, xx);
+ logger.info("Exception while notifying listener " + listener,xx);
}
}
@@ -643,13 +658,13 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
{
if (listener != null)
{
- logger.debug("Invoking callback with {} on listener {}", synInfo, listener);
- return listener.onSyn(stream, synInfo);
+ logger.debug("Invoking callback with {} on listener {}",synInfo,listener);
+ return listener.onSyn(stream,synInfo);
}
}
catch (Exception x)
{
- logger.info("Exception while notifying listener " + listener, x);
+ logger.info("Exception while notifying listener " + listener,x);
}
return null;
}
@@ -660,13 +675,13 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
{
if (listener != null)
{
- logger.debug("Invoking callback with {} on listener {}", rstInfo, listener);
- listener.onRst(this, rstInfo);
+ logger.debug("Invoking callback with {} on listener {}",rstInfo,listener);
+ listener.onRst(this,rstInfo);
}
}
catch (Exception x)
{
- logger.info("Exception while notifying listener " + listener, x);
+ logger.info("Exception while notifying listener " + listener,x);
}
}
@@ -676,13 +691,13 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
{
if (listener != null)
{
- logger.debug("Invoking callback with {} on listener {}", settingsInfo, listener);
- listener.onSettings(this, settingsInfo);
+ logger.debug("Invoking callback with {} on listener {}",settingsInfo,listener);
+ listener.onSettings(this,settingsInfo);
}
}
catch (Exception x)
{
- logger.info("Exception while notifying listener " + listener, x);
+ logger.info("Exception while notifying listener " + listener,x);
}
}
@@ -692,13 +707,13 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
{
if (listener != null)
{
- logger.debug("Invoking callback with {} on listener {}", pingInfo, listener);
- listener.onPing(this, pingInfo);
+ logger.debug("Invoking callback with {} on listener {}",pingInfo,listener);
+ listener.onPing(this,pingInfo);
}
}
catch (Exception x)
{
- logger.info("Exception while notifying listener " + listener, x);
+ logger.info("Exception while notifying listener " + listener,x);
}
}
@@ -708,13 +723,13 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
{
if (listener != null)
{
- logger.debug("Invoking callback with {} on listener {}", goAwayInfo, listener);
- listener.onGoAway(this, goAwayInfo);
+ logger.debug("Invoking callback with {} on listener {}",goAwayInfo,listener);
+ listener.onGoAway(this,goAwayInfo);
}
}
catch (Exception x)
{
- logger.info("Exception while notifying listener " + listener, x);
+ logger.info("Exception while notifying listener " + listener,x);
}
}
@@ -724,7 +739,11 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
try
{
if (stream != null)
+ {
updateLastStreamId(stream);
+ if (stream.isClosed())
+ removeStream(stream);
+ }
// Synchronization is necessary, since we may have concurrent replies
// and those needs to be generated and enqueued atomically in order
@@ -732,10 +751,10 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
synchronized (this)
{
ByteBuffer buffer = generator.control(frame);
- logger.debug("Queuing {} on {}", frame, stream);
- ControlFrameBytes<C> frameBytes = new ControlFrameBytes<>(stream, handler, context, frame, buffer);
+ logger.debug("Queuing {} on {}",frame,stream);
+ ControlFrameBytes<C> frameBytes = new ControlFrameBytes<>(stream,handler,context,frame,buffer);
if (timeout > 0)
- frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
+ frameBytes.task = scheduler.schedule(frameBytes,timeout,unit);
// Special handling for PING frames, they must be sent as soon as possible
if (ControlFrameType.PING == frame.getType())
@@ -748,7 +767,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
catch (Throwable x)
{
- notifyHandlerFailed(handler, x);
+ notifyHandlerFailed(handler,x);
}
}
@@ -761,7 +780,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
int oldValue = lastStreamId.get();
while (streamId > oldValue)
{
- if (lastStreamId.compareAndSet(oldValue, streamId))
+ if (lastStreamId.compareAndSet(oldValue,streamId))
break;
oldValue = lastStreamId.get();
}
@@ -771,10 +790,12 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
@Override
public <C> void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Handler<C> handler, C context)
{
- logger.debug("Queuing {} on {}", dataInfo, stream);
- DataFrameBytes<C> frameBytes = new DataFrameBytes<>(stream, handler, context, dataInfo);
+ logger.debug("Queuing {} on {}",dataInfo,stream);
+ DataFrameBytes<C> frameBytes = new DataFrameBytes<>(stream,handler,context,dataInfo);
if (timeout > 0)
- frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
+ {
+ frameBytes.task = scheduler.schedule(frameBytes,timeout,unit);
+ }
append(frameBytes);
flush();
}
@@ -799,30 +820,35 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
{
frameBytes = queue.get(i);
- if (stalledStreams != null && stalledStreams.contains(frameBytes.getStream()))
+ IStream stream = frameBytes.getStream();
+ if (stream != null && stalledStreams != null && stalledStreams.contains(stream))
continue;
buffer = frameBytes.getByteBuffer();
if (buffer != null)
{
queue.remove(i);
+ // TODO: stream.isUniDirectional() check here is only needed for pushStreams which send a syn with close=true --> find a better solution
+ if (stream != null && !streams.containsValue(stream) && !stream.isUnidirectional())
+ frameBytes.fail(new StreamException(stream.getId(),StreamStatus.INVALID_STREAM));
break;
}
if (stalledStreams == null)
stalledStreams = new HashSet<>();
- stalledStreams.add(frameBytes.getStream());
+ if (stream != null)
+ stalledStreams.add(stream);
- logger.debug("Flush stalled for {}, {} frame(s) in queue", frameBytes, queue.size());
+ logger.debug("Flush stalled for {}, {} frame(s) in queue",frameBytes,queue.size());
}
if (buffer == null)
return;
flushing = true;
- logger.debug("Flushing {}, {} frame(s) in queue", frameBytes, queue.size());
+ logger.debug("Flushing {}, {} frame(s) in queue",frameBytes,queue.size());
}
- write(buffer, this, frameBytes);
+ write(buffer,this,frameBytes);
}
private void append(FrameBytes frameBytes)
@@ -837,7 +863,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
break;
--index;
}
- queue.add(index, frameBytes);
+ queue.add(index,frameBytes);
}
}
@@ -853,7 +879,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
break;
++index;
}
- queue.add(index, frameBytes);
+ queue.add(index,frameBytes);
}
}
@@ -862,7 +888,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
{
synchronized (queue)
{
- logger.debug("Completed write of {}, {} frame(s) in queue", frameBytes, queue.size());
+ logger.debug("Completed write of {}, {} frame(s) in queue",frameBytes,queue.size());
flushing = false;
}
frameBytes.complete();
@@ -878,8 +904,8 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
{
if (controller != null)
{
- logger.debug("Writing {} frame bytes of {}", buffer.remaining(), frameBytes);
- controller.write(buffer, handler, frameBytes);
+ logger.debug("Writing {} frame bytes of {}",buffer.remaining(),frameBytes);
+ controller.write(buffer,handler,frameBytes);
}
}
@@ -898,7 +924,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
public void run()
{
if (handler != null)
- notifyHandlerCompleted(handler, context);
+ notifyHandlerCompleted(handler,context);
flush();
}
});
@@ -909,7 +935,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
try
{
if (handler != null)
- notifyHandlerCompleted(handler, context);
+ notifyHandlerCompleted(handler,context);
flush();
}
finally
@@ -927,12 +953,11 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
catch (Exception x)
{
- logger.info("Exception while notifying handler " + handler, x);
+ logger.info("Exception while notifying handler " + handler,x);
}
}
-
- private void notifyHandlerFailed(Handler handler, Throwable x)
+ private <C> void notifyHandlerFailed(Handler<C> handler, Throwable x)
{
try
{
@@ -941,7 +966,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
catch (Exception xx)
{
- logger.info("Exception while notifying handler " + handler, xx);
+ logger.info("Exception while notifying handler " + handler,xx);
}
}
@@ -952,6 +977,8 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
public abstract ByteBuffer getByteBuffer();
public abstract void complete();
+
+ public abstract void fail(Throwable throwable);
}
private abstract class AbstractFrameBytes<C> implements FrameBytes, Runnable
@@ -984,15 +1011,22 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
@Override
public void complete()
{
- ScheduledFuture<?> task = this.task;
- if (task != null)
- task.cancel(false);
- StandardSession.this.complete(handler, context);
+ cancelTask();
+ StandardSession.this.complete(handler,context);
+ }
+
+ @Override
+ public void fail(Throwable x)
+ {
+ cancelTask();
+ notifyHandlerFailed(handler,x);
}
- protected void fail(Throwable x)
+ private void cancelTask()
{
- notifyHandlerFailed(handler, x);
+ ScheduledFuture<?> task = this.task;
+ if (task != null)
+ task.cancel(false);
}
@Override
@@ -1010,7 +1044,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private ControlFrameBytes(IStream stream, Handler<C> handler, C context, ControlFrame frame, ByteBuffer buffer)
{
- super(stream, handler, context);
+ super(stream,handler,context);
this.frame = frame;
this.buffer = buffer;
}
@@ -1051,7 +1085,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private DataFrameBytes(IStream stream, Handler<C> handler, C context, DataInfo dataInfo)
{
- super(stream, handler, context);
+ super(stream,handler,context);
this.dataInfo = dataInfo;
}
@@ -1069,7 +1103,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
if (size > windowSize)
size = windowSize;
- buffer = generator.data(stream.getId(), size, dataInfo);
+ buffer = generator.data(stream.getId(),size,dataInfo);
return buffer;
}
catch (Throwable x)
@@ -1096,7 +1130,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
else
{
super.complete();
- stream.updateCloseState(dataInfo.isClose(), true);
+ stream.updateCloseState(dataInfo.isClose(),true);
if (stream.isClosed())
removeStream(stream);
}
@@ -1105,7 +1139,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
@Override
public String toString()
{
- return String.format("DATA bytes @%x available=%d consumed=%d on %s", dataInfo.hashCode(), dataInfo.available(), dataInfo.consumed(), getStream());
+ return String.format("DATA bytes @%x available=%d consumed=%d on %s",dataInfo.hashCode(),dataInfo.available(),dataInfo.consumed(),getStream());
}
}
}
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java
index 1de9e1b..098b543 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java
@@ -17,7 +17,9 @@
package org.eclipse.jetty.spdy;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -33,6 +35,7 @@ import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StreamStatus;
+import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.DataFrame;
import org.eclipse.jetty.spdy.frames.HeadersFrame;
@@ -46,18 +49,22 @@ public class StandardStream implements IStream
{
private static final Logger logger = Log.getLogger(Stream.class);
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
+ private final IStream associatedStream;
private final SynStreamFrame frame;
private final ISession session;
private final AtomicInteger windowSize;
+ private final Set<Stream> pushedStreams = Collections.newSetFromMap(new ConcurrentHashMap<Stream, Boolean>());
private volatile StreamFrameListener listener;
private volatile OpenState openState = OpenState.SYN_SENT;
private volatile CloseState closeState = CloseState.OPENED;
+ private volatile boolean reset = false;
- public StandardStream(SynStreamFrame frame, ISession session, int windowSize)
+ public StandardStream(SynStreamFrame frame, ISession session, int windowSize, IStream associatedStream)
{
this.frame = frame;
this.session = session;
this.windowSize = new AtomicInteger(windowSize);
+ this.associatedStream = associatedStream;
}
@Override
@@ -67,6 +74,30 @@ public class StandardStream implements IStream
}
@Override
+ public IStream getAssociatedStream()
+ {
+ return associatedStream;
+ }
+
+ @Override
+ public Set<Stream> getPushedStreams()
+ {
+ return pushedStreams;
+ }
+
+ @Override
+ public void associate(IStream stream)
+ {
+ pushedStreams.add(stream);
+ }
+
+ @Override
+ public void disassociate(IStream stream)
+ {
+ pushedStreams.remove(stream);
+ }
+
+ @Override
public byte getPriority()
{
return frame.getPriority();
@@ -82,7 +113,7 @@ public class StandardStream implements IStream
public void updateWindowSize(int delta)
{
int size = windowSize.addAndGet(delta);
- logger.debug("Updated window size by {}, new window size {}", delta, size);
+ logger.debug("Updated window size by {}, new window size {}",delta,size);
}
@Override
@@ -91,14 +122,6 @@ public class StandardStream implements IStream
return session;
}
- public boolean isHalfClosed()
- {
- CloseState closeState = this.closeState;
- return closeState == CloseState.LOCALLY_CLOSED ||
- closeState == CloseState.REMOTELY_CLOSED ||
- closeState == CloseState.CLOSED;
- }
-
@Override
public Object getAttribute(String key)
{
@@ -108,7 +131,7 @@ public class StandardStream implements IStream
@Override
public void setAttribute(String key, Object value)
{
- attributes.put(key, value);
+ attributes.put(key,value);
}
@Override
@@ -132,7 +155,7 @@ public class StandardStream implements IStream
{
case OPENED:
{
- closeState = local ? CloseState.LOCALLY_CLOSED : CloseState.REMOTELY_CLOSED;
+ closeState = local?CloseState.LOCALLY_CLOSED:CloseState.REMOTELY_CLOSED;
break;
}
case LOCALLY_CLOSED:
@@ -173,16 +196,16 @@ public class StandardStream implements IStream
{
openState = OpenState.REPLY_RECV;
SynReplyFrame synReply = (SynReplyFrame)frame;
- updateCloseState(synReply.isClose(), false);
- ReplyInfo replyInfo = new ReplyInfo(synReply.getHeaders(), synReply.isClose());
+ updateCloseState(synReply.isClose(),false);
+ ReplyInfo replyInfo = new ReplyInfo(synReply.getHeaders(),synReply.isClose());
notifyOnReply(replyInfo);
break;
}
case HEADERS:
{
HeadersFrame headers = (HeadersFrame)frame;
- updateCloseState(headers.isClose(), false);
- HeadersInfo headersInfo = new HeadersInfo(headers.getHeaders(), headers.isClose(), headers.isResetCompression());
+ updateCloseState(headers.isClose(),false);
+ HeadersInfo headersInfo = new HeadersInfo(headers.getHeaders(),headers.isClose(),headers.isResetCompression());
notifyOnHeaders(headersInfo);
break;
}
@@ -194,7 +217,7 @@ public class StandardStream implements IStream
}
case RST_STREAM:
{
- // TODO:
+ reset = true;
break;
}
default:
@@ -208,15 +231,24 @@ public class StandardStream implements IStream
@Override
public void process(DataFrame frame, ByteBuffer data)
{
+ // TODO: in v3 we need to send a rst instead of just ignoring
+ // ignore data frame if this stream is remotelyClosed already
+ if (isHalfClosed() && !isLocallyClosed())
+ {
+ logger.debug("Ignoring received dataFrame as this stream is remotely closed: " + frame);
+ return;
+ }
+
if (!canReceive())
{
- session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
+ logger.debug("Can't receive. Sending rst: " + frame);
+ session.rst(new RstInfo(getId(),StreamStatus.PROTOCOL_ERROR));
return;
}
- updateCloseState(frame.isClose(), false);
+ updateCloseState(frame.isClose(),false);
- ByteBufferDataInfo dataInfo = new ByteBufferDataInfo(data, frame.isClose(), frame.isCompress())
+ ByteBufferDataInfo dataInfo = new ByteBufferDataInfo(data,frame.isClose(),frame.isCompress())
{
@Override
public void consume(int delta)
@@ -243,8 +275,8 @@ public class StandardStream implements IStream
{
if (delta > 0)
{
- WindowUpdateFrame windowUpdateFrame = new WindowUpdateFrame(session.getVersion(), getId(), delta);
- session.control(this, windowUpdateFrame, 0, TimeUnit.MILLISECONDS, null, null);
+ WindowUpdateFrame windowUpdateFrame = new WindowUpdateFrame(session.getVersion(),getId(),delta);
+ session.control(this,windowUpdateFrame,0,TimeUnit.MILLISECONDS,null,null);
}
}
@@ -255,13 +287,13 @@ public class StandardStream implements IStream
{
if (listener != null)
{
- logger.debug("Invoking reply callback with {} on listener {}", replyInfo, listener);
- listener.onReply(this, replyInfo);
+ logger.debug("Invoking reply callback with {} on listener {}",replyInfo,listener);
+ listener.onReply(this,replyInfo);
}
}
catch (Exception x)
{
- logger.info("Exception while notifying listener " + listener, x);
+ logger.info("Exception while notifying listener " + listener,x);
}
}
@@ -272,13 +304,13 @@ public class StandardStream implements IStream
{
if (listener != null)
{
- logger.debug("Invoking headers callback with {} on listener {}", frame, listener);
- listener.onHeaders(this, headersInfo);
+ logger.debug("Invoking headers callback with {} on listener {}",frame,listener);
+ listener.onHeaders(this,headersInfo);
}
}
catch (Exception x)
{
- logger.info("Exception while notifying listener " + listener, x);
+ logger.info("Exception while notifying listener " + listener,x);
}
}
@@ -289,22 +321,42 @@ public class StandardStream implements IStream
{
if (listener != null)
{
- logger.debug("Invoking data callback with {} on listener {}", dataInfo, listener);
- listener.onData(this, dataInfo);
- logger.debug("Invoked data callback with {} on listener {}", dataInfo, listener);
+ logger.debug("Invoking data callback with {} on listener {}",dataInfo,listener);
+ listener.onData(this,dataInfo);
+ logger.debug("Invoked data callback with {} on listener {}",dataInfo,listener);
}
}
catch (Exception x)
{
- logger.info("Exception while notifying listener " + listener, x);
+ logger.info("Exception while notifying listener " + listener,x);
}
}
@Override
+ public Future<Stream> syn(SynInfo synInfo)
+ {
+ Promise<Stream> result = new Promise<>();
+ syn(synInfo,0,TimeUnit.MILLISECONDS,result);
+ return result;
+ }
+
+ @Override
+ public void syn(SynInfo synInfo, long timeout, TimeUnit unit, Handler<Stream> handler)
+ {
+ if (isClosed() || isReset())
+ {
+ handler.failed(new StreamException(getId(),StreamStatus.STREAM_ALREADY_CLOSED));
+ return;
+ }
+ PushSynInfo pushSynInfo = new PushSynInfo(getId(),synInfo);
+ session.syn(pushSynInfo,null,timeout,unit,handler);
+ }
+
+ @Override
public Future<Void> reply(ReplyInfo replyInfo)
{
Promise<Void> result = new Promise<>();
- reply(replyInfo, 0, TimeUnit.MILLISECONDS, result);
+ reply(replyInfo,0,TimeUnit.MILLISECONDS,result);
return result;
}
@@ -312,16 +364,16 @@ public class StandardStream implements IStream
public void reply(ReplyInfo replyInfo, long timeout, TimeUnit unit, Handler<Void> handler)
{
openState = OpenState.REPLY_SENT;
- updateCloseState(replyInfo.isClose(), true);
- SynReplyFrame frame = new SynReplyFrame(session.getVersion(), replyInfo.getFlags(), getId(), replyInfo.getHeaders());
- session.control(this, frame, timeout, unit, handler, null);
+ updateCloseState(replyInfo.isClose(),true);
+ SynReplyFrame frame = new SynReplyFrame(session.getVersion(),replyInfo.getFlags(),getId(),replyInfo.getHeaders());
+ session.control(this,frame,timeout,unit,handler,null);
}
@Override
public Future<Void> data(DataInfo dataInfo)
{
Promise<Void> result = new Promise<>();
- data(dataInfo, 0, TimeUnit.MILLISECONDS, result);
+ data(dataInfo,0,TimeUnit.MILLISECONDS,result);
return result;
}
@@ -330,25 +382,25 @@ public class StandardStream implements IStream
{
if (!canSend())
{
- session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
+ session.rst(new RstInfo(getId(),StreamStatus.PROTOCOL_ERROR));
throw new IllegalStateException("Protocol violation: cannot send a DATA frame before a SYN_REPLY frame");
}
if (isLocallyClosed())
{
- session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
+ session.rst(new RstInfo(getId(),StreamStatus.PROTOCOL_ERROR));
throw new IllegalStateException("Protocol violation: cannot send a DATA frame on a closed stream");
}
// Cannot update the close state here, because the data that we send may
// be flow controlled, so we need the stream to update the window size.
- session.data(this, dataInfo, timeout, unit, handler, null);
+ session.data(this,dataInfo,timeout,unit,handler,null);
}
@Override
public Future<Void> headers(HeadersInfo headersInfo)
{
Promise<Void> result = new Promise<>();
- headers(headersInfo, 0, TimeUnit.MILLISECONDS, result);
+ headers(headersInfo,0,TimeUnit.MILLISECONDS,result);
return result;
}
@@ -357,18 +409,41 @@ public class StandardStream implements IStream
{
if (!canSend())
{
- session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
+ session.rst(new RstInfo(getId(),StreamStatus.PROTOCOL_ERROR));
throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame before a SYN_REPLY frame");
}
if (isLocallyClosed())
{
- session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
+ session.rst(new RstInfo(getId(),StreamStatus.PROTOCOL_ERROR));
throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame on a closed stream");
}
- updateCloseState(headersInfo.isClose(), true);
- HeadersFrame frame = new HeadersFrame(session.getVersion(), headersInfo.getFlags(), getId(), headersInfo.getHeaders());
- session.control(this, frame, timeout, unit, handler, null);
+ updateCloseState(headersInfo.isClose(),true);
+ HeadersFrame frame = new HeadersFrame(session.getVersion(),headersInfo.getFlags(),getId(),headersInfo.getHeaders());
+ session.control(this,frame,timeout,unit,handler,null);
+ }
+
+ @Override
+ public boolean isUnidirectional()
+ {
+ if (associatedStream != null)
+ return true;
+ else
+ return false;
+
+ }
+
+ @Override
+ public boolean isReset()
+ {
+ return reset;
+ }
+
+ @Override
+ public boolean isHalfClosed()
+ {
+ CloseState closeState = this.closeState;
+ return closeState == CloseState.LOCALLY_CLOSED || closeState == CloseState.REMOTELY_CLOSED || closeState == CloseState.CLOSED;
}
@Override
@@ -386,7 +461,7 @@ public class StandardStream implements IStream
@Override
public String toString()
{
- return String.format("stream=%d v%d %s", getId(), session.getVersion(), closeState);
+ return String.format("stream=%d v%d %s",getId(),session.getVersion(),closeState);
}
private boolean canSend()
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/DataInfo.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/DataInfo.java
index db93566..4036401 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/DataInfo.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/DataInfo.java
@@ -162,7 +162,7 @@ public abstract class DataInfo
/**
* <p>Reads and consumes the content bytes of this {@link DataInfo} into the given {@link ByteBuffer}.</p>
*
- * @param output the {@link ByteBuffer} to copy to bytes into
+ * @param output the {@link ByteBuffer} to copy the bytes into
* @return the number of bytes copied
* @see #consume(int)
*/
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Session.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Session.java
index 75c27d5..5e2e5e2 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Session.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Session.java
@@ -75,7 +75,7 @@ public interface Session
* @see #syn(SynInfo, StreamFrameListener, long, TimeUnit, Handler)
*/
public Future<Stream> syn(SynInfo synInfo, StreamFrameListener listener);
-
+
/**
* <p>Sends asynchronously a SYN_FRAME to create a new {@link Stream SPDY stream}.</p>
* <p>Callers may pass a non-null completion handler to be notified of when the
@@ -90,6 +90,7 @@ public interface Session
*/
public void syn(SynInfo synInfo, StreamFrameListener listener, long timeout, TimeUnit unit, Handler<Stream> handler);
+
/**
* <p>Sends asynchronously a RST_STREAM to abort a stream.</p>
* <p>Callers may use the returned future to wait for the reset to be sent.</p>
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java
index c25bd45..6a13829 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java
@@ -17,6 +17,7 @@
package org.eclipse.jetty.spdy.api;
import java.nio.channels.WritePendingException;
+import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -79,13 +80,36 @@ public interface Stream
* @return the priority of this stream
*/
public byte getPriority();
-
+
/**
* @return the session this stream is associated to
*/
public Session getSession();
/**
+ * <p>Initiate a unidirectional spdy pushstream associated to this stream asynchronously<p>
+ * <p>Callers may use the returned future to get the pushstream once it got created</p>
+ *
+ * @param synInfo the metadata to send on stream creation
+ * @return a future containing the stream once it got established
+ * @see #syn(SynInfo, long, TimeUnit, Handler)
+ */
+ public Future<Stream> syn(SynInfo synInfo);
+
+ /**
+ * <p>Initiate a unidirectional spdy pushstream associated to this stream asynchronously<p>
+ * <p>Callers may pass a non-null completion handler to be notified of when the
+ * pushstream has been established.</p>
+ *
+ * @param synInfo the metadata to send on stream creation
+ * @param timeout the operation's timeout
+ * @param unit the timeout's unit
+ * @param handler the completion handler that gets notified once the pushstream is established
+ * @see #syn(SynInfo)
+ */
+ public void syn(SynInfo synInfo, long timeout, TimeUnit unit, Handler<Stream> handler);
+
+ /**
* <p>Sends asynchronously a SYN_REPLY frame in response to a SYN_STREAM frame.</p>
* <p>Callers may use the returned future to wait for the reply to be actually sent.</p>
*
@@ -162,6 +186,16 @@ public interface Stream
public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler<Void> handler);
/**
+ * @return whether this stream is unidirectional or not
+ */
+ public boolean isUnidirectional();
+
+ /**
+ * @return whether this stream has been reset
+ */
+ public boolean isReset();
+
+ /**
* @return whether this stream has been closed by both parties
* @see #isHalfClosed()
*/
@@ -171,7 +205,6 @@ public interface Stream
* @return whether this stream has been closed by one party only
* @see #isClosed() * @param timeout the timeout for the stream creation
* @param unit the timeout's unit
-
*/
public boolean isHalfClosed();
@@ -196,4 +229,15 @@ public interface Stream
* @see #setAttribute(String, Object)
*/
public Object removeAttribute(String key);
+
+ /**
+ * @return the associated parent stream or null if this is not an associated stream
+ */
+ public Stream getAssociatedStream();
+
+ /**
+ * @return associated child streams or an empty set if no associated streams exist
+ */
+ public Set<Stream> getPushedStreams();
+
}
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/SynInfo.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/SynInfo.java
index 08c18e7..c51a001 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/SynInfo.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/SynInfo.java
@@ -28,11 +28,8 @@ public class SynInfo
* @see #getFlags()
*/
public static final byte FLAG_CLOSE = 1;
- public static final byte FLAG_UNIDIRECTIONAL = 2;
private final boolean close;
- private final boolean unidirectional;
- private final int associatedStreamId;
private final byte priority;
private final Headers headers;
@@ -56,28 +53,28 @@ public class SynInfo
*/
public SynInfo(Headers headers, boolean close)
{
- this(headers, close, false, 0, (byte)0);
+ this(headers, close, (byte)0);
}
/**
- * <p>Creates a {@link ReplyInfo} instance with the given headers and the given close flag,
- * the given unidirectional flag, the given associated stream, and with the given priority.</p>
- *
- * @param headers the {@link Headers}
- * @param close the value of the close flag
- * @param unidirectional the value of the unidirectional flag
- * @param associatedStreamId the associated stream id
- * @param priority the priority
+ * <p>
+ * Creates a {@link ReplyInfo} instance with the given headers, the given close flag and with the given priority.
+ * </p>
+ *
+ * @param headers
+ * the {@link Headers}
+ * @param close
+ * the value of the close flag
+ * @param priority
+ * the priority
*/
- public SynInfo(Headers headers, boolean close, boolean unidirectional, int associatedStreamId, byte priority)
+ public SynInfo(Headers headers, boolean close, byte priority)
{
this.close = close;
- this.unidirectional = unidirectional;
- this.associatedStreamId = associatedStreamId;
this.priority = priority;
this.headers = headers;
}
-
+
/**
* @return the value of the close flag
*/
@@ -87,22 +84,6 @@ public class SynInfo
}
/**
- * @return the value of the unidirectional flag
- */
- public boolean isUnidirectional()
- {
- return unidirectional;
- }
-
- /**
- * @return the associated stream id
- */
- public int getAssociatedStreamId()
- {
- return associatedStreamId;
- }
-
- /**
* @return the priority
*/
public byte getPriority()
@@ -117,17 +98,14 @@ public class SynInfo
{
return headers;
}
-
+
/**
- * @return the close and unidirectional flags as integer
+ * @return the close flag as integer
* @see #FLAG_CLOSE
- * @see #FLAG_UNIDIRECTIONAL
*/
public byte getFlags()
{
- byte flags = isClose() ? FLAG_CLOSE : 0;
- flags += isUnidirectional() ? FLAG_UNIDIRECTIONAL : 0;
- return flags;
+ return isClose() ? FLAG_CLOSE : 0;
}
@Override
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/frames/RstStreamFrame.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/frames/RstStreamFrame.java
index 60cee7c..334b816 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/frames/RstStreamFrame.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/frames/RstStreamFrame.java
@@ -29,17 +29,17 @@ public class RstStreamFrame extends ControlFrame
this.streamId = streamId;
this.statusCode = statusCode;
}
-
+
public int getStreamId()
{
return streamId;
}
-
+
public int getStatusCode()
{
return statusCode;
}
-
+
@Override
public String toString()
{
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/frames/SynStreamFrame.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/frames/SynStreamFrame.java
index 427cb58..1b40895 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/frames/SynStreamFrame.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/frames/SynStreamFrame.java
@@ -16,6 +16,7 @@
package org.eclipse.jetty.spdy.frames;
+import org.eclipse.jetty.spdy.PushSynInfo;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.SynInfo;
@@ -62,7 +63,7 @@ public class SynStreamFrame extends ControlFrame
public boolean isUnidirectional()
{
- return (getFlags() & SynInfo.FLAG_UNIDIRECTIONAL) == SynInfo.FLAG_UNIDIRECTIONAL;
+ return (getFlags() & PushSynInfo.FLAG_UNIDIRECTIONAL) == PushSynInfo.FLAG_UNIDIRECTIONAL;
}
@Override
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/parser/SynStreamBodyParser.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/parser/SynStreamBodyParser.java
index 986b375..f18f0c3 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/parser/SynStreamBodyParser.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/parser/SynStreamBodyParser.java
@@ -19,6 +19,7 @@ package org.eclipse.jetty.spdy.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.CompressionFactory;
+import org.eclipse.jetty.spdy.PushSynInfo;
import org.eclipse.jetty.spdy.StreamException;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.SPDY;
@@ -131,7 +132,7 @@ public class SynStreamBodyParser extends ControlFrameBodyParser
{
byte flags = controlFrameParser.getFlags();
// TODO: can it be both FIN and UNIDIRECTIONAL ?
- if (flags != 0 && flags != SynInfo.FLAG_CLOSE && flags != SynInfo.FLAG_UNIDIRECTIONAL)
+ if (flags != 0 && flags != SynInfo.FLAG_CLOSE && flags != PushSynInfo.FLAG_UNIDIRECTIONAL)
throw new IllegalArgumentException("Invalid flag " + flags + " for frame " + ControlFrameType.SYN_STREAM);
SynStreamFrame frame = new SynStreamFrame(version, flags, streamId, associatedStreamId, priority, new Headers(headers, true));
diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java
new file mode 100644
index 0000000..c999c34
--- /dev/null
+++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java
@@ -0,0 +1,457 @@
+/*
+ * Copyright (c) 2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.eclipse.jetty.spdy;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.eclipse.jetty.spdy.api.DataInfo;
+import org.eclipse.jetty.spdy.api.Handler;
+import org.eclipse.jetty.spdy.api.Headers;
+import org.eclipse.jetty.spdy.api.HeadersInfo;
+import org.eclipse.jetty.spdy.api.RstInfo;
+import org.eclipse.jetty.spdy.api.SPDY;
+import org.eclipse.jetty.spdy.api.Session;
+import org.eclipse.jetty.spdy.api.Stream;
+import org.eclipse.jetty.spdy.api.StreamFrameListener;
+import org.eclipse.jetty.spdy.api.StreamStatus;
+import org.eclipse.jetty.spdy.api.StringDataInfo;
+import org.eclipse.jetty.spdy.api.SynInfo;
+import org.eclipse.jetty.spdy.frames.DataFrame;
+import org.eclipse.jetty.spdy.frames.SynReplyFrame;
+import org.eclipse.jetty.spdy.frames.SynStreamFrame;
+import org.eclipse.jetty.spdy.generator.Generator;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class StandardSessionTest
+{
+ @Mock
+ private ISession sessionMock;
+ private ByteBufferPool bufferPool;
+ private Executor threadPool;
+ private StandardSession session;
+ private Generator generator;
+ private ScheduledExecutorService scheduler;
+ private Headers headers;
+
+ @Before
+ public void setUp() throws Exception
+ {
+ bufferPool = new StandardByteBufferPool();
+ threadPool = Executors.newCachedThreadPool();
+ scheduler = Executors.newSingleThreadScheduledExecutor();
+ generator = new Generator(new StandardByteBufferPool(),new StandardCompressionFactory.StandardCompressor());
+ session = new StandardSession(SPDY.V2,bufferPool,threadPool,scheduler,new TestController(),null,1,null,generator);
+ headers = new Headers();
+ }
+
+ @Test
+ public void testStreamIsRemovedFromSessionWhenReset() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ IStream stream = createStream();
+ assertThatStreamIsInSession(stream);
+ assertThat("stream is not reset",stream.isReset(),is(false));
+ session.rst(new RstInfo(stream.getId(),StreamStatus.STREAM_ALREADY_CLOSED));
+ assertThatStreamIsNotInSession(stream);
+ assertThatStreamIsReset(stream);
+ }
+
+ @Test
+ public void testStreamIsAddedAndRemovedFromSession() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ IStream stream = createStream();
+ assertThatStreamIsInSession(stream);
+ stream.updateCloseState(true,true);
+ session.onControlFrame(new SynReplyFrame(SPDY.V2,SynInfo.FLAG_CLOSE,stream.getId(),null));
+ assertThatStreamIsClosed(stream);
+ assertThatStreamIsNotInSession(stream);
+ }
+
+ @Test
+ public void testStreamIsRemovedWhenHeadersWithCloseFlagAreSent() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ IStream stream = createStream();
+ assertThatStreamIsInSession(stream);
+ stream.updateCloseState(true,false);
+ stream.headers(new HeadersInfo(headers,true));
+ assertThatStreamIsClosed(stream);
+ assertThatStreamIsNotInSession(stream);
+ }
+
+ @Test
+ public void testStreamIsUnidirectional() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ IStream stream = createStream();
+ assertThat("stream is not unidirectional",stream.isUnidirectional(),not(true));
+ Stream pushStream = createPushStream(stream);
+ assertThat("pushStream is unidirectional",pushStream.isUnidirectional(),is(true));
+ }
+
+ @Test
+ public void testPushStreamCreation() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ Stream stream = createStream();
+ IStream pushStream = createPushStream(stream);
+ assertThat("Push stream must be associated to the first stream created",pushStream.getAssociatedStream().getId(),is(stream.getId()));
+ assertThat("streamIds need to be monotonic",pushStream.getId(),greaterThan(stream.getId()));
+ }
+
+ @Test
+ public void testPushStreamIsNotClosedWhenAssociatedStreamIsClosed() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ IStream stream = createStream();
+ Stream pushStream = createPushStream(stream);
+ assertThatStreamIsNotHalfClosed(stream);
+ assertThatStreamIsNotClosed(stream);
+ assertThatPushStreamIsHalfClosed(pushStream);
+ assertThatPushStreamIsNotClosed(pushStream);
+
+ stream.updateCloseState(true,true);
+ assertThatStreamIsHalfClosed(stream);
+ assertThatStreamIsNotClosed(stream);
+ assertThatPushStreamIsHalfClosed(pushStream);
+ assertThatPushStreamIsNotClosed(pushStream);
+
+ session.onControlFrame(new SynReplyFrame(SPDY.V2,SynInfo.FLAG_CLOSE,stream.getId(),null));
+ assertThatStreamIsClosed(stream);
+ assertThatPushStreamIsNotClosed(pushStream);
+ }
+
+ @Test
+ public void testCreatePushStreamOnClosedStream() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ IStream stream = createStream();
+ stream.updateCloseState(true,true);
+ assertThatStreamIsHalfClosed(stream);
+ stream.updateCloseState(true,false);
+ assertThatStreamIsClosed(stream);
+ createPushStreamAndMakeSureItFails(stream);
+ }
+
+ private void createPushStreamAndMakeSureItFails(IStream stream) throws InterruptedException
+ {
+ final CountDownLatch failedLatch = new CountDownLatch(1);
+ SynInfo synInfo = new SynInfo(headers,false,stream.getPriority());
+ stream.syn(synInfo,5,TimeUnit.SECONDS,new Handler<Stream>()
+ {
+ @Override
+ public void completed(Stream context)
+ {
+ }
+
+ @Override
+ public void failed(Throwable x)
+ {
+ failedLatch.countDown();
+ }
+ });
+ assertThat("pushStream creation failed",failedLatch.await(5,TimeUnit.SECONDS),is(true));
+ }
+
+ @Test
+ public void testPushStreamIsAddedAndRemovedFromParentAndSessionWhenClosed() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ IStream stream = createStream();
+ IStream pushStream = createPushStream(stream);
+ assertThatPushStreamIsHalfClosed(pushStream);
+ assertThatPushStreamIsInSession(pushStream);
+ assertThatStreamIsAssociatedWithPushStream(stream,pushStream);
+ session.data(pushStream,new StringDataInfo("close",true),5,TimeUnit.SECONDS,null,null);
+ assertThatPushStreamIsClosed(pushStream);
+ assertThatPushStreamIsNotInSession(pushStream);
+ assertThatStreamIsNotAssociatedWithPushStream(stream,pushStream);
+ }
+
+ @Test
+ public void testPushStreamIsRemovedWhenReset() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ IStream stream = createStream();
+ IStream pushStream = (IStream)stream.syn(new SynInfo(false)).get();
+ assertThatPushStreamIsInSession(pushStream);
+ session.rst(new RstInfo(pushStream.getId(),StreamStatus.INVALID_STREAM));
+ assertThatPushStreamIsNotInSession(pushStream);
+ assertThatStreamIsNotAssociatedWithPushStream(stream,pushStream);
+ assertThatStreamIsReset(pushStream);
+ }
+
+ @Test
+ public void testPushStreamWithSynInfoClosedTrue() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ IStream stream = createStream();
+ SynInfo synInfo = new SynInfo(headers,true,stream.getPriority());
+ IStream pushStream = (IStream)stream.syn(synInfo).get(5,TimeUnit.SECONDS);
+ assertThatPushStreamIsHalfClosed(pushStream);
+ assertThatPushStreamIsClosed(pushStream);
+ assertThatStreamIsNotAssociatedWithPushStream(stream,pushStream);
+ assertThatStreamIsNotInSession(pushStream);
+ }
+
+ @Test
+ public void testPushStreamSendHeadersWithCloseFlagIsRemovedFromSessionAndDisassociateFromParent() throws InterruptedException, ExecutionException,
+ TimeoutException
+ {
+ IStream stream = createStream();
+ SynInfo synInfo = new SynInfo(headers,false,stream.getPriority());
+ IStream pushStream = (IStream)stream.syn(synInfo).get(5,TimeUnit.SECONDS);
+ assertThatStreamIsAssociatedWithPushStream(stream,pushStream);
+ assertThatPushStreamIsInSession(pushStream);
+ pushStream.headers(new HeadersInfo(headers,true));
+ assertThatPushStreamIsNotInSession(pushStream);
+ assertThatPushStreamIsHalfClosed(pushStream);
+ assertThatPushStreamIsClosed(pushStream);
+ assertThatStreamIsNotAssociatedWithPushStream(stream,pushStream);
+ }
+
+ @Test
+ public void testCreatedAndClosedListenersAreCalledForNewStream() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ final CountDownLatch createdListenerCalledLatch = new CountDownLatch(1);
+ final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1);
+ session.addListener(new TestStreamListener(createdListenerCalledLatch,closedListenerCalledLatch));
+ IStream stream = createStream();
+ session.onDataFrame(new DataFrame(stream.getId(),SynInfo.FLAG_CLOSE,128),ByteBuffer.allocate(128));
+ stream.data(new StringDataInfo("close",true));
+ assertThat("onStreamCreated listener has been called",createdListenerCalledLatch.await(5,TimeUnit.SECONDS),is(true));
+ assertThatOnStreamClosedListenerHasBeenCalled(closedListenerCalledLatch);
+ }
+
+ @Test
+ public void testListenerIsCalledForResetStream() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1);
+ session.addListener(new TestStreamListener(null,closedListenerCalledLatch));
+ IStream stream = createStream();
+ session.rst(new RstInfo(stream.getId(),StreamStatus.CANCEL_STREAM));
+ assertThatOnStreamClosedListenerHasBeenCalled(closedListenerCalledLatch);
+ }
+
+ @Test
+ public void testCreatedAndClosedListenersAreCalledForNewPushStream() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ final CountDownLatch createdListenerCalledLatch = new CountDownLatch(2);
+ final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1);
+ session.addListener(new TestStreamListener(createdListenerCalledLatch,closedListenerCalledLatch));
+ IStream stream = createStream();
+ IStream pushStream = createPushStream(stream);
+ session.data(pushStream,new StringDataInfo("close",true),5,TimeUnit.SECONDS,null,null);
+ assertThat("onStreamCreated listener has been called twice. Once for the stream and once for the pushStream",
+ createdListenerCalledLatch.await(5,TimeUnit.SECONDS),is(true));
+ assertThatOnStreamClosedListenerHasBeenCalled(closedListenerCalledLatch);
+ }
+
+ @Test
+ public void testListenerIsCalledForResetPushStream() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1);
+ session.addListener(new TestStreamListener(null,closedListenerCalledLatch));
+ IStream stream = createStream();
+ IStream pushStream = createPushStream(stream);
+ session.rst(new RstInfo(pushStream.getId(),StreamStatus.CANCEL_STREAM));
+ assertThatOnStreamClosedListenerHasBeenCalled(closedListenerCalledLatch);
+ }
+
+ private class TestStreamListener extends Session.StreamListener.Adapter
+ {
+ private CountDownLatch createdListenerCalledLatch;
+ private CountDownLatch closedListenerCalledLatch;
+
+ public TestStreamListener(CountDownLatch createdListenerCalledLatch, CountDownLatch closedListenerCalledLatch)
+ {
+ this.createdListenerCalledLatch = createdListenerCalledLatch;
+ this.closedListenerCalledLatch = closedListenerCalledLatch;
+ }
+
+ @Override
+ public void onStreamCreated(Stream stream)
+ {
+ if (createdListenerCalledLatch != null)
+ createdListenerCalledLatch.countDown();
+ super.onStreamCreated(stream);
+ }
+
+ @Override
+ public void onStreamClosed(Stream stream)
+ {
+ if (closedListenerCalledLatch != null)
+ closedListenerCalledLatch.countDown();
+ super.onStreamClosed(stream);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(expected = IllegalStateException.class)
+ public void testSendDataOnHalfClosedStream() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2,SynInfo.FLAG_CLOSE,1,0,(byte)0,null);
+ IStream stream = new StandardStream(synStreamFrame,sessionMock,8184,null);
+ stream.updateCloseState(synStreamFrame.isClose(),true);
+ assertThat("stream is half closed",stream.isHalfClosed(),is(true));
+ stream.data(new StringDataInfo("data on half closed stream",true));
+ verify(sessionMock,never()).data(any(IStream.class),any(DataInfo.class),anyInt(),any(TimeUnit.class),any(Handler.class),any(void.class));
+ }
+
+ @Test
+ @Ignore("In V3 we need to rst the stream if we receive data on a remotely half closed stream.")
+ public void receiveDataOnRemotelyHalfClosedStreamResetsStreamInV3() throws InterruptedException, ExecutionException
+ {
+ IStream stream = (IStream)session.syn(new SynInfo(false),new StreamFrameListener.Adapter()).get();
+ stream.updateCloseState(true,false);
+ assertThat("stream is half closed from remote side",stream.isHalfClosed(),is(true));
+ stream.process(new DataFrame(stream.getId(),(byte)0,256),ByteBuffer.allocate(256));
+ }
+
+ @Test
+ public void testReceiveDataOnRemotelyClosedStreamIsIgnored() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ final CountDownLatch onDataCalledLatch = new CountDownLatch(1);
+ Stream stream = session.syn(new SynInfo(false),new StreamFrameListener.Adapter()
+ {
+ @Override
+ public void onData(Stream stream, DataInfo dataInfo)
+ {
+ onDataCalledLatch.countDown();
+ super.onData(stream,dataInfo);
+ }
+ }).get(5,TimeUnit.SECONDS);
+ session.onControlFrame(new SynReplyFrame(SPDY.V2,SynInfo.FLAG_CLOSE,stream.getId(),headers));
+ session.onDataFrame(new DataFrame(stream.getId(),(byte)0,0),ByteBuffer.allocate(128));
+ assertThat("onData is never called",onDataCalledLatch.await(1,TimeUnit.SECONDS),not(true));
+ }
+
+ private IStream createStream() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ SynInfo synInfo = new SynInfo(headers,false,(byte)0);
+ return (IStream)session.syn(synInfo,new StreamFrameListener.Adapter()).get(5,TimeUnit.SECONDS);
+ }
+
+ private IStream createPushStream(Stream stream) throws InterruptedException, ExecutionException, TimeoutException
+ {
+ SynInfo synInfo = new SynInfo(headers,false,stream.getPriority());
+ return (IStream)stream.syn(synInfo).get(5,TimeUnit.SECONDS);
+ }
+
+ private static class TestController implements Controller<StandardSession.FrameBytes>
+ {
+ @Override
+ public int write(ByteBuffer buffer, Handler<StandardSession.FrameBytes> handler, StandardSession.FrameBytes context)
+ {
+ handler.completed(context);
+ return buffer.remaining();
+ }
+
+ @Override
+ public void close(boolean onlyOutput)
+ {
+ }
+ }
+
+ private void assertThatStreamIsClosed(IStream stream)
+ {
+ assertThat("stream is closed",stream.isClosed(),is(true));
+ }
+
+ private void assertThatStreamIsReset(IStream stream)
+ {
+ assertThat("stream is reset",stream.isReset(),is(true));
+ }
+
+ private void assertThatStreamIsNotInSession(IStream stream)
+ {
+ assertThat("stream is not in session",session.getStreams().contains(stream),not(true));
+ }
+
+ private void assertThatStreamIsInSession(IStream stream)
+ {
+ assertThat("stream is in session",session.getStreams().contains(stream),is(true));
+ }
+
+ private void assertThatStreamIsNotClosed(IStream stream)
+ {
+ assertThat("stream is not closed",stream.isClosed(),not(true));
+ }
+
+ private void assertThatStreamIsNotHalfClosed(IStream stream)
+ {
+ assertThat("stream is not halfClosed",stream.isHalfClosed(),not(true));
+ }
+
+ private void assertThatPushStreamIsNotClosed(Stream pushStream)
+ {
+ assertThat("pushStream is not closed",pushStream.isClosed(),not(true));
+ }
+
+ private void assertThatStreamIsHalfClosed(IStream stream)
+ {
+ assertThat("stream is halfClosed",stream.isHalfClosed(),is(true));
+ }
+
+ private void assertThatStreamIsNotAssociatedWithPushStream(IStream stream, IStream pushStream)
+ {
+ assertThat("pushStream is removed from parent",stream.getPushedStreams().contains(pushStream),not(true));
+ }
+
+ private void assertThatPushStreamIsNotInSession(Stream pushStream)
+ {
+ assertThat("pushStream is not in session",session.getStreams().contains(pushStream.getId()),not(true));
+ }
+
+ private void assertThatPushStreamIsInSession(Stream pushStream)
+ {
+ assertThat("pushStream is in session",session.getStreams().contains(pushStream),is(true));
+ }
+
+ private void assertThatStreamIsAssociatedWithPushStream(IStream stream, Stream pushStream)
+ {
+ assertThat("stream is associated with pushStream",stream.getPushedStreams().contains(pushStream),is(true));
+ }
+
+ private void assertThatPushStreamIsClosed(Stream pushStream)
+ {
+ assertThat("pushStream is closed",pushStream.isClosed(),is(true));
+ }
+
+ private void assertThatPushStreamIsHalfClosed(Stream pushStream)
+ {
+ assertThat("pushStream is half closed ",pushStream.isHalfClosed(),is(true));
+ }
+
+ private void assertThatOnStreamClosedListenerHasBeenCalled(final CountDownLatch closedListenerCalledLatch) throws InterruptedException
+ {
+ assertThat("onStreamClosed listener has been called",closedListenerCalledLatch.await(5,TimeUnit.SECONDS),is(true));
+ }
+}
diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java
new file mode 100644
index 0000000..68a6a95
--- /dev/null
+++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java
@@ -0,0 +1,112 @@
+// ========================================================================
+// Copyright (c) 2009-2009 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+
+
+package org.eclipse.jetty.spdy;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.eclipse.jetty.spdy.api.Handler;
+import org.eclipse.jetty.spdy.api.Stream;
+import org.eclipse.jetty.spdy.api.StreamFrameListener;
+import org.eclipse.jetty.spdy.api.SynInfo;
+import org.eclipse.jetty.spdy.frames.SynStreamFrame;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+
+/* ------------------------------------------------------------ */
+/**
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class StandardStreamTest
+{
+ @Mock private ISession session;
+ @Mock private SynStreamFrame synStreamFrame;
+
+ /**
+ * Test method for {@link org.eclipse.jetty.spdy.StandardStream#syn(org.eclipse.jetty.spdy.api.SynInfo)}.
+ */
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testSyn()
+ {
+ Stream stream = new StandardStream(synStreamFrame,session,0,null);
+ Set<Stream> streams = new HashSet<>();
+ streams.add(stream);
+ when(synStreamFrame.isClose()).thenReturn(false);
+ SynInfo synInfo = new SynInfo(false);
+ when(session.getStreams()).thenReturn(streams);
+ stream.syn(synInfo);
+ verify(session).syn(argThat(new PushSynInfoMatcher(stream.getId(),synInfo)),any(StreamFrameListener.class),anyLong(),any(TimeUnit.class),any(Handler.class));
+ }
+
+ private class PushSynInfoMatcher extends ArgumentMatcher<PushSynInfo>{
+ int associatedStreamId;
+ SynInfo synInfo;
+
+ public PushSynInfoMatcher(int associatedStreamId, SynInfo synInfo)
+ {
+ this.associatedStreamId = associatedStreamId;
+ this.synInfo = synInfo;
+ }
+ @Override
+ public boolean matches(Object argument)
+ {
+ PushSynInfo pushSynInfo = (PushSynInfo)argument;
+ if(pushSynInfo.getAssociatedStreamId() != associatedStreamId){
+ System.out.println("streamIds do not match!");
+ return false;
+ }
+ if(pushSynInfo.isClose() != synInfo.isClose()){
+ System.out.println("isClose doesn't match");
+ return false;
+ }
+ return true;
+ }
+ }
+
+ @Test
+ public void testSynOnClosedStream(){
+ IStream stream = new StandardStream(synStreamFrame,session,0,null);
+ stream.updateCloseState(true,true);
+ stream.updateCloseState(true,false);
+ assertThat("stream expected to be closed",stream.isClosed(),is(true));
+ final CountDownLatch failedLatch = new CountDownLatch(1);
+ stream.syn(new SynInfo(false),1,TimeUnit.SECONDS,new Handler.Adapter<Stream>()
+ {
+ @Override
+ public void failed(Throwable x)
+ {
+ failedLatch.countDown();
+ }
+ });
+ assertThat("PushStream creation failed", failedLatch.getCount(), equalTo(0L));
+ }
+
+}
diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ServerUsageTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ServerUsageTest.java
index f7b3c57..a8bff06 100644
--- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ServerUsageTest.java
+++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ServerUsageTest.java
@@ -99,7 +99,7 @@ public class ServerUsageTest
Session session = stream.getSession();
// Since it's unidirectional, no need to pass the listener
- session.syn(new SynInfo(new Headers(), false, true, stream.getId(), (byte)0), null, 0, TimeUnit.MILLISECONDS, new Handler.Adapter<Stream>()
+ session.syn(new SynInfo(new Headers(), false, (byte)0), null, 0, TimeUnit.MILLISECONDS, new Handler.Adapter<Stream>()
{
@Override
public void completed(Stream pushStream)
diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/frames/RstStreamGenerateParseTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/frames/RstStreamGenerateParseTest.java
index ea3e6d7..378a3fa 100644
--- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/frames/RstStreamGenerateParseTest.java
+++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/frames/RstStreamGenerateParseTest.java
@@ -16,6 +16,12 @@
package org.eclipse.jetty.spdy.frames;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
@@ -38,7 +44,7 @@ public class RstStreamGenerateParseTest
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.control(frame1);
- Assert.assertNotNull(buffer);
+ assertThat("buffer is not null", buffer, not(nullValue()));
TestSPDYParserListener listener = new TestSPDYParserListener();
Parser parser = new Parser(new StandardCompressionFactory().newDecompressor());
@@ -46,13 +52,13 @@ public class RstStreamGenerateParseTest
parser.parse(buffer);
ControlFrame frame2 = listener.getControlFrame();
- Assert.assertNotNull(frame2);
- Assert.assertEquals(ControlFrameType.RST_STREAM, frame2.getType());
+ assertThat("frame2 is not null", frame2, not(nullValue()));
+ assertThat("frame2 is type RST_STREAM",ControlFrameType.RST_STREAM, equalTo(frame2.getType()));
RstStreamFrame rstStream = (RstStreamFrame)frame2;
- Assert.assertEquals(SPDY.V2, rstStream.getVersion());
- Assert.assertEquals(streamId, rstStream.getStreamId());
- Assert.assertEquals(0, rstStream.getFlags());
- Assert.assertEquals(streamStatus, rstStream.getStatusCode());
+ assertThat("rstStream version is SPDY.V2",SPDY.V2, equalTo(rstStream.getVersion()));
+ assertThat("rstStream id is equal to streamId",streamId, equalTo(rstStream.getStreamId()));
+ assertThat("rstStream flags are 0",(byte)0, equalTo(rstStream.getFlags()));
+ assertThat("stream status is equal to rstStream statuscode",streamStatus, is(rstStream.getStatusCode()));
}
@Test
diff --git a/jetty-spdy/spdy-jetty/pom.xml b/jetty-spdy/spdy-jetty/pom.xml
index 08cbaf6..7476bf9 100644
--- a/jetty-spdy/spdy-jetty/pom.xml
+++ b/jetty-spdy/spdy-jetty/pom.xml
@@ -64,8 +64,14 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
+ <scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j-version}</version>
diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ClosedStreamTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ClosedStreamTest.java
new file mode 100644
index 0000000..7d5d6c7
--- /dev/null
+++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ClosedStreamTest.java
@@ -0,0 +1,263 @@
+/*
+ * Copyright (c) 2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.eclipse.jetty.spdy;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.eclipse.jetty.spdy.api.BytesDataInfo;
+import org.eclipse.jetty.spdy.api.DataInfo;
+import org.eclipse.jetty.spdy.api.Headers;
+import org.eclipse.jetty.spdy.api.ReplyInfo;
+import org.eclipse.jetty.spdy.api.SPDY;
+import org.eclipse.jetty.spdy.api.Session;
+import org.eclipse.jetty.spdy.api.SessionStatus;
+import org.eclipse.jetty.spdy.api.Stream;
+import org.eclipse.jetty.spdy.api.StreamFrameListener;
+import org.eclipse.jetty.spdy.api.StringDataInfo;
+import org.eclipse.jetty.spdy.api.SynInfo;
+import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
+import org.eclipse.jetty.spdy.frames.ControlFrame;
+import org.eclipse.jetty.spdy.frames.DataFrame;
+import org.eclipse.jetty.spdy.frames.GoAwayFrame;
+import org.eclipse.jetty.spdy.frames.RstStreamFrame;
+import org.eclipse.jetty.spdy.frames.SynReplyFrame;
+import org.eclipse.jetty.spdy.frames.SynStreamFrame;
+import org.eclipse.jetty.spdy.generator.Generator;
+import org.eclipse.jetty.spdy.parser.Parser;
+import org.eclipse.jetty.spdy.parser.Parser.Listener;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class ClosedStreamTest extends AbstractTest
+{
+ //TODO: Right now it sends a rst as the stream is unknown to the session once it's closed. But according to the spec we probably should just ignore the data?!
+ @Test
+ public void testDataSentOnClosedStreamIsIgnored() throws Exception
+ {
+ ServerSocketChannel server = ServerSocketChannel.open();
+ server.bind(new InetSocketAddress("localhost", 0));
+
+ Session session = startClient(new InetSocketAddress("localhost", server.socket().getLocalPort()), null);
+ final CountDownLatch dataLatch = new CountDownLatch(2);
+ session.syn(new SynInfo(true), new StreamFrameListener.Adapter()
+ {
+ @Override
+ public void onData(Stream stream, DataInfo dataInfo)
+ {
+ dataLatch.countDown();
+ }
+ });
+
+ SocketChannel channel = server.accept();
+ ByteBuffer readBuffer = ByteBuffer.allocate(1024);
+ channel.read(readBuffer);
+ readBuffer.flip();
+ int streamId = readBuffer.getInt(8);
+
+ Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory.StandardCompressor());
+
+ ByteBuffer writeBuffer = generator.control(new SynReplyFrame(SPDY.V2, (byte)0, streamId, new Headers()));
+ channel.write(writeBuffer);
+
+ byte[] bytes = new byte[1];
+ writeBuffer = generator.data(streamId, bytes.length, new BytesDataInfo(bytes, true));
+ channel.write(writeBuffer);
+
+ // Write again to simulate the faulty condition
+ writeBuffer.flip();
+ channel.write(writeBuffer);
+
+ Assert.assertFalse(dataLatch.await(1, TimeUnit.SECONDS));
+
+ writeBuffer = generator.control(new GoAwayFrame(SPDY.V2, 0, SessionStatus.OK.getCode()));
+ channel.write(writeBuffer);
+ channel.shutdownOutput();
+ channel.close();
+
+ server.close();
+ }
+
+ @Test
+ public void testSendDataOnHalfClosedStreamCausesExceptionOnServer() throws Exception
+ {
+ final CountDownLatch replyReceivedLatch = new CountDownLatch(1);
+ final CountDownLatch clientReceivedDataLatch = new CountDownLatch(1);
+ final CountDownLatch exceptionWhenSendingData = new CountDownLatch(1);
+
+ Session clientSession = startClient(startServer(new ServerSessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ stream.reply(new ReplyInfo(true));
+ try
+ {
+ replyReceivedLatch.await(5,TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ try
+ {
+ stream.data(new StringDataInfo("data send after half closed",false));
+ }
+ catch (RuntimeException e)
+ {
+ // we expect an exception here, but we don't want it to be logged
+ exceptionWhenSendingData.countDown();
+ }
+
+ return null;
+ }
+ }),null);
+
+ Stream stream = clientSession.syn(new SynInfo(false),new StreamFrameListener.Adapter()
+ {
+ @Override
+ public void onReply(Stream stream, ReplyInfo replyInfo)
+ {
+ replyReceivedLatch.countDown();
+ super.onReply(stream,replyInfo);
+ }
+
+ @Override
+ public void onData(Stream stream, DataInfo dataInfo)
+ {
+ clientReceivedDataLatch.countDown();
+ super.onData(stream,dataInfo);
+ }
+ }).get();
+ assertThat("reply has been received by client",replyReceivedLatch.await(5,TimeUnit.SECONDS),is(true));
+ assertThat("stream is half closed from server",stream.isHalfClosed(),is(true));
+ assertThat("client has not received any data sent after stream was half closed by server",clientReceivedDataLatch.await(1,TimeUnit.SECONDS),
+ is(false));
+ assertThat("sending data threw an exception",exceptionWhenSendingData.await(5,TimeUnit.SECONDS),is(true));
+ }
+
+ @Test
+ public void testV2ReceiveDataOnHalfClosedStream() throws Exception
+ {
+ final CountDownLatch clientResetReceivedLatch = runReceiveDataOnHalfClosedStream(SPDY.V2);
+ assertThat("server didn't receive data",clientResetReceivedLatch.await(1,TimeUnit.SECONDS),not(true));
+ }
+
+ @Test
+ @Ignore("until v3 is properly implemented")
+ public void testV3ReceiveDataOnHalfClosedStream() throws Exception
+ {
+ final CountDownLatch clientResetReceivedLatch = runReceiveDataOnHalfClosedStream(SPDY.V3);
+ assertThat("server didn't receive data",clientResetReceivedLatch.await(1,TimeUnit.SECONDS),not(true));
+ }
+
+ private CountDownLatch runReceiveDataOnHalfClosedStream(short version) throws Exception, IOException, InterruptedException
+ {
+ final CountDownLatch clientResetReceivedLatch = new CountDownLatch(1);
+ final CountDownLatch serverReplySentLatch = new CountDownLatch(1);
+ final CountDownLatch clientReplyReceivedLatch = new CountDownLatch(1);
+ final CountDownLatch serverDataReceivedLatch = new CountDownLatch(1);
+
+ InetSocketAddress startServer = startServer(new ServerSessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ stream.reply(new ReplyInfo(false));
+ serverReplySentLatch.countDown();
+ try
+ {
+ clientReplyReceivedLatch.await(5,TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ return new StreamFrameListener.Adapter()
+ {
+ @Override
+ public void onData(Stream stream, DataInfo dataInfo)
+ {
+ serverDataReceivedLatch.countDown();
+ super.onData(stream,dataInfo);
+ }
+ };
+ }
+ });
+
+ final SocketChannel socketChannel = SocketChannel.open(startServer);
+ final Generator generator = new Generator(new StandardByteBufferPool(),new StandardCompressionFactory().newCompressor());
+ ByteBuffer synData = generator.control(new SynStreamFrame(version,SynInfo.FLAG_CLOSE,1,0,(byte)0,new Headers()));
+
+ socketChannel.write(synData);
+
+ assertThat("server: syn reply is sent",serverReplySentLatch.await(5,TimeUnit.SECONDS),is(true));
+
+ Parser parser = new Parser(new StandardCompressionFactory.StandardDecompressor());
+ parser.addListener(new Listener.Adapter()
+ {
+ @Override
+ public void onControlFrame(ControlFrame frame)
+ {
+ if (frame instanceof SynReplyFrame)
+ {
+ SynReplyFrame synReplyFrame = (SynReplyFrame)frame;
+ clientReplyReceivedLatch.countDown();
+ int streamId = synReplyFrame.getStreamId();
+ ByteBuffer data = generator.data(streamId,0,new StringDataInfo("data",false));
+ try
+ {
+ socketChannel.write(data);
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ else if (frame instanceof RstStreamFrame)
+ {
+ clientResetReceivedLatch.countDown();
+ }
+ super.onControlFrame(frame);
+ }
+
+ @Override
+ public void onDataFrame(DataFrame frame, ByteBuffer data)
+ {
+ super.onDataFrame(frame,data);
+ }
+ });
+ ByteBuffer response = ByteBuffer.allocate(28);
+ socketChannel.read(response);
+ response.flip();
+ parser.parse(response);
+
+ assertThat("server didn't receive data",serverDataReceivedLatch.await(1,TimeUnit.SECONDS),not(true));
+ return clientResetReceivedLatch;
+ }
+
+}
diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java
index f1908e7..db2303c 100644
--- a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java
+++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java
@@ -451,7 +451,7 @@ public class FlowControlTest extends AbstractTest
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
- private void expectException(Class<? extends Exception> exception, Callable command)
+ private void expectException(Class<? extends Exception> exception, Callable<DataInfo> command)
{
try
{
diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ProtocolViolationsTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ProtocolViolationsTest.java
index 92e642f..eb75be7 100644
--- a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ProtocolViolationsTest.java
+++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ProtocolViolationsTest.java
@@ -116,19 +116,20 @@ public class ProtocolViolationsTest extends AbstractTest
stream.headers(new HeadersInfo(new Headers(), true));
}
- @Test
- public void testDataSentAfterCloseIsDiscardedByRecipient() throws Exception
+ @Test //TODO: throws an ISException in StandardStream.updateCloseState(). But instead we should send a rst or something to the server probably?!
+ public void testServerClosesStreamTwice() throws Exception
{
ServerSocketChannel server = ServerSocketChannel.open();
server.bind(new InetSocketAddress("localhost", 0));
Session session = startClient(new InetSocketAddress("localhost", server.socket().getLocalPort()), null);
final CountDownLatch dataLatch = new CountDownLatch(2);
- session.syn(new SynInfo(true), new StreamFrameListener.Adapter()
+ session.syn(new SynInfo(false), new StreamFrameListener.Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
+ System.out.println("ondata");
dataLatch.countDown();
}
});
diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/PushStreamTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/PushStreamTest.java
new file mode 100644
index 0000000..2dac9cd
--- /dev/null
+++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/PushStreamTest.java
@@ -0,0 +1,355 @@
+/*
+ * Copyright (c) 2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.eclipse.jetty.spdy;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Exchanger;
+import java.util.concurrent.TimeUnit;
+
+import org.eclipse.jetty.spdy.api.BytesDataInfo;
+import org.eclipse.jetty.spdy.api.DataInfo;
+import org.eclipse.jetty.spdy.api.Handler;
+import org.eclipse.jetty.spdy.api.ReplyInfo;
+import org.eclipse.jetty.spdy.api.Session;
+import org.eclipse.jetty.spdy.api.SessionFrameListener;
+import org.eclipse.jetty.spdy.api.Stream;
+import org.eclipse.jetty.spdy.api.StreamFrameListener;
+import org.eclipse.jetty.spdy.api.StringDataInfo;
+import org.eclipse.jetty.spdy.api.SynInfo;
+import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
+import org.junit.Test;
+
+public class PushStreamTest extends AbstractTest
+{
+
+ @Test
+ public void testSynPushStream() throws Exception
+ {
+ final CountDownLatch pushStreamSynLatch = new CountDownLatch(1);
+
+ Session clientSession = startClient(startServer(new ServerSessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ stream.reply(new ReplyInfo(false));
+ stream.syn(new SynInfo(false));
+ return null;
+ }
+ }),new SessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ pushStreamSynLatch.countDown();
+ stream.reply(new ReplyInfo(false));
+ return super.onSyn(stream,synInfo);
+ }
+ });
+
+ clientSession.syn(new SynInfo(false),null).get();
+ assertThat("onSyn has been called",pushStreamSynLatch.await(5,TimeUnit.SECONDS),is(true));
+ }
+
+ @Test
+ public void testSendDataOnPushStreamAfterAssociatedStreamIsClosed() throws Exception
+ {
+ final Exchanger<Stream> streamExchanger = new Exchanger<>();
+ final CountDownLatch pushStreamSynLatch = new CountDownLatch(1);
+ final CyclicBarrier replyBarrier = new CyclicBarrier(3);
+ final CyclicBarrier closeBarrier = new CyclicBarrier(3);
+ final CountDownLatch streamDataSent = new CountDownLatch(2);
+ final CountDownLatch pushStreamDataReceived = new CountDownLatch(2);
+ final CountDownLatch exceptionCountDownLatch = new CountDownLatch(1);
+
+ Session clientSession = startClient(startServer(new ServerSessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ stream.reply(new ReplyInfo(false));
+ try
+ {
+ replyBarrier.await(5,TimeUnit.SECONDS);
+ return new StreamFrameListener.Adapter()
+ {
+ @Override
+ public void onData(Stream stream, DataInfo dataInfo)
+ {
+ try
+ {
+ if (dataInfo.isClose())
+ {
+ stream.data(new StringDataInfo("close stream",true));
+ closeBarrier.await(5,TimeUnit.SECONDS);
+ }
+ streamDataSent.countDown();
+ if (pushStreamDataReceived.getCount() == 2)
+ {
+ Stream pushStream = stream.syn(new SynInfo(false)).get();
+ streamExchanger.exchange(pushStream,5,TimeUnit.SECONDS);
+ }
+ }
+ catch (Exception e)
+ {
+ exceptionCountDownLatch.countDown();
+ }
+ }
+ };
+ }
+ catch (Exception e)
+ {
+ exceptionCountDownLatch.countDown();
+ throw new IllegalStateException(e);
+ }
+ }
+
+ }),new SessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ pushStreamSynLatch.countDown();
+ stream.reply(new ReplyInfo(false));
+ return new StreamFrameListener.Adapter()
+ {
+ @Override
+ public void onData(Stream stream, DataInfo dataInfo)
+ {
+ pushStreamDataReceived.countDown();
+ super.onData(stream,dataInfo);
+ }
+ };
+ }
+ });
+
+ Stream stream = clientSession.syn(new SynInfo(false),new StreamFrameListener.Adapter()
+ {
+ @Override
+ public void onReply(Stream stream, ReplyInfo replyInfo)
+ {
+ try
+ {
+ replyBarrier.await(5,TimeUnit.SECONDS);
+ }
+ catch (Exception e)
+ {
+ exceptionCountDownLatch.countDown();
+ }
+ }
+
+ @Override
+ public void onData(Stream stream, DataInfo dataInfo)
+ {
+ try
+ {
+ closeBarrier.await(5,TimeUnit.SECONDS);
+ }
+ catch (Exception e)
+ {
+ exceptionCountDownLatch.countDown();
+ }
+ }
+ }).get();
+
+ replyBarrier.await(5,TimeUnit.SECONDS);
+ stream.data(new StringDataInfo("client data",false));
+ Stream pushStream = streamExchanger.exchange(null,5,TimeUnit.SECONDS);
+ pushStream.data(new StringDataInfo("first push data frame",false));
+ // nasty, but less complex than using another cyclicBarrier for example
+ while (pushStreamDataReceived.getCount() != 1)
+ Thread.sleep(1);
+ stream.data(new StringDataInfo("client close",true));
+ closeBarrier.await(5,TimeUnit.SECONDS);
+ assertThat("stream is closed",stream.isClosed(),is(true));
+ pushStream.data(new StringDataInfo("second push data frame while associated stream has been closed already",false));
+ assertThat("2 pushStream data frames have been received.",pushStreamDataReceived.await(5,TimeUnit.SECONDS),is(true));
+ assertThat("2 data frames have been sent",streamDataSent.await(5,TimeUnit.SECONDS),is(true));
+ assertThatNoExceptionOccured(exceptionCountDownLatch);
+ }
+
+ @Test
+ public void testSynPushStreamOnClosedStream() throws Exception
+ {
+ final CountDownLatch pushStreamFailedLatch = new CountDownLatch(1);
+
+ Session clientSession = startClient(startServer(new ServerSessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ stream.reply(new ReplyInfo(true));
+ stream.syn(new SynInfo(false),1,TimeUnit.SECONDS,new Handler.Adapter<Stream>()
+ {
+ @Override
+ public void failed(Throwable x)
+ {
+ pushStreamFailedLatch.countDown();
+ }
+ });
+ return super.onSyn(stream,synInfo);
+ }
+ }),new SessionFrameListener.Adapter());
+
+ clientSession.syn(new SynInfo(true),null);
+ assertThat("pushStream syn has failed",pushStreamFailedLatch.await(5,TimeUnit.SECONDS),is(true));
+ }
+
+ @Test
+ public void testSendBigDataOnPushStreamWhenAssociatedStreamIsClosed() throws Exception
+ {
+ final CountDownLatch streamClosedLatch = new CountDownLatch(1);
+ final CountDownLatch allDataReceived = new CountDownLatch(1);
+ final CountDownLatch exceptionCountDownLatch = new CountDownLatch(1);
+ final Exchanger<ByteBuffer> exchanger = new Exchanger<>();
+ final int dataSizeInBytes = 1024 * 1024 * 1;
+ final byte[] transferBytes = createHugeByteArray(dataSizeInBytes);
+
+ Session clientSession = startClient(startServer(new ServerSessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ try
+ {
+ Stream pushStream = stream.syn(new SynInfo(false)).get();
+ stream.reply(new ReplyInfo(true));
+ // wait until stream is closed
+ streamClosedLatch.await(5,TimeUnit.SECONDS);
+ pushStream.data(new BytesDataInfo(transferBytes,true));
+ return null;
+ }
+ catch (Exception e)
+ {
+ exceptionCountDownLatch.countDown();
+ throw new IllegalStateException(e);
+ }
+ }
+ }),new SessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ return new StreamFrameListener.Adapter()
+ {
+ ByteBuffer receivedBytes = ByteBuffer.allocate(dataSizeInBytes);
+
+ @Override
+ public void onData(Stream stream, DataInfo dataInfo)
+ {
+ dataInfo.consumeInto(receivedBytes);
+ if (dataInfo.isClose())
+ {
+ allDataReceived.countDown();
+ try
+ {
+ receivedBytes.flip();
+ exchanger.exchange(receivedBytes.slice(),5,TimeUnit.SECONDS);
+ }
+ catch (Exception e)
+ {
+ exceptionCountDownLatch.countDown();
+ }
+ }
+ }
+ };
+ }
+ });
+
+ Stream stream = clientSession.syn(new SynInfo(true),new StreamFrameListener.Adapter()
+ {
+ @Override
+ public void onReply(Stream stream, ReplyInfo replyInfo)
+ {
+ streamClosedLatch.countDown();
+ super.onReply(stream,replyInfo);
+ }
+ }).get();
+
+ ByteBuffer receivedBytes = exchanger.exchange(null,5,TimeUnit.SECONDS);
+
+ assertThat("received byte array is the same as transferred byte array",Arrays.equals(transferBytes,receivedBytes.array()),is(true));
+ assertThat("onReply has been called to close the stream",streamClosedLatch.await(5,TimeUnit.SECONDS),is(true));
+ assertThat("stream is closed",stream.isClosed(),is(true));
+ assertThat("all data has been received",allDataReceived.await(20,TimeUnit.SECONDS),is(true));
+ assertThatNoExceptionOccured(exceptionCountDownLatch);
+ }
+
+ private byte[] createHugeByteArray(int sizeInBytes)
+ {
+ byte[] bytes = new byte[sizeInBytes];
+ new Random().nextBytes(bytes);
+ return bytes;
+ }
+
+ @Test
+ public void testOddEvenStreamIds() throws Exception
+ {
+ final CountDownLatch pushStreamIdIsEvenLatch = new CountDownLatch(3);
+
+ Session clientSession = startClient(startServer(new ServerSessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ stream.syn(new SynInfo(false));
+ return null;
+ }
+ }),new SessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ stream.reply(new ReplyInfo(false));
+ assertStreamIdIsEven(stream);
+ pushStreamIdIsEvenLatch.countDown();
+ return super.onSyn(stream,synInfo);
+ }
+ });
+
+ Stream stream = clientSession.syn(new SynInfo(false),null).get();
+ Stream stream2 = clientSession.syn(new SynInfo(false),null).get();
+ Stream stream3 = clientSession.syn(new SynInfo(false),null).get();
+ assertStreamIdIsOdd(stream);
+ assertStreamIdIsOdd(stream2);
+ assertStreamIdIsOdd(stream3);
+
+ assertThat("all pushStreams had even ids",pushStreamIdIsEvenLatch.await(5,TimeUnit.SECONDS),is(true));
+ }
+
+ private void assertStreamIdIsEven(Stream stream)
+ {
+ assertThat("streamId is odd",stream.getId() % 2,is(0));
+ }
+
+ private void assertStreamIdIsOdd(Stream stream)
+ {
+ assertThat("streamId is odd",stream.getId() % 2,is(1));
+ }
+
+ private void assertThatNoExceptionOccured(final CountDownLatch exceptionCountDownLatch) throws InterruptedException
+ {
+ assertThat("No exception occured", exceptionCountDownLatch.await(1,TimeUnit.SECONDS),is(false));
+ }
+}
diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ResetStreamTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ResetStreamTest.java
index c6c0a2b..654bb5f 100644
--- a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ResetStreamTest.java
+++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ResetStreamTest.java
@@ -1,5 +1,11 @@
package org.eclipse.jetty.spdy;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -15,7 +21,6 @@ import org.eclipse.jetty.spdy.api.StreamStatus;
import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
-import org.junit.Assert;
import org.junit.Test;
public class ResetStreamTest extends AbstractTest
@@ -23,12 +28,12 @@ public class ResetStreamTest extends AbstractTest
@Test
public void testResetStreamIsRemoved() throws Exception
{
- Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()), null);
+ Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()),null);
- Stream stream = session.syn(new SynInfo(false), null).get(5, TimeUnit.SECONDS);
- session.rst(new RstInfo(stream.getId(), StreamStatus.CANCEL_STREAM)).get(5, TimeUnit.SECONDS);
+ Stream stream = session.syn(new SynInfo(false),null).get(5,TimeUnit.SECONDS);
+ session.rst(new RstInfo(stream.getId(),StreamStatus.CANCEL_STREAM)).get(5,TimeUnit.SECONDS);
- Assert.assertEquals(0, session.getStreams().size());
+ assertEquals("session expected to contain 0 streams",0,session.getStreams().size());
}
@Test
@@ -44,11 +49,11 @@ public class ResetStreamTest extends AbstractTest
{
Session serverSession = stream.getSession();
serverSessionRef.set(serverSession);
- serverSession.rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM));
+ serverSession.rst(new RstInfo(stream.getId(),StreamStatus.REFUSED_STREAM));
synLatch.countDown();
return null;
}
- }), new SessionFrameListener.Adapter()
+ }),new SessionFrameListener.Adapter()
{
@Override
public void onRst(Session session, RstInfo rstInfo)
@@ -57,16 +62,17 @@ public class ResetStreamTest extends AbstractTest
}
});
- clientSession.syn(new SynInfo(false), null).get(5, TimeUnit.SECONDS);
+ Stream stream = clientSession.syn(new SynInfo(false),null).get(5,TimeUnit.SECONDS);
- Assert.assertTrue(synLatch.await(5, TimeUnit.SECONDS));
+ assertTrue("syncLatch didn't count down",synLatch.await(5,TimeUnit.SECONDS));
Session serverSession = serverSessionRef.get();
- Assert.assertEquals(0, serverSession.getStreams().size());
+ assertEquals("serverSession expected to contain 0 streams",0,serverSession.getStreams().size());
- Assert.assertTrue(rstLatch.await(5, TimeUnit.SECONDS));
+ assertTrue("rstLatch didn't count down",rstLatch.await(5,TimeUnit.SECONDS));
// Need to sleep a while to give the chance to the implementation to remove the stream
TimeUnit.SECONDS.sleep(1);
- Assert.assertEquals(0, clientSession.getStreams().size());
+ assertTrue("stream is expected to be reset",stream.isReset());
+ assertEquals("clientSession expected to contain 0 streams",0,clientSession.getStreams().size());
}
@Test
@@ -83,8 +89,8 @@ public class ResetStreamTest extends AbstractTest
try
{
// Refuse the stream, we must ignore data frames
- Assert.assertTrue(synLatch.await(5, TimeUnit.SECONDS));
- stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM));
+ assertTrue(synLatch.await(5,TimeUnit.SECONDS));
+ stream.getSession().rst(new RstInfo(stream.getId(),StreamStatus.REFUSED_STREAM));
return new StreamFrameListener.Adapter()
{
@Override
@@ -100,7 +106,7 @@ public class ResetStreamTest extends AbstractTest
return null;
}
}
- }), new SessionFrameListener.Adapter()
+ }),new SessionFrameListener.Adapter()
{
@Override
public void onRst(Session session, RstInfo rstInfo)
@@ -109,8 +115,8 @@ public class ResetStreamTest extends AbstractTest
}
});
- Stream stream = session.syn(new SynInfo(false), null).get(5, TimeUnit.SECONDS);
- stream.data(new StringDataInfo("data", true), 5, TimeUnit.SECONDS, new Handler.Adapter<Void>()
+ Stream stream = session.syn(new SynInfo(false),null).get(5,TimeUnit.SECONDS);
+ stream.data(new StringDataInfo("data",true),5,TimeUnit.SECONDS,new Handler.Adapter<Void>()
{
@Override
public void completed(Void context)
@@ -119,7 +125,60 @@ public class ResetStreamTest extends AbstractTest
}
});
- Assert.assertTrue(rstLatch.await(5, TimeUnit.SECONDS));
- Assert.assertFalse(dataLatch.await(1, TimeUnit.SECONDS));
+ assertTrue("rstLatch didn't count down",rstLatch.await(5,TimeUnit.SECONDS));
+ assertTrue("stream is expected to be reset",stream.isReset());
+ assertFalse("dataLatch shouln't be count down",dataLatch.await(1,TimeUnit.SECONDS));
}
+
+ @Test
+ public void testResetAfterServerReceivedFirstDataFrameAndSecondDataFrameFails() throws Exception
+ {
+ final CountDownLatch synLatch = new CountDownLatch(1);
+ final CountDownLatch dataLatch = new CountDownLatch(1);
+ final CountDownLatch rstLatch = new CountDownLatch(1);
+ final CountDownLatch failLatch = new CountDownLatch(1);
+ Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ synLatch.countDown();
+ return new StreamFrameListener.Adapter()
+ {
+ @Override
+ public void onData(Stream stream, DataInfo dataInfo)
+ {
+ dataLatch.countDown();
+ stream.getSession().rst(new RstInfo(stream.getId(),StreamStatus.REFUSED_STREAM));
+ }
+ };
+ }
+ }),new SessionFrameListener.Adapter()
+ {
+ @Override
+ public void onRst(Session session, RstInfo rstInfo)
+ {
+ rstLatch.countDown();
+ }
+ });
+
+ Stream stream = session.syn(new SynInfo(false),null).get(5,TimeUnit.SECONDS);
+ assertThat("syn is received by server", synLatch.await(5,TimeUnit.SECONDS),is(true));
+ stream.data(new StringDataInfo("data",false),5,TimeUnit.SECONDS,null);
+ assertThat("stream is reset",rstLatch.await(5,TimeUnit.SECONDS),is(true));
+ stream.data(new StringDataInfo("2nd dataframe",false),5L,TimeUnit.SECONDS,new Handler.Adapter<Void>()
+ {
+ @Override
+ public void failed(Throwable x)
+ {
+ failLatch.countDown();
+ }
+ });
+
+ assertThat("2nd data call failed",failLatch.await(5,TimeUnit.SECONDS),is(true));
+ assertThat("stream is reset",stream.isReset(),is(true));
+ }
+
+ // TODO: If server already received 2nd dataframe after it rst, it should ignore it. Not easy to do.
+
}
diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SSLEngineLeakTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SSLEngineLeakTest.java
index cdb9a7e..2e58551 100644
--- a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SSLEngineLeakTest.java
+++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SSLEngineLeakTest.java
@@ -36,6 +36,7 @@ public class SSLEngineLeakTest extends AbstractTest
Field field = NextProtoNego.class.getDeclaredField("objects");
field.setAccessible(true);
+ @SuppressWarnings("unchecked")
Map<Object, NextProtoNego.Provider> objects = (Map<Object, NextProtoNego.Provider>)field.get(null);
int initialSize = objects.size();
diff --git a/pom.xml b/pom.xml
index be0f93a..8154388 100644
--- a/pom.xml
+++ b/pom.xml
@@ -498,6 +498,11 @@
<version>4.8.1</version>
</dependency>
<dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.1</version>
+ </dependency>
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>1.8.5</version>