Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorThomas Becker2013-08-22 09:31:38 +0000
committerThomas Becker2013-08-22 12:00:27 +0000
commit525b268d41cc73cb76526a0353903032e98dbb9b (patch)
tree7abb56ad4eac9a79417c7abbe6e196d8dd5cb85a /jetty-spdy/spdy-core/src
parentc0ed8375d3726ecf5ec1c7fa0fb40a50a2b3f57f (diff)
downloadorg.eclipse.jetty.project-525b268d41cc73cb76526a0353903032e98dbb9b.tar.gz
org.eclipse.jetty.project-525b268d41cc73cb76526a0353903032e98dbb9b.tar.xz
org.eclipse.jetty.project-525b268d41cc73cb76526a0353903032e98dbb9b.zip
415656 SPDY - add IdleTimeout per Stream functionality
Diffstat (limited to 'jetty-spdy/spdy-core/src')
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java4
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java25
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java12
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/StreamFrameListener.java11
-rw-r--r--jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java8
-rw-r--r--jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java13
-rw-r--r--jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java84
7 files changed, 136 insertions, 21 deletions
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java
index e50f98e1ab..5d1d5b954f 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java
@@ -541,7 +541,9 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
private IStream createStream(SynStreamFrame frame, StreamFrameListener listener, boolean local, Promise<Stream> promise)
{
IStream associatedStream = streams.get(frame.getAssociatedStreamId());
- IStream stream = new StandardStream(frame.getStreamId(), frame.getPriority(), this, associatedStream, promise);
+ IStream stream = new StandardStream(frame.getStreamId(), frame.getPriority(), this, associatedStream,
+ scheduler, promise);
+ stream.setIdleTimeout(endPoint.getIdleTimeout());
flowControlStrategy.onNewStream(this, stream);
stream.updateCloseState(frame.isClose(), local);
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java
index e0b65c7b79..71457904f3 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import org.eclipse.jetty.io.IdleTimeout;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
@@ -43,8 +44,9 @@ import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.util.thread.Scheduler;
-public class StandardStream implements IStream
+public class StandardStream extends IdleTimeout implements IStream
{
private static final Logger LOG = Log.getLogger(Stream.class);
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
@@ -60,8 +62,9 @@ public class StandardStream implements IStream
private volatile CloseState closeState = CloseState.OPENED;
private volatile boolean reset = false;
- public StandardStream(int id, byte priority, ISession session, IStream associatedStream, Promise<Stream> promise)
+ public StandardStream(int id, byte priority, ISession session, IStream associatedStream, Scheduler scheduler, Promise<Stream> promise)
{
+ super(scheduler);
this.id = id;
this.priority = priority;
this.session = session;
@@ -106,6 +109,18 @@ public class StandardStream implements IStream
}
@Override
+ protected void onIdleExpired(TimeoutException timeout)
+ {
+ listener.onFailure(timeout);
+ }
+
+ @Override
+ public boolean isOpen()
+ {
+ return !isClosed();
+ }
+
+ @Override
public int getWindowSize()
{
return windowSize.get();
@@ -194,6 +209,7 @@ public class StandardStream implements IStream
@Override
public void process(ControlFrame frame)
{
+ notIdle();
switch (frame.getType())
{
case SYN_STREAM:
@@ -234,6 +250,7 @@ public class StandardStream implements IStream
@Override
public void process(DataInfo dataInfo)
{
+ notIdle();
// TODO: in v3 we need to send a rst instead of just ignoring
// ignore data frame if this stream is remotelyClosed already
if (isRemotelyClosed())
@@ -349,6 +366,7 @@ public class StandardStream implements IStream
@Override
public void push(PushInfo pushInfo, Promise<Stream> promise)
{
+ notIdle();
if (isClosed() || isReset())
{
promise.failed(new StreamException(getId(), StreamStatus.STREAM_ALREADY_CLOSED,
@@ -373,6 +391,7 @@ public class StandardStream implements IStream
@Override
public void reply(ReplyInfo replyInfo, Callback callback)
{
+ notIdle();
if (isUnidirectional())
throw new IllegalStateException("Protocol violation: cannot send SYN_REPLY frames in unidirectional streams");
openState = OpenState.REPLY_SENT;
@@ -395,6 +414,7 @@ public class StandardStream implements IStream
@Override
public void data(DataInfo dataInfo, Callback callback)
{
+ notIdle();
if (!canSend())
{
session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());
@@ -425,6 +445,7 @@ public class StandardStream implements IStream
@Override
public void headers(HeadersInfo headersInfo, Callback callback)
{
+ notIdle();
if (!canSend())
{
session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java
index abc1b88847..4b511314c9 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java
@@ -225,4 +225,16 @@ public interface Stream
*/
public Set<Stream> getPushedStreams();
+ /**
+ * Get the idle timeout set for this particular stream
+ * @return the idle timeout
+ */
+ public long getIdleTimeout();
+
+ /**
+ * Set an idle timeout for this stream
+ * @param timeout
+ */
+ public void setIdleTimeout(long timeout);
+
}
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/StreamFrameListener.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/StreamFrameListener.java
index 0a4248a1f9..c8139e340b 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/StreamFrameListener.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/StreamFrameListener.java
@@ -70,6 +70,12 @@ public interface StreamFrameListener extends EventListener
public void onData(Stream stream, DataInfo dataInfo);
/**
+ * <p>Callback invoked on errors.</p>
+ * @param x
+ */
+ public void onFailure(Throwable x);
+
+ /**
* <p>Empty implementation of {@link StreamFrameListener}</p>
*/
public static class Adapter implements StreamFrameListener
@@ -94,5 +100,10 @@ public interface StreamFrameListener extends EventListener
public void onData(Stream stream, DataInfo dataInfo)
{
}
+
+ @Override
+ public void onFailure(Throwable x)
+ {
+ }
}
}
diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java
index 1d198a802a..dc8c650bf4 100644
--- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java
+++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java
@@ -24,7 +24,9 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
+import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.SPDYException;
@@ -47,6 +49,8 @@ import org.junit.runner.RunWith;
@RunWith(AdvancedRunner.class)
public class AsyncTimeoutTest
{
+ EndPoint endPoint = new ByteArrayEndPoint();
+
@Slow
@Test
public void testAsyncTimeoutInControlFrames() throws Exception
@@ -60,7 +64,7 @@ public class AsyncTimeoutTest
scheduler.start(); // TODO need to use jetty lifecycles better here
Generator generator = new Generator(bufferPool, new StandardCompressionFactory.StandardCompressor());
Session session = new StandardSession(SPDY.V2, bufferPool, threadPool, scheduler, new TestController(),
- null, null, 1, null, generator, new FlowControlStrategy.None())
+ endPoint, null, 1, null, generator, new FlowControlStrategy.None())
{
@Override
public void flush()
@@ -103,7 +107,7 @@ public class AsyncTimeoutTest
scheduler.start();
Generator generator = new Generator(bufferPool, new StandardCompressionFactory.StandardCompressor());
Session session = new StandardSession(SPDY.V2, bufferPool, threadPool, scheduler, new TestController(),
- null, null, 1, null, generator, new FlowControlStrategy.None())
+ endPoint, null, 1, null, generator, new FlowControlStrategy.None())
{
@Override
protected void write(ByteBuffer buffer, Callback callback)
diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java
index 813fb0ceb5..45e000aa27 100644
--- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java
+++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.io.ByteBufferPool;
+import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
@@ -75,6 +76,7 @@ import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class StandardSessionTest
@@ -84,6 +86,10 @@ public class StandardSessionTest
@Mock
private Controller controller;
+
+ @Mock
+ private EndPoint endPoint;
+
private ExecutorService threadPool;
private StandardSession session;
private Scheduler scheduler;
@@ -97,8 +103,9 @@ public class StandardSessionTest
threadPool = Executors.newCachedThreadPool();
scheduler = new TimerScheduler();
scheduler.start();
- session = new StandardSession(VERSION, bufferPool, threadPool, scheduler, controller, null, null, 1, null,
+ session = new StandardSession(VERSION, bufferPool, threadPool, scheduler, controller, endPoint, null, 1, null,
generator, new FlowControlStrategy.None());
+ when(endPoint.getIdleTimeout()).thenReturn(30000L);
headers = new Fields();
}
@@ -428,7 +435,7 @@ public class StandardSessionTest
final CountDownLatch failedCalledLatch = new CountDownLatch(2);
SynStreamFrame synStreamFrame = new SynStreamFrame(VERSION, SynInfo.FLAG_CLOSE, 1, 0, (byte)0, (short)0, null);
- IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null);
+ IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null, null);
stream.updateWindowSize(8192);
Callback.Adapter callback = new Callback.Adapter()
{
@@ -502,7 +509,7 @@ public class StandardSessionTest
private void testHeaderFramesAreSentInOrder(final byte priority0, final byte priority1, final byte priority2) throws InterruptedException, ExecutionException
{
final StandardSession testLocalSession = new StandardSession(VERSION, bufferPool, threadPool, scheduler,
- new ControllerMock(), null, null, 1, null, generator, new FlowControlStrategy.None());
+ new ControllerMock(), endPoint, null, 1, null, generator, new FlowControlStrategy.None());
HashSet<Future> tasks = new HashSet<>();
int numberOfTasksToRun = 128;
diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java
index 57bdd98805..aadb2ddf53 100644
--- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java
+++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardStreamTest.java
@@ -18,16 +18,6 @@
package org.eclipse.jetty.spdy;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@@ -43,23 +33,44 @@ import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.frames.SynStreamFrame;
+import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.Promise;
+import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
@RunWith(MockitoJUnitRunner.class)
public class StandardStreamTest
{
+ private final ScheduledExecutorScheduler scheduler = new ScheduledExecutorScheduler();
@Mock
private ISession session;
@Mock
private SynStreamFrame synStreamFrame;
+ @Before
+ public void setUp() throws Exception
+ {
+ scheduler.start();
+ }
+
/**
* Test method for {@link Stream#push(org.eclipse.jetty.spdy.api.PushInfo)}.
*/
@@ -67,7 +78,7 @@ public class StandardStreamTest
@Test
public void testSyn()
{
- Stream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null);
+ Stream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null, null);
Set<Stream> streams = new HashSet<>();
streams.add(stream);
when(synStreamFrame.isClose()).thenReturn(false);
@@ -100,7 +111,8 @@ public class StandardStreamTest
@Test
public void testSynOnClosedStream()
{
- IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null);
+ IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session,
+ null, null , null);
stream.updateCloseState(true, true);
stream.updateCloseState(true, false);
assertThat("stream expected to be closed", stream.isClosed(), is(true));
@@ -121,11 +133,57 @@ public class StandardStreamTest
public void testSendDataOnHalfClosedStream() throws InterruptedException, ExecutionException, TimeoutException
{
SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2, SynInfo.FLAG_CLOSE, 1, 0, (byte)0, (short)0, null);
- IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null);
+ IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session,
+ null, scheduler, null);
stream.updateWindowSize(8192);
stream.updateCloseState(synStreamFrame.isClose(), true);
assertThat("stream is half closed", stream.isHalfClosed(), is(true));
stream.data(new StringDataInfo("data on half closed stream", true));
verify(session, never()).data(any(IStream.class), any(DataInfo.class), anyInt(), any(TimeUnit.class), any(Callback.class));
}
+
+ @Test
+ @Slow
+ public void testIdleTimeout() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ final CountDownLatch onFailCalledLatch = new CountDownLatch(1);
+ IStream stream = new StandardStream(1, (byte)0, session, null, scheduler, null);
+ stream.setIdleTimeout(500);
+ stream.setStreamFrameListener(new StreamFrameListener.Adapter()
+ {
+ @Override
+ public void onFailure(Throwable x)
+ {
+ assertThat("exception is a TimeoutException", x, is(instanceOf(TimeoutException.class)));
+ onFailCalledLatch.countDown();
+ }
+ });
+ stream.process(new StringDataInfo("string", false));
+ Thread.sleep(1000);
+ assertThat("onFailure has been called", onFailCalledLatch.await(5, TimeUnit.SECONDS), is(true));
+ }
+
+ @Test
+ @Slow
+ public void testIdleTimeoutIsInterruptedWhenReceiving() throws InterruptedException, ExecutionException,
+ TimeoutException
+ {
+ final CountDownLatch onFailCalledLatch = new CountDownLatch(1);
+ IStream stream = new StandardStream(1, (byte)0, session, null, scheduler, null);
+ stream.setStreamFrameListener(new StreamFrameListener.Adapter()
+ {
+ @Override
+ public void onFailure(Throwable x)
+ {
+ assertThat("exception is a TimeoutException", x, is(instanceOf(TimeoutException.class)));
+ onFailCalledLatch.countDown();
+ }
+ });
+ stream.process(new StringDataInfo("string", false));
+ Thread.sleep(500);
+ stream.process(new StringDataInfo("string", false));
+ Thread.sleep(500);
+ assertThat("onFailure has been called", onFailCalledLatch.await(1, TimeUnit.SECONDS), is(false));
+ }
+
}

Back to the top