diff options
author | Simone Bordet | 2015-02-05 12:17:03 +0000 |
---|---|---|
committer | Simone Bordet | 2015-02-09 08:11:56 +0000 |
commit | 02b5732720607c2703d8c0e1a9d62901f40aea67 (patch) | |
tree | e4a9eaaab8b75d5f853dd5651444f83d1255edb9 /jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java | |
parent | 96132dbe459d0491c1523462d8e9af3580117808 (diff) | |
download | org.eclipse.jetty.project-02b5732720607c2703d8c0e1a9d62901f40aea67.tar.gz org.eclipse.jetty.project-02b5732720607c2703d8c0e1a9d62901f40aea67.tar.xz org.eclipse.jetty.project-02b5732720607c2703d8c0e1a9d62901f40aea67.zip |
First take at implementing the HttpClientTransport for HTTP2.
Diffstat (limited to 'jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java')
-rw-r--r-- | jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java | 74 |
1 files changed, 56 insertions, 18 deletions
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index 58bf192333..719ba80882 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -18,6 +18,8 @@ package org.eclipse.jetty.http2; +import java.io.EOFException; +import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeoutException; @@ -48,7 +50,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream private final ISession session; private final int streamId; private volatile Listener listener; - private volatile boolean reset; + private volatile boolean localReset; + private volatile boolean remoteReset; public HTTP2Stream(Scheduler scheduler, ISession session, int streamId) { @@ -94,6 +97,9 @@ public class HTTP2Stream extends IdleTimeout implements IStream public void reset(ResetFrame frame, Callback callback) { notIdle(); + if (isReset()) + return; + localReset = true; session.control(this, callback, frame, Frame.EMPTY_ARRAY); } @@ -118,7 +124,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream @Override public boolean isReset() { - return reset; + return localReset || remoteReset; } @Override @@ -127,6 +133,11 @@ public class HTTP2Stream extends IdleTimeout implements IStream return closeState.get() == CloseState.CLOSED; } + private boolean isRemotelyClosed() + { + return closeState.get() == CloseState.REMOTELY_CLOSED; + } + @Override public boolean isOpen() { @@ -144,10 +155,10 @@ public class HTTP2Stream extends IdleTimeout implements IStream close(); // Tell the other peer that we timed out. - reset(new ResetFrame(getId(), ErrorCodes.CANCEL_STREAM_ERROR), Callback.Adapter.INSTANCE); + reset(new ResetFrame(getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.Adapter.INSTANCE); // Notify the application. - notifyFailure(this, timeout); + notifyTimeout(this, timeout); } private ConcurrentMap<String, Object> attributes() @@ -207,40 +218,52 @@ public class HTTP2Stream extends IdleTimeout implements IStream private boolean onHeaders(HeadersFrame frame, Callback callback) { - // TODO: handle case where HEADERS after DATA. + updateClose(frame.isEndStream(), false); callback.succeeded(); return false; } private boolean onData(DataFrame frame, Callback callback) { - // TODO: handle cases where: - // TODO: A) stream already remotely close. - // TODO: B) DATA before HEADERS. - if (getRecvWindow() < 0) { // It's a bad client, it does not deserve to be // treated gently by just resetting the stream. - session.close(ErrorCodes.FLOW_CONTROL_ERROR, "stream_window_exceeded", callback); + session.close(ErrorCode.FLOW_CONTROL_ERROR.code, "stream_window_exceeded", callback); return true; } - else + + // SPEC: remotely closed streams must be replied with a reset. + if (isRemotelyClosed()) { - notifyData(this, frame, callback); - return false; + reset(new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR.code), Callback.Adapter.INSTANCE); + callback.failed(new EOFException("stream_closed")); + return true; } + + if (isReset()) + { + // Just drop the frame. + callback.failed(new IOException("stream_reset")); + return true; + } + + updateClose(frame.isEndStream(), false); + notifyData(this, frame, callback); + return false; } private boolean onReset(ResetFrame frame, Callback callback) { - reset = true; + remoteReset = true; callback.succeeded(); + notifyReset(this, frame); return false; } private boolean onPush(PushPromiseFrame frame, Callback callback) { + updateClose(true, true); callback.succeeded(); return false; } @@ -316,7 +339,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream onClose(); } - protected void notifyData(Stream stream, DataFrame frame, Callback callback) + private void notifyData(Stream stream, DataFrame frame, Callback callback) { final Listener listener = this.listener; if (listener == null) @@ -331,14 +354,29 @@ public class HTTP2Stream extends IdleTimeout implements IStream } } - private void notifyFailure(Stream stream, Throwable failure) + private void notifyReset(Stream stream, ResetFrame frame) + { + final Listener listener = this.listener; + if (listener == null) + return; + try + { + listener.onReset(stream, frame); + } + catch (Throwable x) + { + LOG.info("Failure while notifying listener " + listener, x); + } + } + + private void notifyTimeout(Stream stream, Throwable failure) { Listener listener = this.listener; if (listener == null) return; try { - listener.onFailure(stream, failure); + listener.onTimeout(stream, failure); } catch (Throwable x) { @@ -350,6 +388,6 @@ public class HTTP2Stream extends IdleTimeout implements IStream public String toString() { return String.format("%s@%x{id=%d,sendWindow=%s,recvWindow=%s,reset=%b,%s}", getClass().getSimpleName(), - hashCode(), getId(), sendWindow, recvWindow, reset, closeState); + hashCode(), getId(), sendWindow, recvWindow, isReset(), closeState); } } |