| author | Thomas Becker | 2012-04-27 11:44:22 (EDT) |
|---|---|---|
| committer | Thomas Becker | 2012-04-27 11:44:22 (EDT) |
| commit | f362fb0b48eb9eb695c31bb21920d651fb91adee (patch) (side-by-side diff) | |
| tree | a0b81c30f8db895b007c950e9891cf74f4dcd712 | |
| parent | 011ae1f447073aa8dae51028b1368decfa0446fa (diff) | |
| download | org.eclipse.jetty.project-f362fb0b48eb9eb695c31bb21920d651fb91adee.zip org.eclipse.jetty.project-f362fb0b48eb9eb695c31bb21920d651fb91adee.tar.gz org.eclipse.jetty.project-f362fb0b48eb9eb695c31bb21920d651fb91adee.tar.bz2 | |
spdy push implementationrefs/changes/46/5746/1
Change-Id: Ibca8ce444588785f13c9890370422a2dc4b149ac
25 files changed, 1746 insertions, 272 deletions
diff --git a/jetty-spdy/spdy-core/pom.xml b/jetty-spdy/spdy-core/pom.xml index d6d4c0f..a933132 100644 --- a/jetty-spdy/spdy-core/pom.xml +++ b/jetty-spdy/spdy-core/pom.xml @@ -6,9 +6,9 @@ <version>7.6.4-SNAPSHOT</version> </parent> - <modelVersion>4.0.0</modelVersion> - <artifactId>spdy-core</artifactId> - <name>Jetty :: SPDY :: Core</name> + <modelVersion>4.0.0</modelVersion> + <artifactId>spdy-core</artifactId> + <name>Jetty :: SPDY :: Core</name> <dependencies> <dependency> @@ -16,10 +16,21 @@ <artifactId>jetty-util</artifactId> <version>${project.version}</version> </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/IStream.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/IStream.java index 2291cf7..07934c1 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/IStream.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/IStream.java @@ -82,7 +82,7 @@ public interface IStream extends Stream public void process(ControlFrame frame); /** - * <p>Processes the give data frame along with the given byte buffer, + * <p>Processes the given data frame along with the given byte buffer, * for example by updating the stream's state or by calling listeners.</p> * * @param frame the data frame to process @@ -90,4 +90,26 @@ public interface IStream extends Stream * @see #process(ControlFrame) */ public void process(DataFrame frame, ByteBuffer data); + + /** + * <p>Associate the given {@link IStream} to this {@link IStream}.</p> + * + * @param stream the stream to associate with this stream + */ + public void associate(IStream stream); + + /** + * <p>remove the given associated {@link IStream} from this stream</p> + * + * @param stream the stream to be removed + */ + public void disassociate(IStream stream); + + /** + * <p>Overrides Stream.getAssociatedStream() to return an instance of IStream instead of Stream + * + * @see Stream#getAssociatedStream() + */ + @Override + public IStream getAssociatedStream(); } diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/PushSynInfo.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/PushSynInfo.java new file mode 100644 index 0000000..a460d54 --- a/dev/null +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/PushSynInfo.java @@ -0,0 +1,41 @@ +package org.eclipse.jetty.spdy; + +import org.eclipse.jetty.spdy.api.SynInfo; + +/* ------------------------------------------------------------ */ +/** + * <p>A subclass container of {@link SynInfo} for unidirectional streams</p> + */ +public class PushSynInfo extends SynInfo +{ + public static final byte FLAG_UNIDIRECTIONAL = 2; + + private int associatedStreamId; + + public PushSynInfo(int associatedStreamId, SynInfo synInfo){ + super(synInfo.getHeaders(), synInfo.isClose(), synInfo.getPriority()); + this.associatedStreamId = associatedStreamId; + } + + /** + * @return the close and unidirectional flags as integer + * @see #FLAG_CLOSE + * @see #FLAG_UNIDIRECTIONAL + */ + @Override + public byte getFlags() + { + byte flags = super.getFlags(); + flags += FLAG_UNIDIRECTIONAL; + return flags; + } + + /** + * @return the id of the associated stream + */ + public int getAssociatedStreamId() + { + return associatedStreamId; + } + +} diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/SessionException.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/SessionException.java index fa9a55e..3e0c195 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/SessionException.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/SessionException.java @@ -20,6 +20,7 @@ import org.eclipse.jetty.spdy.api.SessionStatus; public class SessionException extends RuntimeException { + private final SessionStatus sessionStatus; public SessionException(SessionStatus sessionStatus) diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java index 8b1a53d..fa89c6e 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java @@ -95,7 +95,8 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand private boolean flushing; private volatile int windowSize = 65536; - public StandardSession(short version, ByteBufferPool bufferPool, Executor threadPool, ScheduledExecutorService scheduler, Controller<FrameBytes> controller, IdleListener idleListener, int initialStreamId, SessionFrameListener listener, Generator generator) + public StandardSession(short version, ByteBufferPool bufferPool, Executor threadPool, ScheduledExecutorService scheduler, + Controller<FrameBytes> controller, IdleListener idleListener, int initialStreamId, SessionFrameListener listener, Generator generator) { this.version = version; this.bufferPool = bufferPool; @@ -109,6 +110,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand this.generator = generator; } + @Override public short getVersion() { return version; @@ -130,7 +132,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand public Future<Stream> syn(SynInfo synInfo, StreamFrameListener listener) { Promise<Stream> result = new Promise<>(); - syn(synInfo, listener, 0, TimeUnit.MILLISECONDS, result); + syn(synInfo,listener,0,TimeUnit.MILLISECONDS,result); return result; } @@ -143,20 +145,18 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand // have stream3 hit the network before stream1, not only to comply with the spec // but also because the compression context for the headers would be wrong, as the // frame with a compression history will come before the first compressed frame. + int associatedStreamId = 0; + if (synInfo instanceof PushSynInfo) + { + associatedStreamId = ((PushSynInfo)synInfo).getAssociatedStreamId(); + } + synchronized (this) { - if (synInfo.isUnidirectional()) - { - // TODO: unidirectional functionality - throw new UnsupportedOperationException(); - } - else - { - int streamId = streamIds.getAndAdd(2); - SynStreamFrame synStream = new SynStreamFrame(version, synInfo.getFlags(), streamId, 0, synInfo.getPriority(), synInfo.getHeaders()); - IStream stream = createStream(synStream, listener); - control(stream, synStream, timeout, unit, handler, stream); - } + int streamId = streamIds.getAndAdd(2); + SynStreamFrame synStream = new SynStreamFrame(version,synInfo.getFlags(),streamId,associatedStreamId,synInfo.getPriority(),synInfo.getHeaders()); + IStream stream = createStream(synStream,listener); + control(stream,synStream,timeout,unit,handler,stream); } } @@ -164,7 +164,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand public Future<Void> rst(RstInfo rstInfo) { Promise<Void> result = new Promise<>(); - rst(rstInfo, 0, TimeUnit.MILLISECONDS, result); + rst(rstInfo,0,TimeUnit.MILLISECONDS,result); return result; } @@ -174,16 +174,19 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand // SPEC v3, 2.2.2 if (goAwaySent.get()) { - complete(handler, null); + complete(handler,null); } else { int streamId = rstInfo.getStreamId(); IStream stream = streams.get(streamId); + RstStreamFrame frame = new RstStreamFrame(version,streamId,rstInfo.getStreamStatus().getCode(version)); + control(stream,frame,timeout,unit,handler,null); if (stream != null) + { + stream.process(frame); removeStream(stream); - RstStreamFrame frame = new RstStreamFrame(version, streamId, rstInfo.getStreamStatus().getCode(version)); - control(null, frame, timeout, unit, handler, null); + } } } @@ -191,22 +194,22 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand public Future<Void> settings(SettingsInfo settingsInfo) { Promise<Void> result = new Promise<>(); - settings(settingsInfo, 0, TimeUnit.MILLISECONDS, result); + settings(settingsInfo,0,TimeUnit.MILLISECONDS,result); return result; } @Override public void settings(SettingsInfo settingsInfo, long timeout, TimeUnit unit, Handler<Void> handler) { - SettingsFrame frame = new SettingsFrame(version, settingsInfo.getFlags(), settingsInfo.getSettings()); - control(null, frame, timeout, unit, handler, null); + SettingsFrame frame = new SettingsFrame(version,settingsInfo.getFlags(),settingsInfo.getSettings()); + control(null,frame,timeout,unit,handler,null); } @Override public Future<PingInfo> ping() { Promise<PingInfo> result = new Promise<>(); - ping(0, TimeUnit.MILLISECONDS, result); + ping(0,TimeUnit.MILLISECONDS,result); return result; } @@ -215,8 +218,8 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand { int pingId = pingIds.getAndAdd(2); PingInfo pingInfo = new PingInfo(pingId); - PingFrame frame = new PingFrame(version, pingId); - control(null, frame, timeout, unit, handler, pingInfo); + PingFrame frame = new PingFrame(version,pingId); + control(null,frame,timeout,unit,handler,pingInfo); } @Override @@ -228,28 +231,28 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand private Future<Void> goAway(SessionStatus sessionStatus) { Promise<Void> result = new Promise<>(); - goAway(sessionStatus, 0, TimeUnit.MILLISECONDS, result); + goAway(sessionStatus,0,TimeUnit.MILLISECONDS,result); return result; } @Override public void goAway(long timeout, TimeUnit unit, Handler<Void> handler) { - goAway(SessionStatus.OK, timeout, unit, handler); + goAway(SessionStatus.OK,timeout,unit,handler); } private void goAway(SessionStatus sessionStatus, long timeout, TimeUnit unit, Handler<Void> handler) { - if (goAwaySent.compareAndSet(false, true)) + if (goAwaySent.compareAndSet(false,true)) { if (!goAwayReceived.get()) { - GoAwayFrame frame = new GoAwayFrame(version, lastStreamId.get(), sessionStatus.getCode()); - control(null, frame, timeout, unit, handler, null); + GoAwayFrame frame = new GoAwayFrame(version,lastStreamId.get(),sessionStatus.getCode()); + control(null,frame,timeout,unit,handler,null); return; } } - complete(handler, null); + complete(handler,null); } @Override @@ -263,14 +266,14 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand @Override public void onControlFrame(ControlFrame frame) { - notifyIdle(idleListener, false); + notifyIdle(idleListener,false); try { - logger.debug("Processing {}", frame); + logger.debug("Processing {}",frame); if (goAwaySent.get()) { - logger.debug("Skipped processing of {}", frame); + logger.debug("Skipped processing of {}",frame); return; } @@ -329,21 +332,21 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand } finally { - notifyIdle(idleListener, true); + notifyIdle(idleListener,true); } } @Override public void onDataFrame(DataFrame frame, ByteBuffer data) { - notifyIdle(idleListener, false); + notifyIdle(idleListener,false); try { - logger.debug("Processing {}, {} data bytes", frame, data.remaining()); + logger.debug("Processing {}, {} data bytes",frame,data.remaining()); if (goAwaySent.get()) { - logger.debug("Skipped processing of {}", frame); + logger.debug("Skipped processing of {}",frame); return; } @@ -351,18 +354,18 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand IStream stream = streams.get(streamId); if (stream == null) { - RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM); - logger.debug("Unknown stream {}", rstInfo); + RstInfo rstInfo = new RstInfo(streamId,StreamStatus.INVALID_STREAM); + logger.debug("Unknown stream {}",rstInfo); rst(rstInfo); } else { - processData(stream, frame, data); + processData(stream,frame,data); } } finally { - notifyIdle(idleListener, true); + notifyIdle(idleListener,true); } } @@ -374,7 +377,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand private void processData(IStream stream, DataFrame frame, ByteBuffer data) { - stream.process(frame, data); + stream.process(frame,data); updateLastStreamId(stream); if (stream.isClosed()) removeStream(stream); @@ -383,43 +386,42 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand @Override public void onStreamException(StreamException x) { - notifyOnException(listener, x); - rst(new RstInfo(x.getStreamId(), x.getStreamStatus())); + notifyOnException(listener,x); + rst(new RstInfo(x.getStreamId(),x.getStreamStatus())); } @Override public void onSessionException(SessionException x) { Throwable cause = x.getCause(); - notifyOnException(listener, cause == null ? x : cause); + notifyOnException(listener,cause == null?x:cause); goAway(x.getSessionStatus()); } private void onSyn(SynStreamFrame frame) { - IStream stream = newStream(frame); - stream.updateCloseState(frame.isClose(), false); - logger.debug("Opening {}", stream); - int streamId = frame.getStreamId(); - IStream existing = streams.putIfAbsent(streamId, stream); + IStream stream = newStream(frame,null); + stream.updateCloseState(frame.isClose(),false); + logger.debug("Opening {}",stream); + int streamId = stream.getId(); + IStream existing = streams.putIfAbsent(streamId,stream); if (existing != null) { - RstInfo rstInfo = new RstInfo(streamId, StreamStatus.PROTOCOL_ERROR); - logger.debug("Duplicate stream, {}", rstInfo); + RstInfo rstInfo = new RstInfo(streamId,StreamStatus.PROTOCOL_ERROR); + logger.debug("Duplicate stream, {}",rstInfo); rst(rstInfo); } else { - processSyn(listener, stream, frame); + processSyn(listener,stream,frame); } } private void processSyn(SessionFrameListener listener, IStream stream, SynStreamFrame frame) { stream.process(frame); - SynInfo synInfo = new SynInfo(frame.getHeaders(), frame.isClose(), - frame.isUnidirectional(), frame.getAssociatedStreamId(), frame.getPriority()); - StreamFrameListener streamListener = notifyOnSyn(listener, stream, synInfo); + SynInfo synInfo = new SynInfo(frame.getHeaders(),frame.isClose(),frame.getPriority()); + StreamFrameListener streamListener = notifyOnSyn(listener,stream,synInfo); stream.setStreamFrameListener(streamListener); flush(); // The onSyn() listener may have sent a frame that closed the stream @@ -429,25 +431,36 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand private IStream createStream(SynStreamFrame synStream, StreamFrameListener listener) { - IStream stream = newStream(synStream); - stream.updateCloseState(synStream.isClose(), true); + IStream parentStream = streams.get(synStream.getAssociatedStreamId()); + + IStream stream = newStream(synStream,parentStream); + stream.updateCloseState(synStream.isClose(),true); stream.setStreamFrameListener(listener); - if (streams.putIfAbsent(synStream.getStreamId(), stream) != null) + + if (synStream.isUnidirectional()) + { + // unidirectional streams are implicitly half closed for the client + stream.updateCloseState(true,false); + if (!stream.isClosed()) + parentStream.associate(stream); + } + + if (streams.putIfAbsent(synStream.getStreamId(),stream) != null) { // If this happens we have a bug since we did not check that the peer's streamId was valid // (if we're on server, then the client sent an odd streamId and we did not check that) - throw new IllegalStateException(); + throw new IllegalStateException("StreamId: " + synStream.getStreamId() + " invalid."); } - logger.debug("Created {}", stream); + logger.debug("Created {}",stream); notifyStreamCreated(stream); return stream; } - private IStream newStream(SynStreamFrame frame) + private IStream newStream(SynStreamFrame frame, IStream parentStream) { - return new StandardStream(frame, this, windowSize); + return new StandardStream(frame,this,windowSize,parentStream); } private void notifyStreamCreated(IStream stream) @@ -462,7 +475,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand } catch (Exception x) { - logger.info("Exception while notifying listener " + listener, x); + logger.info("Exception while notifying listener " + listener,x); } } } @@ -470,13 +483,17 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand private void removeStream(IStream stream) { + if (stream.isUnidirectional()) + { + stream.getAssociatedStream().disassociate(stream); + } + IStream removed = streams.remove(stream.getId()); if (removed != null) - { assert removed == stream; - logger.debug("Removed {}", stream); - notifyStreamClosed(stream); - } + + logger.debug("Removed {}",stream); + notifyStreamClosed(stream); } private void notifyStreamClosed(IStream stream) @@ -491,7 +508,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand } catch (Exception x) { - logger.info("Exception while notifying listener " + listener, x); + logger.info("Exception while notifying listener " + listener,x); } } } @@ -503,13 +520,13 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand IStream stream = streams.get(streamId); if (stream == null) { - RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM); - logger.debug("Unknown stream {}", rstInfo); + RstInfo rstInfo = new RstInfo(streamId,StreamStatus.INVALID_STREAM); + logger.debug("Unknown stream {}",rstInfo); rst(rstInfo); } else { - processReply(stream, frame); + processReply(stream,frame); } } @@ -522,15 +539,13 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand private void onRst(RstStreamFrame frame) { - // TODO: implement logic to clean up unidirectional streams associated with this stream - IStream stream = streams.get(frame.getStreamId()); if (stream != null) stream.process(frame); - RstInfo rstInfo = new RstInfo(frame.getStreamId(), StreamStatus.from(frame.getVersion(), frame.getStatusCode())); - notifyOnRst(listener, rstInfo); + RstInfo rstInfo = new RstInfo(frame.getStreamId(),StreamStatus.from(frame.getVersion(),frame.getStatusCode())); + notifyOnRst(listener,rstInfo); flush(); if (stream != null) @@ -546,11 +561,11 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand windowSize = windowSizeSetting.value(); for (IStream stream : streams.values()) stream.updateWindowSize(windowSize - prevWindowSize); - logger.debug("Updated window size to {}", windowSize); + logger.debug("Updated window size to {}",windowSize); } - SettingsInfo settingsInfo = new SettingsInfo(frame.getSettings(), frame.isClearPersisted()); - notifyOnSettings(listener, settingsInfo); + SettingsInfo settingsInfo = new SettingsInfo(frame.getSettings(),frame.isClearPersisted()); + notifyOnSettings(listener,settingsInfo); flush(); } @@ -560,21 +575,21 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand if (pingId % 2 == pingIds.get() % 2) { PingInfo pingInfo = new PingInfo(frame.getPingId()); - notifyOnPing(listener, pingInfo); + notifyOnPing(listener,pingInfo); flush(); } else { - control(null, frame, 0, TimeUnit.MILLISECONDS, null, null); + control(null,frame,0,TimeUnit.MILLISECONDS,null,null); } } private void onGoAway(GoAwayFrame frame) { - if (goAwayReceived.compareAndSet(false, true)) + if (goAwayReceived.compareAndSet(false,true)) { - GoAwayInfo goAwayInfo = new GoAwayInfo(frame.getLastStreamId(), SessionStatus.from(frame.getStatusCode())); - notifyOnGoAway(listener, goAwayInfo); + GoAwayInfo goAwayInfo = new GoAwayInfo(frame.getLastStreamId(),SessionStatus.from(frame.getStatusCode())); + notifyOnGoAway(listener,goAwayInfo); flush(); // SPDY does not require to send back a response to a GO_AWAY. // We notified the application of the last good stream id, @@ -589,13 +604,13 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand IStream stream = streams.get(streamId); if (stream == null) { - RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM); - logger.debug("Unknown stream, {}", rstInfo); + RstInfo rstInfo = new RstInfo(streamId,StreamStatus.INVALID_STREAM); + logger.debug("Unknown stream, {}",rstInfo); rst(rstInfo); } else { - processHeaders(stream, frame); + processHeaders(stream,frame); } } @@ -627,13 +642,13 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand { if (listener != null) { - logger.debug("Invoking callback with {} on listener {}", x, listener); + logger.debug("Invoking callback with {} on listener {}",x,listener); listener.onException(x); } } catch (Exception xx) { - logger.info("Exception while notifying listener " + listener, xx); + logger.info("Exception while notifying listener " + listener,xx); } } @@ -643,13 +658,13 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand { if (listener != null) { - logger.debug("Invoking callback with {} on listener {}", synInfo, listener); - return listener.onSyn(stream, synInfo); + logger.debug("Invoking callback with {} on listener {}",synInfo,listener); + return listener.onSyn(stream,synInfo); } } catch (Exception x) { - logger.info("Exception while notifying listener " + listener, x); + logger.info("Exception while notifying listener " + listener,x); } return null; } @@ -660,13 +675,13 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand { if (listener != null) { - logger.debug("Invoking callback with {} on listener {}", rstInfo, listener); - listener.onRst(this, rstInfo); + logger.debug("Invoking callback with {} on listener {}",rstInfo,listener); + listener.onRst(this,rstInfo); } } catch (Exception x) { - logger.info("Exception while notifying listener " + listener, x); + logger.info("Exception while notifying listener " + listener,x); } } @@ -676,13 +691,13 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand { if (listener != null) { - logger.debug("Invoking callback with {} on listener {}", settingsInfo, listener); - listener.onSettings(this, settingsInfo); + logger.debug("Invoking callback with {} on listener {}",settingsInfo,listener); + listener.onSettings(this,settingsInfo); } } catch (Exception x) { - logger.info("Exception while notifying listener " + listener, x); + logger.info("Exception while notifying listener " + listener,x); } } @@ -692,13 +707,13 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand { if (listener != null) { - logger.debug("Invoking callback with {} on listener {}", pingInfo, listener); - listener.onPing(this, pingInfo); + logger.debug("Invoking callback with {} on listener {}",pingInfo,listener); + listener.onPing(this,pingInfo); } } catch (Exception x) { - logger.info("Exception while notifying listener " + listener, x); + logger.info("Exception while notifying listener " + listener,x); } } @@ -708,13 +723,13 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand { if (listener != null) { - logger.debug("Invoking callback with {} on listener {}", goAwayInfo, listener); - listener.onGoAway(this, goAwayInfo); + logger.debug("Invoking callback with {} on listener {}",goAwayInfo,listener); + listener.onGoAway(this,goAwayInfo); } } catch (Exception x) { - logger.info("Exception while notifying listener " + listener, x); + logger.info("Exception while notifying listener " + listener,x); } } @@ -724,7 +739,11 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand try { if (stream != null) + { updateLastStreamId(stream); + if (stream.isClosed()) + removeStream(stream); + } // Synchronization is necessary, since we may have concurrent replies // and those needs to be generated and enqueued atomically in order @@ -732,10 +751,10 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand synchronized (this) { ByteBuffer buffer = generator.control(frame); - logger.debug("Queuing {} on {}", frame, stream); - ControlFrameBytes<C> frameBytes = new ControlFrameBytes<>(stream, handler, context, frame, buffer); + logger.debug("Queuing {} on {}",frame,stream); + ControlFrameBytes<C> frameBytes = new ControlFrameBytes<>(stream,handler,context,frame,buffer); if (timeout > 0) - frameBytes.task = scheduler.schedule(frameBytes, timeout, unit); + frameBytes.task = scheduler.schedule(frameBytes,timeout,unit); // Special handling for PING frames, they must be sent as soon as possible if (ControlFrameType.PING == frame.getType()) @@ -748,7 +767,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand } catch (Throwable x) { - notifyHandlerFailed(handler, x); + notifyHandlerFailed(handler,x); } } @@ -761,7 +780,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand int oldValue = lastStreamId.get(); while (streamId > oldValue) { - if (lastStreamId.compareAndSet(oldValue, streamId)) + if (lastStreamId.compareAndSet(oldValue,streamId)) break; oldValue = lastStreamId.get(); } @@ -771,10 +790,12 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand @Override public <C> void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Handler<C> handler, C context) { - logger.debug("Queuing {} on {}", dataInfo, stream); - DataFrameBytes<C> frameBytes = new DataFrameBytes<>(stream, handler, context, dataInfo); + logger.debug("Queuing {} on {}",dataInfo,stream); + DataFrameBytes<C> frameBytes = new DataFrameBytes<>(stream,handler,context,dataInfo); if (timeout > 0) - frameBytes.task = scheduler.schedule(frameBytes, timeout, unit); + { + frameBytes.task = scheduler.schedule(frameBytes,timeout,unit); + } append(frameBytes); flush(); } @@ -799,30 +820,35 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand { frameBytes = queue.get(i); - if (stalledStreams != null && stalledStreams.contains(frameBytes.getStream())) + IStream stream = frameBytes.getStream(); + if (stream != null && stalledStreams != null && stalledStreams.contains(stream)) continue; buffer = frameBytes.getByteBuffer(); if (buffer != null) { queue.remove(i); + // TODO: stream.isUniDirectional() check here is only needed for pushStreams which send a syn with close=true --> find a better solution + if (stream != null && !streams.containsValue(stream) && !stream.isUnidirectional()) + frameBytes.fail(new StreamException(stream.getId(),StreamStatus.INVALID_STREAM)); break; } if (stalledStreams == null) stalledStreams = new HashSet<>(); - stalledStreams.add(frameBytes.getStream()); + if (stream != null) + stalledStreams.add(stream); - logger.debug("Flush stalled for {}, {} frame(s) in queue", frameBytes, queue.size()); + logger.debug("Flush stalled for {}, {} frame(s) in queue",frameBytes,queue.size()); } if (buffer == null) return; flushing = true; - logger.debug("Flushing {}, {} frame(s) in queue", frameBytes, queue.size()); + logger.debug("Flushing {}, {} frame(s) in queue",frameBytes,queue.size()); } - write(buffer, this, frameBytes); + write(buffer,this,frameBytes); } private void append(FrameBytes frameBytes) @@ -837,7 +863,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand break; --index; } - queue.add(index, frameBytes); + queue.add(index,frameBytes); } } @@ -853,7 +879,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand break; ++index; } - queue.add(index, frameBytes); + queue.add(index,frameBytes); } } @@ -862,7 +888,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand { synchronized (queue) { - logger.debug("Completed write of {}, {} frame(s) in queue", frameBytes, queue.size()); + logger.debug("Completed write of {}, {} frame(s) in queue",frameBytes,queue.size()); flushing = false; } frameBytes.complete(); @@ -878,8 +904,8 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand { if (controller != null) { - logger.debug("Writing {} frame bytes of {}", buffer.remaining(), frameBytes); - controller.write(buffer, handler, frameBytes); + logger.debug("Writing {} frame bytes of {}",buffer.remaining(),frameBytes); + controller.write(buffer,handler,frameBytes); } } @@ -898,7 +924,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand public void run() { if (handler != null) - notifyHandlerCompleted(handler, context); + notifyHandlerCompleted(handler,context); flush(); } }); @@ -909,7 +935,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand try { if (handler != null) - notifyHandlerCompleted(handler, context); + notifyHandlerCompleted(handler,context); flush(); } finally @@ -927,12 +953,11 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand } catch (Exception x) { - logger.info("Exception while notifying handler " + handler, x); + logger.info("Exception while notifying handler " + handler,x); } } - - private void notifyHandlerFailed(Handler handler, Throwable x) + private <C> void notifyHandlerFailed(Handler<C> handler, Throwable x) { try { @@ -941,7 +966,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand } catch (Exception xx) { - logger.info("Exception while notifying handler " + handler, xx); + logger.info("Exception while notifying handler " + handler,xx); } } @@ -952,6 +977,8 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand public abstract ByteBuffer getByteBuffer(); public abstract void complete(); + + public abstract void fail(Throwable throwable); } private abstract class AbstractFrameBytes<C> implements FrameBytes, Runnable @@ -984,15 +1011,22 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand @Override public void complete() { - ScheduledFuture<?> task = this.task; - if (task != null) - task.cancel(false); - StandardSession.this.complete(handler, context); + cancelTask(); + StandardSession.this.complete(handler,context); + } + + @Override + public void fail(Throwable x) + { + cancelTask(); + notifyHandlerFailed(handler,x); } - protected void fail(Throwable x) + private void cancelTask() { - notifyHandlerFailed(handler, x); + ScheduledFuture<?> task = this.task; + if (task != null) + task.cancel(false); } @Override @@ -1010,7 +1044,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand private ControlFrameBytes(IStream stream, Handler<C> handler, C context, ControlFrame frame, ByteBuffer buffer) { - super(stream, handler, context); + super(stream,handler,context); this.frame = frame; this.buffer = buffer; } @@ -1051,7 +1085,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand private DataFrameBytes(IStream stream, Handler<C> handler, C context, DataInfo dataInfo) { - super(stream, handler, context); + super(stream,handler,context); this.dataInfo = dataInfo; } @@ -1069,7 +1103,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand if (size > windowSize) size = windowSize; - buffer = generator.data(stream.getId(), size, dataInfo); + buffer = generator.data(stream.getId(),size,dataInfo); return buffer; } catch (Throwable x) @@ -1096,7 +1130,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand else { super.complete(); - stream.updateCloseState(dataInfo.isClose(), true); + stream.updateCloseState(dataInfo.isClose(),true); if (stream.isClosed()) removeStream(stream); } @@ -1105,7 +1139,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand @Override public String toString() { - return String.format("DATA bytes @%x available=%d consumed=%d on %s", dataInfo.hashCode(), dataInfo.available(), dataInfo.consumed(), getStream()); + return String.format("DATA bytes @%x available=%d consumed=%d on %s",dataInfo.hashCode(),dataInfo.available(),dataInfo.consumed(),getStream()); } } } diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java index 1de9e1b..098b543 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java @@ -17,7 +17,9 @@ package org.eclipse.jetty.spdy; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -33,6 +35,7 @@ import org.eclipse.jetty.spdy.api.Session; import org.eclipse.jetty.spdy.api.Stream; import org.eclipse.jetty.spdy.api.StreamFrameListener; import org.eclipse.jetty.spdy.api.StreamStatus; +import org.eclipse.jetty.spdy.api.SynInfo; import org.eclipse.jetty.spdy.frames.ControlFrame; import org.eclipse.jetty.spdy.frames.DataFrame; import org.eclipse.jetty.spdy.frames.HeadersFrame; @@ -46,18 +49,22 @@ public class StandardStream implements IStream { private static final Logger logger = Log.getLogger(Stream.class); private final Map<String, Object> attributes = new ConcurrentHashMap<>(); + private final IStream associatedStream; private final SynStreamFrame frame; private final ISession session; private final AtomicInteger windowSize; + private final Set<Stream> pushedStreams = Collections.newSetFromMap(new ConcurrentHashMap<Stream, Boolean>()); private volatile StreamFrameListener listener; private volatile OpenState openState = OpenState.SYN_SENT; private volatile CloseState closeState = CloseState.OPENED; + private volatile boolean reset = false; - public StandardStream(SynStreamFrame frame, ISession session, int windowSize) + public StandardStream(SynStreamFrame frame, ISession session, int windowSize, IStream associatedStream) { this.frame = frame; this.session = session; this.windowSize = new AtomicInteger(windowSize); + this.associatedStream = associatedStream; } @Override @@ -67,6 +74,30 @@ public class StandardStream implements IStream } @Override + public IStream getAssociatedStream() + { + return associatedStream; + } + + @Override + public Set<Stream> getPushedStreams() + { + return pushedStreams; + } + + @Override + public void associate(IStream stream) + { + pushedStreams.add(stream); + } + + @Override + public void disassociate(IStream stream) + { + pushedStreams.remove(stream); + } + + @Override public byte getPriority() { return frame.getPriority(); @@ -82,7 +113,7 @@ public class StandardStream implements IStream public void updateWindowSize(int delta) { int size = windowSize.addAndGet(delta); - logger.debug("Updated window size by {}, new window size {}", delta, size); + logger.debug("Updated window size by {}, new window size {}",delta,size); } @Override @@ -91,14 +122,6 @@ public class StandardStream implements IStream return session; } - public boolean isHalfClosed() - { - CloseState closeState = this.closeState; - return closeState == CloseState.LOCALLY_CLOSED || - closeState == CloseState.REMOTELY_CLOSED || - closeState == CloseState.CLOSED; - } - @Override public Object getAttribute(String key) { @@ -108,7 +131,7 @@ public class StandardStream implements IStream @Override public void setAttribute(String key, Object value) { - attributes.put(key, value); + attributes.put(key,value); } @Override @@ -132,7 +155,7 @@ public class StandardStream implements IStream { case OPENED: { - closeState = local ? CloseState.LOCALLY_CLOSED : CloseState.REMOTELY_CLOSED; + closeState = local?CloseState.LOCALLY_CLOSED:CloseState.REMOTELY_CLOSED; break; } case LOCALLY_CLOSED: @@ -173,16 +196,16 @@ public class StandardStream implements IStream { openState = OpenState.REPLY_RECV; SynReplyFrame synReply = (SynReplyFrame)frame; - updateCloseState(synReply.isClose(), false); - ReplyInfo replyInfo = new ReplyInfo(synReply.getHeaders(), synReply.isClose()); + updateCloseState(synReply.isClose(),false); + ReplyInfo replyInfo = new ReplyInfo(synReply.getHeaders(),synReply.isClose()); notifyOnReply(replyInfo); break; } case HEADERS: { HeadersFrame headers = (HeadersFrame)frame; - updateCloseState(headers.isClose(), false); - HeadersInfo headersInfo = new HeadersInfo(headers.getHeaders(), headers.isClose(), headers.isResetCompression()); + updateCloseState(headers.isClose(),false); + HeadersInfo headersInfo = new HeadersInfo(headers.getHeaders(),headers.isClose(),headers.isResetCompression()); notifyOnHeaders(headersInfo); break; } @@ -194,7 +217,7 @@ public class StandardStream implements IStream } case RST_STREAM: { - // TODO: + reset = true; break; } default: @@ -208,15 +231,24 @@ public class StandardStream implements IStream @Override public void process(DataFrame frame, ByteBuffer data) { + // TODO: in v3 we need to send a rst instead of just ignoring + // ignore data frame if this stream is remotelyClosed already + if (isHalfClosed() && !isLocallyClosed()) + { + logger.debug("Ignoring received dataFrame as this stream is remotely closed: " + frame); + return; + } + if (!canReceive()) { - session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR)); + logger.debug("Can't receive. Sending rst: " + frame); + session.rst(new RstInfo(getId(),StreamStatus.PROTOCOL_ERROR)); return; } - updateCloseState(frame.isClose(), false); + updateCloseState(frame.isClose(),false); - ByteBufferDataInfo dataInfo = new ByteBufferDataInfo(data, frame.isClose(), frame.isCompress()) + ByteBufferDataInfo dataInfo = new ByteBufferDataInfo(data,frame.isClose(),frame.isCompress()) { @Override public void consume(int delta) @@ -243,8 +275,8 @@ public class StandardStream implements IStream { if (delta > 0) { - WindowUpdateFrame windowUpdateFrame = new WindowUpdateFrame(session.getVersion(), getId(), delta); - session.control(this, windowUpdateFrame, 0, TimeUnit.MILLISECONDS, null, null); + WindowUpdateFrame windowUpdateFrame = new WindowUpdateFrame(session.getVersion(),getId(),delta); + session.control(this,windowUpdateFrame,0,TimeUnit.MILLISECONDS,null,null); } } @@ -255,13 +287,13 @@ public class StandardStream implements IStream { if (listener != null) { - logger.debug("Invoking reply callback with {} on listener {}", replyInfo, listener); - listener.onReply(this, replyInfo); + logger.debug("Invoking reply callback with {} on listener {}",replyInfo,listener); + listener.onReply(this,replyInfo); } } catch (Exception x) { - logger.info("Exception while notifying listener " + listener, x); + logger.info("Exception while notifying listener " + listener,x); } } @@ -272,13 +304,13 @@ public class StandardStream implements IStream { if (listener != null) { - logger.debug("Invoking headers callback with {} on listener {}", frame, listener); - listener.onHeaders(this, headersInfo); + logger.debug("Invoking headers callback with {} on listener {}",frame,listener); + listener.onHeaders(this,headersInfo); } } catch (Exception x) { - logger.info("Exception while notifying listener " + listener, x); + logger.info("Exception while notifying listener " + listener,x); } } @@ -289,22 +321,42 @@ public class StandardStream implements IStream { if (listener != null) { - logger.debug("Invoking data callback with {} on listener {}", dataInfo, listener); - listener.onData(this, dataInfo); - logger.debug("Invoked data callback with {} on listener {}", dataInfo, listener); + logger.debug("Invoking data callback with {} on listener {}",dataInfo,listener); + listener.onData(this,dataInfo); + logger.debug("Invoked data callback with {} on listener {}",dataInfo,listener); } } catch (Exception x) { - logger.info("Exception while notifying listener " + listener, x); + logger.info("Exception while notifying listener " + listener,x); } } @Override + public Future<Stream> syn(SynInfo synInfo) + { + Promise<Stream> result = new Promise<>(); + syn(synInfo,0,TimeUnit.MILLISECONDS,result); + return result; + } + + @Override + public void syn(SynInfo synInfo, long timeout, TimeUnit unit, Handler<Stream> handler) + { + if (isClosed() || isReset()) + { + handler.failed(new StreamException(getId(),StreamStatus.STREAM_ALREADY_CLOSED)); + return; + } + PushSynInfo pushSynInfo = new PushSynInfo(getId(),synInfo); + session.syn(pushSynInfo,null,timeout,unit,handler); + } + + @Override public Future<Void> reply(ReplyInfo replyInfo) { Promise<Void> result = new Promise<>(); - reply(replyInfo, 0, TimeUnit.MILLISECONDS, result); + reply(replyInfo,0,TimeUnit.MILLISECONDS,result); return result; } @@ -312,16 +364,16 @@ public class StandardStream implements IStream public void reply(ReplyInfo replyInfo, long timeout, TimeUnit unit, Handler<Void> handler) { openState = OpenState.REPLY_SENT; - updateCloseState(replyInfo.isClose(), true); - SynReplyFrame frame = new SynReplyFrame(session.getVersion(), replyInfo.getFlags(), getId(), replyInfo.getHeaders()); - session.control(this, frame, timeout, unit, handler, null); + updateCloseState(replyInfo.isClose(),true); + SynReplyFrame frame = new SynReplyFrame(session.getVersion(),replyInfo.getFlags(),getId(),replyInfo.getHeaders()); + session.control(this,frame,timeout,unit,handler,null); } @Override public Future<Void> data(DataInfo dataInfo) { Promise<Void> result = new Promise<>(); - data(dataInfo, 0, TimeUnit.MILLISECONDS, result); + data(dataInfo,0,TimeUnit.MILLISECONDS,result); return result; } @@ -330,25 +382,25 @@ public class StandardStream implements IStream { if (!canSend()) { - session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR)); + session.rst(new RstInfo(getId(),StreamStatus.PROTOCOL_ERROR)); throw new IllegalStateException("Protocol violation: cannot send a DATA frame before a SYN_REPLY frame"); } if (isLocallyClosed()) { - session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR)); + session.rst(new RstInfo(getId(),StreamStatus.PROTOCOL_ERROR)); throw new IllegalStateException("Protocol violation: cannot send a DATA frame on a closed stream"); } // Cannot update the close state here, because the data that we send may // be flow controlled, so we need the stream to update the window size. - session.data(this, dataInfo, timeout, unit, handler, null); + session.data(this,dataInfo,timeout,unit,handler,null); } @Override public Future<Void> headers(HeadersInfo headersInfo) { Promise<Void> result = new Promise<>(); - headers(headersInfo, 0, TimeUnit.MILLISECONDS, result); + headers(headersInfo,0,TimeUnit.MILLISECONDS,result); return result; } @@ -357,18 +409,41 @@ public class StandardStream implements IStream { if (!canSend()) { - session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR)); + session.rst(new RstInfo(getId(),StreamStatus.PROTOCOL_ERROR)); throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame before a SYN_REPLY frame"); } if (isLocallyClosed()) { - session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR)); + session.rst(new RstInfo(getId(),StreamStatus.PROTOCOL_ERROR)); throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame on a closed stream"); } - updateCloseState(headersInfo.isClose(), true); - HeadersFrame frame = new HeadersFrame(session.getVersion(), headersInfo.getFlags(), getId(), headersInfo.getHeaders()); - session.control(this, frame, timeout, unit, handler, null); + updateCloseState(headersInfo.isClose(),true); + HeadersFrame frame = new HeadersFrame(session.getVersion(),headersInfo.getFlags(),getId(),headersInfo.getHeaders()); + session.control(this,frame,timeout,unit,handler,null); + } + + @Override + public boolean isUnidirectional() + { + if (associatedStream != null) + return true; + else + return false; + + } + + @Override + public boolean isReset() + { + return reset; + } + + @Override + public boolean isHalfClosed() + { + CloseState closeState = this.closeState; + return closeState == CloseState.LOCALLY_CLOSED || closeState == CloseState.REMOTELY_CLOSED || closeState == CloseState.CLOSED; } @Override @@ -386,7 +461,7 @@ public class StandardStream implements IStream @Override public String toString() { - return String.format("stream=%d v%d %s", getId(), session.getVersion(), closeState); + return String.format("stream=%d v%d %s",getId(),session.getVersion(),closeState); } private boolean canSend() diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/DataInfo.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/DataInfo.java index db93566..4036401 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/DataInfo.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/DataInfo.java @@ -162,7 +162,7 @@ public abstract class DataInfo /** * <p>Reads and consumes the content bytes of this {@link DataInfo} into the given {@link ByteBuffer}.</p> * - * @param output the {@link ByteBuffer} to copy to bytes into + * @param output the {@link ByteBuffer} to copy the bytes into * @return the number of bytes copied * @see #consume(int) */ diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Session.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Session.java index 75c27d5..5e2e5e2 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Session.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Session.java @@ -75,7 +75,7 @@ public interface Session * @see #syn(SynInfo, StreamFrameListener, long, TimeUnit, Handler) */ public Future<Stream> syn(SynInfo synInfo, StreamFrameListener listener); - + /** * <p>Sends asynchronously a SYN_FRAME to create a new {@link Stream SPDY stream}.</p> * <p>Callers may pass a non-null completion handler to be notified of when the @@ -90,6 +90,7 @@ public interface Session */ public void syn(SynInfo synInfo, StreamFrameListener listener, long timeout, TimeUnit unit, Handler<Stream> handler); + /** * <p>Sends asynchronously a RST_STREAM to abort a stream.</p> * <p>Callers may use the returned future to wait for the reset to be sent.</p> diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java index c25bd45..6a13829 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java @@ -17,6 +17,7 @@ package org.eclipse.jetty.spdy.api; import java.nio.channels.WritePendingException; +import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -79,13 +80,36 @@ public interface Stream * @return the priority of this stream */ public byte getPriority(); - + /** * @return the session this stream is associated to */ public Session getSession(); /** + * <p>Initiate a unidirectional spdy pushstream associated to this stream asynchronously<p> + * <p>Callers may use the returned future to get the pushstream once it got created</p> + * + * @param synInfo the metadata to send on stream creation + * @return a future containing the stream once it got established + * @see #syn(SynInfo, long, TimeUnit, Handler) + */ + public Future<Stream> syn(SynInfo synInfo); + + /** + * <p>Initiate a unidirectional spdy pushstream associated to this stream asynchronously<p> + * <p>Callers may pass a non-null completion handler to be notified of when the + * pushstream has been established.</p> + * + * @param synInfo the metadata to send on stream creation + * @param timeout the operation's timeout + * @param unit the timeout's unit + * @param handler the completion handler that gets notified once the pushstream is established + * @see #syn(SynInfo) + */ + public void syn(SynInfo synInfo, long timeout, TimeUnit unit, Handler<Stream> handler); + + /** * <p>Sends asynchronously a SYN_REPLY frame in response to a SYN_STREAM frame.</p> * <p>Callers may use the returned future to wait for the reply to be actually sent.</p> * @@ -162,6 +186,16 @@ public interface Stream public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler<Void> handler); /** + * @return whether this stream is unidirectional or not + */ + public boolean isUnidirectional(); + + /** + * @return whether this stream has been reset + */ + public boolean isReset(); + + /** * @return whether this stream has been closed by both parties * @see #isHalfClosed() */ @@ -171,7 +205,6 @@ public interface Stream * @return whether this stream has been closed by one party only * @see #isClosed() * @param timeout the timeout for the stream creation * @param unit the timeout's unit - */ public boolean isHalfClosed(); @@ -196,4 +229,15 @@ public interface Stream * @see #setAttribute(String, Object) */ public Object removeAttribute(String key); + + /** + * @return the associated parent stream or null if this is not an associated stream + */ + public Stream getAssociatedStream(); + + /** + * @return associated child streams or an empty set if no associated streams exist + */ + public Set<Stream> getPushedStreams(); + } diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/SynInfo.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/SynInfo.java index 08c18e7..c51a001 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/SynInfo.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/SynInfo.java @@ -28,11 +28,8 @@ public class SynInfo * @see #getFlags() */ public static final byte FLAG_CLOSE = 1; - public static final byte FLAG_UNIDIRECTIONAL = 2; private final boolean close; - private final boolean unidirectional; - private final int associatedStreamId; private final byte priority; private final Headers headers; @@ -56,28 +53,28 @@ public class SynInfo */ public SynInfo(Headers headers, boolean close) { - this(headers, close, false, 0, (byte)0); + this(headers, close, (byte)0); } /** - * <p>Creates a {@link ReplyInfo} instance with the given headers and the given close flag, - * the given unidirectional flag, the given associated stream, and with the given priority.</p> - * - * @param headers the {@link Headers} - * @param close the value of the close flag - * @param unidirectional the value of the unidirectional flag - * @param associatedStreamId the associated stream id - * @param priority the priority + * <p> + * Creates a {@link ReplyInfo} instance with the given headers, the given close flag and with the given priority. + * </p> + * + * @param headers + * the {@link Headers} + * @param close + * the value of the close flag + * @param priority + * the priority */ - public SynInfo(Headers headers, boolean close, boolean unidirectional, int associatedStreamId, byte priority) + public SynInfo(Headers headers, boolean close, byte priority) { this.close = close; - this.unidirectional = unidirectional; - this.associatedStreamId = associatedStreamId; this.priority = priority; this.headers = headers; } - + /** * @return the value of the close flag */ @@ -87,22 +84,6 @@ public class SynInfo } /** - * @return the value of the unidirectional flag - */ - public boolean isUnidirectional() - { - return unidirectional; - } - - /** - * @return the associated stream id - */ - public int getAssociatedStreamId() - { - return associatedStreamId; - } - - /** * @return the priority */ public byte getPriority() @@ -117,17 +98,14 @@ public class SynInfo { return headers; } - + /** - * @return the close and unidirectional flags as integer + * @return the close flag as integer * @see #FLAG_CLOSE - * @see #FLAG_UNIDIRECTIONAL */ public byte getFlags() { - byte flags = isClose() ? FLAG_CLOSE : 0; - flags += isUnidirectional() ? FLAG_UNIDIRECTIONAL : 0; - return flags; + return isClose() ? FLAG_CLOSE : 0; } @Override diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/frames/RstStreamFrame.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/frames/RstStreamFrame.java index 60cee7c..334b816 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/frames/RstStreamFrame.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/frames/RstStreamFrame.java @@ -29,17 +29,17 @@ public class RstStreamFrame extends ControlFrame this.streamId = streamId; this.statusCode = statusCode; } - + public int getStreamId() { return streamId; } - + public int getStatusCode() { return statusCode; } - + @Override public String toString() { diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/frames/SynStreamFrame.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/frames/SynStreamFrame.java index 427cb58..1b40895 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/frames/SynStreamFrame.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/frames/SynStreamFrame.java @@ -16,6 +16,7 @@ package org.eclipse.jetty.spdy.frames; +import org.eclipse.jetty.spdy.PushSynInfo; import org.eclipse.jetty.spdy.api.Headers; import org.eclipse.jetty.spdy.api.SynInfo; @@ -62,7 +63,7 @@ public class SynStreamFrame extends ControlFrame public boolean isUnidirectional() { - return (getFlags() & SynInfo.FLAG_UNIDIRECTIONAL) == SynInfo.FLAG_UNIDIRECTIONAL; + return (getFlags() & PushSynInfo.FLAG_UNIDIRECTIONAL) == PushSynInfo.FLAG_UNIDIRECTIONAL; } @Override diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/parser/SynStreamBodyParser.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/parser/SynStreamBodyParser.java index 986b375..f18f0c3 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/parser/SynStreamBodyParser.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/parser/SynStreamBodyParser.java @@ -19,6 +19,7 @@ package org.eclipse.jetty.spdy.parser; import java.nio.ByteBuffer; import org.eclipse.jetty.spdy.CompressionFactory; +import org.eclipse.jetty.spdy.PushSynInfo; import org.eclipse.jetty.spdy.StreamException; import org.eclipse.jetty.spdy.api.Headers; import org.eclipse.jetty.spdy.api.SPDY; @@ -131,7 +132,7 @@ public class SynStreamBodyParser extends ControlFrameBodyParser { byte flags = controlFrameParser.getFlags(); // TODO: can it be both FIN and UNIDIRECTIONAL ? - if (flags != 0 && flags != SynInfo.FLAG_CLOSE && flags != SynInfo.FLAG_UNIDIRECTIONAL) + if (flags != 0 && flags != SynInfo.FLAG_CLOSE && flags != PushSynInfo.FLAG_UNIDIRECTIONAL) throw new IllegalArgumentException("Invalid flag " + flags + " for frame " + ControlFrameType.SYN_STREAM); SynStreamFrame frame = new SynStreamFrame(version, flags, streamId, associatedStreamId, priority, new Headers(headers, true)); diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java new file mode 100644 index 0000000..c999c34 --- a/dev/null +++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java @@ -0,0 +1,457 @@ +/* + * Copyright (c) 2012 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.eclipse.jetty.spdy; + +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.eclipse.jetty.spdy.api.DataInfo; +import org.eclipse.jetty.spdy.api.Handler; +import org.eclipse.jetty.spdy.api.Headers; +import org.eclipse.jetty.spdy.api.HeadersInfo; +import org.eclipse.jetty.spdy.api.RstInfo; +import org.eclipse.jetty.spdy.api.SPDY; +import org.eclipse.jetty.spdy.api.Session; +import org.eclipse.jetty.spdy.api.Stream; +import org.eclipse.jetty.spdy.api.StreamFrameListener; +import org.eclipse.jetty.spdy.api.StreamStatus; +import org.eclipse.jetty.spdy.api.StringDataInfo; +import org.eclipse.jetty.spdy.api.SynInfo; +import org.eclipse.jetty.spdy.frames.DataFrame; +import org.eclipse.jetty.spdy.frames.SynReplyFrame; +import org.eclipse.jetty.spdy.frames.SynStreamFrame; +import org.eclipse.jetty.spdy.generator.Generator; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class StandardSessionTest +{ + @Mock + private ISession sessionMock; + private ByteBufferPool bufferPool; + private Executor threadPool; + private StandardSession session; + private Generator generator; + private ScheduledExecutorService scheduler; + private Headers headers; + + @Before + public void setUp() throws Exception + { + bufferPool = new StandardByteBufferPool(); + threadPool = Executors.newCachedThreadPool(); + scheduler = Executors.newSingleThreadScheduledExecutor(); + generator = new Generator(new StandardByteBufferPool(),new StandardCompressionFactory.StandardCompressor()); + session = new StandardSession(SPDY.V2,bufferPool,threadPool,scheduler,new TestController(),null,1,null,generator); + headers = new Headers(); + } + + @Test + public void testStreamIsRemovedFromSessionWhenReset() throws InterruptedException, ExecutionException, TimeoutException + { + IStream stream = createStream(); + assertThatStreamIsInSession(stream); + assertThat("stream is not reset",stream.isReset(),is(false)); + session.rst(new RstInfo(stream.getId(),StreamStatus.STREAM_ALREADY_CLOSED)); + assertThatStreamIsNotInSession(stream); + assertThatStreamIsReset(stream); + } + + @Test + public void testStreamIsAddedAndRemovedFromSession() throws InterruptedException, ExecutionException, TimeoutException + { + IStream stream = createStream(); + assertThatStreamIsInSession(stream); + stream.updateCloseState(true,true); + session.onControlFrame(new SynReplyFrame(SPDY.V2,SynInfo.FLAG_CLOSE,stream.getId(),null)); + assertThatStreamIsClosed(stream); + assertThatStreamIsNotInSession(stream); + } + + @Test + public void testStreamIsRemovedWhenHeadersWithCloseFlagAreSent() throws InterruptedException, ExecutionException, TimeoutException + { + IStream stream = createStream(); + assertThatStreamIsInSession(stream); + stream.updateCloseState(true,false); + stream.headers(new HeadersInfo(headers,true)); + assertThatStreamIsClosed(stream); + assertThatStreamIsNotInSession(stream); + } + + @Test + public void testStreamIsUnidirectional() throws InterruptedException, ExecutionException, TimeoutException + { + IStream stream = createStream(); + assertThat("stream is not unidirectional",stream.isUnidirectional(),not(true)); + Stream pushStream = createPushStream(stream); + assertThat("pushStream is unidirectional",pushStream.isUnidirectional(),is(true)); + } + + @Test + public void testPushStreamCreation() throws InterruptedException, ExecutionException, TimeoutException + { + Stream stream = createStream(); + IStream pushStream = createPushStream(stream); + assertThat("Push stream must be associated to the first stream created",pushStream.getAssociatedStream().getId(),is(stream.getId())); + assertThat("streamIds need to be monotonic",pushStream.getId(),greaterThan(stream.getId())); + } + + @Test + public void testPushStreamIsNotClosedWhenAssociatedStreamIsClosed() throws InterruptedException, ExecutionException, TimeoutException + { + IStream stream = createStream(); + Stream pushStream = createPushStream(stream); + assertThatStreamIsNotHalfClosed(stream); + assertThatStreamIsNotClosed(stream); + assertThatPushStreamIsHalfClosed(pushStream); + assertThatPushStreamIsNotClosed(pushStream); + + stream.updateCloseState(true,true); + assertThatStreamIsHalfClosed(stream); + assertThatStreamIsNotClosed(stream); + assertThatPushStreamIsHalfClosed(pushStream); + assertThatPushStreamIsNotClosed(pushStream); + + session.onControlFrame(new SynReplyFrame(SPDY.V2,SynInfo.FLAG_CLOSE,stream.getId(),null)); + assertThatStreamIsClosed(stream); + assertThatPushStreamIsNotClosed(pushStream); + } + + @Test + public void testCreatePushStreamOnClosedStream() throws InterruptedException, ExecutionException, TimeoutException + { + IStream stream = createStream(); + stream.updateCloseState(true,true); + assertThatStreamIsHalfClosed(stream); + stream.updateCloseState(true,false); + assertThatStreamIsClosed(stream); + createPushStreamAndMakeSureItFails(stream); + } + + private void createPushStreamAndMakeSureItFails(IStream stream) throws InterruptedException + { + final CountDownLatch failedLatch = new CountDownLatch(1); + SynInfo synInfo = new SynInfo(headers,false,stream.getPriority()); + stream.syn(synInfo,5,TimeUnit.SECONDS,new Handler<Stream>() + { + @Override + public void completed(Stream context) + { + } + + @Override + public void failed(Throwable x) + { + failedLatch.countDown(); + } + }); + assertThat("pushStream creation failed",failedLatch.await(5,TimeUnit.SECONDS),is(true)); + } + + @Test + public void testPushStreamIsAddedAndRemovedFromParentAndSessionWhenClosed() throws InterruptedException, ExecutionException, TimeoutException + { + IStream stream = createStream(); + IStream pushStream = createPushStream(stream); + assertThatPushStreamIsHalfClosed(pushStream); + assertThatPushStreamIsInSession(pushStream); + assertThatStreamIsAssociatedWithPushStream(stream,pushStream); + session.data(pushStream,new StringDataInfo("close",true),5,TimeUnit.SECONDS,null,null); + assertThatPushStreamIsClosed(pushStream); + assertThatPushStreamIsNotInSession(pushStream); + assertThatStreamIsNotAssociatedWithPushStream(stream,pushStream); + } + + @Test + public void testPushStreamIsRemovedWhenReset() throws InterruptedException, ExecutionException, TimeoutException + { + IStream stream = createStream(); + IStream pushStream = (IStream)stream.syn(new SynInfo(false)).get(); + assertThatPushStreamIsInSession(pushStream); + session.rst(new RstInfo(pushStream.getId(),StreamStatus.INVALID_STREAM)); + assertThatPushStreamIsNotInSession(pushStream); + assertThatStreamIsNotAssociatedWithPushStream(stream,pushStream); + assertThatStreamIsReset(pushStream); + } + + @Test + public void testPushStreamWithSynInfoClosedTrue() throws InterruptedException, ExecutionException, TimeoutException + { + IStream stream = createStream(); + SynInfo synInfo = new SynInfo(headers,true,stream.getPriority()); + IStream pushStream = (IStream)stream.syn(synInfo).get(5,TimeUnit.SECONDS); + assertThatPushStreamIsHalfClosed(pushStream); + assertThatPushStreamIsClosed(pushStream); + assertThatStreamIsNotAssociatedWithPushStream(stream,pushStream); + assertThatStreamIsNotInSession(pushStream); + } + + @Test + public void testPushStreamSendHeadersWithCloseFlagIsRemovedFromSessionAndDisassociateFromParent() throws InterruptedException, ExecutionException, + TimeoutException + { + IStream stream = createStream(); + SynInfo synInfo = new SynInfo(headers,false,stream.getPriority()); + IStream pushStream = (IStream)stream.syn(synInfo).get(5,TimeUnit.SECONDS); + assertThatStreamIsAssociatedWithPushStream(stream,pushStream); + assertThatPushStreamIsInSession(pushStream); + pushStream.headers(new HeadersInfo(headers,true)); + assertThatPushStreamIsNotInSession(pushStream); + assertThatPushStreamIsHalfClosed(pushStream); + assertThatPushStreamIsClosed(pushStream); + assertThatStreamIsNotAssociatedWithPushStream(stream,pushStream); + } + + @Test + public void testCreatedAndClosedListenersAreCalledForNewStream() throws InterruptedException, ExecutionException, TimeoutException + { + final CountDownLatch createdListenerCalledLatch = new CountDownLatch(1); + final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1); + session.addListener(new TestStreamListener(createdListenerCalledLatch,closedListenerCalledLatch)); + IStream stream = createStream(); + session.onDataFrame(new DataFrame(stream.getId(),SynInfo.FLAG_CLOSE,128),ByteBuffer.allocate(128)); + stream.data(new StringDataInfo("close",true)); + assertThat("onStreamCreated listener has been called",createdListenerCalledLatch.await(5,TimeUnit.SECONDS),is(true)); + assertThatOnStreamClosedListenerHasBeenCalled(closedListenerCalledLatch); + } + + @Test + public void testListenerIsCalledForResetStream() throws InterruptedException, ExecutionException, TimeoutException + { + final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1); + session.addListener(new TestStreamListener(null,closedListenerCalledLatch)); + IStream stream = createStream(); + session.rst(new RstInfo(stream.getId(),StreamStatus.CANCEL_STREAM)); + assertThatOnStreamClosedListenerHasBeenCalled(closedListenerCalledLatch); + } + + @Test + public void testCreatedAndClosedListenersAreCalledForNewPushStream() throws InterruptedException, ExecutionException, TimeoutException + { + final CountDownLatch createdListenerCalledLatch = new CountDownLatch(2); + final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1); + session.addListener(new TestStreamListener(createdListenerCalledLatch,closedListenerCalledLatch)); + IStream stream = createStream(); + IStream pushStream = createPushStream(stream); + session.data(pushStream,new StringDataInfo("close",true),5,TimeUnit.SECONDS,null,null); + assertThat("onStreamCreated listener has been called twice. Once for the stream and once for the pushStream", + createdListenerCalledLatch.await(5,TimeUnit.SECONDS),is(true)); + assertThatOnStreamClosedListenerHasBeenCalled(closedListenerCalledLatch); + } + + @Test + public void testListenerIsCalledForResetPushStream() throws InterruptedException, ExecutionException, TimeoutException + { + final CountDownLatch closedListenerCalledLatch = new CountDownLatch(1); + session.addListener(new TestStreamListener(null,closedListenerCalledLatch)); + IStream stream = createStream(); + IStream pushStream = createPushStream(stream); + session.rst(new RstInfo(pushStream.getId(),StreamStatus.CANCEL_STREAM)); + assertThatOnStreamClosedListenerHasBeenCalled(closedListenerCalledLatch); + } + + private class TestStreamListener extends Session.StreamListener.Adapter + { + private CountDownLatch createdListenerCalledLatch; + private CountDownLatch closedListenerCalledLatch; + + public TestStreamListener(CountDownLatch createdListenerCalledLatch, CountDownLatch closedListenerCalledLatch) + { + this.createdListenerCalledLatch = createdListenerCalledLatch; + this.closedListenerCalledLatch = closedListenerCalledLatch; + } + + @Override + public void onStreamCreated(Stream stream) + { + if (createdListenerCalledLatch != null) + createdListenerCalledLatch.countDown(); + super.onStreamCreated(stream); + } + + @Override + public void onStreamClosed(Stream stream) + { + if (closedListenerCalledLatch != null) + closedListenerCalledLatch.countDown(); + super.onStreamClosed(stream); + } + } + + @SuppressWarnings("unchecked") + @Test(expected = IllegalStateException.class) + public void testSendDataOnHalfClosedStream() throws InterruptedException, ExecutionException, TimeoutException + { + SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2,SynInfo.FLAG_CLOSE,1,0,(byte)0,null); + IStream stream = new StandardStream(synStreamFrame,sessionMock,8184,null); + stream.updateCloseState(synStreamFrame.isClose(),true); + assertThat("stream is half closed",stream.isHalfClosed(),is(true)); + stream.data(new StringDataInfo("data on half closed stream",true)); + verify(sessionMock,never()).data(any(IStream.class),any(DataInfo.class),anyInt(),any(TimeUnit.class),any(Handler.class),any(void.class)); + } + + @Test + @Ignore("In V3 we need to rst the stream if we receive data on a remotely half closed stream.") + public void receiveDataOnRemotelyHalfClosedStreamResetsStreamInV3() throws InterruptedException, ExecutionException + { + IStream stream = (IStream)session.syn(new SynInfo(false),new StreamFrameListener.Adapter()).get(); + stream.updateCloseState(true,false); + assertThat("stream is half closed from remote side",stream.isHalfClosed(),is(true)); + stream.process(new DataFrame(stream.getId(),(byte)0,256),ByteBuffer.allocate(256)); + } + + @Test + public void testReceiveDataOnRemotelyClosedStreamIsIgnored() throws InterruptedException, ExecutionException, TimeoutException + { + final CountDownLatch onDataCalledLatch = new CountDownLatch(1); + Stream stream = session.syn(new SynInfo(false),new StreamFrameListener.Adapter() + { + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + onDataCalledLatch.countDown(); + super.onData(stream,dataInfo); + } + }).get(5,TimeUnit.SECONDS); + session.onControlFrame(new SynReplyFrame(SPDY.V2,SynInfo.FLAG_CLOSE,stream.getId(),headers)); + session.onDataFrame(new DataFrame(stream.getId(),(byte)0,0),ByteBuffer.allocate(128)); + assertThat("onData is never called",onDataCalledLatch.await(1,TimeUnit.SECONDS),not(true)); + } + + private IStream createStream() throws InterruptedException, ExecutionException, TimeoutException + { + SynInfo synInfo = new SynInfo(headers,false,(byte)0); + return (IStream)session.syn(synInfo,new StreamFrameListener.Adapter()).get(5,TimeUnit.SECONDS); + } + + private IStream createPushStream(Stream stream) throws InterruptedException, ExecutionException, TimeoutException + { + SynInfo synInfo = new SynInfo(headers,false,stream.getPriority()); + return (IStream)stream.syn(synInfo).get(5,TimeUnit.SECONDS); + } + + private static class TestController implements Controller<StandardSession.FrameBytes> + { + @Override + public int write(ByteBuffer buffer, Handler<StandardSession.FrameBytes> handler, StandardSession.FrameBytes context) + { + handler.completed(context); + return buffer.remaining(); + } + + @Override + public void close(boolean onlyOutput) + { + } + } + + private void assertThatStreamIsClosed(IStream stream) + { + assertThat("stream is closed",stream.isClosed(),is(true)); + } + + private void assertThatStreamIsReset(IStream stream) + { + assertThat("stream is reset",stream.isReset(),is(true)); + } + + private void assertThatStreamIsNotInSession(IStream stream) + { + assertThat("stream is not in session",session.getStreams().contains(stream),not(true)); + } + + private void assertThatStreamIsInSession(IStream stream) + { + assertThat("stream is in session",session.getStreams().contains(stream),is(true)); + } + + private void assertThatStreamIsNotClosed(IStream stream) + { + assertThat("stream is not closed",stream.isClosed(),not(true)); + } + + private void assertThatStreamIsNotHalfClosed(IStream stream) + { + assertThat("stream is not halfClosed",stream.isHalfClosed(),not(true)); + } + + private void assertThatPushStreamIsNotClosed(Stream pushStream) + { + assertThat("pushStream is not closed",pushStream.isClosed(),not(true)); + } + + private void assertThatStreamIsHalfClosed(IStream stream) + { + assertThat("stream is halfClosed",stream.isHalfClosed(),is(true)); + } + + private void assertThatStreamIsNotAssociatedWithPushStream(IStream stream, IStream pushStream) + { + assertThat("pushStream is removed from parent",stream.getPushedStreams().contains(pushStream),not(true)); + } + + private void assertThatPushStreamIsNotInSession(Stream pushStream) + { + assertThat("pushStream is not in session",session.getStreams().contains(pushStream.getId()),not(true)); + } + + private void assertThatPushStreamIsInSession(Stream pushStream) + { + assertThat("pushStream is in session",session.getStreams().contains(pushStream),is(true)); + } + + private void assertThatStreamIsAssociatedWithPushStream(IStream stream, Stream pushStream) + { + assertThat("stream is associated with pushStream",stream.getPushedStreams().contains(pushStream),is(true)); + } + + private void assertThatPushStreamIsClosed(Stream pushStream) + { + assertThat("pushStream is closed",pushStream.isClosed(),is(true)); + } + + private void assertThatPushStreamIsHalfClosed(Stream pushStream) + { + assertThat("pushStream is half closed ",pushStream.isHalfClosed(),is(true)); + } + + private void assertThatOnStreamClosedListenerHasBeenCalled(final CountDownLatch closedListenerCalledLatch) throws InterruptedException + { + assertThat("onStreamClosed listener has been called",closedListenerCalledLatch.await(5,TimeUnit.SECONDS),is(true)); + } +} diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java new file mode 100644 index 0000000..68a6a95 --- a/dev/null +++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java @@ -0,0 +1,112 @@ +// ======================================================================== +// Copyright (c) 2009-2009 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== + + +package org.eclipse.jetty.spdy; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.spdy.api.Handler; +import org.eclipse.jetty.spdy.api.Stream; +import org.eclipse.jetty.spdy.api.StreamFrameListener; +import org.eclipse.jetty.spdy.api.SynInfo; +import org.eclipse.jetty.spdy.frames.SynStreamFrame; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentMatcher; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + + +/* ------------------------------------------------------------ */ +/** + */ +@RunWith(MockitoJUnitRunner.class) +public class StandardStreamTest +{ + @Mock private ISession session; + @Mock private SynStreamFrame synStreamFrame; + + /** + * Test method for {@link org.eclipse.jetty.spdy.StandardStream#syn(org.eclipse.jetty.spdy.api.SynInfo)}. + */ + @SuppressWarnings("unchecked") + @Test + public void testSyn() + { + Stream stream = new StandardStream(synStreamFrame,session,0,null); + Set<Stream> streams = new HashSet<>(); + streams.add(stream); + when(synStreamFrame.isClose()).thenReturn(false); + SynInfo synInfo = new SynInfo(false); + when(session.getStreams()).thenReturn(streams); + stream.syn(synInfo); + verify(session).syn(argThat(new PushSynInfoMatcher(stream.getId(),synInfo)),any(StreamFrameListener.class),anyLong(),any(TimeUnit.class),any(Handler.class)); + } + + private class PushSynInfoMatcher extends ArgumentMatcher<PushSynInfo>{ + int associatedStreamId; + SynInfo synInfo; + + public PushSynInfoMatcher(int associatedStreamId, SynInfo synInfo) + { + this.associatedStreamId = associatedStreamId; + this.synInfo = synInfo; + } + @Override + public boolean matches(Object argument) + { + PushSynInfo pushSynInfo = (PushSynInfo)argument; + if(pushSynInfo.getAssociatedStreamId() != associatedStreamId){ + System.out.println("streamIds do not match!"); + return false; + } + if(pushSynInfo.isClose() != synInfo.isClose()){ + System.out.println("isClose doesn't match"); + return false; + } + return true; + } + } + + @Test + public void testSynOnClosedStream(){ + IStream stream = new StandardStream(synStreamFrame,session,0,null); + stream.updateCloseState(true,true); + stream.updateCloseState(true,false); + assertThat("stream expected to be closed",stream.isClosed(),is(true)); + final CountDownLatch failedLatch = new CountDownLatch(1); + stream.syn(new SynInfo(false),1,TimeUnit.SECONDS,new Handler.Adapter<Stream>() + { + @Override + public void failed(Throwable x) + { + failedLatch.countDown(); + } + }); + assertThat("PushStream creation failed", failedLatch.getCount(), equalTo(0L)); + } + +} diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ServerUsageTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ServerUsageTest.java index f7b3c57..a8bff06 100644 --- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ServerUsageTest.java +++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ServerUsageTest.java @@ -99,7 +99,7 @@ public class ServerUsageTest Session session = stream.getSession(); // Since it's unidirectional, no need to pass the listener - session.syn(new SynInfo(new Headers(), false, true, stream.getId(), (byte)0), null, 0, TimeUnit.MILLISECONDS, new Handler.Adapter<Stream>() + session.syn(new SynInfo(new Headers(), false, (byte)0), null, 0, TimeUnit.MILLISECONDS, new Handler.Adapter<Stream>() { @Override public void completed(Stream pushStream) diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/frames/RstStreamGenerateParseTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/frames/RstStreamGenerateParseTest.java index ea3e6d7..378a3fa 100644 --- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/frames/RstStreamGenerateParseTest.java +++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/frames/RstStreamGenerateParseTest.java @@ -16,6 +16,12 @@ package org.eclipse.jetty.spdy.frames; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; + import java.nio.ByteBuffer; import org.eclipse.jetty.spdy.StandardByteBufferPool; @@ -38,7 +44,7 @@ public class RstStreamGenerateParseTest Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor()); ByteBuffer buffer = generator.control(frame1); - Assert.assertNotNull(buffer); + assertThat("buffer is not null", buffer, not(nullValue())); TestSPDYParserListener listener = new TestSPDYParserListener(); Parser parser = new Parser(new StandardCompressionFactory().newDecompressor()); @@ -46,13 +52,13 @@ public class RstStreamGenerateParseTest parser.parse(buffer); ControlFrame frame2 = listener.getControlFrame(); - Assert.assertNotNull(frame2); - Assert.assertEquals(ControlFrameType.RST_STREAM, frame2.getType()); + assertThat("frame2 is not null", frame2, not(nullValue())); + assertThat("frame2 is type RST_STREAM",ControlFrameType.RST_STREAM, equalTo(frame2.getType())); RstStreamFrame rstStream = (RstStreamFrame)frame2; - Assert.assertEquals(SPDY.V2, rstStream.getVersion()); - Assert.assertEquals(streamId, rstStream.getStreamId()); - Assert.assertEquals(0, rstStream.getFlags()); - Assert.assertEquals(streamStatus, rstStream.getStatusCode()); + assertThat("rstStream version is SPDY.V2",SPDY.V2, equalTo(rstStream.getVersion())); + assertThat("rstStream id is equal to streamId",streamId, equalTo(rstStream.getStreamId())); + assertThat("rstStream flags are 0",(byte)0, equalTo(rstStream.getFlags())); + assertThat("stream status is equal to rstStream statuscode",streamStatus, is(rstStream.getStatusCode())); } @Test diff --git a/jetty-spdy/spdy-jetty/pom.xml b/jetty-spdy/spdy-jetty/pom.xml index 08cbaf6..7476bf9 100644 --- a/jetty-spdy/spdy-jetty/pom.xml +++ b/jetty-spdy/spdy-jetty/pom.xml @@ -64,8 +64,14 @@ <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> + <scope>test</scope> </dependency> <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j-version}</version> diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ClosedStreamTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ClosedStreamTest.java new file mode 100644 index 0000000..7d5d6c7 --- a/dev/null +++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ClosedStreamTest.java @@ -0,0 +1,263 @@ +/* + * Copyright (c) 2012 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.eclipse.jetty.spdy; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.spdy.api.BytesDataInfo; +import org.eclipse.jetty.spdy.api.DataInfo; +import org.eclipse.jetty.spdy.api.Headers; +import org.eclipse.jetty.spdy.api.ReplyInfo; +import org.eclipse.jetty.spdy.api.SPDY; +import org.eclipse.jetty.spdy.api.Session; +import org.eclipse.jetty.spdy.api.SessionStatus; +import org.eclipse.jetty.spdy.api.Stream; +import org.eclipse.jetty.spdy.api.StreamFrameListener; +import org.eclipse.jetty.spdy.api.StringDataInfo; +import org.eclipse.jetty.spdy.api.SynInfo; +import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener; +import org.eclipse.jetty.spdy.frames.ControlFrame; +import org.eclipse.jetty.spdy.frames.DataFrame; +import org.eclipse.jetty.spdy.frames.GoAwayFrame; +import org.eclipse.jetty.spdy.frames.RstStreamFrame; +import org.eclipse.jetty.spdy.frames.SynReplyFrame; +import org.eclipse.jetty.spdy.frames.SynStreamFrame; +import org.eclipse.jetty.spdy.generator.Generator; +import org.eclipse.jetty.spdy.parser.Parser; +import org.eclipse.jetty.spdy.parser.Parser.Listener; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +public class ClosedStreamTest extends AbstractTest +{ + //TODO: Right now it sends a rst as the stream is unknown to the session once it's closed. But according to the spec we probably should just ignore the data?! + @Test + public void testDataSentOnClosedStreamIsIgnored() throws Exception + { + ServerSocketChannel server = ServerSocketChannel.open(); + server.bind(new InetSocketAddress("localhost", 0)); + + Session session = startClient(new InetSocketAddress("localhost", server.socket().getLocalPort()), null); + final CountDownLatch dataLatch = new CountDownLatch(2); + session.syn(new SynInfo(true), new StreamFrameListener.Adapter() + { + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + dataLatch.countDown(); + } + }); + + SocketChannel channel = server.accept(); + ByteBuffer readBuffer = ByteBuffer.allocate(1024); + channel.read(readBuffer); + readBuffer.flip(); + int streamId = readBuffer.getInt(8); + + Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory.StandardCompressor()); + + ByteBuffer writeBuffer = generator.control(new SynReplyFrame(SPDY.V2, (byte)0, streamId, new Headers())); + channel.write(writeBuffer); + + byte[] bytes = new byte[1]; + writeBuffer = generator.data(streamId, bytes.length, new BytesDataInfo(bytes, true)); + channel.write(writeBuffer); + + // Write again to simulate the faulty condition + writeBuffer.flip(); + channel.write(writeBuffer); + + Assert.assertFalse(dataLatch.await(1, TimeUnit.SECONDS)); + + writeBuffer = generator.control(new GoAwayFrame(SPDY.V2, 0, SessionStatus.OK.getCode())); + channel.write(writeBuffer); + channel.shutdownOutput(); + channel.close(); + + server.close(); + } + + @Test + public void testSendDataOnHalfClosedStreamCausesExceptionOnServer() throws Exception + { + final CountDownLatch replyReceivedLatch = new CountDownLatch(1); + final CountDownLatch clientReceivedDataLatch = new CountDownLatch(1); + final CountDownLatch exceptionWhenSendingData = new CountDownLatch(1); + + Session clientSession = startClient(startServer(new ServerSessionFrameListener.Adapter() + { + @Override + public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + { + stream.reply(new ReplyInfo(true)); + try + { + replyReceivedLatch.await(5,TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + try + { + stream.data(new StringDataInfo("data send after half closed",false)); + } + catch (RuntimeException e) + { + // we expect an exception here, but we don't want it to be logged + exceptionWhenSendingData.countDown(); + } + + return null; + } + }),null); + + Stream stream = clientSession.syn(new SynInfo(false),new StreamFrameListener.Adapter() + { + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + replyReceivedLatch.countDown(); + super.onReply(stream,replyInfo); + } + + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + clientReceivedDataLatch.countDown(); + super.onData(stream,dataInfo); + } + }).get(); + assertThat("reply has been received by client",replyReceivedLatch.await(5,TimeUnit.SECONDS),is(true)); + assertThat("stream is half closed from server",stream.isHalfClosed(),is(true)); + assertThat("client has not received any data sent after stream was half closed by server",clientReceivedDataLatch.await(1,TimeUnit.SECONDS), + is(false)); + assertThat("sending data threw an exception",exceptionWhenSendingData.await(5,TimeUnit.SECONDS),is(true)); + } + + @Test + public void testV2ReceiveDataOnHalfClosedStream() throws Exception + { + final CountDownLatch clientResetReceivedLatch = runReceiveDataOnHalfClosedStream(SPDY.V2); + assertThat("server didn't receive data",clientResetReceivedLatch.await(1,TimeUnit.SECONDS),not(true)); + } + + @Test + @Ignore("until v3 is properly implemented") + public void testV3ReceiveDataOnHalfClosedStream() throws Exception + { + final CountDownLatch clientResetReceivedLatch = runReceiveDataOnHalfClosedStream(SPDY.V3); + assertThat("server didn't receive data",clientResetReceivedLatch.await(1,TimeUnit.SECONDS),not(true)); + } + + private CountDownLatch runReceiveDataOnHalfClosedStream(short version) throws Exception, IOException, InterruptedException + { + final CountDownLatch clientResetReceivedLatch = new CountDownLatch(1); + final CountDownLatch serverReplySentLatch = new CountDownLatch(1); + final CountDownLatch clientReplyReceivedLatch = new CountDownLatch(1); + final CountDownLatch serverDataReceivedLatch = new CountDownLatch(1); + + InetSocketAddress startServer = startServer(new ServerSessionFrameListener.Adapter() + { + @Override + public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + { + stream.reply(new ReplyInfo(false)); + serverReplySentLatch.countDown(); + try + { + clientReplyReceivedLatch.await(5,TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + return new StreamFrameListener.Adapter() + { + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + serverDataReceivedLatch.countDown(); + super.onData(stream,dataInfo); + } + }; + } + }); + + final SocketChannel socketChannel = SocketChannel.open(startServer); + final Generator generator = new Generator(new StandardByteBufferPool(),new StandardCompressionFactory().newCompressor()); + ByteBuffer synData = generator.control(new SynStreamFrame(version,SynInfo.FLAG_CLOSE,1,0,(byte)0,new Headers())); + + socketChannel.write(synData); + + assertThat("server: syn reply is sent",serverReplySentLatch.await(5,TimeUnit.SECONDS),is(true)); + + Parser parser = new Parser(new StandardCompressionFactory.StandardDecompressor()); + parser.addListener(new Listener.Adapter() + { + @Override + public void onControlFrame(ControlFrame frame) + { + if (frame instanceof SynReplyFrame) + { + SynReplyFrame synReplyFrame = (SynReplyFrame)frame; + clientReplyReceivedLatch.countDown(); + int streamId = synReplyFrame.getStreamId(); + ByteBuffer data = generator.data(streamId,0,new StringDataInfo("data",false)); + try + { + socketChannel.write(data); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + else if (frame instanceof RstStreamFrame) + { + clientResetReceivedLatch.countDown(); + } + super.onControlFrame(frame); + } + + @Override + public void onDataFrame(DataFrame frame, ByteBuffer data) + { + super.onDataFrame(frame,data); + } + }); + ByteBuffer response = ByteBuffer.allocate(28); + socketChannel.read(response); + response.flip(); + parser.parse(response); + + assertThat("server didn't receive data",serverDataReceivedLatch.await(1,TimeUnit.SECONDS),not(true)); + return clientResetReceivedLatch; + } + +} diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java index f1908e7..db2303c 100644 --- a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java +++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/FlowControlTest.java @@ -451,7 +451,7 @@ public class FlowControlTest extends AbstractTest Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); } - private void expectException(Class<? extends Exception> exception, Callable command) + private void expectException(Class<? extends Exception> exception, Callable<DataInfo> command) { try { diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ProtocolViolationsTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ProtocolViolationsTest.java index 92e642f..eb75be7 100644 --- a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ProtocolViolationsTest.java +++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ProtocolViolationsTest.java @@ -116,19 +116,20 @@ public class ProtocolViolationsTest extends AbstractTest stream.headers(new HeadersInfo(new Headers(), true)); } - @Test - public void testDataSentAfterCloseIsDiscardedByRecipient() throws Exception + @Test //TODO: throws an ISException in StandardStream.updateCloseState(). But instead we should send a rst or something to the server probably?! + public void testServerClosesStreamTwice() throws Exception { ServerSocketChannel server = ServerSocketChannel.open(); server.bind(new InetSocketAddress("localhost", 0)); Session session = startClient(new InetSocketAddress("localhost", server.socket().getLocalPort()), null); final CountDownLatch dataLatch = new CountDownLatch(2); - session.syn(new SynInfo(true), new StreamFrameListener.Adapter() + session.syn(new SynInfo(false), new StreamFrameListener.Adapter() { @Override public void onData(Stream stream, DataInfo dataInfo) { + System.out.println("ondata"); dataLatch.countDown(); } }); diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/PushStreamTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/PushStreamTest.java new file mode 100644 index 0000000..2dac9cd --- a/dev/null +++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/PushStreamTest.java @@ -0,0 +1,355 @@ +/* + * Copyright (c) 2012 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.eclipse.jetty.spdy; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Exchanger; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.spdy.api.BytesDataInfo; +import org.eclipse.jetty.spdy.api.DataInfo; +import org.eclipse.jetty.spdy.api.Handler; +import org.eclipse.jetty.spdy.api.ReplyInfo; +import org.eclipse.jetty.spdy.api.Session; +import org.eclipse.jetty.spdy.api.SessionFrameListener; +import org.eclipse.jetty.spdy.api.Stream; +import org.eclipse.jetty.spdy.api.StreamFrameListener; +import org.eclipse.jetty.spdy.api.StringDataInfo; +import org.eclipse.jetty.spdy.api.SynInfo; +import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener; +import org.junit.Test; + +public class PushStreamTest extends AbstractTest +{ + + @Test + public void testSynPushStream() throws Exception + { + final CountDownLatch pushStreamSynLatch = new CountDownLatch(1); + + Session clientSession = startClient(startServer(new ServerSessionFrameListener.Adapter() + { + @Override + public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + { + stream.reply(new ReplyInfo(false)); + stream.syn(new SynInfo(false)); + return null; + } + }),new SessionFrameListener.Adapter() + { + @Override + public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + { + pushStreamSynLatch.countDown(); + stream.reply(new ReplyInfo(false)); + return super.onSyn(stream,synInfo); + } + }); + + clientSession.syn(new SynInfo(false),null).get(); + assertThat("onSyn has been called",pushStreamSynLatch.await(5,TimeUnit.SECONDS),is(true)); + } + + @Test + public void testSendDataOnPushStreamAfterAssociatedStreamIsClosed() throws Exception + { + final Exchanger<Stream> streamExchanger = new Exchanger<>(); + final CountDownLatch pushStreamSynLatch = new CountDownLatch(1); + final CyclicBarrier replyBarrier = new CyclicBarrier(3); + final CyclicBarrier closeBarrier = new CyclicBarrier(3); + final CountDownLatch streamDataSent = new CountDownLatch(2); + final CountDownLatch pushStreamDataReceived = new CountDownLatch(2); + final CountDownLatch exceptionCountDownLatch = new CountDownLatch(1); + + Session clientSession = startClient(startServer(new ServerSessionFrameListener.Adapter() + { + @Override + public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + { + stream.reply(new ReplyInfo(false)); + try + { + replyBarrier.await(5,TimeUnit.SECONDS); + return new StreamFrameListener.Adapter() + { + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + try + { + if (dataInfo.isClose()) + { + stream.data(new StringDataInfo("close stream",true)); + closeBarrier.await(5,TimeUnit.SECONDS); + } + streamDataSent.countDown(); + if (pushStreamDataReceived.getCount() == 2) + { + Stream pushStream = stream.syn(new SynInfo(false)).get(); + streamExchanger.exchange(pushStream,5,TimeUnit.SECONDS); + } + } + catch (Exception e) + { + exceptionCountDownLatch.countDown(); + } + } + }; + } + catch (Exception e) + { + exceptionCountDownLatch.countDown(); + throw new IllegalStateException(e); + } + } + + }),new SessionFrameListener.Adapter() + { + @Override + public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + { + pushStreamSynLatch.countDown(); + stream.reply(new ReplyInfo(false)); + return new StreamFrameListener.Adapter() + { + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + pushStreamDataReceived.countDown(); + super.onData(stream,dataInfo); + } + }; + } + }); + + Stream stream = clientSession.syn(new SynInfo(false),new StreamFrameListener.Adapter() + { + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + try + { + replyBarrier.await(5,TimeUnit.SECONDS); + } + catch (Exception e) + { + exceptionCountDownLatch.countDown(); + } + } + + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + try + { + closeBarrier.await(5,TimeUnit.SECONDS); + } + catch (Exception e) + { + exceptionCountDownLatch.countDown(); + } + } + }).get(); + + replyBarrier.await(5,TimeUnit.SECONDS); + stream.data(new StringDataInfo("client data",false)); + Stream pushStream = streamExchanger.exchange(null,5,TimeUnit.SECONDS); + pushStream.data(new StringDataInfo("first push data frame",false)); + // nasty, but less complex than using another cyclicBarrier for example + while (pushStreamDataReceived.getCount() != 1) + Thread.sleep(1); + stream.data(new StringDataInfo("client close",true)); + closeBarrier.await(5,TimeUnit.SECONDS); + assertThat("stream is closed",stream.isClosed(),is(true)); + pushStream.data(new StringDataInfo("second push data frame while associated stream has been closed already",false)); + assertThat("2 pushStream data frames have been received.",pushStreamDataReceived.await(5,TimeUnit.SECONDS),is(true)); + assertThat("2 data frames have been sent",streamDataSent.await(5,TimeUnit.SECONDS),is(true)); + assertThatNoExceptionOccured(exceptionCountDownLatch); + } + + @Test + public void testSynPushStreamOnClosedStream() throws Exception + { + final CountDownLatch pushStreamFailedLatch = new CountDownLatch(1); + + Session clientSession = startClient(startServer(new ServerSessionFrameListener.Adapter() + { + @Override + public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + { + stream.reply(new ReplyInfo(true)); + stream.syn(new SynInfo(false),1,TimeUnit.SECONDS,new Handler.Adapter<Stream>() + { + @Override + public void failed(Throwable x) + { + pushStreamFailedLatch.countDown(); + } + }); + return super.onSyn(stream,synInfo); + } + }),new SessionFrameListener.Adapter()); + + clientSession.syn(new SynInfo(true),null); + assertThat("pushStream syn has failed",pushStreamFailedLatch.await(5,TimeUnit.SECONDS),is(true)); + } + + @Test + public void testSendBigDataOnPushStreamWhenAssociatedStreamIsClosed() throws Exception + { + final CountDownLatch streamClosedLatch = new CountDownLatch(1); + final CountDownLatch allDataReceived = new CountDownLatch(1); + final CountDownLatch exceptionCountDownLatch = new CountDownLatch(1); + final Exchanger<ByteBuffer> exchanger = new Exchanger<>(); + final int dataSizeInBytes = 1024 * 1024 * 1; + final byte[] transferBytes = createHugeByteArray(dataSizeInBytes); + + Session clientSession = startClient(startServer(new ServerSessionFrameListener.Adapter() + { + @Override + public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + { + try + { + Stream pushStream = stream.syn(new SynInfo(false)).get(); + stream.reply(new ReplyInfo(true)); + // wait until stream is closed + streamClosedLatch.await(5,TimeUnit.SECONDS); + pushStream.data(new BytesDataInfo(transferBytes,true)); + return null; + } + catch (Exception e) + { + exceptionCountDownLatch.countDown(); + throw new IllegalStateException(e); + } + } + }),new SessionFrameListener.Adapter() + { + @Override + public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + { + return new StreamFrameListener.Adapter() + { + ByteBuffer receivedBytes = ByteBuffer.allocate(dataSizeInBytes); + + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + dataInfo.consumeInto(receivedBytes); + if (dataInfo.isClose()) + { + allDataReceived.countDown(); + try + { + receivedBytes.flip(); + exchanger.exchange(receivedBytes.slice(),5,TimeUnit.SECONDS); + } + catch (Exception e) + { + exceptionCountDownLatch.countDown(); + } + } + } + }; + } + }); + + Stream stream = clientSession.syn(new SynInfo(true),new StreamFrameListener.Adapter() + { + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + streamClosedLatch.countDown(); + super.onReply(stream,replyInfo); + } + }).get(); + + ByteBuffer receivedBytes = exchanger.exchange(null,5,TimeUnit.SECONDS); + + assertThat("received byte array is the same as transferred byte array",Arrays.equals(transferBytes,receivedBytes.array()),is(true)); + assertThat("onReply has been called to close the stream",streamClosedLatch.await(5,TimeUnit.SECONDS),is(true)); + assertThat("stream is closed",stream.isClosed(),is(true)); + assertThat("all data has been received",allDataReceived.await(20,TimeUnit.SECONDS),is(true)); + assertThatNoExceptionOccured(exceptionCountDownLatch); + } + + private byte[] createHugeByteArray(int sizeInBytes) + { + byte[] bytes = new byte[sizeInBytes]; + new Random().nextBytes(bytes); + return bytes; + } + + @Test + public void testOddEvenStreamIds() throws Exception + { + final CountDownLatch pushStreamIdIsEvenLatch = new CountDownLatch(3); + + Session clientSession = startClient(startServer(new ServerSessionFrameListener.Adapter() + { + @Override + public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + { + stream.syn(new SynInfo(false)); + return null; + } + }),new SessionFrameListener.Adapter() + { + @Override + public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + { + stream.reply(new ReplyInfo(false)); + assertStreamIdIsEven(stream); + pushStreamIdIsEvenLatch.countDown(); + return super.onSyn(stream,synInfo); + } + }); + + Stream stream = clientSession.syn(new SynInfo(false),null).get(); + Stream stream2 = clientSession.syn(new SynInfo(false),null).get(); + Stream stream3 = clientSession.syn(new SynInfo(false),null).get(); + assertStreamIdIsOdd(stream); + assertStreamIdIsOdd(stream2); + assertStreamIdIsOdd(stream3); + + assertThat("all pushStreams had even ids",pushStreamIdIsEvenLatch.await(5,TimeUnit.SECONDS),is(true)); + } + + private void assertStreamIdIsEven(Stream stream) + { + assertThat("streamId is odd",stream.getId() % 2,is(0)); + } + + private void assertStreamIdIsOdd(Stream stream) + { + assertThat("streamId is odd",stream.getId() % 2,is(1)); + } + + private void assertThatNoExceptionOccured(final CountDownLatch exceptionCountDownLatch) throws InterruptedException + { + assertThat("No exception occured", exceptionCountDownLatch.await(1,TimeUnit.SECONDS),is(false)); + } +} diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ResetStreamTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ResetStreamTest.java index c6c0a2b..654bb5f 100644 --- a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ResetStreamTest.java +++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ResetStreamTest.java @@ -1,5 +1,11 @@ package org.eclipse.jetty.spdy; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -15,7 +21,6 @@ import org.eclipse.jetty.spdy.api.StreamStatus; import org.eclipse.jetty.spdy.api.StringDataInfo; import org.eclipse.jetty.spdy.api.SynInfo; import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener; -import org.junit.Assert; import org.junit.Test; public class ResetStreamTest extends AbstractTest @@ -23,12 +28,12 @@ public class ResetStreamTest extends AbstractTest @Test public void testResetStreamIsRemoved() throws Exception { - Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()), null); + Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()),null); - Stream stream = session.syn(new SynInfo(false), null).get(5, TimeUnit.SECONDS); - session.rst(new RstInfo(stream.getId(), StreamStatus.CANCEL_STREAM)).get(5, TimeUnit.SECONDS); + Stream stream = session.syn(new SynInfo(false),null).get(5,TimeUnit.SECONDS); + session.rst(new RstInfo(stream.getId(),StreamStatus.CANCEL_STREAM)).get(5,TimeUnit.SECONDS); - Assert.assertEquals(0, session.getStreams().size()); + assertEquals("session expected to contain 0 streams",0,session.getStreams().size()); } @Test @@ -44,11 +49,11 @@ public class ResetStreamTest extends AbstractTest { Session serverSession = stream.getSession(); serverSessionRef.set(serverSession); - serverSession.rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM)); + serverSession.rst(new RstInfo(stream.getId(),StreamStatus.REFUSED_STREAM)); synLatch.countDown(); return null; } - }), new SessionFrameListener.Adapter() + }),new SessionFrameListener.Adapter() { @Override public void onRst(Session session, RstInfo rstInfo) @@ -57,16 +62,17 @@ public class ResetStreamTest extends AbstractTest } }); - clientSession.syn(new SynInfo(false), null).get(5, TimeUnit.SECONDS); + Stream stream = clientSession.syn(new SynInfo(false),null).get(5,TimeUnit.SECONDS); - Assert.assertTrue(synLatch.await(5, TimeUnit.SECONDS)); + assertTrue("syncLatch didn't count down",synLatch.await(5,TimeUnit.SECONDS)); Session serverSession = serverSessionRef.get(); - Assert.assertEquals(0, serverSession.getStreams().size()); + assertEquals("serverSession expected to contain 0 streams",0,serverSession.getStreams().size()); - Assert.assertTrue(rstLatch.await(5, TimeUnit.SECONDS)); + assertTrue("rstLatch didn't count down",rstLatch.await(5,TimeUnit.SECONDS)); // Need to sleep a while to give the chance to the implementation to remove the stream TimeUnit.SECONDS.sleep(1); - Assert.assertEquals(0, clientSession.getStreams().size()); + assertTrue("stream is expected to be reset",stream.isReset()); + assertEquals("clientSession expected to contain 0 streams",0,clientSession.getStreams().size()); } @Test @@ -83,8 +89,8 @@ public class ResetStreamTest extends AbstractTest try { // Refuse the stream, we must ignore data frames - Assert.assertTrue(synLatch.await(5, TimeUnit.SECONDS)); - stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM)); + assertTrue(synLatch.await(5,TimeUnit.SECONDS)); + stream.getSession().rst(new RstInfo(stream.getId(),StreamStatus.REFUSED_STREAM)); return new StreamFrameListener.Adapter() { @Override @@ -100,7 +106,7 @@ public class ResetStreamTest extends AbstractTest return null; } } - }), new SessionFrameListener.Adapter() + }),new SessionFrameListener.Adapter() { @Override public void onRst(Session session, RstInfo rstInfo) @@ -109,8 +115,8 @@ public class ResetStreamTest extends AbstractTest } }); - Stream stream = session.syn(new SynInfo(false), null).get(5, TimeUnit.SECONDS); - stream.data(new StringDataInfo("data", true), 5, TimeUnit.SECONDS, new Handler.Adapter<Void>() + Stream stream = session.syn(new SynInfo(false),null).get(5,TimeUnit.SECONDS); + stream.data(new StringDataInfo("data",true),5,TimeUnit.SECONDS,new Handler.Adapter<Void>() { @Override public void completed(Void context) @@ -119,7 +125,60 @@ public class ResetStreamTest extends AbstractTest } }); - Assert.assertTrue(rstLatch.await(5, TimeUnit.SECONDS)); - Assert.assertFalse(dataLatch.await(1, TimeUnit.SECONDS)); + assertTrue("rstLatch didn't count down",rstLatch.await(5,TimeUnit.SECONDS)); + assertTrue("stream is expected to be reset",stream.isReset()); + assertFalse("dataLatch shouln't be count down",dataLatch.await(1,TimeUnit.SECONDS)); } + + @Test + public void testResetAfterServerReceivedFirstDataFrameAndSecondDataFrameFails() throws Exception + { + final CountDownLatch synLatch = new CountDownLatch(1); + final CountDownLatch dataLatch = new CountDownLatch(1); + final CountDownLatch rstLatch = new CountDownLatch(1); + final CountDownLatch failLatch = new CountDownLatch(1); + Session session = startClient(startServer(new ServerSessionFrameListener.Adapter() + { + @Override + public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + { + synLatch.countDown(); + return new StreamFrameListener.Adapter() + { + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + dataLatch.countDown(); + stream.getSession().rst(new RstInfo(stream.getId(),StreamStatus.REFUSED_STREAM)); + } + }; + } + }),new SessionFrameListener.Adapter() + { + @Override + public void onRst(Session session, RstInfo rstInfo) + { + rstLatch.countDown(); + } + }); + + Stream stream = session.syn(new SynInfo(false),null).get(5,TimeUnit.SECONDS); + assertThat("syn is received by server", synLatch.await(5,TimeUnit.SECONDS),is(true)); + stream.data(new StringDataInfo("data",false),5,TimeUnit.SECONDS,null); + assertThat("stream is reset",rstLatch.await(5,TimeUnit.SECONDS),is(true)); + stream.data(new StringDataInfo("2nd dataframe",false),5L,TimeUnit.SECONDS,new Handler.Adapter<Void>() + { + @Override + public void failed(Throwable x) + { + failLatch.countDown(); + } + }); + + assertThat("2nd data call failed",failLatch.await(5,TimeUnit.SECONDS),is(true)); + assertThat("stream is reset",stream.isReset(),is(true)); + } + + // TODO: If server already received 2nd dataframe after it rst, it should ignore it. Not easy to do. + } diff --git a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SSLEngineLeakTest.java b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SSLEngineLeakTest.java index cdb9a7e..2e58551 100644 --- a/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SSLEngineLeakTest.java +++ b/jetty-spdy/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SSLEngineLeakTest.java @@ -36,6 +36,7 @@ public class SSLEngineLeakTest extends AbstractTest Field field = NextProtoNego.class.getDeclaredField("objects"); field.setAccessible(true); + @SuppressWarnings("unchecked") Map<Object, NextProtoNego.Provider> objects = (Map<Object, NextProtoNego.Provider>)field.get(null); int initialSize = objects.size(); @@ -498,6 +498,11 @@ <version>4.8.1</version> </dependency> <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <version>1.1</version> + </dependency> + <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-core</artifactId> <version>1.8.5</version> |

