Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimone Bordet2015-02-05 12:17:03 +0000
committerSimone Bordet2015-02-09 08:11:56 +0000
commit02b5732720607c2703d8c0e1a9d62901f40aea67 (patch)
treee4a9eaaab8b75d5f853dd5651444f83d1255edb9 /jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java
parent96132dbe459d0491c1523462d8e9af3580117808 (diff)
downloadorg.eclipse.jetty.project-02b5732720607c2703d8c0e1a9d62901f40aea67.tar.gz
org.eclipse.jetty.project-02b5732720607c2703d8c0e1a9d62901f40aea67.tar.xz
org.eclipse.jetty.project-02b5732720607c2703d8c0e1a9d62901f40aea67.zip
First take at implementing the HttpClientTransport for HTTP2.
Diffstat (limited to 'jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java')
-rw-r--r--jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java74
1 files changed, 56 insertions, 18 deletions
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java
index 58bf192333..719ba80882 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java
@@ -18,6 +18,8 @@
package org.eclipse.jetty.http2;
+import java.io.EOFException;
+import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
@@ -48,7 +50,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream
private final ISession session;
private final int streamId;
private volatile Listener listener;
- private volatile boolean reset;
+ private volatile boolean localReset;
+ private volatile boolean remoteReset;
public HTTP2Stream(Scheduler scheduler, ISession session, int streamId)
{
@@ -94,6 +97,9 @@ public class HTTP2Stream extends IdleTimeout implements IStream
public void reset(ResetFrame frame, Callback callback)
{
notIdle();
+ if (isReset())
+ return;
+ localReset = true;
session.control(this, callback, frame, Frame.EMPTY_ARRAY);
}
@@ -118,7 +124,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream
@Override
public boolean isReset()
{
- return reset;
+ return localReset || remoteReset;
}
@Override
@@ -127,6 +133,11 @@ public class HTTP2Stream extends IdleTimeout implements IStream
return closeState.get() == CloseState.CLOSED;
}
+ private boolean isRemotelyClosed()
+ {
+ return closeState.get() == CloseState.REMOTELY_CLOSED;
+ }
+
@Override
public boolean isOpen()
{
@@ -144,10 +155,10 @@ public class HTTP2Stream extends IdleTimeout implements IStream
close();
// Tell the other peer that we timed out.
- reset(new ResetFrame(getId(), ErrorCodes.CANCEL_STREAM_ERROR), Callback.Adapter.INSTANCE);
+ reset(new ResetFrame(getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.Adapter.INSTANCE);
// Notify the application.
- notifyFailure(this, timeout);
+ notifyTimeout(this, timeout);
}
private ConcurrentMap<String, Object> attributes()
@@ -207,40 +218,52 @@ public class HTTP2Stream extends IdleTimeout implements IStream
private boolean onHeaders(HeadersFrame frame, Callback callback)
{
- // TODO: handle case where HEADERS after DATA.
+ updateClose(frame.isEndStream(), false);
callback.succeeded();
return false;
}
private boolean onData(DataFrame frame, Callback callback)
{
- // TODO: handle cases where:
- // TODO: A) stream already remotely close.
- // TODO: B) DATA before HEADERS.
-
if (getRecvWindow() < 0)
{
// It's a bad client, it does not deserve to be
// treated gently by just resetting the stream.
- session.close(ErrorCodes.FLOW_CONTROL_ERROR, "stream_window_exceeded", callback);
+ session.close(ErrorCode.FLOW_CONTROL_ERROR.code, "stream_window_exceeded", callback);
return true;
}
- else
+
+ // SPEC: remotely closed streams must be replied with a reset.
+ if (isRemotelyClosed())
{
- notifyData(this, frame, callback);
- return false;
+ reset(new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR.code), Callback.Adapter.INSTANCE);
+ callback.failed(new EOFException("stream_closed"));
+ return true;
}
+
+ if (isReset())
+ {
+ // Just drop the frame.
+ callback.failed(new IOException("stream_reset"));
+ return true;
+ }
+
+ updateClose(frame.isEndStream(), false);
+ notifyData(this, frame, callback);
+ return false;
}
private boolean onReset(ResetFrame frame, Callback callback)
{
- reset = true;
+ remoteReset = true;
callback.succeeded();
+ notifyReset(this, frame);
return false;
}
private boolean onPush(PushPromiseFrame frame, Callback callback)
{
+ updateClose(true, true);
callback.succeeded();
return false;
}
@@ -316,7 +339,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream
onClose();
}
- protected void notifyData(Stream stream, DataFrame frame, Callback callback)
+ private void notifyData(Stream stream, DataFrame frame, Callback callback)
{
final Listener listener = this.listener;
if (listener == null)
@@ -331,14 +354,29 @@ public class HTTP2Stream extends IdleTimeout implements IStream
}
}
- private void notifyFailure(Stream stream, Throwable failure)
+ private void notifyReset(Stream stream, ResetFrame frame)
+ {
+ final Listener listener = this.listener;
+ if (listener == null)
+ return;
+ try
+ {
+ listener.onReset(stream, frame);
+ }
+ catch (Throwable x)
+ {
+ LOG.info("Failure while notifying listener " + listener, x);
+ }
+ }
+
+ private void notifyTimeout(Stream stream, Throwable failure)
{
Listener listener = this.listener;
if (listener == null)
return;
try
{
- listener.onFailure(stream, failure);
+ listener.onTimeout(stream, failure);
}
catch (Throwable x)
{
@@ -350,6 +388,6 @@ public class HTTP2Stream extends IdleTimeout implements IStream
public String toString()
{
return String.format("%s@%x{id=%d,sendWindow=%s,recvWindow=%s,reset=%b,%s}", getClass().getSimpleName(),
- hashCode(), getId(), sendWindow, recvWindow, reset, closeState);
+ hashCode(), getId(), sendWindow, recvWindow, isReset(), closeState);
}
}

Back to the top