diff options
author | Thomas Becker | 2013-08-22 09:31:38 +0000 |
---|---|---|
committer | Thomas Becker | 2013-08-22 12:00:27 +0000 |
commit | 525b268d41cc73cb76526a0353903032e98dbb9b (patch) | |
tree | 7abb56ad4eac9a79417c7abbe6e196d8dd5cb85a /jetty-spdy/spdy-core/src | |
parent | c0ed8375d3726ecf5ec1c7fa0fb40a50a2b3f57f (diff) | |
download | org.eclipse.jetty.project-525b268d41cc73cb76526a0353903032e98dbb9b.tar.gz org.eclipse.jetty.project-525b268d41cc73cb76526a0353903032e98dbb9b.tar.xz org.eclipse.jetty.project-525b268d41cc73cb76526a0353903032e98dbb9b.zip |
415656 SPDY - add IdleTimeout per Stream functionality
Diffstat (limited to 'jetty-spdy/spdy-core/src')
7 files changed, 136 insertions, 21 deletions
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java index e50f98e1ab..5d1d5b954f 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java @@ -541,7 +541,9 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable private IStream createStream(SynStreamFrame frame, StreamFrameListener listener, boolean local, Promise<Stream> promise) { IStream associatedStream = streams.get(frame.getAssociatedStreamId()); - IStream stream = new StandardStream(frame.getStreamId(), frame.getPriority(), this, associatedStream, promise); + IStream stream = new StandardStream(frame.getStreamId(), frame.getPriority(), this, associatedStream, + scheduler, promise); + stream.setIdleTimeout(endPoint.getIdleTimeout()); flowControlStrategy.onNewStream(this, stream); stream.updateCloseState(frame.isClose(), local); diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java index e0b65c7b79..71457904f3 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java @@ -26,6 +26,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import org.eclipse.jetty.io.IdleTimeout; import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.HeadersInfo; import org.eclipse.jetty.spdy.api.PushInfo; @@ -43,8 +44,9 @@ import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Scheduler; -public class StandardStream implements IStream +public class StandardStream extends IdleTimeout implements IStream { private static final Logger LOG = Log.getLogger(Stream.class); private final Map<String, Object> attributes = new ConcurrentHashMap<>(); @@ -60,8 +62,9 @@ public class StandardStream implements IStream private volatile CloseState closeState = CloseState.OPENED; private volatile boolean reset = false; - public StandardStream(int id, byte priority, ISession session, IStream associatedStream, Promise<Stream> promise) + public StandardStream(int id, byte priority, ISession session, IStream associatedStream, Scheduler scheduler, Promise<Stream> promise) { + super(scheduler); this.id = id; this.priority = priority; this.session = session; @@ -106,6 +109,18 @@ public class StandardStream implements IStream } @Override + protected void onIdleExpired(TimeoutException timeout) + { + listener.onFailure(timeout); + } + + @Override + public boolean isOpen() + { + return !isClosed(); + } + + @Override public int getWindowSize() { return windowSize.get(); @@ -194,6 +209,7 @@ public class StandardStream implements IStream @Override public void process(ControlFrame frame) { + notIdle(); switch (frame.getType()) { case SYN_STREAM: @@ -234,6 +250,7 @@ public class StandardStream implements IStream @Override public void process(DataInfo dataInfo) { + notIdle(); // TODO: in v3 we need to send a rst instead of just ignoring // ignore data frame if this stream is remotelyClosed already if (isRemotelyClosed()) @@ -349,6 +366,7 @@ public class StandardStream implements IStream @Override public void push(PushInfo pushInfo, Promise<Stream> promise) { + notIdle(); if (isClosed() || isReset()) { promise.failed(new StreamException(getId(), StreamStatus.STREAM_ALREADY_CLOSED, @@ -373,6 +391,7 @@ public class StandardStream implements IStream @Override public void reply(ReplyInfo replyInfo, Callback callback) { + notIdle(); if (isUnidirectional()) throw new IllegalStateException("Protocol violation: cannot send SYN_REPLY frames in unidirectional streams"); openState = OpenState.REPLY_SENT; @@ -395,6 +414,7 @@ public class StandardStream implements IStream @Override public void data(DataInfo dataInfo, Callback callback) { + notIdle(); if (!canSend()) { session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter()); @@ -425,6 +445,7 @@ public class StandardStream implements IStream @Override public void headers(HeadersInfo headersInfo, Callback callback) { + notIdle(); if (!canSend()) { session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter()); diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java index abc1b88847..4b511314c9 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java @@ -225,4 +225,16 @@ public interface Stream */ public Set<Stream> getPushedStreams(); + /** + * Get the idle timeout set for this particular stream + * @return the idle timeout + */ + public long getIdleTimeout(); + + /** + * Set an idle timeout for this stream + * @param timeout + */ + public void setIdleTimeout(long timeout); + } diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/StreamFrameListener.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/StreamFrameListener.java index 0a4248a1f9..c8139e340b 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/StreamFrameListener.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/StreamFrameListener.java @@ -70,6 +70,12 @@ public interface StreamFrameListener extends EventListener public void onData(Stream stream, DataInfo dataInfo); /** + * <p>Callback invoked on errors.</p> + * @param x + */ + public void onFailure(Throwable x); + + /** * <p>Empty implementation of {@link StreamFrameListener}</p> */ public static class Adapter implements StreamFrameListener @@ -94,5 +100,10 @@ public interface StreamFrameListener extends EventListener public void onData(Stream stream, DataInfo dataInfo) { } + + @Override + public void onFailure(Throwable x) + { + } } } diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java index 1d198a802a..dc8c650bf4 100644 --- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java +++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java @@ -24,7 +24,9 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.eclipse.jetty.io.ByteArrayEndPoint; import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.MappedByteBufferPool; import org.eclipse.jetty.spdy.api.SPDY; import org.eclipse.jetty.spdy.api.SPDYException; @@ -47,6 +49,8 @@ import org.junit.runner.RunWith; @RunWith(AdvancedRunner.class) public class AsyncTimeoutTest { + EndPoint endPoint = new ByteArrayEndPoint(); + @Slow @Test public void testAsyncTimeoutInControlFrames() throws Exception @@ -60,7 +64,7 @@ public class AsyncTimeoutTest scheduler.start(); // TODO need to use jetty lifecycles better here Generator generator = new Generator(bufferPool, new StandardCompressionFactory.StandardCompressor()); Session session = new StandardSession(SPDY.V2, bufferPool, threadPool, scheduler, new TestController(), - null, null, 1, null, generator, new FlowControlStrategy.None()) + endPoint, null, 1, null, generator, new FlowControlStrategy.None()) { @Override public void flush() @@ -103,7 +107,7 @@ public class AsyncTimeoutTest scheduler.start(); Generator generator = new Generator(bufferPool, new StandardCompressionFactory.StandardCompressor()); Session session = new StandardSession(SPDY.V2, bufferPool, threadPool, scheduler, new TestController(), - null, null, 1, null, generator, new FlowControlStrategy.None()) + endPoint, null, 1, null, generator, new FlowControlStrategy.None()) { @Override protected void write(ByteBuffer buffer, Callback callback) diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java index 813fb0ceb5..45e000aa27 100644 --- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java +++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.MappedByteBufferPool; import org.eclipse.jetty.spdy.api.ByteBufferDataInfo; import org.eclipse.jetty.spdy.api.DataInfo; @@ -75,6 +76,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class StandardSessionTest @@ -84,6 +86,10 @@ public class StandardSessionTest @Mock private Controller controller; + + @Mock + private EndPoint endPoint; + private ExecutorService threadPool; private StandardSession session; private Scheduler scheduler; @@ -97,8 +103,9 @@ public class StandardSessionTest threadPool = Executors.newCachedThreadPool(); scheduler = new TimerScheduler(); scheduler.start(); - session = new StandardSession(VERSION, bufferPool, threadPool, scheduler, controller, null, null, 1, null, + session = new StandardSession(VERSION, bufferPool, threadPool, scheduler, controller, endPoint, null, 1, null, generator, new FlowControlStrategy.None()); + when(endPoint.getIdleTimeout()).thenReturn(30000L); headers = new Fields(); } @@ -428,7 +435,7 @@ public class StandardSessionTest final CountDownLatch failedCalledLatch = new CountDownLatch(2); SynStreamFrame synStreamFrame = new SynStreamFrame(VERSION, SynInfo.FLAG_CLOSE, 1, 0, (byte)0, (short)0, null); - IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null); + IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null, null); stream.updateWindowSize(8192); Callback.Adapter callback = new Callback.Adapter() { @@ -502,7 +509,7 @@ public class StandardSessionTest private void testHeaderFramesAreSentInOrder(final byte priority0, final byte priority1, final byte priority2) throws InterruptedException, ExecutionException { final StandardSession testLocalSession = new StandardSession(VERSION, bufferPool, threadPool, scheduler, - new ControllerMock(), null, null, 1, null, generator, new FlowControlStrategy.None()); + new ControllerMock(), endPoint, null, 1, null, generator, new FlowControlStrategy.None()); HashSet<Future> tasks = new HashSet<>(); int numberOfTasksToRun = 128; diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java index 57bdd98805..aadb2ddf53 100644 --- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java +++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java @@ -18,16 +18,6 @@ package org.eclipse.jetty.spdy; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.argThat; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import java.util.HashSet; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -43,23 +33,44 @@ import org.eclipse.jetty.spdy.api.StreamFrameListener; import org.eclipse.jetty.spdy.api.StringDataInfo; import org.eclipse.jetty.spdy.api.SynInfo; import org.eclipse.jetty.spdy.frames.SynStreamFrame; +import org.eclipse.jetty.toolchain.test.annotation.Slow; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Fields; import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentMatcher; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + @RunWith(MockitoJUnitRunner.class) public class StandardStreamTest { + private final ScheduledExecutorScheduler scheduler = new ScheduledExecutorScheduler(); @Mock private ISession session; @Mock private SynStreamFrame synStreamFrame; + @Before + public void setUp() throws Exception + { + scheduler.start(); + } + /** * Test method for {@link Stream#push(org.eclipse.jetty.spdy.api.PushInfo)}. */ @@ -67,7 +78,7 @@ public class StandardStreamTest @Test public void testSyn() { - Stream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null); + Stream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null, null); Set<Stream> streams = new HashSet<>(); streams.add(stream); when(synStreamFrame.isClose()).thenReturn(false); @@ -100,7 +111,8 @@ public class StandardStreamTest @Test public void testSynOnClosedStream() { - IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null); + IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, + null, null , null); stream.updateCloseState(true, true); stream.updateCloseState(true, false); assertThat("stream expected to be closed", stream.isClosed(), is(true)); @@ -121,11 +133,57 @@ public class StandardStreamTest public void testSendDataOnHalfClosedStream() throws InterruptedException, ExecutionException, TimeoutException { SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2, SynInfo.FLAG_CLOSE, 1, 0, (byte)0, (short)0, null); - IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null); + IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, + null, scheduler, null); stream.updateWindowSize(8192); stream.updateCloseState(synStreamFrame.isClose(), true); assertThat("stream is half closed", stream.isHalfClosed(), is(true)); stream.data(new StringDataInfo("data on half closed stream", true)); verify(session, never()).data(any(IStream.class), any(DataInfo.class), anyInt(), any(TimeUnit.class), any(Callback.class)); } + + @Test + @Slow + public void testIdleTimeout() throws InterruptedException, ExecutionException, TimeoutException + { + final CountDownLatch onFailCalledLatch = new CountDownLatch(1); + IStream stream = new StandardStream(1, (byte)0, session, null, scheduler, null); + stream.setIdleTimeout(500); + stream.setStreamFrameListener(new StreamFrameListener.Adapter() + { + @Override + public void onFailure(Throwable x) + { + assertThat("exception is a TimeoutException", x, is(instanceOf(TimeoutException.class))); + onFailCalledLatch.countDown(); + } + }); + stream.process(new StringDataInfo("string", false)); + Thread.sleep(1000); + assertThat("onFailure has been called", onFailCalledLatch.await(5, TimeUnit.SECONDS), is(true)); + } + + @Test + @Slow + public void testIdleTimeoutIsInterruptedWhenReceiving() throws InterruptedException, ExecutionException, + TimeoutException + { + final CountDownLatch onFailCalledLatch = new CountDownLatch(1); + IStream stream = new StandardStream(1, (byte)0, session, null, scheduler, null); + stream.setStreamFrameListener(new StreamFrameListener.Adapter() + { + @Override + public void onFailure(Throwable x) + { + assertThat("exception is a TimeoutException", x, is(instanceOf(TimeoutException.class))); + onFailCalledLatch.countDown(); + } + }); + stream.process(new StringDataInfo("string", false)); + Thread.sleep(500); + stream.process(new StringDataInfo("string", false)); + Thread.sleep(500); + assertThat("onFailure has been called", onFailCalledLatch.await(1, TimeUnit.SECONDS), is(false)); + } + } |