Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimone Bordet2015-02-18 13:31:25 +0000
committerSimone Bordet2015-02-18 22:00:37 +0000
commitd4809e9b790fcd49298a6b70cedeaff37c23b880 (patch)
tree749a89b9ef95568e34e57f6172b40c70e75a910e /jetty-http2/http2-common
parent4b6d024c859ce7d829cdb181ebcefb437878dca6 (diff)
downloadorg.eclipse.jetty.project-d4809e9b790fcd49298a6b70cedeaff37c23b880.tar.gz
org.eclipse.jetty.project-d4809e9b790fcd49298a6b70cedeaff37c23b880.tar.xz
org.eclipse.jetty.project-d4809e9b790fcd49298a6b70cedeaff37c23b880.zip
Improved handling of the stream close state.
Now the stream close state is updated when the frame has been successfully written, and when it is received. The stream is closed in case of failures. Just after the stream close state update, if the stream is closed then it is removed from the session.
Diffstat (limited to 'jetty-http2/http2-common')
-rw-r--r--jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java5
-rw-r--r--jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java34
-rw-r--r--jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java45
-rw-r--r--jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java19
-rw-r--r--jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java14
-rw-r--r--jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java3
6 files changed, 79 insertions, 41 deletions
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java
index 44031c20e8..a26553c6c8 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java
@@ -364,6 +364,11 @@ public class HTTP2Flusher extends IteratingCallback
@Override
public void failed(Throwable x)
{
+ if (stream != null)
+ {
+ stream.close();
+ stream.getSession().removeStream(stream, true);
+ }
callback.failed(x);
}
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java
index 2e210105c2..dbf0266468 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java
@@ -181,8 +181,6 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength);
}
});
- if (stream.isClosed())
- removeStream(stream, false);
}
}
else
@@ -214,9 +212,6 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
stream.process(frame, Callback.Adapter.INSTANCE);
else
notifyReset(this, frame);
-
- if (stream != null)
- removeStream(stream, false);
}
@Override
@@ -416,7 +411,6 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
final IStream stream = createLocalStream(streamId, promise);
if (stream == null)
return;
- stream.updateClose(frame.isEndStream(), true);
stream.setListener(listener);
ControlEntry entry = new ControlEntry(frame, stream, new PromiseCallback<>(promise, stream));
@@ -428,7 +422,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
@Override
- public void push(IStream stream, Promise<Stream> promise, PushPromiseFrame frame)
+ public void push(IStream stream, Promise<Stream> promise, PushPromiseFrame frame, Stream.Listener listener)
{
// Synchronization is necessary to atomically create
// the stream id and enqueue the frame to be sent.
@@ -441,7 +435,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
final IStream pushStream = createLocalStream(streamId, promise);
if (pushStream == null)
return;
- pushStream.updateClose(true, false);
+ pushStream.setListener(listener);
ControlEntry entry = new ControlEntry(frame, pushStream, new PromiseCallback<>(promise, pushStream));
queued = flusher.append(entry);
@@ -647,7 +641,8 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
return new HTTP2Stream(scheduler, this, streamId);
}
- protected void removeStream(IStream stream, boolean local)
+ @Override
+ public void removeStream(IStream stream, boolean local)
{
IStream removed = streams.remove(stream.getId());
if (removed != null)
@@ -845,8 +840,10 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
{
if (closed.compareAndSet(current, CloseState.CLOSED))
{
- // Close the flusher and disconnect.
flusher.close();
+ for (IStream stream : streams.values())
+ stream.close();
+ streams.clear();
disconnect();
return;
}
@@ -988,15 +985,14 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
case HEADERS:
{
HeadersFrame headersFrame = (HeadersFrame)frame;
- stream.updateClose(headersFrame.isEndStream(), true);
- if (stream.isClosed())
+ if (stream.updateClose(headersFrame.isEndStream(), true))
removeStream(stream, true);
break;
}
case RST_STREAM:
{
- if (stream != null)
- removeStream(stream, true);
+ stream.close();
+ removeStream(stream, true);
break;
}
case SETTINGS:
@@ -1007,6 +1003,13 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
flowControl.updateInitialStreamWindow(HTTP2Session.this, initialWindow, true);
break;
}
+ case PUSH_PROMISE:
+ {
+ // Pushed streams are implicitly remotely closed.
+ // They are closed when sending an end-stream DATA frame.
+ stream.updateClose(true, false);
+ break;
+ }
case GO_AWAY:
{
// We just sent a GO_AWAY, only shutdown the
@@ -1097,8 +1100,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
{
// Only now we can update the close state
// and eventually remove the stream.
- stream.updateClose(dataFrame.isEndStream(), true);
- if (stream.isClosed())
+ if (stream.updateClose(dataFrame.isEndStream(), true))
removeStream(stream, true);
callback.succeeded();
}
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java
index ddfa5b9789..d6c3783aa8 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java
@@ -80,10 +80,10 @@ public class HTTP2Stream extends IdleTimeout implements IStream
}
@Override
- public void push(PushPromiseFrame frame, Promise<Stream> promise)
+ public void push(PushPromiseFrame frame, Promise<Stream> promise, Listener listener)
{
notIdle();
- session.push(this, promise, frame);
+ session.push(this, promise, frame, listener);
}
@Override
@@ -227,7 +227,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream
private void onHeaders(HeadersFrame frame, Callback callback)
{
- updateClose(frame.isEndStream(), false);
+ if (updateClose(frame.isEndStream(), false))
+ session.removeStream(this, false);
callback.succeeded();
}
@@ -237,7 +238,9 @@ public class HTTP2Stream extends IdleTimeout implements IStream
{
// It's a bad client, it does not deserve to be
// treated gently by just resetting the stream.
- session.close(ErrorCode.FLOW_CONTROL_ERROR.code, "stream_window_exceeded", callback);
+ session.close(ErrorCode.FLOW_CONTROL_ERROR.code, "stream_window_exceeded", Callback.Adapter.INSTANCE);
+ callback.failed(new IOException("stream_window_exceeded"));
+ return;
}
// SPEC: remotely closed streams must be replied with a reset.
@@ -245,39 +248,46 @@ public class HTTP2Stream extends IdleTimeout implements IStream
{
reset(new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR.code), Callback.Adapter.INSTANCE);
callback.failed(new EOFException("stream_closed"));
+ return;
}
if (isReset())
{
// Just drop the frame.
callback.failed(new IOException("stream_reset"));
+ return;
}
- updateClose(frame.isEndStream(), false);
+ if (updateClose(frame.isEndStream(), false))
+ session.removeStream(this, false);
notifyData(this, frame, callback);
}
private void onReset(ResetFrame frame, Callback callback)
{
remoteReset = true;
+ close();
+ session.removeStream(this, false);
callback.succeeded();
notifyReset(this, frame);
}
private void onPush(PushPromiseFrame frame, Callback callback)
{
+ // Pushed streams are implicitly locally closed.
+ // They are closed when receiving an end-stream DATA frame.
updateClose(true, true);
callback.succeeded();
}
@Override
- public void updateClose(boolean update, boolean local)
+ public boolean updateClose(boolean update, boolean local)
{
if (LOG.isDebugEnabled())
LOG.debug("Update close for {} close={} local={}", this, update, local);
if (!update)
- return;
+ return false;
while (true)
{
@@ -288,24 +298,26 @@ public class HTTP2Stream extends IdleTimeout implements IStream
{
CloseState newValue = local ? CloseState.LOCALLY_CLOSED : CloseState.REMOTELY_CLOSED;
if (closeState.compareAndSet(current, newValue))
- return;
+ return false;
break;
}
case LOCALLY_CLOSED:
{
- if (!local)
- close();
- return;
+ if (local)
+ return false;
+ close();
+ return true;
}
case REMOTELY_CLOSED:
{
- if (local)
- close();
- return;
+ if (!local)
+ return false;
+ close();
+ return true;
}
default:
{
- return;
+ return false;
}
}
}
@@ -334,7 +346,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream
return recvWindow.getAndAdd(delta);
}
- private void close()
+ @Override
+ public void close()
{
closeState.set(CloseState.CLOSED);
onClose();
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java
index 737fa165e6..6c39ef6b3d 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java
@@ -38,6 +38,14 @@ public interface ISession extends Session
public IStream getStream(int streamId);
/**
+ * <p>Removes the given {@code stream}.</p>
+ *
+ * @param stream the stream to remove
+ * @param local whether the stream is local or remote
+ */
+ public void removeStream(IStream stream, boolean local);
+
+ /**
* <p>Enqueues the given frames to be written to the connection.</p>
*
* @param stream the stream the frames belong to
@@ -52,11 +60,12 @@ public interface ISession extends Session
* <p>Differently from {@link #control(IStream, Callback, Frame, Frame...)}, this method
* generates atomically the stream id for the pushed stream.</p>
*
- * @param stream the stream associated to the pushed stream
- * @param promise the promise that gets notified of the pushed stream creation
- * @param frame the PUSH_PROMISE frame to enqueue
+ * @param stream the stream associated to the pushed stream
+ * @param promise the promise that gets notified of the pushed stream creation
+ * @param frame the PUSH_PROMISE frame to enqueue
+ * @param listener the listener that gets notified of pushed stream events
*/
- public void push(IStream stream, Promise<Stream> promise, PushPromiseFrame frame);
+ public void push(IStream stream, Promise<Stream> promise, PushPromiseFrame frame, Stream.Listener listener);
/**
* <p>Enqueues the given DATA frame to be written to the connection.</p>
@@ -87,7 +96,7 @@ public interface ISession extends Session
* <p>Callback method invoked when the a WINDOW_UPDATE frame has been received.</p>
*
* @param stream the stream the window update belongs to, or null if the window update belongs to the session
- * @param frame the WINDOW_UPDATE frame received
+ * @param frame the WINDOW_UPDATE frame received
*/
public void onWindowUpdate(IStream stream, WindowUpdateFrame frame);
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java
index 0e0f5460c2..5d222c76d5 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java
@@ -18,6 +18,8 @@
package org.eclipse.jetty.http2;
+import java.io.Closeable;
+
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.util.Callback;
@@ -27,7 +29,7 @@ import org.eclipse.jetty.util.Callback;
* <p>This class extends {@link Stream} by adding the methods required to
* implement the HTTP/2 stream functionalities.</p>
*/
-public interface IStream extends Stream
+public interface IStream extends Stream, Closeable
{
/**
* <p>The constant used as attribute key to store/retrieve the HTTP
@@ -67,9 +69,15 @@ public interface IStream extends Stream
* @param local whether the update comes from a local operation
* (such as sending a frame that ends the stream)
* or a remote operation (such as receiving a frame
- * that ends the stream).
+ * @return whether the stream has been fully closed by this invocation
*/
- public void updateClose(boolean update, boolean local);
+ public boolean updateClose(boolean update, boolean local);
+
+ /**
+ * <p>Forcibly closes this stream.</p>
+ */
+ @Override
+ public void close();
/**
* @return the current value of the stream send window
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java
index 5c6a6337a7..ef8d82eb7f 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java
@@ -63,8 +63,9 @@ public interface Stream
*
* @param frame the PUSH_PROMISE frame to send
* @param promise the promise that gets notified of the pushed stream creation
+ * @param listener the listener that gets notified of stream events
*/
- public void push(PushPromiseFrame frame, Promise<Stream> promise);
+ public void push(PushPromiseFrame frame, Promise<Stream> promise, Listener listener);
/**
* <p>Sends the given DATA {@code frame}.</p>

Back to the top