Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorThomas Becker2013-03-04 09:24:58 +0000
committerThomas Becker2013-03-04 09:25:17 +0000
commit1aa8fce78bc0c45f192b18d0cd5e2987fba1c1cd (patch)
tree4ba71278225c1e319c78df7b0aff375c33ae73f7
parentc396622770ef30d0eca6075cf91ec64b7ffc56eb (diff)
downloadorg.eclipse.jetty.project-1aa8fce78bc0c45f192b18d0cd5e2987fba1c1cd.tar.gz
org.eclipse.jetty.project-1aa8fce78bc0c45f192b18d0cd5e2987fba1c1cd.tar.xz
org.eclipse.jetty.project-1aa8fce78bc0c45f192b18d0cd5e2987fba1c1cd.zip
402277 spdy proxy: fix race condition in nested push streams initiated by upstream server. Fix several other small proxy issues
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java1
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java4
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java6
-rw-r--r--jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java350
-rw-r--r--jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java1
-rw-r--r--jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPToSPDYTest.java3
-rw-r--r--jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxySPDYToSPDYTest.java134
7 files changed, 348 insertions, 151 deletions
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java
index dcce655ede..114ffc1ed5 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java
@@ -1350,6 +1350,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
{
bufferPool.release(buffer);
IStream stream = getStream();
+ dataInfo.consume(size);
flowControlStrategy.updateWindow(StandardSession.this, stream, -size);
if (dataInfo.available() > 0)
{
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java
index 7e9afac7a0..e0b65c7b79 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java
@@ -403,7 +403,7 @@ public class StandardStream implements IStream
if (isLocallyClosed())
{
session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());
- throw new IllegalStateException("Protocol violation: cannot send a DATA frame on a closed stream");
+ throw new IllegalStateException("Protocol violation: cannot send a DATA frame on a locally closed stream");
}
// Cannot update the close state here, because the data that we send may
@@ -481,7 +481,7 @@ public class StandardStream implements IStream
@Override
public String toString()
{
- return String.format("stream=%d v%d windowSize=%db reset=%s prio=%d %s %s", getId(), session.getVersion(),
+ return String.format("stream=%d v%d windowSize=%d reset=%s prio=%d %s %s", getId(), session.getVersion(),
getWindowSize(), isReset(), priority, openState, closeState);
}
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java
index c8ca8a7bec..abc1b88847 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java
@@ -97,13 +97,13 @@ public interface Stream
/**
* <p>Initiate a unidirectional spdy pushstream associated to this stream asynchronously<p> <p>Callers may pass a
- * non-null completion callback to be notified of when the pushstream has been established.</p>
+ * non-null completion promise to be notified of when the pushstream has been established.</p>
*
* @param pushInfo the metadata to send on stream creation
- * @param callback the completion callback that gets notified once the pushstream is established
+ * @param promise the completion promise that gets notified once the pushstream is established
* @see #push(PushInfo)
*/
- public void push(PushInfo pushInfo, Promise<Stream> callback);
+ public void push(PushInfo pushInfo, Promise<Stream> promise);
/**
* <p>Sends asynchronously a SYN_REPLY frame in response to a SYN_STREAM frame.</p> <p>Callers may use the returned
diff --git a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java
index 67db8bff18..5fe79092b4 100644
--- a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java
+++ b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java
@@ -58,7 +58,7 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
{
private static final Logger LOG = Log.getLogger(SPDYProxyEngine.class);
- private static final String STREAM_HANDLER_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.streamHandler";
+ private static final String STREAM_PROMISE_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.streamPromise";
private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.clientStream";
private final ConcurrentMap<String, Session> serverSessions = new ConcurrentHashMap<>();
@@ -113,9 +113,9 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
SynInfo serverSynInfo = new SynInfo(headers, clientSynInfo.isClose());
StreamFrameListener listener = new ProxyStreamFrameListener(clientStream);
- StreamHandler handler = new StreamHandler(clientStream, serverSynInfo);
- clientStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
- serverSession.syn(serverSynInfo, listener, handler);
+ StreamPromise promise = new StreamPromise(clientStream, serverSynInfo);
+ clientStream.setAttribute(STREAM_PROMISE_ATTRIBUTE, promise);
+ serverSession.syn(serverSynInfo, listener, promise);
return this;
}
@@ -166,8 +166,8 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
}
};
- StreamHandler streamHandler = (StreamHandler)clientStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
- streamHandler.data(serverDataInfo);
+ StreamPromise streamPromise = (StreamPromise)clientStream.getAttribute(STREAM_PROMISE_ATTRIBUTE);
+ streamPromise.data(serverDataInfo);
}
private Session produceSession(String host, short version, InetSocketAddress address)
@@ -219,87 +219,101 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
stream.getSession().rst(rstInfo, new Callback.Adapter());
}
- private class ProxyStreamFrameListener extends StreamFrameListener.Adapter
+ private class ProxyPushStreamFrameListener implements StreamFrameListener
{
- private final Stream clientStream;
- private volatile ReplyInfo replyInfo;
+ private PushStreamPromise pushStreamPromise;
- public ProxyStreamFrameListener(Stream clientStream)
+ private ProxyPushStreamFrameListener(PushStreamPromise pushStreamPromise)
{
- this.clientStream = clientStream;
+ this.pushStreamPromise = pushStreamPromise;
}
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
- LOG.debug("S -> P pushed {} on {}", pushInfo, stream);
+ LOG.debug("S -> P pushed {} on {}. Opening new PushStream P -> C now.", pushInfo, stream);
+ PushStreamPromise newPushStreamPromise = new PushStreamPromise(stream, pushInfo);
+ this.pushStreamPromise.push(newPushStreamPromise);
+ return new ProxyPushStreamFrameListener(newPushStreamPromise);
+ }
+
+ @Override
+ public void onReply(Stream stream, ReplyInfo replyInfo)
+ {
+ // Push streams never send a reply
+ throw new UnsupportedOperationException();
+ }
- Fields headers = new Fields(pushInfo.getHeaders(), false);
+ @Override
+ public void onHeaders(Stream stream, HeadersInfo headersInfo)
+ {
+ throw new UnsupportedOperationException();
+ }
- addResponseProxyHeaders(stream, headers);
- customizeResponseHeaders(stream, headers);
- Stream clientStream = (Stream)stream.getAssociatedStream().getAttribute
- (CLIENT_STREAM_ATTRIBUTE);
- convert(stream.getSession().getVersion(), clientStream.getSession().getVersion(),
- headers);
+ @Override
+ public void onData(Stream serverStream, final DataInfo serverDataInfo)
+ {
+ LOG.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
- StreamHandler handler = new StreamHandler(clientStream, pushInfo);
- stream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
- clientStream.push(new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers,
- pushInfo.isClose()),
- handler);
- return new Adapter()
+ ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
{
@Override
- public void onReply(Stream stream, ReplyInfo replyInfo)
+ public void consume(int delta)
{
- // Push streams never send a reply
- throw new UnsupportedOperationException();
+ super.consume(delta);
+ serverDataInfo.consume(delta);
}
+ };
- @Override
- public void onHeaders(Stream stream, HeadersInfo headersInfo)
- {
- throw new UnsupportedOperationException();
- }
+ pushStreamPromise.data(clientDataInfo);
+ }
+ }
- @Override
- public void onData(Stream serverStream, final DataInfo serverDataInfo)
- {
- LOG.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
+ private class ProxyStreamFrameListener extends StreamFrameListener.Adapter
+ {
+ private final Stream receiverStream;
- ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
- {
- @Override
- public void consume(int delta)
- {
- super.consume(delta);
- serverDataInfo.consume(delta);
- }
- };
-
- StreamHandler handler = (StreamHandler)serverStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
- handler.data(clientDataInfo);
- }
- };
+ public ProxyStreamFrameListener(Stream receiverStream)
+ {
+ this.receiverStream = receiverStream;
+ }
+
+ @Override
+ public StreamFrameListener onPush(Stream senderStream, PushInfo pushInfo)
+ {
+ LOG.debug("S -> P {} on {}");
+ PushInfo newPushInfo = convertPushInfo(pushInfo, senderStream, receiverStream);
+ PushStreamPromise pushStreamPromise = new PushStreamPromise(senderStream, newPushInfo);
+ receiverStream.push(newPushInfo, pushStreamPromise);
+ return new ProxyPushStreamFrameListener(pushStreamPromise);
}
@Override
public void onReply(final Stream stream, ReplyInfo replyInfo)
{
LOG.debug("S -> P {} on {}", replyInfo, stream);
+ final ReplyInfo clientReplyInfo = new ReplyInfo(convertHeaders(stream, receiverStream, replyInfo.getHeaders()),
+ replyInfo.isClose());
+ reply(stream, clientReplyInfo);
+ }
- short serverVersion = stream.getSession().getVersion();
- Fields headers = new Fields(replyInfo.getHeaders(), false);
-
- addResponseProxyHeaders(stream, headers);
- customizeResponseHeaders(stream, headers);
- short clientVersion = this.clientStream.getSession().getVersion();
- convert(serverVersion, clientVersion, headers);
+ private void reply(final Stream stream, final ReplyInfo clientReplyInfo)
+ {
+ receiverStream.reply(clientReplyInfo, new Callback()
+ {
+ @Override
+ public void succeeded()
+ {
+ LOG.debug("P -> C {} from {} to {}", clientReplyInfo, stream, receiverStream);
+ }
- this.replyInfo = new ReplyInfo(headers, replyInfo.isClose());
- if (replyInfo.isClose())
- reply(stream);
+ @Override
+ public void failed(Throwable x)
+ {
+ LOG.debug(x);
+ rst(receiverStream);
+ }
+ });
}
@Override
@@ -313,101 +327,82 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
public void onData(final Stream stream, final DataInfo dataInfo)
{
LOG.debug("S -> P {} on {}", dataInfo, stream);
-
- if (replyInfo != null)
- {
- if (dataInfo.isClose())
- replyInfo.getHeaders().put("content-length", String.valueOf(dataInfo.available()));
- reply(stream);
- }
data(stream, dataInfo);
}
- private void reply(final Stream stream)
+ private void data(final Stream stream, final DataInfo serverDataInfo)
{
- final ReplyInfo replyInfo = this.replyInfo;
- this.replyInfo = null;
- clientStream.reply(replyInfo, new Callback()
+ final ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
{
@Override
- public void succeeded()
+ public void consume(int delta)
{
- LOG.debug("P -> C {} from {} to {}", replyInfo, stream, clientStream);
+ super.consume(delta);
+ serverDataInfo.consume(delta);
}
+ };
- @Override
- public void failed(Throwable x)
- {
- LOG.debug(x);
- rst(clientStream);
- }
- });
- }
-
- private void data(final Stream stream, final DataInfo dataInfo)
- {
- clientStream.data(dataInfo, new Callback() //TODO: timeout???
+ receiverStream.data(clientDataInfo, new Callback() //TODO: timeout???
{
@Override
public void succeeded()
{
- dataInfo.consume(dataInfo.length());
- LOG.debug("P -> C {} from {} to {}", dataInfo, stream, clientStream);
+ LOG.debug("P -> C {} from {} to {}", clientDataInfo, stream, receiverStream);
}
@Override
public void failed(Throwable x)
{
LOG.debug(x);
- rst(clientStream);
+ rst(receiverStream);
}
});
}
}
/**
- * <p>{@link StreamHandler} implements the forwarding of DATA frames from the client to the server.</p> <p>Instances
- * of this class buffer DATA frames sent by clients and send them to the server. The buffering happens between the
- * send of the SYN_STREAM to the server (where DATA frames may arrive from the client before the SYN_STREAM has been
- * fully sent), and between DATA frames, if the client is a fast producer and the server a slow consumer, or if the
- * client is a SPDY v2 client (and hence without flow control) while the server is a SPDY v3 server (and hence with
- * flow control).</p>
+ * <p>{@link StreamPromise} implements the forwarding of DATA frames from the client to the server and vice
+ * versa.</p> <p>Instances of this class buffer DATA frames sent by clients and send them to the server. The
+ * buffering happens between the send of the SYN_STREAM to the server (where DATA frames may arrive from the client
+ * before the SYN_STREAM has been fully sent), and between DATA frames, if the client is a fast producer and the
+ * server a slow consumer, or if the client is a SPDY v2 client (and hence without flow control) while the server is
+ * a SPDY v3 server (and hence with flow control).</p>
*/
- private class StreamHandler implements Promise<Stream>
+ private class StreamPromise implements Promise<Stream>
{
- private final Queue<DataInfoHandler> queue = new LinkedList<>();
- private final Stream clientStream;
+ private final Queue<DataInfoCallback> queue = new LinkedList<>();
+ private final Stream senderStream;
private final Info info;
- private Stream serverStream;
+ private Stream receiverStream;
- private StreamHandler(Stream clientStream, Info info)
+ private StreamPromise(Stream senderStream, Info info)
{
- this.clientStream = clientStream;
+ this.senderStream = senderStream;
this.info = info;
}
@Override
- public void succeeded(Stream serverStream)
+ public void succeeded(Stream stream)
{
- LOG.debug("P -> S {} from {} to {}", info, clientStream, serverStream);
+ LOG.debug("P -> S {} from {} to {}", info, senderStream, stream);
- serverStream.setAttribute(CLIENT_STREAM_ATTRIBUTE, clientStream);
+ stream.setAttribute(CLIENT_STREAM_ATTRIBUTE, senderStream);
- DataInfoHandler dataInfoHandler;
+ DataInfoCallback dataInfoCallback;
synchronized (queue)
{
- this.serverStream = serverStream;
- dataInfoHandler = queue.peek();
- if (dataInfoHandler != null)
+ this.receiverStream = stream;
+ dataInfoCallback = queue.peek();
+ if (dataInfoCallback != null)
{
- if (dataInfoHandler.flushing)
+ if (dataInfoCallback.flushing)
{
- LOG.debug("SYN completed, flushing {}, queue size {}", dataInfoHandler.dataInfo, queue.size());
- dataInfoHandler = null;
+ LOG.debug("SYN completed, flushing {}, queue size {}", dataInfoCallback.dataInfo, queue.size());
+ dataInfoCallback = null;
}
else
{
- dataInfoHandler.flushing = true;
+ dataInfoCallback.flushing = true;
LOG.debug("SYN completed, queue size {}", queue.size());
}
}
@@ -416,37 +411,37 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
LOG.debug("SYN completed, queue empty");
}
}
- if (dataInfoHandler != null)
- flush(serverStream, dataInfoHandler);
+ if (dataInfoCallback != null)
+ flush(stream, dataInfoCallback);
}
@Override
public void failed(Throwable x)
{
LOG.debug(x);
- rst(clientStream);
+ rst(senderStream);
}
public void data(DataInfo dataInfo)
{
- Stream serverStream;
- DataInfoHandler dataInfoHandler = null;
- DataInfoHandler item = new DataInfoHandler(dataInfo);
+ Stream receiverStream;
+ DataInfoCallback dataInfoCallbackToFlush = null;
+ DataInfoCallback dataInfoCallBackToQueue = new DataInfoCallback(dataInfo);
synchronized (queue)
{
- queue.offer(item);
- serverStream = this.serverStream;
- if (serverStream != null)
+ queue.offer(dataInfoCallBackToQueue);
+ receiverStream = this.receiverStream;
+ if (receiverStream != null)
{
- dataInfoHandler = queue.peek();
- if (dataInfoHandler.flushing)
+ dataInfoCallbackToFlush = queue.peek();
+ if (dataInfoCallbackToFlush.flushing)
{
- LOG.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoHandler.dataInfo, queue.size());
- serverStream = null;
+ LOG.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoCallbackToFlush.dataInfo, queue.size());
+ receiverStream = null;
}
else
{
- dataInfoHandler.flushing = true;
+ dataInfoCallbackToFlush.flushing = true;
LOG.debug("Queued {}, queue size {}", dataInfo, queue.size());
}
}
@@ -455,22 +450,22 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
LOG.debug("Queued {}, SYN incomplete, queue size {}", dataInfo, queue.size());
}
}
- if (serverStream != null)
- flush(serverStream, dataInfoHandler);
+ if (receiverStream != null)
+ flush(receiverStream, dataInfoCallbackToFlush);
}
- private void flush(Stream serverStream, DataInfoHandler dataInfoHandler)
+ private void flush(Stream receiverStream, DataInfoCallback dataInfoCallback)
{
- LOG.debug("P -> S {} on {}", dataInfoHandler.dataInfo, serverStream);
- serverStream.data(dataInfoHandler.dataInfo, dataInfoHandler); //TODO: timeout???
+ LOG.debug("P -> S {} on {}", dataInfoCallback.dataInfo, receiverStream);
+ receiverStream.data(dataInfoCallback.dataInfo, dataInfoCallback); //TODO: timeout???
}
- private class DataInfoHandler implements Callback
+ private class DataInfoCallback implements Callback
{
private final DataInfo dataInfo;
private boolean flushing;
- private DataInfoHandler(DataInfo dataInfo)
+ private DataInfoCallback(DataInfo dataInfo)
{
this.dataInfo = dataInfo;
}
@@ -479,18 +474,18 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
public void succeeded()
{
Stream serverStream;
- DataInfoHandler dataInfoHandler;
+ DataInfoCallback dataInfoCallback;
synchronized (queue)
{
- serverStream = StreamHandler.this.serverStream;
+ serverStream = StreamPromise.this.receiverStream;
assert serverStream != null;
- dataInfoHandler = queue.poll();
- assert dataInfoHandler == this;
- dataInfoHandler = queue.peek();
- if (dataInfoHandler != null)
+ dataInfoCallback = queue.poll();
+ assert dataInfoCallback == this;
+ dataInfoCallback = queue.peek();
+ if (dataInfoCallback != null)
{
- assert !dataInfoHandler.flushing;
- dataInfoHandler.flushing = true;
+ assert !dataInfoCallback.flushing;
+ dataInfoCallback.flushing = true;
LOG.debug("Completed {}, queue size {}", dataInfo, queue.size());
}
else
@@ -498,23 +493,72 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
LOG.debug("Completed {}, queue empty", dataInfo);
}
}
- if (dataInfoHandler != null)
- flush(serverStream, dataInfoHandler);
+ if (dataInfoCallback != null)
+ flush(serverStream, dataInfoCallback);
}
@Override
public void failed(Throwable x)
{
LOG.debug(x);
- rst(clientStream);
+ rst(senderStream);
+ }
+ }
+
+ public Stream getSenderStream()
+ {
+ return senderStream;
+ }
+
+ public Info getInfo()
+ {
+ return info;
+ }
+
+ public Stream getReceiverStream()
+ {
+ synchronized (queue)
+ {
+ return receiverStream;
}
}
}
- private class ProxySessionFrameListener extends SessionFrameListener.Adapter
+ private class PushStreamPromise extends StreamPromise
{
+ private volatile PushStreamPromise pushStreamPromise;
+
+ private PushStreamPromise(Stream senderStream, PushInfo pushInfo)
+ {
+ super(senderStream, pushInfo);
+ }
@Override
+ public void succeeded(Stream receiverStream)
+ {
+ super.succeeded(receiverStream);
+
+ LOG.debug("P -> C PushStreamPromise.succeeded() called with pushStreamPromise: {}", pushStreamPromise);
+
+ PushStreamPromise promise = pushStreamPromise;
+ if (promise != null)
+ receiverStream.push(convertPushInfo((PushInfo)getInfo(), getSenderStream(), receiverStream), pushStreamPromise);
+ }
+
+ public void push(PushStreamPromise pushStreamPromise)
+ {
+ Stream receiverStream = getReceiverStream();
+
+ if (receiverStream != null)
+ receiverStream.push(convertPushInfo((PushInfo)getInfo(), getSenderStream(), receiverStream), pushStreamPromise);
+ else
+ this.pushStreamPromise = pushStreamPromise;
+ }
+ }
+
+ private class ProxySessionFrameListener extends SessionFrameListener.Adapter
+ {
+ @Override
public void onRst(Session serverSession, RstInfo serverRstInfo)
{
Stream serverStream = serverSession.getStream(serverRstInfo.getStreamId());
@@ -536,4 +580,20 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
serverSessions.values().remove(serverSession);
}
}
+
+ private PushInfo convertPushInfo(PushInfo pushInfo, Stream from, Stream to)
+ {
+ Fields headersToConvert = pushInfo.getHeaders();
+ Fields headers = convertHeaders(from, to, headersToConvert);
+ return new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers, pushInfo.isClose());
+ }
+
+ private Fields convertHeaders(Stream from, Stream to, Fields headersToConvert)
+ {
+ Fields headers = new Fields(headersToConvert, false);
+ addResponseProxyHeaders(from, headers);
+ customizeResponseHeaders(from, headers);
+ convert(from.getSession().getVersion(), to.getSession().getVersion(), headers);
+ return headers;
+ }
}
diff --git a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java
index 7d65a6f5c1..1bf46ac039 100644
--- a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java
+++ b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/http/ReferrerPushStrategyTest.java
@@ -566,7 +566,6 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
final CountDownLatch mainStreamLatch = new CountDownLatch(2);
final CountDownLatch pushDataLatch = new CountDownLatch(2);
Session session2 = startClient(version, address, null);
- LOG.warn("REQUEST FOR PUSHED RESOURCES");
session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()
{
@Override
diff --git a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPToSPDYTest.java b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPToSPDYTest.java
index 11f44dc2a3..bcb46a9b6c 100644
--- a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPToSPDYTest.java
+++ b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPToSPDYTest.java
@@ -311,6 +311,8 @@ public class ProxyHTTPToSPDYTest
Fields responseHeaders = new Fields();
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version), "200 OK");
+ responseHeaders.put("content-length", String.valueOf(data.length));
+
ReplyInfo replyInfo = new ReplyInfo(responseHeaders, false);
stream.reply(replyInfo, new Callback.Adapter());
stream.data(new BytesDataInfo(data, true), new Callback.Adapter());
@@ -437,6 +439,7 @@ public class ProxyHTTPToSPDYTest
Fields responseHeaders = new Fields();
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version), "200 OK");
+ responseHeaders.put("content-length", String.valueOf(data.length));
ReplyInfo replyInfo = new ReplyInfo(responseHeaders, false);
stream.reply(replyInfo, new Callback.Adapter());
stream.data(new BytesDataInfo(data, true), new Callback.Adapter());
diff --git a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxySPDYToSPDYTest.java b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxySPDYToSPDYTest.java
index 506c5ed571..2739d1e0c5 100644
--- a/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxySPDYToSPDYTest.java
+++ b/jetty-spdy/spdy-http-server/src/test/java/org/eclipse/jetty/spdy/server/proxy/ProxySPDYToSPDYTest.java
@@ -329,6 +329,140 @@ public class ProxySPDYToSPDYTest
}
@Test
+ public void testSYNThenSPDYNestedPushIsReceived() throws Exception
+ {
+ final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
+ InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
+ {
+ @Override
+ public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+ {
+ Fields responseHeaders = new Fields();
+ responseHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
+ responseHeaders.put(HTTPSPDYHeader.STATUS.name(version), "200 OK");
+ stream.reply(new ReplyInfo(responseHeaders, false), new Callback.Adapter());
+
+ final Fields pushHeaders = new Fields();
+ pushHeaders.put(HTTPSPDYHeader.URI.name(version), "/push");
+ stream.push(new PushInfo(5, TimeUnit.SECONDS, pushHeaders, false), new Promise.Adapter<Stream>()
+ {
+ @Override
+ public void succeeded(Stream pushStream)
+ {
+ pushHeaders.put(HTTPSPDYHeader.URI.name(version), "/nestedpush");
+ pushStream.push(new PushInfo(5, TimeUnit.SECONDS, pushHeaders, false), new Adapter<Stream>()
+ {
+ @Override
+ public void succeeded(Stream pushStream)
+ {
+ pushHeaders.put(HTTPSPDYHeader.URI.name(version), "/anothernestedpush");
+ pushStream.push(new PushInfo(5, TimeUnit.SECONDS, pushHeaders, false), new Adapter<Stream>()
+ {
+ @Override
+ public void succeeded(Stream pushStream)
+ {
+ pushStream.data(new BytesDataInfo(data, true), new Callback.Adapter());
+ }
+ });
+ pushStream.data(new BytesDataInfo(data, true), new Callback.Adapter());
+ }
+ });
+ pushStream.data(new BytesDataInfo(data, true), new Callback.Adapter());
+ }
+ });
+
+ stream.data(new BytesDataInfo(data, true), new Callback.Adapter());
+
+ return null;
+ }
+ }));
+ proxyConnector.addConnectionFactory(proxyConnector.getConnectionFactory("spdy/" + version));
+
+ final CountDownLatch pushSynLatch = new CountDownLatch(3);
+ final CountDownLatch pushDataLatch = new CountDownLatch(3);
+ Session client = factory.newSPDYClient(version).connect(proxyAddress, null).get(5, TimeUnit.SECONDS);
+
+ Fields headers = new Fields();
+ headers.put(HTTPSPDYHeader.HOST.name(version), "localhost:" + proxyAddress.getPort());
+ final CountDownLatch replyLatch = new CountDownLatch(1);
+ final CountDownLatch dataLatch = new CountDownLatch(1);
+ client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
+ {
+ // onPush for 1st push stream
+ @Override
+ public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
+ {
+ pushSynLatch.countDown();
+ return new StreamFrameListener.Adapter()
+ {
+ // onPush for 2nd nested push stream
+ @Override
+ public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
+ {
+ pushSynLatch.countDown();
+ return new Adapter()
+ {
+ // onPush for 3rd nested push stream
+ @Override
+ public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
+ {
+ pushSynLatch.countDown();
+ return new Adapter()
+ {
+ @Override
+ public void onData(Stream stream, DataInfo dataInfo)
+ {
+ dataInfo.consume(dataInfo.length());
+ if (dataInfo.isClose())
+ pushDataLatch.countDown();
+ }
+ };
+ }
+
+ @Override
+ public void onData(Stream stream, DataInfo dataInfo)
+ {
+ dataInfo.consume(dataInfo.length());
+ if (dataInfo.isClose())
+ pushDataLatch.countDown();
+ }
+ };
+ }
+
+ @Override
+ public void onData(Stream stream, DataInfo dataInfo)
+ {
+ dataInfo.consume(dataInfo.length());
+ if (dataInfo.isClose())
+ pushDataLatch.countDown();
+ }
+ };
+ }
+
+ @Override
+ public void onReply(Stream stream, ReplyInfo replyInfo)
+ {
+ replyLatch.countDown();
+ }
+
+ @Override
+ public void onData(Stream stream, DataInfo dataInfo)
+ {
+ dataInfo.consume(dataInfo.length());
+ if (dataInfo.isClose())
+ dataLatch.countDown();
+ }
+ });
+
+ Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
+ Assert.assertTrue(pushSynLatch.await(5, TimeUnit.SECONDS));
+ Assert.assertTrue(pushDataLatch.await(5, TimeUnit.SECONDS));
+ Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
+
+ client.goAway(new GoAwayInfo(5, TimeUnit.SECONDS));
+ }
+
+ @Test
public void testPING() throws Exception
{
// PING is per hop, and it does not carry the information to which server to ping to

Back to the top