aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorThomas Becker2012-08-09 12:54:41 (EDT)
committerThomas Becker2012-08-09 12:54:41 (EDT)
commit731eb31c7e7d089597bd7222bfcedf383ba9a11c (patch)
treefaf080e5ac027f01d6411bc15543e2ec60d4d8ed
parent6c0bb390aea779ee8754f3d5bf79bf045b7c5873 (diff)
downloadorg.eclipse.jetty.project-731eb31c7e7d089597bd7222bfcedf383ba9a11c.zip
org.eclipse.jetty.project-731eb31c7e7d089597bd7222bfcedf383ba9a11c.tar.gz
org.eclipse.jetty.project-731eb31c7e7d089597bd7222bfcedf383ba9a11c.tar.bz2
interims changes
-rw-r--r--jetty-http/src/main/java/org/eclipse/jetty/http/HttpMethod.java14
-rw-r--r--jetty-http/src/main/java/org/eclipse/jetty/http/HttpParser.java8
-rw-r--r--jetty-http/src/main/java/org/eclipse/jetty/http/HttpVersion.java16
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/Request.java2
-rw-r--r--jetty-server/src/test/java/org/eclipse/jetty/server/HttpFiveWaysToCommitTest.java194
-rw-r--r--jetty-server/src/test/resources/jetty-logging.properties2
-rw-r--r--jetty-spdy/pom.xml2
-rw-r--r--jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/AbstractHTTPSPDYServerConnector.java70
-rw-r--r--jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/HTTPSPDYServerConnector.java34
-rw-r--r--jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnection.java797
-rw-r--r--jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnectionFactory.java433
-rw-r--r--jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/HTTPSPDYProxyConnector.java35
-rw-r--r--jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPAsyncConnectionFactory.java39
-rw-r--r--jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYAsyncConnection.java602
-rw-r--r--jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/SPDYProxyEngine.java959
-rw-r--r--jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/AbstractHTTPSPDYTest.java8
-rw-r--r--jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ProtocolNegotiationTest.java14
-rw-r--r--jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/PushStrategyBenchmarkTest.java262
-rw-r--r--jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ReferrerPushStrategyTest.java33
-rw-r--r--jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ReferrerPushStrategyV2Test.java20
-rw-r--r--jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYTest.java11
-rw-r--r--jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYv2Test.java2
-rw-r--r--jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYv2Test.java1454
-rw-r--r--jetty-spdy/spdy-jetty-http/src/test/resources/jetty-logging.properties4
-rw-r--r--jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java10
25 files changed, 2335 insertions, 2690 deletions
diff --git a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpMethod.java b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpMethod.java
index ca2e0d2..6d52956 100644
--- a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpMethod.java
+++ b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpMethod.java
@@ -97,9 +97,7 @@ public enum HttpMethod
/* ------------------------------------------------------------ */
/**
* Optimised lookup to find a method name and trailing space in a byte array.
- * @param bytes Array containing ISO-8859-1 characters
- * @param position The first valid index
- * @param limit The first non valid index
+ * @param buffer buffer containing ISO-8859-1 characters
* @return A HttpMethod if a match or null if no easy match.
*/
public static HttpMethod lookAheadGet(ByteBuffer buffer)
@@ -151,4 +149,14 @@ public enum HttpMethod
{
return toString();
}
+
+ /**
+ * Converts the given String parameter to an HttpMethod
+ * @param method the String to get the equivalent HttpMethod from
+ * @return the HttpMethod or null if the parameter method is unknown
+ */
+ public static HttpMethod fromString(String method)
+ {
+ return CACHE.get(method);
+ }
}
diff --git a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpParser.java b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpParser.java
index 18466b0..679b9fc 100644
--- a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpParser.java
+++ b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpParser.java
@@ -218,7 +218,7 @@ public class HttpParser
if (_version!=null)
{
buffer.position(buffer.position()+_version.asString().length()+1);
- _persistent=_version.getVerion()>=HttpVersion.HTTP_1_1.getVerion();
+ _persistent=_version.getVersion()>=HttpVersion.HTTP_1_1.getVersion();
_state=State.SPACE1;
return;
}
@@ -309,7 +309,7 @@ public class HttpParser
badMessage(buffer, "Unknown Version");
return true;
}
- _persistent=_version.getVerion()>=HttpVersion.HTTP_1_1.getVerion();
+ _persistent=_version.getVersion()>=HttpVersion.HTTP_1_1.getVersion();
_state=State.SPACE1;
}
else if (ch < HttpTokens.SPACE && ch>=0)
@@ -410,7 +410,7 @@ public class HttpParser
_string.setLength(0);
buffer.position(buffer.position()+_version.asString().length()-1);
_eol=buffer.get();
- _persistent=_version.getVerion()>=HttpVersion.HTTP_1_1.getVerion();
+ _persistent=_version.getVersion()>=HttpVersion.HTTP_1_1.getVersion();
_state=State.HEADER;
return_from_parse|=_requestHandler.startRequest(_method,_methodString, _uri, _version);
}
@@ -450,7 +450,7 @@ public class HttpParser
}
_eol=ch;
- _persistent=_version.getVerion()>=HttpVersion.HTTP_1_1.getVerion();
+ _persistent=_version.getVersion()>=HttpVersion.HTTP_1_1.getVersion();
_state=State.HEADER;
return_from_parse|=_requestHandler.startRequest(_method,_methodString, _uri, _version);
continue;
diff --git a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpVersion.java b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpVersion.java
index e403696..09ae2ad 100644
--- a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpVersion.java
+++ b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpVersion.java
@@ -80,9 +80,7 @@ public enum HttpVersion
/* ------------------------------------------------------------ */
/**
* Optimised lookup to find a HTTP Version and trailing white space in a byte array.
- * @param bytes Array containing ISO-8859-1 characters
- * @param position The first valid index
- * @param limit The first non valid index
+ * @param buffer buffer containing ISO-8859-1 characters
* @return A HttpVersion if a match or null if no easy match.
*/
public static HttpVersion lookAheadGet(ByteBuffer buffer)
@@ -120,7 +118,7 @@ public enum HttpVersion
}
/* ------------------------------------------------------------ */
- public int getVerion()
+ public int getVersion()
{
return _version;
}
@@ -144,6 +142,16 @@ public enum HttpVersion
return _string;
}
+ /**
+ * Case insensitive fromString() conversion
+ * @param version the String to convert to enum constant
+ * @return the enum constant or null if version unknown
+ */
+ public static HttpVersion fromString(String version)
+ {
+ return CACHE.get(version);
+ }
+
/* ------------------------------------------------------------ */
public static HttpVersion fromVersion(int version)
{
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java
index 59f1950..cf226e2 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java
@@ -1700,7 +1700,7 @@ public class Request implements HttpServletRequest
/* ------------------------------------------------------------ */
public boolean isHead()
{
- return HttpMethod.HEAD==_httpMethod;
+ return HttpMethod.HEAD==HttpMethod.fromString(_method);
}
/* ------------------------------------------------------------ */
diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpFiveWaysToCommitTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpFiveWaysToCommitTest.java
new file mode 100644
index 0000000..e22412b
--- /dev/null
+++ b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpFiveWaysToCommitTest.java
@@ -0,0 +1,194 @@
+package org.eclipse.jetty.server;
+//========================================================================
+//Copyright (c) 1999-2009 Mort Bay Consulting Pty. Ltd.
+//------------------------------------------------------------------------
+//All rights reserved. This program and the accompanying materials
+//are made available under the terms of the Eclipse Public License v1.0
+//and Apache License v2.0 which accompanies this distribution.
+//The Eclipse Public License is available at
+//http://www.eclipse.org/legal/epl-v10.html
+//The Apache License v2.0 is available at
+//http://www.opensource.org/licenses/apache2.0.php
+//You may elect to redistribute this code under either of these licenses.
+//========================================================================
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.URISyntaxException;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.util.IO;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Resource Handler test
+ * <p/>
+ * TODO: increase the testing going on here
+ */
+public class HttpFiveWaysToCommitTest
+{
+ private static Server server;
+ private static SelectChannelConnector connector;
+
+
+ @Before
+ public void setUp() throws Exception
+ {
+ server = new Server();
+ connector = new SelectChannelConnector(server);
+ server.setConnectors(new Connector[]{connector});
+ }
+
+ /* ------------------------------------------------------------ */
+ @After
+ public void tearDown() throws Exception
+ {
+ server.stop();
+ }
+
+ @Test
+ public void testHandlerSetsHandledTrueOnly() throws Exception
+ {
+ server.setHandler(new OnlySetHandledHandler());
+ server.start();
+
+ StringWriter writer = executeRequest(HttpStatus.OK_200);
+
+ System.out.println("RESPONSE: " + writer.toString());
+ }
+
+
+ private class OnlySetHandledHandler extends AbstractHandler
+ {
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+ {
+ baseRequest.setHandled(true);
+ }
+ }
+
+ @Test
+ public void testHandlerExplicitFlush() throws Exception
+ {
+ server.setHandler(new ExplicitFlushHandler());
+ server.start();
+
+ StringWriter writer = executeRequest(HttpStatus.OK_200);
+
+ System.out.println("RESPONSE: " + writer.toString());
+ }
+
+ private class ExplicitFlushHandler extends AbstractHandler
+ {
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+ {
+ baseRequest.setHandled(true);
+ response.getWriter().write("foobar");
+ response.flushBuffer();
+ }
+ }
+
+ @Test
+ public void testHandlerDoesNotSetHandled() throws Exception
+ {
+ server.setHandler(new DoesNotSetHandledHandler());
+ server.start();
+
+ StringWriter writer = executeRequest(HttpStatus.NOT_FOUND_404);
+
+ System.out.println("RESPONSE: " + writer.toString());
+ }
+
+ private class DoesNotSetHandledHandler extends AbstractHandler
+ {
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+ {
+ baseRequest.setHandled(false);
+ }
+ }
+
+ @Test
+ public void testCommitWithMoreDataToWrite() throws Exception
+ {
+ server.setHandler(new CommitResponseWithMoreDataToWriteHandler());
+ server.start();
+
+ StringWriter writer = executeRequest(HttpStatus.OK_200);
+
+ System.out.println("RESPONSE: " + writer.toString());
+ }
+
+ private class CommitResponseWithMoreDataToWriteHandler extends AbstractHandler
+ {
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+ {
+ baseRequest.setHandled(true);
+ response.getWriter().write("foo");
+ response.flushBuffer();
+ response.getWriter().write("bar");
+ }
+ }
+
+ @Test
+ public void testBufferOverflow() throws Exception
+ {
+ server.setHandler(new OverflowHandler());
+ server.start();
+
+ StringWriter writer = executeRequest(HttpStatus.OK_200);
+
+ System.out.println("RESPONSE: " + writer.toString());
+ }
+
+ private class OverflowHandler extends AbstractHandler
+ {
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+ {
+ baseRequest.setHandled(true);
+ response.setContentLength(2);
+ response.getWriter().write("foo");
+ }
+ }
+
+ private StringWriter executeRequest(int expectedStatus) throws URISyntaxException, IOException
+ {
+ URI uri = new URI("http://localhost:" + connector.getLocalPort() + "/");
+ HttpURLConnection connection = (HttpURLConnection)uri.toURL().openConnection();
+ connection.connect();
+
+ for (String header : connection.getHeaderFields().keySet())
+ {
+ System.out.println(header + ": " + connection.getHeaderFields().get(header));
+ }
+ int responseCode = connection.getResponseCode();
+ assertThat("return code is 200 ok", responseCode, is(expectedStatus));
+
+ StringWriter writer = new StringWriter();
+ if (responseCode == HttpStatus.OK_200)
+ {
+ InputStream inputStream = connection.getInputStream();
+ InputStreamReader reader = new InputStreamReader(inputStream);
+ IO.copy(reader, writer);
+ }
+
+ return writer;
+ }
+
+}
diff --git a/jetty-server/src/test/resources/jetty-logging.properties b/jetty-server/src/test/resources/jetty-logging.properties
new file mode 100644
index 0000000..e10fb08
--- /dev/null
+++ b/jetty-server/src/test/resources/jetty-logging.properties
@@ -0,0 +1,2 @@
+org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
+org.eclipse.jetty.server.LEVEL=DEBUG
diff --git a/jetty-spdy/pom.xml b/jetty-spdy/pom.xml
index 59ac802..ae9bc54 100644
--- a/jetty-spdy/pom.xml
+++ b/jetty-spdy/pom.xml
@@ -19,8 +19,8 @@
<modules>
<module>spdy-core</module>
<module>spdy-jetty</module>
- <!--
<module>spdy-jetty-http</module>
+ <!--
<module>spdy-jetty-http-webapp</module>
-->
</modules>
diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/AbstractHTTPSPDYServerConnector.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/AbstractHTTPSPDYServerConnector.java
index 543f783..0aa6a43 100644
--- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/AbstractHTTPSPDYServerConnector.java
+++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/AbstractHTTPSPDYServerConnector.java
@@ -13,49 +13,45 @@
package org.eclipse.jetty.spdy.http;
-import java.io.IOException;
-
-import org.eclipse.jetty.http.HttpSchemes;
-import org.eclipse.jetty.io.EndPoint;
-import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.spdy.SPDYServerConnector;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
public class AbstractHTTPSPDYServerConnector extends SPDYServerConnector
{
- public AbstractHTTPSPDYServerConnector(ServerSessionFrameListener listener, SslContextFactory sslContextFactory)
+ public AbstractHTTPSPDYServerConnector(Server server, ServerSessionFrameListener listener)
{
- super(listener, sslContextFactory);
+ super(server, listener);
}
- @Override
- public void customize(EndPoint endPoint, Request request) throws IOException
- {
- super.customize(endPoint, request);
- if (getSslContextFactory() != null)
- request.setScheme(HttpSchemes.HTTPS);
- }
-
- @Override
- public boolean isConfidential(Request request)
- {
- if (getSslContextFactory() != null)
- {
- int confidentialPort = getConfidentialPort();
- return confidentialPort == 0 || confidentialPort == request.getServerPort();
- }
- return super.isConfidential(request);
- }
-
- @Override
- public boolean isIntegral(Request request)
- {
- if (getSslContextFactory() != null)
- {
- int integralPort = getIntegralPort();
- return integralPort == 0 || integralPort == request.getServerPort();
- }
- return super.isIntegral(request);
- }
+ //TODO:
+// @Override
+// public void customize(EndPoint endPoint, Request request) throws IOException
+// {
+// super.customize(endPoint, request);
+// if (getSslContextFactory() != null)
+// request.setScheme(HttpSchemes.HTTPS);
+// }
+//
+// @Override
+// public boolean isConfidential(Request request)
+// {
+// if (getSslContextFactory() != null)
+// {
+// int confidentialPort = getConfidentialPort();
+// return confidentialPort == 0 || confidentialPort == request.getServerPort();
+// }
+// return super.isConfidential(request);
+// }
+//
+// @Override
+// public boolean isIntegral(Request request)
+// {
+// if (getSslContextFactory() != null)
+// {
+// int integralPort = getIntegralPort();
+// return integralPort == 0 || integralPort == request.getServerPort();
+// }
+// return super.isIntegral(request);
+// }
}
diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/HTTPSPDYServerConnector.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/HTTPSPDYServerConnector.java
index 8b76589..d9b258d 100644
--- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/HTTPSPDYServerConnector.java
+++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/HTTPSPDYServerConnector.java
@@ -17,40 +17,32 @@ package org.eclipse.jetty.spdy.http;
import java.util.Collections;
import java.util.Map;
+import org.eclipse.jetty.server.HttpServerConnectionFactory;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.spdy.SPDYServerConnector;
import org.eclipse.jetty.spdy.api.SPDY;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
-public class HTTPSPDYServerConnector extends AbstractHTTPSPDYServerConnector
+public class HTTPSPDYServerConnector extends SPDYServerConnector
{
- public HTTPSPDYServerConnector()
+ public HTTPSPDYServerConnector(Server server)
{
- this(null, Collections.<Short, PushStrategy>emptyMap());
+ this(server, Collections.<Short, PushStrategy>emptyMap());
}
- public HTTPSPDYServerConnector(Map<Short, PushStrategy> pushStrategies)
- {
- this(null, pushStrategies);
- }
-
- public HTTPSPDYServerConnector(SslContextFactory sslContextFactory)
- {
- this(sslContextFactory, Collections.<Short, PushStrategy>emptyMap());
- }
-
- public HTTPSPDYServerConnector(SslContextFactory sslContextFactory, Map<Short, PushStrategy> pushStrategies)
+ public HTTPSPDYServerConnector(Server server, Map<Short, PushStrategy> pushStrategies)
{
// We pass a null ServerSessionFrameListener because for
// HTTP over SPDY we need one that references the endPoint
- super(null, sslContextFactory);
- clearAsyncConnectionFactories();
+ super(server, null);
+ clearConnectionFactories();
// The "spdy/3" protocol handles HTTP over SPDY
- putAsyncConnectionFactory("spdy/3", new ServerHTTPSPDYAsyncConnectionFactory(SPDY.V3, getByteBufferPool(), getExecutor(), getScheduler(), this, getPushStrategy(SPDY.V3,pushStrategies)));
+ putConnectionFactory("spdy/3", new ServerHTTPSPDYAsyncConnectionFactory(SPDY.V3, getByteBufferPool(), getExecutor(), getScheduler(), this, getPushStrategy(SPDY.V3, pushStrategies)));
// The "spdy/2" protocol handles HTTP over SPDY
- putAsyncConnectionFactory("spdy/2", new ServerHTTPSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), getScheduler(), this, getPushStrategy(SPDY.V2,pushStrategies)));
+ putConnectionFactory("spdy/2", new ServerHTTPSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), getScheduler(), this, getPushStrategy(SPDY.V2, pushStrategies)));
// The "http/1.1" protocol handles browsers that support NPN but not SPDY
- putAsyncConnectionFactory("http/1.1", new ServerHTTPAsyncConnectionFactory(this));
+ putConnectionFactory("http/1.1", new HttpServerConnectionFactory(this));
// The default connection factory handles plain HTTP on non-SSL or non-NPN connections
- setDefaultAsyncConnectionFactory(getAsyncConnectionFactory("http/1.1"));
+ setDefaultConnectionFactory(getConnectionFactory("http/1.1"));
}
private PushStrategy getPushStrategy(short version, Map<Short, PushStrategy> pushStrategies)
diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnection.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnection.java
deleted file mode 100644
index ecc241e..0000000
--- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnection.java
+++ /dev/null
@@ -1,797 +0,0 @@
-//========================================================================
-//Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
-//------------------------------------------------------------------------
-//All rights reserved. This program and the accompanying materials
-//are made available under the terms of the Eclipse Public License v1.0
-//and Apache License v2.0 which accompanies this distribution.
-//The Eclipse Public License is available at
-//http://www.eclipse.org/legal/epl-v10.html
-//The Apache License v2.0 is available at
-//http://www.opensource.org/licenses/apache2.0.php
-//You may elect to redistribute this code under either of these licenses.
-//========================================================================
-
-
-package org.eclipse.jetty.spdy.http;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.eclipse.jetty.http.HttpException;
-import org.eclipse.jetty.http.HttpFields;
-import org.eclipse.jetty.http.HttpGenerator;
-import org.eclipse.jetty.http.HttpParser;
-import org.eclipse.jetty.http.HttpStatus;
-import org.eclipse.jetty.io.AsyncEndPoint;
-import org.eclipse.jetty.io.Buffer;
-import org.eclipse.jetty.io.Buffers;
-import org.eclipse.jetty.io.ByteArrayBuffer;
-import org.eclipse.jetty.io.Connection;
-import org.eclipse.jetty.io.EndPoint;
-import org.eclipse.jetty.io.nio.AsyncConnection;
-import org.eclipse.jetty.io.nio.DirectNIOBuffer;
-import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
-import org.eclipse.jetty.io.nio.NIOBuffer;
-import org.eclipse.jetty.server.AbstractHttpConnection;
-import org.eclipse.jetty.server.Connector;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.spdy.SPDYAsyncConnection;
-import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
-import org.eclipse.jetty.spdy.api.BytesDataInfo;
-import org.eclipse.jetty.spdy.api.DataInfo;
-import org.eclipse.jetty.spdy.api.Handler;
-import org.eclipse.jetty.spdy.api.Headers;
-import org.eclipse.jetty.spdy.api.ReplyInfo;
-import org.eclipse.jetty.spdy.api.RstInfo;
-import org.eclipse.jetty.spdy.api.SPDY;
-import org.eclipse.jetty.spdy.api.Stream;
-import org.eclipse.jetty.spdy.api.StreamStatus;
-import org.eclipse.jetty.spdy.api.SynInfo;
-import org.eclipse.jetty.util.log.Log;
-import org.eclipse.jetty.util.log.Logger;
-
-public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implements AsyncConnection
-{
- private static final Logger logger = Log.getLogger(ServerHTTPSPDYAsyncConnection.class);
- private static final ByteBuffer ZERO_BYTES = ByteBuffer.allocate(0);
- private static final DataInfo END_OF_CONTENT = new ByteBufferDataInfo(ZERO_BYTES, true);
-
- private final Queue<Runnable> tasks = new LinkedList<>();
- private final BlockingQueue<DataInfo> dataInfos = new LinkedBlockingQueue<>();
- private final short version;
- private final SPDYAsyncConnection connection;
- private final PushStrategy pushStrategy;
- private final Stream stream;
- private Headers headers; // No need for volatile, guarded by state
- private DataInfo dataInfo; // No need for volatile, guarded by state
- private NIOBuffer buffer; // No need for volatile, guarded by state
- private volatile State state = State.INITIAL;
- private boolean dispatched; // Guarded by synchronization on tasks
-
- public ServerHTTPSPDYAsyncConnection(Connector connector, AsyncEndPoint endPoint, Server server, short version, SPDYAsyncConnection connection, PushStrategy pushStrategy, Stream stream)
- {
- super(connector, endPoint, server);
- this.version = version;
- this.connection = connection;
- this.pushStrategy = pushStrategy;
- this.stream = stream;
- getParser().setPersistent(true);
- }
-
- @Override
- protected HttpParser newHttpParser(Buffers requestBuffers, EndPoint endPoint, HttpParser.EventHandler requestHandler)
- {
- return new HTTPSPDYParser(requestBuffers, endPoint);
- }
-
- @Override
- protected HttpGenerator newHttpGenerator(Buffers responseBuffers, EndPoint endPoint)
- {
- return new HTTPSPDYGenerator(responseBuffers, endPoint);
- }
-
- @Override
- public AsyncEndPoint getEndPoint()
- {
- return (AsyncEndPoint)super.getEndPoint();
- }
-
- private void post(Runnable task)
- {
- synchronized (tasks)
- {
- logger.debug("Posting task {}", task);
- tasks.offer(task);
- dispatch();
- }
- }
-
- private void dispatch()
- {
- synchronized (tasks)
- {
- if (dispatched)
- return;
-
- final Runnable task = tasks.poll();
- if (task != null)
- {
- dispatched = true;
- logger.debug("Dispatching task {}", task);
- execute(new Runnable()
- {
- @Override
- public void run()
- {
- logger.debug("Executing task {}", task);
- task.run();
- logger.debug("Completing task {}", task);
- dispatched = false;
- dispatch();
- }
- });
- }
- }
- }
-
- protected void execute(Runnable task)
- {
- getServer().getThreadPool().dispatch(task);
- }
-
- @Override
- public Connection handle()
- {
- setCurrentConnection(this);
- try
- {
- switch (state)
- {
- case INITIAL:
- {
- break;
- }
- case REQUEST:
- {
- Headers.Header method = headers.get(HTTPSPDYHeader.METHOD.name(version));
- Headers.Header uri = headers.get(HTTPSPDYHeader.URI.name(version));
- Headers.Header version = headers.get(HTTPSPDYHeader.VERSION.name(this.version));
-
- if (method == null || uri == null || version == null)
- throw new HttpException(HttpStatus.BAD_REQUEST_400);
-
- String m = method.value();
- String u = uri.value();
- String v = version.value();
- logger.debug("HTTP > {} {} {}", m, u, v);
- startRequest(new ByteArrayBuffer(m), new ByteArrayBuffer(u), new ByteArrayBuffer(v));
-
- Headers.Header schemeHeader = headers.get(HTTPSPDYHeader.SCHEME.name(this.version));
- if(schemeHeader != null)
- _request.setScheme(schemeHeader.value());
-
- updateState(State.HEADERS);
- handle();
- break;
- }
- case HEADERS:
- {
- for (Headers.Header header : headers)
- {
- String name = header.name();
-
- // Skip special SPDY headers, unless it's the "host" header
- HTTPSPDYHeader specialHeader = HTTPSPDYHeader.from(version, name);
- if (specialHeader != null)
- {
- if (specialHeader == HTTPSPDYHeader.HOST)
- name = "host";
- else
- continue;
- }
-
- switch (name)
- {
- case "connection":
- case "keep-alive":
- case "proxy-connection":
- case "transfer-encoding":
- {
- // Spec says to ignore these headers
- continue;
- }
- default:
- {
- // Spec says headers must be single valued
- String value = header.value();
- logger.debug("HTTP > {}: {}", name, value);
- parsedHeader(new ByteArrayBuffer(name), new ByteArrayBuffer(value));
- break;
- }
- }
- }
- break;
- }
- case HEADERS_COMPLETE:
- {
- headerComplete();
- break;
- }
- case CONTENT:
- {
- final Buffer buffer = this.buffer;
- if (buffer != null && buffer.length() > 0)
- content(buffer);
- break;
- }
- case FINAL:
- {
- messageComplete(0);
- break;
- }
- case ASYNC:
- {
- handleRequest();
- break;
- }
- default:
- {
- throw new IllegalStateException();
- }
- }
- return this;
- }
- catch (HttpException x)
- {
- respond(stream, x.getStatus());
- return this;
- }
- catch (IOException x)
- {
- close(stream);
- return this;
- }
- finally
- {
- setCurrentConnection(null);
- }
- }
-
- private void respond(Stream stream, int status)
- {
- if (stream.isUnidirectional())
- {
- stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.INTERNAL_ERROR));
- }
- else
- {
- Headers headers = new Headers();
- headers.put(HTTPSPDYHeader.STATUS.name(version), String.valueOf(status));
- headers.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
- stream.reply(new ReplyInfo(headers, true));
- }
- }
-
- private void close(Stream stream)
- {
- stream.getSession().goAway();
- }
-
- @Override
- public void onInputShutdown() throws IOException
- {
- }
-
- private void updateState(State newState)
- {
- logger.debug("State update {} -> {}", state, newState);
- state = newState;
- }
-
- public void beginRequest(final Headers headers, final boolean endRequest)
- {
- this.headers = headers.isEmpty() ? null : headers;
- post(new Runnable()
- {
- @Override
- public void run()
- {
- if (!headers.isEmpty())
- updateState(State.REQUEST);
- handle();
- if (endRequest)
- performEndRequest();
- }
- });
- }
-
- public void headers(Headers headers)
- {
- this.headers = headers;
- post(new Runnable()
- {
- @Override
- public void run()
- {
- updateState(state == State.INITIAL ? State.REQUEST : State.HEADERS);
- handle();
- }
- });
- }
-
- public void content(final DataInfo dataInfo, boolean endRequest)
- {
- // We need to copy the dataInfo since we do not know when its bytes
- // will be consumed. When the copy is consumed, we consume also the
- // original, so the implementation can send a window update.
- ByteBufferDataInfo copyDataInfo = new ByteBufferDataInfo(dataInfo.asByteBuffer(false), dataInfo.isClose(), dataInfo.isCompress())
- {
- @Override
- public void consume(int delta)
- {
- super.consume(delta);
- dataInfo.consume(delta);
- }
- };
- logger.debug("Queuing last={} content {}", endRequest, copyDataInfo);
- dataInfos.offer(copyDataInfo);
- if (endRequest)
- dataInfos.offer(END_OF_CONTENT);
- post(new Runnable()
- {
- @Override
- public void run()
- {
- logger.debug("HTTP > {} bytes of content", dataInfo.length());
- if (state == State.HEADERS)
- {
- updateState(State.HEADERS_COMPLETE);
- handle();
- }
- updateState(State.CONTENT);
- handle();
- }
- });
- }
-
- public void endRequest()
- {
- post(new Runnable()
- {
- public void run()
- {
- performEndRequest();
- }
- });
- }
-
- private void performEndRequest()
- {
- if (state == State.HEADERS)
- {
- updateState(State.HEADERS_COMPLETE);
- handle();
- }
- updateState(State.FINAL);
- handle();
- }
-
- public void async()
- {
- post(new Runnable()
- {
- @Override
- public void run()
- {
- State oldState = state;
- updateState(State.ASYNC);
- handle();
- updateState(oldState);
- }
- });
- }
-
- protected void reply(Stream stream, ReplyInfo replyInfo)
- {
- if (!stream.isUnidirectional())
- stream.reply(replyInfo);
- if (replyInfo.getHeaders().get(HTTPSPDYHeader.STATUS.name(version)).value().startsWith("200") &&
- !stream.isClosed())
- {
- // We have a 200 OK with some content to send
-
- Headers.Header scheme = headers.get(HTTPSPDYHeader.SCHEME.name(version));
- Headers.Header host = headers.get(HTTPSPDYHeader.HOST.name(version));
- Headers.Header uri = headers.get(HTTPSPDYHeader.URI.name(version));
- Set<String> pushResources = pushStrategy.apply(stream, headers, replyInfo.getHeaders());
-
- for (String pushResourcePath : pushResources)
- {
- final Headers requestHeaders = createRequestHeaders(scheme, host, uri, pushResourcePath);
- final Headers pushHeaders = createPushHeaders(scheme, host, pushResourcePath);
-
- stream.syn(new SynInfo(pushHeaders, false), getMaxIdleTime(), TimeUnit.MILLISECONDS, new Handler.Adapter<Stream>()
- {
- @Override
- public void completed(Stream pushStream)
- {
- ServerHTTPSPDYAsyncConnection pushConnection =
- new ServerHTTPSPDYAsyncConnection(getConnector(), getEndPoint(), getServer(), version, connection, pushStrategy, pushStream);
- pushConnection.beginRequest(requestHeaders, true);
- }
- });
- }
- }
- }
-
- private Headers createRequestHeaders(Headers.Header scheme, Headers.Header host, Headers.Header uri, String pushResourcePath)
- {
- final Headers requestHeaders = new Headers();
- requestHeaders.put(HTTPSPDYHeader.METHOD.name(version), "GET");
- requestHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
- requestHeaders.put(scheme);
- requestHeaders.put(host);
- requestHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
- String referrer = scheme.value() + "://" + host.value() + uri.value();
- requestHeaders.put("referer", referrer);
- // Remember support for gzip encoding
- requestHeaders.put(headers.get("accept-encoding"));
- requestHeaders.put("x-spdy-push", "true");
- return requestHeaders;
- }
-
- private Headers createPushHeaders(Headers.Header scheme, Headers.Header host, String pushResourcePath)
- {
- final Headers pushHeaders = new Headers();
- if (version == SPDY.V2)
- pushHeaders.put(HTTPSPDYHeader.URI.name(version), scheme.value() + "://" + host.value() + pushResourcePath);
- else
- {
- pushHeaders.put(HTTPSPDYHeader.URI.name(version), pushResourcePath);
- pushHeaders.put(scheme);
- pushHeaders.put(host);
- }
- pushHeaders.put(HTTPSPDYHeader.STATUS.name(version), "200");
- pushHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
- return pushHeaders;
- }
-
- private Buffer consumeContent(long maxIdleTime) throws IOException, InterruptedException
- {
- while (true)
- {
- // Volatile read to ensure visibility
- State state = this.state;
- if (state != State.HEADERS_COMPLETE && state != State.CONTENT && state != State.FINAL)
- throw new IllegalStateException();
-
- if (buffer != null)
- {
- if (buffer.length() > 0)
- {
- logger.debug("Consuming content bytes, {} available", buffer.length());
- return buffer;
- }
- else
- {
- // The application has consumed the buffer, so consume also the DataInfo
- dataInfo.consume(dataInfo.length());
- logger.debug("Consumed {} content bytes, queue size {}", dataInfo.consumed(), dataInfos.size());
- dataInfo = null;
- buffer = null;
- // Loop to get content bytes from DataInfos
- }
- }
- else
- {
- logger.debug("Waiting at most {} ms for content bytes", maxIdleTime);
- long begin = System.nanoTime();
- dataInfo = dataInfos.poll(maxIdleTime, TimeUnit.MILLISECONDS);
- long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - begin);
- logger.debug("Waited {} ms for content bytes", elapsed);
- if (dataInfo != null)
- {
- if (dataInfo == END_OF_CONTENT)
- {
- logger.debug("End of content bytes, queue size {}", dataInfos.size());
- return null;
- }
-
- ByteBuffer byteBuffer = dataInfo.asByteBuffer(false);
- buffer = byteBuffer.isDirect() ? new DirectNIOBuffer(byteBuffer, false) : new IndirectNIOBuffer(byteBuffer, false);
- // Loop to return the buffer
- }
- else
- {
- stream.getSession().goAway();
- throw new EOFException("read timeout");
- }
- }
- }
- }
-
- private int availableContent()
- {
- // Volatile read to ensure visibility
- State state = this.state;
- if (state != State.HEADERS_COMPLETE && state != State.CONTENT)
- throw new IllegalStateException();
- return buffer == null ? 0 : buffer.length();
- }
-
- @Override
- public void commitResponse(boolean last) throws IOException
- {
- // Keep the original behavior since it just delegates to the generator
- super.commitResponse(last);
- }
-
- @Override
- public void flushResponse() throws IOException
- {
- // Just commit the response, if necessary: flushing buffers will be taken care of in complete()
- commitResponse(false);
- }
-
- @Override
- public void completeResponse() throws IOException
- {
- // Keep the original behavior since it just delegates to the generator
- super.completeResponse();
- }
-
- private enum State
- {
- INITIAL, REQUEST, HEADERS, HEADERS_COMPLETE, CONTENT, FINAL, ASYNC
- }
-
- /**
- * Needed in order to override parser methods that read content.
- */
- private class HTTPSPDYParser extends HttpParser
- {
- public HTTPSPDYParser(Buffers buffers, EndPoint endPoint)
- {
- super(buffers, endPoint, new HTTPSPDYParserHandler());
- }
-
- @Override
- public Buffer blockForContent(long maxIdleTime) throws IOException
- {
- try
- {
- return consumeContent(maxIdleTime);
- }
- catch (InterruptedException x)
- {
- throw new InterruptedIOException();
- }
- }
-
- @Override
- public int available() throws IOException
- {
- return availableContent();
- }
- }
-
- /**
- * Empty implementation, since it won't parse anything
- */
- private static class HTTPSPDYParserHandler extends HttpParser.EventHandler
- {
- @Override
- public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
- {
- }
-
- @Override
- public void content(Buffer ref) throws IOException
- {
- }
-
- @Override
- public void startResponse(Buffer version, int status, Buffer reason) throws IOException
- {
- }
- }
-
- /**
- * Needed in order to override generator methods that would generate HTTP,
- * since we must generate SPDY instead.
- */
- private class HTTPSPDYGenerator extends HttpGenerator
- {
- private boolean closed;
-
- private HTTPSPDYGenerator(Buffers buffers, EndPoint endPoint)
- {
- super(buffers, endPoint);
- }
-
- @Override
- public void send1xx(int code) throws IOException
- {
- // TODO: not supported yet, but unlikely to be called
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void sendResponse(Buffer response) throws IOException
- {
- // Do not think this method is ever used.
- // Jetty calls it from Request.setAttribute() only if the attribute
- // "org.eclipse.jetty.server.ResponseBuffer", seems like a hack.
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void sendError(int code, String reason, String content, boolean close) throws IOException
- {
- // Keep original behavior because it's delegating to other methods that we override.
- super.sendError(code, reason, content, close);
- }
-
- @Override
- public void completeHeader(HttpFields fields, boolean allContentAdded) throws IOException
- {
- Headers headers = new Headers();
- String version = "HTTP/1.1";
- headers.put(HTTPSPDYHeader.VERSION.name(ServerHTTPSPDYAsyncConnection.this.version), version);
- StringBuilder status = new StringBuilder().append(_status);
- if (_reason != null)
- status.append(" ").append(_reason.toString("UTF-8"));
- headers.put(HTTPSPDYHeader.STATUS.name(ServerHTTPSPDYAsyncConnection.this.version), status.toString());
- logger.debug("HTTP < {} {}", version, status);
-
- if (fields != null)
- {
- for (int i = 0; i < fields.size(); ++i)
- {
- HttpFields.Field field = fields.getField(i);
- String name = field.getName().toLowerCase();
- String value = field.getValue();
- headers.put(name, value);
- logger.debug("HTTP < {}: {}", name, value);
- }
- }
-
- // We have to query the HttpGenerator and its buffers to know
- // whether there is content buffered and update the generator state
- Buffer content = getContentBuffer();
- reply(stream, new ReplyInfo(headers, content == null));
- if (content != null)
- {
- closed = false;
- // Update HttpGenerator fields so that they remain consistent
- _state = HttpGenerator.STATE_CONTENT;
- }
- else
- {
- closed = true;
- // Update HttpGenerator fields so that they remain consistent
- _state = HttpGenerator.STATE_END;
- }
- }
-
- private Buffer getContentBuffer()
- {
- if (_buffer != null && _buffer.length() > 0)
- return _buffer;
- if (_content != null && _content.length() > 0)
- return _content;
- return null;
- }
-
- @Override
- public boolean addContent(byte b) throws IOException
- {
- // In HttpGenerator, writing one byte only has a different path than
- // writing a buffer. Here we normalize these path to keep it simpler.
- addContent(new ByteArrayBuffer(new byte[]{b}), false);
- return false;
- }
-
- @Override
- public void addContent(Buffer content, boolean last) throws IOException
- {
- // Keep the original behavior since adding content will
- // just accumulate bytes until the response is committed.
- super.addContent(content, last);
- }
-
- @Override
- public void flush(long maxIdleTime) throws IOException
- {
- try
- {
- Buffer content = getContentBuffer();
- while (content != null)
- {
- DataInfo dataInfo = toDataInfo(content, closed);
- logger.debug("HTTP < {} bytes of content", dataInfo.length());
- stream.data(dataInfo).get(maxIdleTime, TimeUnit.MILLISECONDS);
- content.clear();
- _bypass = false;
- content = getContentBuffer();
- }
- }
- catch (TimeoutException x)
- {
- stream.getSession().goAway();
- throw new EOFException("write timeout");
- }
- catch (InterruptedException x)
- {
- throw new InterruptedIOException();
- }
- catch (ExecutionException x)
- {
- throw new IOException(x.getCause());
- }
- }
-
- private DataInfo toDataInfo(Buffer buffer, boolean close)
- {
- if (buffer instanceof ByteArrayBuffer)
- return new BytesDataInfo(buffer.array(), buffer.getIndex(), buffer.length(), close);
-
- if (buffer instanceof NIOBuffer)
- {
- ByteBuffer byteBuffer = ((NIOBuffer)buffer).getByteBuffer();
- byteBuffer.limit(buffer.putIndex());
- byteBuffer.position(buffer.getIndex());
- return new ByteBufferDataInfo(byteBuffer, close);
- }
-
- return new BytesDataInfo(buffer.asArray(), close);
- }
-
- @Override
- public int flushBuffer() throws IOException
- {
- // Must never be called because it's where the HttpGenerator writes
- // the HTTP content to the EndPoint (we should write SPDY instead).
- // If it's called it's our bug.
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void blockForOutput(long maxIdleTime) throws IOException
- {
- // The semantic of this method is weird: not only it has to block
- // but also need to flush. Since we have a blocking flush method
- // we delegate to that, because it has the same semantic.
- flush(maxIdleTime);
- }
-
- @Override
- public void complete() throws IOException
- {
- Buffer content = getContentBuffer();
- if (content != null)
- {
- closed = true;
- _state = STATE_END;
- flush(getMaxIdleTime());
- }
- else if (!closed)
- {
- closed = true;
- _state = STATE_END;
- // Send the last, empty, data frame
- stream.data(new ByteBufferDataInfo(ZERO_BYTES, true));
- }
- }
- }
-}
diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnectionFactory.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnectionFactory.java
index 7a5f381..fa29394 100644
--- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnectionFactory.java
+++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnectionFactory.java
@@ -14,19 +14,37 @@
package org.eclipse.jetty.spdy.http;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
-import org.eclipse.jetty.io.AsyncEndPoint;
+import org.eclipse.jetty.http.HttpFields;
+import org.eclipse.jetty.http.HttpGenerator;
+import org.eclipse.jetty.http.HttpHeader;
+import org.eclipse.jetty.http.HttpMethod;
+import org.eclipse.jetty.http.HttpVersion;
+import org.eclipse.jetty.io.ByteBufferPool;
+import org.eclipse.jetty.io.Connection;
+import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Connector;
-import org.eclipse.jetty.spdy.ByteBufferPool;
-import org.eclipse.jetty.spdy.EmptyAsyncEndPoint;
-import org.eclipse.jetty.spdy.SPDYAsyncConnection;
+import org.eclipse.jetty.server.HttpChannel;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpInput;
+import org.eclipse.jetty.server.Response;
+import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.spdy.ServerSPDYAsyncConnectionFactory;
+import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
+import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.SynInfo;
@@ -36,7 +54,7 @@ import org.eclipse.jetty.util.log.Logger;
public class ServerHTTPSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnectionFactory
{
- private static final String CONNECTION_ATTRIBUTE = "org.eclipse.jetty.spdy.http.connection";
+ private static final String CHANNEL_ATTRIBUTE = "org.eclipse.jetty.spdy.http.HTTPChannelOverSPDY";
private static final Logger logger = Log.getLogger(ServerHTTPSPDYAsyncConnectionFactory.class);
private final Connector connector;
@@ -50,16 +68,16 @@ public class ServerHTTPSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnect
}
@Override
- protected ServerSessionFrameListener provideServerSessionFrameListener(AsyncEndPoint endPoint, Object attachment)
+ protected ServerSessionFrameListener provideServerSessionFrameListener(EndPoint endPoint, Object attachment)
{
return new HTTPServerFrameListener(endPoint);
}
private class HTTPServerFrameListener extends ServerSessionFrameListener.Adapter implements StreamFrameListener
{
- private final AsyncEndPoint endPoint;
+ private final EndPoint endPoint;
- public HTTPServerFrameListener(AsyncEndPoint endPoint)
+ public HTTPServerFrameListener(EndPoint endPoint)
{
this.endPoint = endPoint;
}
@@ -75,28 +93,27 @@ public class ServerHTTPSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnect
logger.debug("Received {} on {}", synInfo, stream);
- HTTPSPDYAsyncEndPoint asyncEndPoint = new HTTPSPDYAsyncEndPoint(endPoint, stream);
- ServerHTTPSPDYAsyncConnection connection = new ServerHTTPSPDYAsyncConnection(connector, asyncEndPoint,
- connector.getServer(), getVersion(), (SPDYAsyncConnection)endPoint.getConnection(),
- pushStrategy, stream);
- asyncEndPoint.setConnection(connection);
- stream.setAttribute(CONNECTION_ATTRIBUTE, connection);
+ HTTPChannelOverSPDY channel = new HTTPChannelOverSPDY(connector.getServer(), endPoint.getConnection(),
+ stream);
+ stream.setAttribute(CHANNEL_ATTRIBUTE, channel);
Headers headers = synInfo.getHeaders();
- connection.beginRequest(headers, synInfo.isClose());
if (headers.isEmpty())
{
// If the SYN has no headers, they may come later in a HEADERS frame
return this;
}
- else
+
+ channel.beginRequest(headers);
+
+ if (synInfo.isClose())
{
- if (synInfo.isClose())
- return null;
- else
- return this;
+ channel.endRequest();
+ return null;
}
+ else
+ return this;
}
@Override
@@ -109,75 +126,399 @@ public class ServerHTTPSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnect
public void onHeaders(Stream stream, HeadersInfo headersInfo)
{
logger.debug("Received {} on {}", headersInfo, stream);
- ServerHTTPSPDYAsyncConnection connection = (ServerHTTPSPDYAsyncConnection)stream.getAttribute(CONNECTION_ATTRIBUTE);
- connection.headers(headersInfo.getHeaders());
+ HTTPChannelOverSPDY channel = (HTTPChannelOverSPDY)stream.getAttribute(CHANNEL_ATTRIBUTE);
+ HttpChannel.EventHandler eventHandler = channel.getEventHandler();
+
+ for (Headers.Header header : headersInfo.getHeaders())
+ {
+ String name = header.name();
+
+ // Skip special SPDY headers, unless it's the "host" header
+ HTTPSPDYHeader specialHeader = HTTPSPDYHeader.from(getVersion(), name);
+ if (specialHeader != null)
+ {
+ if (specialHeader == HTTPSPDYHeader.HOST)
+ name = "host";
+ else
+ continue;
+ }
+
+ switch (name)
+ {
+ case "connection":
+ case "keep-alive":
+ case "proxy-connection":
+ case "transfer-encoding":
+ {
+ // Spec says to ignore these headers
+ continue;
+ }
+ default:
+ {
+ HttpHeader httpHeader = HttpHeader.valueOf(header.name());
+ // Spec says headers must be single valued
+ String value = header.value();
+ logger.debug("HTTP > {}: {}", name, value);
+ //TODO: move stuff to HttpOverSPDYChannel?
+ eventHandler.parsedHeader(httpHeader, header.name(), value);
+ }
+ }
+ }
+
if (headersInfo.isClose())
- connection.endRequest();
+ channel.endRequest();
}
@Override
- public void onData(Stream stream, DataInfo dataInfo)
+ public void onData(Stream stream, final DataInfo dataInfo)
{
logger.debug("Received {} on {}", dataInfo, stream);
- ServerHTTPSPDYAsyncConnection connection = (ServerHTTPSPDYAsyncConnection)stream.getAttribute(CONNECTION_ATTRIBUTE);
- connection.content(dataInfo, dataInfo.isClose());
+ HTTPChannelOverSPDY channel = (HTTPChannelOverSPDY)stream.getAttribute(CHANNEL_ATTRIBUTE);
+
+ // We need to copy the dataInfo since we do not know when its bytes
+ // will be consumed. When the copy is consumed, we consume also the
+ // original, so the implementation can send a window update.
+ ByteBufferDataInfo copyDataInfo = new ByteBufferDataInfo(dataInfo.asByteBuffer(false), dataInfo.isClose(), dataInfo.isCompress())
+ {
+ @Override
+ public void consume(int delta)
+ {
+ super.consume(delta);
+ dataInfo.consume(delta);
+ }
+ };
+ logger.debug("Queuing last={} content {}", dataInfo.isClose(), copyDataInfo);
+ channel.getEventHandler().content(copyDataInfo.asByteBuffer(true));
+ // dataInfos.offer(copyDataInfo); //TODO:
+ // .content()
+ // if (endRequest)
+ // dataInfos.offer(END_OF_CONTENT);
+ // updateState(State.CONTENT);
+ // handle();
+ // }
+ // });
+ //
+ // connection.content(dataInfo, dataInfo.isClose());
if (dataInfo.isClose())
- connection.endRequest();
+ channel.endRequest();
}
}
- private class HTTPSPDYAsyncEndPoint extends EmptyAsyncEndPoint
+ private enum State
+ {
+ INITIAL, REQUEST, HEADERS, HEADERS_COMPLETE, CONTENT, FINAL, ASYNC
+ }
+
+ private class HTTPChannelOverSPDY extends HttpChannel
{
- private final AsyncEndPoint endPoint;
private final Stream stream;
- private HTTPSPDYAsyncEndPoint(AsyncEndPoint endPoint, Stream stream)
+ private final Queue<Runnable> tasks = new LinkedList<>();
+ private final BlockingQueue<DataInfo> dataInfos = new LinkedBlockingQueue<>();
+
+ // TODO: volatile?
+ private Headers headers; // No need for volatile, guarded by state
+ private DataInfo dataInfo; // No need for volatile, guarded by state
+ // private NIOBuffer buffer; // No need for volatile, guarded by state
+ private volatile State state = State.INITIAL;
+ private boolean dispatched; // Guarded by synchronization on tasks
+
+
+ public HTTPChannelOverSPDY(Server server, Connection connection, Stream stream)
{
- this.endPoint = endPoint;
+ super(server, connection, new HttpInput());
this.stream = stream;
}
+ private void post(Runnable task)
+ {
+ synchronized (tasks)
+ {
+ logger.debug("Posting task {}", task);
+ tasks.offer(task);
+ dispatch();
+ }
+ }
+
+ private void dispatch()
+ {
+ synchronized (tasks)
+ {
+ if (dispatched)
+ return;
+
+ final Runnable task = tasks.poll();
+ if (task != null)
+ {
+ dispatched = true;
+ logger.debug("Dispatching task {}", task);
+ execute(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ logger.debug("Executing task {}", task);
+ task.run();
+ logger.debug("Completing task {}", task);
+ dispatched = false;
+ dispatch();
+ }
+ });
+ }
+ }
+ }
+
+ private void endRequest()
+ {
+ // TODO: hasBody is unused, persistent is false in spdy
+ getEventHandler().headerComplete(false, false);
+ // TODO: contentLength is unused
+ getEventHandler().messageComplete(-1);
+ //TODO: is this the way to go?
+ handle();
+ }
+
+ private void beginRequest(Headers headers)
+ {
+ this.headers = headers;
+ Headers.Header method = headers.get(HTTPSPDYHeader.METHOD.name(getVersion()));
+ Headers.Header uri = headers.get(HTTPSPDYHeader.URI.name(getVersion()));
+ Headers.Header version = headers.get(HTTPSPDYHeader.VERSION.name(getVersion()));
+
+ if (method == null || uri == null || version == null)
+ throw new IllegalStateException("400"); // TODO: replace with HttpException equivalent
+ // throw new HttpException(HttpStatus.BAD_REQUEST_400);
+
+ HttpMethod httpMethod = HttpMethod.fromString(method.value());
+ HttpVersion httpVersion = HttpVersion.fromString(version.value());
+ String uriString = uri.value();
+
+ logger.debug("HTTP > {} {} {}", httpMethod, uriString, httpVersion);
+ //TODO: why pass httpMethod and httpMethod.asString() ?
+ getEventHandler().startRequest(httpMethod, httpMethod.asString(), uriString, httpVersion);
+
+ Headers.Header schemeHeader = headers.get(HTTPSPDYHeader.SCHEME.name(getVersion()));
+ // if (schemeHeader != null) //TODO: thomas
+ // _request.setScheme(schemeHeader.value());
+
+ for (Headers.Header header : headers)
+ {
+ String name = header.name();
+ HttpHeader httpHeader = null;
+
+ // Skip special SPDY headers, unless it's the "host" header
+ HTTPSPDYHeader specialHeader = HTTPSPDYHeader.from(getVersion(), name);
+ if (specialHeader != null)
+ {
+ if (specialHeader == HTTPSPDYHeader.HOST)
+ {
+ httpHeader = HttpHeader.HOST;
+ name = "host";
+ }
+ else
+ continue;
+ }
+
+ switch (name)
+ {
+ case "connection":
+ case "keep-alive":
+ case "proxy-connection":
+ case "transfer-encoding":
+ {
+ // Spec says to ignore these headers
+ continue;
+ }
+ default:
+ {
+ // Spec says headers must be single valued
+ String value = header.value();
+ logger.debug("HTTP > {}: {}", name, value);
+ //TODO: Is it safe to pass a null HttpHeader here?
+ getEventHandler().parsedHeader(httpHeader, name, value);
+ break;
+ }
+ }
+ }
+ }
+
@Override
- public void asyncDispatch()
+ public Connector getConnector()
{
- ServerHTTPSPDYAsyncConnection connection = (ServerHTTPSPDYAsyncConnection)stream.getAttribute(CONNECTION_ATTRIBUTE);
- connection.async();
+ logger.debug("getConnector");
+ return null;
}
@Override
- public String getLocalAddr()
+ public HttpConfiguration getHttpConfiguration()
{
- return endPoint.getLocalAddr();
+ logger.debug("getHttpConfiguration");
+ return new HttpConfiguration(null, false);
}
@Override
- public String getLocalHost()
+ protected int write(ByteBuffer content) throws IOException
{
- return endPoint.getLocalHost();
+ logger.debug("write");
+ return 0;
}
@Override
- public int getLocalPort()
+ protected void commitResponse(HttpGenerator.ResponseInfo info, ByteBuffer content) throws IOException
{
- return endPoint.getLocalPort();
+ logger.debug("commitResponse");
}
@Override
- public String getRemoteAddr()
+ protected int getContentBufferSize()
{
- return endPoint.getRemoteAddr();
+ logger.debug("getContentBufferSize");
+ return 0;
}
@Override
- public String getRemoteHost()
+ protected void increaseContentBufferSize(int size)
{
- return endPoint.getRemoteHost();
+ logger.debug("increaseContentBufferSize");
}
@Override
- public int getRemotePort()
+ protected void resetBuffer()
{
- return endPoint.getRemotePort();
+ logger.debug("resetBuffer");
}
+
+ @Override
+ protected void flushResponse() throws IOException
+ {
+ logger.debug("flushResponse");
+ }
+
+ @Override
+ protected void completeResponse() throws IOException
+ {
+ logger.debug("completeResponse");
+ getEventHandler().commit();
+ Response response = getResponse();
+ logger.debug("completed");
+ Headers headers = new Headers();
+ headers.put(HTTPSPDYHeader.VERSION.name(getVersion()), HttpVersion.HTTP_1_1.asString());
+ StringBuilder status = new StringBuilder().append(response.getStatus());
+ String reason = response.getReason();
+ if (reason != null)
+ status.append(" ").append(reason.toString());
+ headers.put(HTTPSPDYHeader.STATUS.name(getVersion()), status.toString());
+ logger.debug("HTTP < {} {}", HttpVersion.HTTP_1_1, status);
+
+ HttpFields httpFields = response.getHttpFields();
+ if (httpFields != null)
+ {
+ for (HttpFields.Field httpField : httpFields)
+ {
+ String name = httpField.getName().toLowerCase();
+ String value = httpField.getValue();
+ headers.put(name, value);
+ logger.debug("HTTP < {}: {}", name, value);
+ }
+ }
+
+ // We have to query the HttpGenerator and its buffers to know
+ // whether there is content buffered and update the generator state
+ reply(stream, new ReplyInfo(headers, response.getContentCount() < 1));
+
+ //TODO: sent content
+ }
+
+ protected void reply(Stream stream, ReplyInfo replyInfo)
+ {
+ if (!stream.isUnidirectional())
+ stream.reply(replyInfo);
+ if (replyInfo.getHeaders().get(HTTPSPDYHeader.STATUS.name(getVersion())).value().startsWith("200") &&
+ !stream.isClosed())
+ {
+ // We have a 200 OK with some content to send
+
+ Headers.Header scheme = headers.get(HTTPSPDYHeader.SCHEME.name(getVersion()));
+ Headers.Header host = headers.get(HTTPSPDYHeader.HOST.name(getVersion()));
+ Headers.Header uri = headers.get(HTTPSPDYHeader.URI.name(getVersion()));
+ Set<String> pushResources = pushStrategy.apply(stream, headers, replyInfo.getHeaders());
+
+ for (String pushResourcePath : pushResources)
+ {
+ final Headers requestHeaders = createRequestHeaders(scheme, host, uri, pushResourcePath);
+ final Headers pushHeaders = createPushHeaders(scheme, host, pushResourcePath);
+
+ //TODO:
+ // stream.syn(new SynInfo(pushHeaders, false), getMaxIdleTime(), TimeUnit.MILLISECONDS, new Handler.Adapter<Stream>()
+ // {
+ // @Override
+ // public void completed(Stream pushStream)
+ // {
+ // ServerHTTPSPDYAsyncConnection pushConnection =
+ // new ServerHTTPSPDYAsyncConnection(getConnector(), getEndPoint(), getServer(), getVersion(), connection, pushStrategy, pushStream);
+ // pushConnection.beginRequest(requestHeaders, true);
+ // }
+ // });
+ }
+ }
+ }
+
+ private Headers createRequestHeaders(Headers.Header scheme, Headers.Header host, Headers.Header uri, String pushResourcePath)
+ {
+ final Headers requestHeaders = new Headers();
+ requestHeaders.put(HTTPSPDYHeader.METHOD.name(getVersion()), "GET");
+ requestHeaders.put(HTTPSPDYHeader.VERSION.name(getVersion()), "HTTP/1.1");
+ requestHeaders.put(scheme);
+ requestHeaders.put(host);
+ requestHeaders.put(HTTPSPDYHeader.URI.name(getVersion()), pushResourcePath);
+ String referrer = scheme.value() + "://" + host.value() + uri.value();
+ requestHeaders.put("referer", referrer);
+ // Remember support for gzip encoding
+ requestHeaders.put(headers.get("accept-encoding"));
+ requestHeaders.put("x-spdy-push", "true");
+ return requestHeaders;
+ }
+
+ private Headers createPushHeaders(Headers.Header scheme, Headers.Header host, String pushResourcePath)
+ {
+ final Headers pushHeaders = new Headers();
+ if (getVersion() == SPDY.V2)
+ pushHeaders.put(HTTPSPDYHeader.URI.name(getVersion()), scheme.value() + "://" + host.value() + pushResourcePath);
+ else
+ {
+ pushHeaders.put(HTTPSPDYHeader.URI.name(getVersion()), pushResourcePath);
+ pushHeaders.put(scheme);
+ pushHeaders.put(host);
+ }
+ pushHeaders.put(HTTPSPDYHeader.STATUS.name(getVersion()), "200");
+ pushHeaders.put(HTTPSPDYHeader.VERSION.name(getVersion()), "HTTP/1.1");
+ return pushHeaders;
+ }
+
+ @Override
+ protected void completed()
+ {
+ logger.debug("completed");
+ }
+
+ @Override
+ protected void execute(Runnable task)
+ {
+ connector.getExecutor().execute(task);
+ }
+
+ @Override
+ public ScheduledExecutorService getScheduler()
+ {
+ return null;
+ }
+ }
+
+ /**
+ * Needed in order to override generator methods that would generate HTTP, since we must generate SPDY instead.
+ */
+ private class HTTPSPDYGenerator
+ {
+
}
}
diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/HTTPSPDYProxyConnector.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/HTTPSPDYProxyConnector.java
index 2827af0..0342786 100644
--- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/HTTPSPDYProxyConnector.java
+++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/HTTPSPDYProxyConnector.java
@@ -14,26 +14,21 @@
package org.eclipse.jetty.spdy.proxy;
-import org.eclipse.jetty.spdy.ServerSPDYAsyncConnectionFactory;
-import org.eclipse.jetty.spdy.api.SPDY;
-import org.eclipse.jetty.spdy.http.AbstractHTTPSPDYServerConnector;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
-
-public class HTTPSPDYProxyConnector extends AbstractHTTPSPDYServerConnector
+public class HTTPSPDYProxyConnector //extends AbstractHTTPSPDYServerConnector
{
- public HTTPSPDYProxyConnector(ProxyEngineSelector proxyEngineSelector)
- {
- this(proxyEngineSelector, null);
- }
-
- public HTTPSPDYProxyConnector(ProxyEngineSelector proxyEngineSelector, SslContextFactory sslContextFactory)
- {
- super(proxyEngineSelector, sslContextFactory);
- clearAsyncConnectionFactories();
+// public HTTPSPDYProxyConnector(ProxyEngineSelector proxyEngineSelector)
+// {
+// this(proxyEngineSelector, null);
+// }
- putAsyncConnectionFactory("spdy/3", new ServerSPDYAsyncConnectionFactory(SPDY.V3, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngineSelector));
- putAsyncConnectionFactory("spdy/2", new ServerSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngineSelector));
- putAsyncConnectionFactory("http/1.1", new ProxyHTTPAsyncConnectionFactory(this, SPDY.V2, proxyEngineSelector));
- setDefaultAsyncConnectionFactory(getAsyncConnectionFactory("http/1.1"));
- }
+// public HTTPSPDYProxyConnector(ProxyEngineSelector proxyEngineSelector, SslContextFactory sslContextFactory)
+// {
+// super(proxyEngineSelector, sslContextFactory);
+// clearAsyncConnectionFactories();
+//
+// putAsyncConnectionFactory("spdy/3", new ServerSPDYAsyncConnectionFactory(SPDY.V3, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngineSelector));
+// putAsyncConnectionFactory("spdy/2", new ServerSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngineSelector));
+// putAsyncConnectionFactory("http/1.1", new ProxyHTTPAsyncConnectionFactory(this, SPDY.V2, proxyEngineSelector));
+// setDefaultAsyncConnectionFactory(getAsyncConnectionFactory("http/1.1"));
+// }
}
diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPAsyncConnectionFactory.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPAsyncConnectionFactory.java
index f73c7e8..e3e77a1 100644
--- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPAsyncConnectionFactory.java
+++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPAsyncConnectionFactory.java
@@ -14,28 +14,21 @@
package org.eclipse.jetty.spdy.proxy;
-import java.nio.channels.SocketChannel;
-
-import org.eclipse.jetty.io.AsyncEndPoint;
-import org.eclipse.jetty.io.nio.AsyncConnection;
-import org.eclipse.jetty.spdy.SPDYServerConnector;
-import org.eclipse.jetty.spdy.http.ServerHTTPAsyncConnectionFactory;
-
-public class ProxyHTTPAsyncConnectionFactory extends ServerHTTPAsyncConnectionFactory
+public class ProxyHTTPAsyncConnectionFactory //extends ServerHTTPAsyncConnectionFactory
{
- private final short version;
- private final ProxyEngineSelector proxyEngineSelector;
-
- public ProxyHTTPAsyncConnectionFactory(SPDYServerConnector connector, short version, ProxyEngineSelector proxyEngineSelector)
- {
- super(connector);
- this.version = version;
- this.proxyEngineSelector = proxyEngineSelector;
- }
-
- @Override
- public AsyncConnection newAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment)
- {
- return new ProxyHTTPSPDYAsyncConnection(getConnector(), endPoint, version, proxyEngineSelector);
- }
+// private final short version;
+// private final ProxyEngineSelector proxyEngineSelector;
+//
+// public ProxyHTTPAsyncConnectionFactory(SPDYServerConnector connector, short version, ProxyEngineSelector proxyEngineSelector)
+// {
+// super(connector);
+// this.version = version;
+// this.proxyEngineSelector = proxyEngineSelector;
+// }
+//
+// @Override
+// public Connection newAsyncConnection(SocketChannel channel, EndPoint endPoint, Object attachment)
+// {
+// return new ProxyHTTPSPDYAsyncConnection(getConnector(), endPoint, version, proxyEngineSelector);
+// }
}
diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYAsyncConnection.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYAsyncConnection.java
index 92c3184..826b554 100644
--- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYAsyncConnection.java
+++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYAsyncConnection.java
@@ -14,324 +14,288 @@
package org.eclipse.jetty.spdy.proxy;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.eclipse.jetty.http.HttpFields;
-import org.eclipse.jetty.http.HttpGenerator;
-import org.eclipse.jetty.io.AsyncEndPoint;
-import org.eclipse.jetty.io.Buffer;
-import org.eclipse.jetty.io.ByteArrayBuffer;
-import org.eclipse.jetty.io.EndPoint;
-import org.eclipse.jetty.io.nio.DirectNIOBuffer;
-import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
-import org.eclipse.jetty.io.nio.NIOBuffer;
-import org.eclipse.jetty.server.AsyncHttpConnection;
-import org.eclipse.jetty.spdy.ISession;
-import org.eclipse.jetty.spdy.IStream;
-import org.eclipse.jetty.spdy.SPDYServerConnector;
-import org.eclipse.jetty.spdy.StandardSession;
-import org.eclipse.jetty.spdy.StandardStream;
-import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
-import org.eclipse.jetty.spdy.api.BytesDataInfo;
-import org.eclipse.jetty.spdy.api.DataInfo;
-import org.eclipse.jetty.spdy.api.GoAwayInfo;
-import org.eclipse.jetty.spdy.api.Handler;
-import org.eclipse.jetty.spdy.api.Headers;
-import org.eclipse.jetty.spdy.api.HeadersInfo;
-import org.eclipse.jetty.spdy.api.ReplyInfo;
-import org.eclipse.jetty.spdy.api.RstInfo;
-import org.eclipse.jetty.spdy.api.SessionStatus;
-import org.eclipse.jetty.spdy.api.Stream;
-import org.eclipse.jetty.spdy.api.StreamFrameListener;
-import org.eclipse.jetty.spdy.api.SynInfo;
-import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
-
-public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection
+public class ProxyHTTPSPDYAsyncConnection// extends HttpConnection
{
- private final Headers headers = new Headers();
- private final short version;
- private final ProxyEngineSelector proxyEngineSelector;
- private final HttpGenerator generator;
- private final ISession session;
- private HTTPStream stream;
- private Buffer content;
-
- public ProxyHTTPSPDYAsyncConnection(SPDYServerConnector connector, EndPoint endPoint, short version, ProxyEngineSelector proxyEngineSelector)
- {
- super(connector, endPoint, connector.getServer());
- this.version = version;
- this.proxyEngineSelector = proxyEngineSelector;
- this.generator = (HttpGenerator)_generator;
- this.session = new HTTPSession(version, connector);
- this.session.setAttribute("org.eclipse.jetty.spdy.remoteAddress", endPoint.getRemoteAddr());
- }
-
- @Override
- public AsyncEndPoint getEndPoint()
- {
- return (AsyncEndPoint)super.getEndPoint();
- }
-
- @Override
- protected void startRequest(Buffer method, Buffer uri, Buffer httpVersion) throws IOException
- {
- SPDYServerConnector connector = (SPDYServerConnector)getConnector();
- String scheme = connector.getSslContextFactory() != null ? "https" : "http";
- headers.put(HTTPSPDYHeader.SCHEME.name(version), scheme);
- headers.put(HTTPSPDYHeader.METHOD.name(version), method.toString("UTF-8"));
- headers.put(HTTPSPDYHeader.URI.name(version), uri.toString("UTF-8"));
- headers.put(HTTPSPDYHeader.VERSION.name(version), httpVersion.toString("UTF-8"));
- }
-
- @Override
- protected void parsedHeader(Buffer name, Buffer value) throws IOException
- {
- String headerName = name.toString("UTF-8").toLowerCase();
- String headerValue = value.toString("UTF-8");
- switch (headerName)
- {
- case "host":
- headers.put(HTTPSPDYHeader.HOST.name(version), headerValue);
- break;
- default:
- headers.put(headerName, headerValue);
- break;
- }
- }
-
- @Override
- protected void headerComplete() throws IOException
- {
- }
-
- @Override
- protected void content(Buffer buffer) throws IOException
- {
- if (content == null)
- {
- stream = syn(false);
- content = buffer;
- }
- else
- {
- stream.getStreamFrameListener().onData(stream, toDataInfo(buffer, false));
- }
- }
-
- @Override
- public void messageComplete(long contentLength) throws IOException
- {
- if (stream == null)
- {
- assert content == null;
- if (headers.isEmpty())
- proxyEngineSelector.onGoAway(session, new GoAwayInfo(0, SessionStatus.OK));
- else
- syn(true);
- }
- else
- {
- stream.getStreamFrameListener().onData(stream, toDataInfo(content, true));
- }
- headers.clear();
- stream = null;
- content = null;
- }
-
- private HTTPStream syn(boolean close)
- {
- HTTPStream stream = new HTTPStream(1, (byte)0, session, null);
- StreamFrameListener streamFrameListener = proxyEngineSelector.onSyn(stream, new SynInfo(headers, close));
- stream.setStreamFrameListener(streamFrameListener);
- return stream;
- }
-
- private DataInfo toDataInfo(Buffer buffer, boolean close)
- {
- if (buffer instanceof ByteArrayBuffer)
- return new BytesDataInfo(buffer.array(), buffer.getIndex(), buffer.length(), close);
-
- if (buffer instanceof NIOBuffer)
- {
- ByteBuffer byteBuffer = ((NIOBuffer)buffer).getByteBuffer();
- byteBuffer.limit(buffer.putIndex());
- byteBuffer.position(buffer.getIndex());
- return new ByteBufferDataInfo(byteBuffer, close);
- }
-
- return new BytesDataInfo(buffer.asArray(), close);
- }
-
- private class HTTPSession extends StandardSession
- {
- private HTTPSession(short version, SPDYServerConnector connector)
- {
- super(version, connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), null, null, 1, proxyEngineSelector, null, null);
- }
-
- @Override
- public void rst(RstInfo rstInfo, long timeout, TimeUnit unit, Handler<Void> handler)
- {
- // Not much we can do in HTTP land: just close the connection
- goAway(timeout, unit, handler);
- }
-
- @Override
- public void goAway(long timeout, TimeUnit unit, Handler<Void> handler)
- {
- try
- {
- getEndPoint().close();
- handler.completed(null);
- }
- catch (IOException x)
- {
- handler.failed(null, x);
- }
- }
- }
-
- /**
- * <p>This stream will convert the SPDY invocations performed by the proxy into HTTP to be sent to the client.</p>
- */
- private class HTTPStream extends StandardStream
- {
- private final Pattern statusRegexp = Pattern.compile("(\\d{3})\\s*(.*)");
-
- private HTTPStream(int id, byte priority, ISession session, IStream associatedStream)
- {
- super(id, priority, session, associatedStream);
- }
-
- @Override
- public void syn(SynInfo synInfo, long timeout, TimeUnit unit, Handler<Stream> handler)
- {
- // HTTP does not support pushed streams
- handler.completed(new HTTPPushStream(2, getPriority(), getSession(), this));
- }
-
- @Override
- public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler<Void> handler)
- {
- // TODO
- throw new UnsupportedOperationException("Not Yet Implemented");
- }
-
- @Override
- public void reply(ReplyInfo replyInfo, long timeout, TimeUnit unit, Handler<Void> handler)
- {
- try
- {
- Headers headers = new Headers(replyInfo.getHeaders(), false);
-
- headers.remove(HTTPSPDYHeader.SCHEME.name(version));
-
- String status = headers.remove(HTTPSPDYHeader.STATUS.name(version)).value();
- Matcher matcher = statusRegexp.matcher(status);
- matcher.matches();
- int code = Integer.parseInt(matcher.group(1));
- String reason = matcher.group(2);
- generator.setResponse(code, reason);
-
- String httpVersion = headers.remove(HTTPSPDYHeader.VERSION.name(version)).value();
- generator.setVersion(Integer.parseInt(httpVersion.replaceAll("\\D", "")));
-
- Headers.Header host = headers.remove(HTTPSPDYHeader.HOST.name(version));
- if (host != null)
- headers.put("host", host.value());
-
- HttpFields fields = new HttpFields();
- for (Headers.Header header : headers)
- {
- String name = camelize(header.name());
- fields.put(name, header.value());
- }
- generator.completeHeader(fields, replyInfo.isClose());
-
- if (replyInfo.isClose())
- complete();
-
- handler.completed(null);
- }
- catch (IOException x)
- {
- handler.failed(null, x);
- }
- }
-
- private String camelize(String name)
- {
- char[] chars = name.toCharArray();
- chars[0] = Character.toUpperCase(chars[0]);
-
- for (int i = 0; i < chars.length; ++i)
- {
- char c = chars[i];
- int j = i + 1;
- if (c == '-' && j < chars.length)
- chars[j] = Character.toUpperCase(chars[j]);
- }
- return new String(chars);
- }
-
- @Override
- public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Handler<Void> handler)
- {
- try
- {
- // Data buffer must be copied, as the ByteBuffer is pooled
- ByteBuffer byteBuffer = dataInfo.asByteBuffer(false);
-
- Buffer buffer = byteBuffer.isDirect() ?
- new DirectNIOBuffer(byteBuffer, false) :
- new IndirectNIOBuffer(byteBuffer, false);
-
- generator.addContent(buffer, dataInfo.isClose());
- generator.flush(unit.toMillis(timeout));
-
- if (dataInfo.isClose())
- complete();
-
- handler.completed(null);
- }
- catch (IOException x)
- {
- handler.failed(null, x);
- }
- }
-
- private void complete() throws IOException
- {
- generator.complete();
- // We need to call asyncDispatch() as if the HTTP request
- // has been suspended and now we complete the response
- getEndPoint().asyncDispatch();
- }
- }
-
- private class HTTPPushStream extends StandardStream
- {
- private HTTPPushStream(int id, byte priority, ISession session, IStream associatedStream)
- {
- super(id, priority, session, associatedStream);
- }
-
- @Override
- public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler<Void> handler)
- {
- // Ignore pushed headers
- handler.completed(null);
- }
-
- @Override
- public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Handler<Void> handler)
- {
- // Ignore pushed data
- handler.completed(null);
- }
- }
+// private final short version;
+// private final Headers headers = new Headers();
+// private final ProxyEngineSelector proxyEngineSelector;
+// private final HttpGenerator generator;
+// private final ISession session;
+// private HTTPStream stream;
+// private Buffer content;
+//
+// public ProxyHTTPSPDYAsyncConnection(SPDYServerConnector connector, EndPoint endPoint, short version, ProxyEngineSelector proxyEngineSelector)
+// {
+// super(new HttpConfiguration(connector.getSslContextFactory(), connector.getSslContextFactory() != null), connector, endPoint);
+// this.version = version;
+// this.proxyEngineSelector = proxyEngineSelector;
+// this.generator = (HttpGenerator)getGenerator();
+// this.session = new HTTPSession(version, connector);
+// this.session.setAttribute("org.eclipse.jetty.spdy.remoteAddress", endPoint.getRemoteAddress());
+// }
+//
+// @Override
+// public EndPoint getEndPoint()
+// {
+// return super.getEndPoint();
+// }
+//
+// @Override
+// protected void startRequest(Buffer method, Buffer uri, Buffer httpVersion) throws IOException
+// {
+// SPDYServerConnector connector = (SPDYServerConnector)getConnector();
+// String scheme = connector.getSslContextFactory() != null ? "https" : "http";
+// headers.put(HTTPSPDYHeader.SCHEME.name(version), scheme);
+// headers.put(HTTPSPDYHeader.METHOD.name(version), method.toString("UTF-8"));
+// headers.put(HTTPSPDYHeader.URI.name(version), uri.toString("UTF-8"));
+// headers.put(HTTPSPDYHeader.VERSION.name(version), httpVersion.toString("UTF-8"));
+// }
+//
+// @Override
+// protected void parsedHeader(Buffer name, Buffer value) throws IOException
+// {
+// String headerName = name.toString("UTF-8").toLowerCase();
+// String headerValue = value.toString("UTF-8");
+// switch (headerName)
+// {
+// case "host":
+// headers.put(HTTPSPDYHeader.HOST.name(version), headerValue);
+// break;
+// default:
+// headers.put(headerName, headerValue);
+// break;
+// }
+// }
+//
+// @Override
+// protected void headerComplete() throws IOException
+// {
+// }
+//
+// @Override
+// protected void content(Buffer buffer) throws IOException
+// {
+// if (content == null)
+// {
+// stream = syn(false);
+// content = buffer;
+// }
+// else
+// {
+// stream.getStreamFrameListener().onData(stream, toDataInfo(buffer, false));
+// }
+// }
+//
+// @Override
+// public void messageComplete(long contentLength) throws IOException
+// {
+// if (stream == null)
+// {
+// assert content == null;
+// if (headers.isEmpty())
+// proxyEngineSelector.onGoAway(session, new GoAwayInfo(0, SessionStatus.OK));
+// else
+// syn(true);
+// }
+// else
+// {
+// stream.getStreamFrameListener().onData(stream, toDataInfo(content, true));
+// }
+// headers.clear();
+// stream = null;
+// content = null;
+// }
+//
+// private HTTPStream syn(boolean close)
+// {
+// HTTPStream stream = new HTTPStream(1, (byte)0, session, null);
+// StreamFrameListener streamFrameListener = proxyEngineSelector.onSyn(stream, new SynInfo(headers, close));
+// stream.setStreamFrameListener(streamFrameListener);
+// return stream;
+// }
+//
+// private DataInfo toDataInfo(Buffer buffer, boolean close)
+// {
+// if (buffer instanceof ByteArrayBuffer)
+// return new BytesDataInfo(buffer.array(), buffer.getIndex(), buffer.length(), close);
+//
+// if (buffer instanceof NIOBuffer)
+// {
+// ByteBuffer byteBuffer = ((NIOBuffer)buffer).getByteBuffer();
+// byteBuffer.limit(buffer.putIndex());
+// byteBuffer.position(buffer.getIndex());
+// return new ByteBufferDataInfo(byteBuffer, close);
+// }
+//
+// return new BytesDataInfo(buffer.asArray(), close);
+// }
+//
+// private class HTTPSession extends StandardSession
+// {
+// private HTTPSession(short version, SPDYServerConnector connector)
+// {
+// super(version, connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), null, null, 1, proxyEngineSelector, null, null);
+// }
+//
+// @Override
+// public void rst(RstInfo rstInfo, long timeout, TimeUnit unit, Handler<Void> handler)
+// {
+// // Not much we can do in HTTP land: just close the connection
+// goAway(timeout, unit, handler);
+// }
+//
+// @Override
+// public void goAway(long timeout, TimeUnit unit, Handler<Void> handler)
+// {
+// try
+// {
+// getEndPoint().close();
+// handler.completed(null);
+// }
+// catch (IOException x)
+// {
+// handler.failed(null, x);
+// }
+// }
+// }
+//
+// /**
+// * <p>This stream will convert the SPDY invocations performed by the proxy into HTTP to be sent to the client.</p>
+// */
+// private class HTTPStream extends StandardStream
+// {
+// private final Pattern statusRegexp = Pattern.compile("(\\d{3})\\s*(.*)");
+//
+// private HTTPStream(int id, byte priority, ISession session, IStream associatedStream)
+// {
+// super(id, priority, session, associatedStream);
+// }
+//
+// @Override
+// public void syn(SynInfo synInfo, long timeout, TimeUnit unit, Handler<Stream> handler)
+// {
+// // HTTP does not support pushed streams
+// handler.completed(new HTTPPushStream(2, getPriority(), getSession(), this));
+// }
+//
+// @Override
+// public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler<Void> handler)
+// {
+// // TODO
+// throw new UnsupportedOperationException("Not Yet Implemented");
+// }
+//
+// @Override
+// public void reply(ReplyInfo replyInfo, long timeout, TimeUnit unit, Handler<Void> handler)
+// {
+// try
+// {
+// Headers headers = new Headers(replyInfo.getHeaders(), false);
+//
+// headers.remove(HTTPSPDYHeader.SCHEME.name(version));
+//
+// String status = headers.remove(HTTPSPDYHeader.STATUS.name(version)).value();
+// Matcher matcher = statusRegexp.matcher(status);
+// matcher.matches();
+// int code = Integer.parseInt(matcher.group(1));
+// String reason = matcher.group(2);
+// generator.setResponse(code, reason);
+//
+// String httpVersion = headers.remove(HTTPSPDYHeader.VERSION.name(version)).value();
+// generator.setVersion(Integer.parseInt(httpVersion.replaceAll("\\D", "")));
+//
+// Headers.Header host = headers.remove(HTTPSPDYHeader.HOST.name(version));
+// if (host != null)
+// headers.put("host", host.value());
+//
+// HttpFields fields = new HttpFields();
+// for (Headers.Header header : headers)
+// {
+// String name = camelize(header.name());
+// fields.put(name, header.value());
+// }
+// generator.completeHeader(fields, replyInfo.isClose());
+//
+// if (replyInfo.isClose())
+// complete();
+//
+// handler.completed(null);
+// }
+// catch (IOException x)
+// {
+// handler.failed(null, x);
+// }
+// }
+//
+// private String camelize(String name)
+// {
+// char[] chars = name.toCharArray();
+// chars[0] = Character.toUpperCase(chars[0]);
+//
+// for (int i = 0; i < chars.length; ++i)
+// {
+// char c = chars[i];
+// int j = i + 1;
+// if (c == '-' && j < chars.length)
+// chars[j] = Character.toUpperCase(chars[j]);
+// }
+// return new String(chars);
+// }
+//
+// @Override
+// public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Handler<Void> handler)
+// {
+// try
+// {
+// // Data buffer must be copied, as the ByteBuffer is pooled
+// ByteBuffer byteBuffer = dataInfo.asByteBuffer(false);
+//
+// Buffer buffer = byteBuffer.isDirect() ?
+// new DirectNIOBuffer(byteBuffer, false) :
+// new IndirectNIOBuffer(byteBuffer, false);
+//
+// generator.addContent(buffer, dataInfo.isClose());
+// generator.flush(unit.toMillis(timeout));
+//
+// if (dataInfo.isClose())
+// complete();
+//
+// handler.completed(null);
+// }
+// catch (IOException x)
+// {
+// handler.failed(null, x);
+// }
+// }
+//
+// private void complete() throws IOException
+// {
+// generator.complete();
+// // We need to call asyncDispatch() as if the HTTP request
+// // has been suspended and now we complete the response
+// getEndPoint().asyncDispatch();
+// }
+// }
+//
+// private class HTTPPushStream extends StandardStream
+// {
+// private HTTPPushStream(int id, byte priority, ISession session, IStream associatedStream)
+// {
+// super(id, priority, session, associatedStream);
+// }
+//
+// @Override
+// public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler<Void> handler)
+// {
+// // Ignore pushed headers
+// handler.completed(null);
+// }
+//
+// @Override
+// public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Handler<Void> handler)
+// {
+// // Ignore pushed data
+// handler.completed(null);
+// }
+// }
}
diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/SPDYProxyEngine.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/SPDYProxyEngine.java
index c01af3e..cd43b8f 100644
--- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/SPDYProxyEngine.java
+++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/SPDYProxyEngine.java
@@ -14,501 +14,476 @@
package org.eclipse.jetty.spdy.proxy;
-import java.net.InetSocketAddress;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-
-import org.eclipse.jetty.spdy.SPDYClient;
-import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
-import org.eclipse.jetty.spdy.api.DataInfo;
-import org.eclipse.jetty.spdy.api.GoAwayInfo;
-import org.eclipse.jetty.spdy.api.Handler;
-import org.eclipse.jetty.spdy.api.Headers;
-import org.eclipse.jetty.spdy.api.HeadersInfo;
-import org.eclipse.jetty.spdy.api.ReplyInfo;
-import org.eclipse.jetty.spdy.api.RstInfo;
-import org.eclipse.jetty.spdy.api.SPDY;
-import org.eclipse.jetty.spdy.api.Session;
-import org.eclipse.jetty.spdy.api.SessionFrameListener;
-import org.eclipse.jetty.spdy.api.Stream;
-import org.eclipse.jetty.spdy.api.StreamFrameListener;
-import org.eclipse.jetty.spdy.api.StreamStatus;
-import org.eclipse.jetty.spdy.api.SynInfo;
-import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
-
/**
* <p>{@link SPDYProxyEngine} implements a SPDY to SPDY proxy, that is, converts SPDY events received by
* clients into SPDY events for the servers.</p>
*/
-public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
+public class SPDYProxyEngine //extends ProxyEngine implements StreamFrameListener
{
- private static final String STREAM_HANDLER_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.streamHandler";
- private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.clientStream";
-
- private final ConcurrentMap<String, Session> serverSessions = new ConcurrentHashMap<>();
- private final SessionFrameListener sessionListener = new ProxySessionFrameListener();
- private final SPDYClient.Factory factory;
- private volatile long connectTimeout = 15000;
- private volatile long timeout = 60000;
-
- public SPDYProxyEngine(SPDYClient.Factory factory)
- {
- this.factory = factory;
- }
-
- public long getConnectTimeout()
- {
- return connectTimeout;
- }
-
- public void setConnectTimeout(long connectTimeout)
- {
- this.connectTimeout = connectTimeout;
- }
-
- public long getTimeout()
- {
- return timeout;
- }
-
- public void setTimeout(long timeout)
- {
- this.timeout = timeout;
- }
-
- public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo)
- {
- Headers headers = new Headers(clientSynInfo.getHeaders(), false);
-
- short serverVersion = getVersion(proxyServerInfo.getProtocol());
- InetSocketAddress address = proxyServerInfo.getAddress();
- Session serverSession = produceSession(proxyServerInfo.getHost(), serverVersion, address);
- if (serverSession == null)
- {
- rst(clientStream);
- return null;
- }
-
- final Session clientSession = clientStream.getSession();
-
- addRequestProxyHeaders(clientStream, headers);
- customizeRequestHeaders(clientStream, headers);
- convert(clientSession.getVersion(), serverVersion, headers);
-
- SynInfo serverSynInfo = new SynInfo(headers, clientSynInfo.isClose());
- StreamFrameListener listener = new ProxyStreamFrameListener(clientStream);
- StreamHandler handler = new StreamHandler(clientStream, serverSynInfo);
- clientStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
- serverSession.syn(serverSynInfo, listener, timeout, TimeUnit.MILLISECONDS, handler);
- return this;
- }
-
- private static short getVersion(String protocol)
- {
- switch (protocol)
- {
- case "spdy/2":
- return SPDY.V2;
- case "spdy/3":
- return SPDY.V3;
- default:
- throw new IllegalArgumentException("Procotol: " + protocol + " is not a known SPDY protocol");
- }
- }
-
- @Override
- public void onReply(Stream stream, ReplyInfo replyInfo)
- {
- // Servers do not receive replies
- }
-
- @Override
- public void onHeaders(Stream stream, HeadersInfo headersInfo)
- {
- // TODO
- throw new UnsupportedOperationException("Not Yet Implemented");
- }
-
- @Override
- public void onData(Stream clientStream, final DataInfo clientDataInfo)
- {
- logger.debug("C -> P {} on {}", clientDataInfo, clientStream);
-
- ByteBufferDataInfo serverDataInfo = new ByteBufferDataInfo(clientDataInfo.asByteBuffer(false), clientDataInfo.isClose())
- {
- @Override
- public void consume(int delta)
- {
- super.consume(delta);
- clientDataInfo.consume(delta);
- }
- };
-
- StreamHandler streamHandler = (StreamHandler)clientStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
- streamHandler.data(serverDataInfo);
- }
-
- private Session produceSession(String host, short version, InetSocketAddress address)
- {
- try
- {
- Session session = serverSessions.get(host);
- if (session == null)
- {
- SPDYClient client = factory.newSPDYClient(version);
- session = client.connect(address, sessionListener).get(getConnectTimeout(), TimeUnit.MILLISECONDS);
- logger.debug("Proxy session connected to {}", address);
- Session existing = serverSessions.putIfAbsent(host, session);
- if (existing != null)
- {
- session.goAway(getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
- session = existing;
- }
- }
- return session;
- }
- catch (Exception x)
- {
- logger.debug(x);
- return null;
- }
- }
-
- private void convert(short fromVersion, short toVersion, Headers headers)
- {
- if (fromVersion != toVersion)
- {
- for (HTTPSPDYHeader httpHeader : HTTPSPDYHeader.values())
- {
- Headers.Header header = headers.remove(httpHeader.name(fromVersion));
- if (header != null)
- {
- String toName = httpHeader.name(toVersion);
- for (String value : header.values())
- headers.add(toName, value);
- }
- }
- }
- }
-
- private void rst(Stream stream)
- {
- RstInfo rstInfo = new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM);
- stream.getSession().rst(rstInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
- }
-
- private class ProxyStreamFrameListener extends StreamFrameListener.Adapter
- {
- private final Stream clientStream;
- private volatile ReplyInfo replyInfo;
-
- public ProxyStreamFrameListener(Stream clientStream)
- {
- this.clientStream = clientStream;
- }
-
- @Override
- public void onReply(final Stream stream, ReplyInfo replyInfo)
- {
- logger.debug("S -> P {} on {}", replyInfo, stream);
-
- short serverVersion = stream.getSession().getVersion();
- Headers headers = new Headers(replyInfo.getHeaders(), false);
-
- addResponseProxyHeaders(stream, headers);
- customizeResponseHeaders(stream, headers);
- short clientVersion = this.clientStream.getSession().getVersion();
- convert(serverVersion, clientVersion, headers);
-
- this.replyInfo = new ReplyInfo(headers, replyInfo.isClose());
- if (replyInfo.isClose())
- reply(stream);
- }
-
- @Override
- public void onHeaders(Stream stream, HeadersInfo headersInfo)
- {
- // TODO
- throw new UnsupportedOperationException("Not Yet Implemented");
- }
-
- @Override
- public void onData(final Stream stream, final DataInfo dataInfo)
- {
- logger.debug("S -> P {} on {}", dataInfo, stream);
-
- if (replyInfo != null)
- {
- if (dataInfo.isClose())
- replyInfo.getHeaders().put("content-length", String.valueOf(dataInfo.available()));
- reply(stream);
- }
- data(stream, dataInfo);
- }
-
- private void reply(final Stream stream)
- {
- final ReplyInfo replyInfo = this.replyInfo;
- this.replyInfo = null;
- clientStream.reply(replyInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler<Void>()
- {
- @Override
- public void completed(Void context)
- {
- logger.debug("P -> C {} from {} to {}", replyInfo, stream, clientStream);
- }
-
- @Override
- public void failed(Void context, Throwable x)
- {
- logger.debug(x);
- rst(clientStream);
- }
- });
- }
-
- private void data(final Stream stream, final DataInfo dataInfo)
- {
- clientStream.data(dataInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler<Void>()
- {
- @Override
- public void completed(Void context)
- {
- dataInfo.consume(dataInfo.length());
- logger.debug("P -> C {} from {} to {}", dataInfo, stream, clientStream);
- }
-
- @Override
- public void failed(Void context, Throwable x)
- {
- logger.debug(x);
- rst(clientStream);
- }
- });
- }
- }
-
- /**
- * <p>{@link StreamHandler} implements the forwarding of DATA frames from the client to the server.</p>
- * <p>Instances of this class buffer DATA frames sent by clients and send them to the server.
- * The buffering happens between the send of the SYN_STREAM to the server (where DATA frames may arrive
- * from the client before the SYN_STREAM has been fully sent), and between DATA frames, if the client
- * is a fast producer and the server a slow consumer, or if the client is a SPDY v2 client (and hence
- * without flow control) while the server is a SPDY v3 server (and hence with flow control).</p>
- */
- private class StreamHandler implements Handler<Stream>
- {
- private final Queue<DataInfoHandler> queue = new LinkedList<>();
- private final Stream clientStream;
- private final SynInfo serverSynInfo;
- private Stream serverStream;
-
- private StreamHandler(Stream clientStream, SynInfo serverSynInfo)
- {
- this.clientStream = clientStream;
- this.serverSynInfo = serverSynInfo;
- }
-
- @Override
- public void completed(Stream serverStream)
- {
- logger.debug("P -> S {} from {} to {}", serverSynInfo, clientStream, serverStream);
-
- serverStream.setAttribute(CLIENT_STREAM_ATTRIBUTE, clientStream);
-
- DataInfoHandler dataInfoHandler;
- synchronized (queue)
- {
- this.serverStream = serverStream;
- dataInfoHandler = queue.peek();
- if (dataInfoHandler != null)
- {
- if (dataInfoHandler.flushing)
- {
- logger.debug("SYN completed, flushing {}, queue size {}", dataInfoHandler.dataInfo, queue.size());
- dataInfoHandler = null;
- }
- else
- {
- dataInfoHandler.flushing = true;
- logger.debug("SYN completed, queue size {}", queue.size());
- }
- }
- else
- {
- logger.debug("SYN completed, queue empty");
- }
- }
- if (dataInfoHandler != null)
- flush(serverStream, dataInfoHandler);
- }
-
- @Override
- public void failed(Stream serverStream, Throwable x)
- {
- logger.debug(x);
- rst(clientStream);
- }
-
- public void data(DataInfo dataInfo)
- {
- Stream serverStream;
- DataInfoHandler dataInfoHandler = null;
- DataInfoHandler item = new DataInfoHandler(dataInfo);
- synchronized (queue)
- {
- queue.offer(item);
- serverStream = this.serverStream;
- if (serverStream != null)
- {
- dataInfoHandler = queue.peek();
- if (dataInfoHandler.flushing)
- {
- logger.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoHandler.dataInfo, queue.size());
- serverStream = null;
- }
- else
- {
- dataInfoHandler.flushing = true;
- logger.debug("Queued {}, queue size {}", dataInfo, queue.size());
- }
- }
- else
- {
- logger.debug("Queued {}, SYN incomplete, queue size {}", dataInfo, queue.size());
- }
- }
- if (serverStream != null)
- flush(serverStream, dataInfoHandler);
- }
-
- private void flush(Stream serverStream, DataInfoHandler dataInfoHandler)
- {
- logger.debug("P -> S {} on {}", dataInfoHandler.dataInfo, serverStream);
- serverStream.data(dataInfoHandler.dataInfo, getTimeout(), TimeUnit.MILLISECONDS, dataInfoHandler);
- }
-
- private class DataInfoHandler implements Handler<Void>
- {
- private final DataInfo dataInfo;
- private boolean flushing;
-
- private DataInfoHandler(DataInfo dataInfo)
- {
- this.dataInfo = dataInfo;
- }
-
- @Override
- public void completed(Void context)
- {
- Stream serverStream;
- DataInfoHandler dataInfoHandler;
- synchronized (queue)
- {
- serverStream = StreamHandler.this.serverStream;
- assert serverStream != null;
- dataInfoHandler = queue.poll();
- assert dataInfoHandler == this;
- dataInfoHandler = queue.peek();
- if (dataInfoHandler != null)
- {
- assert !dataInfoHandler.flushing;
- dataInfoHandler.flushing = true;
- logger.debug("Completed {}, queue size {}", dataInfo, queue.size());
- }
- else
- {
- logger.debug("Completed {}, queue empty", dataInfo);
- }
- }
- if (dataInfoHandler != null)
- flush(serverStream, dataInfoHandler);
- }
-
- @Override
- public void failed(Void context, Throwable x)
- {
- logger.debug(x);
- rst(clientStream);
- }
- }
- }
-
- private class ProxySessionFrameListener extends SessionFrameListener.Adapter implements StreamFrameListener
- {
- @Override
- public StreamFrameListener onSyn(Stream serverStream, SynInfo serverSynInfo)
- {
- logger.debug("S -> P pushed {} on {}", serverSynInfo, serverStream);
-
- Headers headers = new Headers(serverSynInfo.getHeaders(), false);
-
- addResponseProxyHeaders(serverStream, headers);
- customizeResponseHeaders(serverStream, headers);
- Stream clientStream = (Stream)serverStream.getAssociatedStream().getAttribute(CLIENT_STREAM_ATTRIBUTE);
- convert(serverStream.getSession().getVersion(), clientStream.getSession().getVersion(), headers);
-
- StreamHandler handler = new StreamHandler(clientStream, serverSynInfo);
- serverStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
- clientStream.syn(new SynInfo(headers, serverSynInfo.isClose()), getTimeout(), TimeUnit.MILLISECONDS, handler);
-
- return this;
- }
-
- @Override
- public void onRst(Session serverSession, RstInfo serverRstInfo)
- {
- Stream serverStream = serverSession.getStream(serverRstInfo.getStreamId());
- if (serverStream != null)
- {
- Stream clientStream = (Stream)serverStream.getAttribute(CLIENT_STREAM_ATTRIBUTE);
- if (clientStream != null)
- {
- Session clientSession = clientStream.getSession();
- RstInfo clientRstInfo = new RstInfo(clientStream.getId(), serverRstInfo.getStreamStatus());
- clientSession.rst(clientRstInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
- }
- }
- }
-
- @Override
- public void onGoAway(Session serverSession, GoAwayInfo goAwayInfo)
- {
- serverSessions.values().remove(serverSession);
- }
-
- @Override
- public void onReply(Stream stream, ReplyInfo replyInfo)
- {
- // Push streams never send a reply
- }
-
- @Override
- public void onHeaders(Stream stream, HeadersInfo headersInfo)
- {
- throw new UnsupportedOperationException(); //TODO
- }
-
- @Override
- public void onData(Stream serverStream, final DataInfo serverDataInfo)
- {
- logger.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
-
- ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
- {
- @Override
- public void consume(int delta)
- {
- super.consume(delta);
- serverDataInfo.consume(delta);
- }
- };
-
- StreamHandler handler = (StreamHandler)serverStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
- handler.data(clientDataInfo);
- }
- }
+// private static final String STREAM_HANDLER_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.streamHandler";
+// private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.clientStream";
+//
+// private final ConcurrentMap<String, Session> serverSessions = new ConcurrentHashMap<>();
+// private final SessionFrameListener sessionListener = new ProxySessionFrameListener();
+// private final SPDYClient.Factory factory;
+// private volatile long connectTimeout = 15000;
+// private volatile long timeout = 60000;
+//
+// public SPDYProxyEngine(SPDYClient.Factory factory)
+// {
+// this.factory = factory;
+// }
+//
+// public long getConnectTimeout()
+// {
+// return connectTimeout;
+// }
+//
+// public void setConnectTimeout(long connectTimeout)
+// {
+// this.connectTimeout = connectTimeout;
+// }
+//
+// public long getTimeout()
+// {
+// return timeout;
+// }
+//
+// public void setTimeout(long timeout)
+// {
+// this.timeout = timeout;
+// }
+//
+// public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo)
+// {
+// Headers headers = new Headers(clientSynInfo.getHeaders(), false);
+//
+// short serverVersion = getVersion(proxyServerInfo.getProtocol());
+// InetSocketAddress address = proxyServerInfo.getAddress();
+// Session serverSession = produceSession(proxyServerInfo.getHost(), serverVersion, address);
+// if (serverSession == null)
+// {
+// rst(clientStream);
+// return null;
+// }
+//
+// final Session clientSession = clientStream.getSession();
+//
+// addRequestProxyHeaders(clientStream, headers);
+// customizeRequestHeaders(clientStream, headers);
+// convert(clientSession.getVersion(), serverVersion, headers);
+//
+// SynInfo serverSynInfo = new SynInfo(headers, clientSynInfo.isClose());
+// StreamFrameListener listener = new ProxyStreamFrameListener(clientStream);
+// StreamHandler handler = new StreamHandler(clientStream, serverSynInfo);
+// clientStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
+// serverSession.syn(serverSynInfo, listener, timeout, TimeUnit.MILLISECONDS, handler);
+// return this;
+// }
+//
+// private static short getVersion(String protocol)
+// {
+// switch (protocol)
+// {
+// case "spdy/2":
+// return SPDY.V2;
+// case "spdy/3":
+// return SPDY.V3;
+// default:
+// throw new IllegalArgumentException("Procotol: " + protocol + " is not a known SPDY protocol");
+// }
+// }
+//
+// @Override
+// public void onReply(Stream stream, ReplyInfo replyInfo)
+// {
+// // Servers do not receive replies
+// }
+//
+// @Override
+// public void onHeaders(Stream stream, HeadersInfo headersInfo)
+// {
+// // TODO
+// throw new UnsupportedOperationException("Not Yet Implemented");
+// }
+//
+// @Override
+// public void onData(Stream clientStream, final DataInfo clientDataInfo)
+// {
+// logger.debug("C -> P {} on {}", clientDataInfo, clientStream);
+//
+// ByteBufferDataInfo serverDataInfo = new ByteBufferDataInfo(clientDataInfo.asByteBuffer(false), clientDataInfo.isClose())
+// {
+// @Override
+// public void consume(int delta)
+// {
+// super.consume(delta);
+// clientDataInfo.consume(delta);
+// }
+// };
+//
+// StreamHandler streamHandler = (StreamHandler)clientStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
+// streamHandler.data(serverDataInfo);
+// }
+//
+// private Session produceSession(String host, short version, InetSocketAddress address)
+// {
+// try
+// {
+// Session session = serverSessions.get(host);
+// if (session == null)
+// {
+// SPDYClient client = factory.newSPDYClient(version);
+// session = client.connect(address, sessionListener).get(getConnectTimeout(), TimeUnit.MILLISECONDS);
+// logger.debug("Proxy session connected to {}", address);
+// Session existing = serverSessions.putIfAbsent(host, session);
+// if (existing != null)
+// {
+// session.goAway(getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
+// session = existing;
+// }
+// }
+// return session;
+// }
+// catch (Exception x)
+// {
+// logger.debug(x);
+// return null;
+// }
+// }
+//
+// private void convert(short fromVersion, short toVersion, Headers headers)
+// {
+// if (fromVersion != toVersion)
+// {
+// for (HTTPSPDYHeader httpHeader : HTTPSPDYHeader.values())
+// {
+// Headers.Header header = headers.remove(httpHeader.name(fromVersion));
+// if (header != null)
+// {
+// String toName = httpHeader.name(toVersion);
+// for (String value : header.values())
+// headers.add(toName, value);
+// }
+// }
+// }
+// }
+//
+// private void rst(Stream stream)
+// {
+// RstInfo rstInfo = new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM);
+// stream.getSession().rst(rstInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
+// }
+//
+// private class ProxyStreamFrameListener extends StreamFrameListener.Adapter
+// {
+// private final Stream clientStream;
+// private volatile ReplyInfo replyInfo;
+//
+// public ProxyStreamFrameListener(Stream clientStream)
+// {
+// this.clientStream = clientStream;
+// }
+//
+// @Override
+// public void onReply(final Stream stream, ReplyInfo replyInfo)
+// {
+// logger.debug("S -> P {} on {}", replyInfo, stream);
+//
+// short serverVersion = stream.getSession().getVersion();
+// Headers headers = new Headers(replyInfo.getHeaders(), false);
+//
+// addResponseProxyHeaders(stream, headers);
+// customizeResponseHeaders(stream, headers);
+// short clientVersion = this.clientStream.getSession().getVersion();
+// convert(serverVersion, clientVersion, headers);
+//
+// this.replyInfo = new ReplyInfo(headers, replyInfo.isClose());
+// if (replyInfo.isClose())
+// reply(stream);
+// }
+//
+// @Override
+// public void onHeaders(Stream stream, HeadersInfo headersInfo)
+// {
+// // TODO
+// throw new UnsupportedOperationException("Not Yet Implemented");
+// }
+//
+// @Override
+// public void onData(final Stream stream, final DataInfo dataInfo)
+// {
+// logger.debug("S -> P {} on {}", dataInfo, stream);
+//
+// if (replyInfo != null)
+// {
+// if (dataInfo.isClose())
+// replyInfo.getHeaders().put("content-length", String.valueOf(dataInfo.available()));
+// reply(stream);
+// }
+// data(stream, dataInfo);
+// }
+//
+// private void reply(final Stream stream)
+// {
+// final ReplyInfo replyInfo = this.replyInfo;
+// this.replyInfo = null;
+// clientStream.reply(replyInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler<Void>()
+// {
+// @Override
+// public void completed(Void context)
+// {
+// logger.debug("P -> C {} from {} to {}", replyInfo, stream, clientStream);
+// }
+//
+// @Override
+// public void failed(Void context, Throwable x)
+// {
+// logger.debug(x);
+// rst(clientStream);
+// }
+// });
+// }
+//
+// private void data(final Stream stream, final DataInfo dataInfo)
+// {
+// clientStream.data(dataInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler<Void>()
+// {
+// @Override
+// public void completed(Void context)
+// {
+// dataInfo.consume(dataInfo.length());
+// logger.debug("P -> C {} from {} to {}", dataInfo, stream, clientStream);
+// }
+//
+// @Override
+// public void failed(Void context, Throwable x)
+// {
+// logger.debug(x);
+// rst(clientStream);
+// }
+// });
+// }
+// }
+//
+// /**
+// * <p>{@link StreamHandler} implements the forwarding of DATA frames from the client to the server.</p>
+// * <p>Instances of this class buffer DATA frames sent by clients and send them to the server.
+// * The buffering happens between the send of the SYN_STREAM to the server (where DATA frames may arrive
+// * from the client before the SYN_STREAM has been fully sent), and between DATA frames, if the client
+// * is a fast producer and the server a slow consumer, or if the client is a SPDY v2 client (and hence
+// * without flow control) while the server is a SPDY v3 server (and hence with flow control).</p>
+// */
+// private class StreamHandler implements Handler<Stream>
+// {
+// private final Queue<DataInfoHandler> queue = new LinkedList<>();
+// private final Stream clientStream;
+// private final SynInfo serverSynInfo;
+// private Stream serverStream;
+//
+// private StreamHandler(Stream clientStream, SynInfo serverSynInfo)
+// {
+// this.clientStream = clientStream;
+// this.serverSynInfo = serverSynInfo;
+// }
+//
+// @Override
+// public void completed(Stream serverStream)
+// {
+// logger.debug("P -> S {} from {} to {}", serverSynInfo, clientStream, serverStream);
+//
+// serverStream.setAttribute(CLIENT_STREAM_ATTRIBUTE, clientStream);
+//
+// DataInfoHandler dataInfoHandler;
+// synchronized (queue)
+// {
+// this.serverStream = serverStream;
+// dataInfoHandler = queue.peek();
+// if (dataInfoHandler != null)
+// {
+// if (dataInfoHandler.flushing)
+// {
+// logger.debug("SYN completed, flushing {}, queue size {}", dataInfoHandler.dataInfo, queue.size());
+// dataInfoHandler = null;
+// }
+// else
+// {
+// dataInfoHandler.flushing = true;
+// logger.debug("SYN completed, queue size {}", queue.size());
+// }
+// }
+// else
+// {
+// logger.debug("SYN completed, queue empty");
+// }
+// }
+// if (dataInfoHandler != null)
+// flush(serverStream, dataInfoHandler);
+// }
+//
+// @Override
+// public void failed(Stream serverStream, Throwable x)
+// {
+// logger.debug(x);
+// rst(clientStream);
+// }
+//
+// public void data(DataInfo dataInfo)
+// {
+// Stream serverStream;
+// DataInfoHandler dataInfoHandler = null;
+// DataInfoHandler item = new DataInfoHandler(dataInfo);
+// synchronized (queue)
+// {
+// queue.offer(item);
+// serverStream = this.serverStream;
+// if (serverStream != null)
+// {
+// dataInfoHandler = queue.peek();
+// if (dataInfoHandler.flushing)
+// {
+// logger.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoHandler.dataInfo, queue.size());
+// serverStream = null;
+// }
+// else
+// {
+// dataInfoHandler.flushing = true;
+// logger.debug("Queued {}, queue size {}", dataInfo, queue.size());
+// }
+// }
+// else
+// {
+// logger.debug("Queued {}, SYN incomplete, queue size {}", dataInfo, queue.size());
+// }
+// }
+// if (serverStream != null)
+// flush(serverStream, dataInfoHandler);
+// }
+//
+// private void flush(Stream serverStream, DataInfoHandler dataInfoHandler)
+// {
+// logger.debug("P -> S {} on {}", dataInfoHandler.dataInfo, serverStream);
+// serverStream.data(dataInfoHandler.dataInfo, getTimeout(), TimeUnit.MILLISECONDS, dataInfoHandler);
+// }
+//
+// private class DataInfoHandler implements Handler<Void>
+// {
+// private final DataInfo dataInfo;
+// private boolean flushing;
+//
+// private DataInfoHandler(DataInfo dataInfo)
+// {
+// this.dataInfo = dataInfo;
+// }
+//
+// @Override
+// public void completed(Void context)
+// {
+// Stream serverStream;
+// DataInfoHandler dataInfoHandler;
+// synchronized (queue)
+// {
+// serverStream = StreamHandler.this.serverStream;
+// assert serverStream != null;
+// dataInfoHandler = queue.poll();
+// assert dataInfoHandler == this;
+// dataInfoHandler = queue.peek();
+// if (dataInfoHandler != null)
+// {
+// assert !dataInfoHandler.flushing;
+// dataInfoHandler.flushing = true;
+// logger.debug("Completed {}, queue size {}", dataInfo, queue.size());
+// }
+// else
+// {
+// logger.debug("Completed {}, queue empty", dataInfo);
+// }
+// }
+// if (dataInfoHandler != null)
+// flush(serverStream, dataInfoHandler);
+// }
+//
+// @Override
+// public void failed(Void context, Throwable x)
+// {
+// logger.debug(x);
+// rst(clientStream);
+// }
+// }
+// }
+//
+// private class ProxySessionFrameListener extends SessionFrameListener.Adapter implements StreamFrameListener
+// {
+// @Override
+// public StreamFrameListener onSyn(Stream serverStream, SynInfo serverSynInfo)
+// {
+// logger.debug("S -> P pushed {} on {}", serverSynInfo, serverStream);
+//
+// Headers headers = new Headers(serverSynInfo.getHeaders(), false);
+//
+// addResponseProxyHeaders(serverStream, headers);
+// customizeResponseHeaders(serverStream, headers);
+// Stream clientStream = (Stream)serverStream.getAssociatedStream().getAttribute(CLIENT_STREAM_ATTRIBUTE);
+// convert(serverStream.getSession().getVersion(), clientStream.getSession().getVersion(), headers);
+//
+// StreamHandler handler = new StreamHandler(clientStream, serverSynInfo);
+// serverStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
+// clientStream.syn(new SynInfo(headers, serverSynInfo.isClose()), getTimeout(), TimeUnit.MILLISECONDS, handler);
+//
+// return this;
+// }
+//
+// @Override
+// public void onRst(Session serverSession, RstInfo serverRstInfo)
+// {
+// Stream serverStream = serverSession.getStream(serverRstInfo.getStreamId());
+// if (serverStream != null)
+// {
+// Stream clientStream = (Stream)serverStream.getAttribute(CLIENT_STREAM_ATTRIBUTE);
+// if (clientStream != null)
+// {
+// Session clientSession = clientStream.getSession();
+// RstInfo clientRstInfo = new RstInfo(clientStream.getId(), serverRstInfo.getStreamStatus());
+// clientSession.rst(clientRstInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
+// }
+// }
+// }
+//
+// @Override
+// public void onGoAway(Session serverSession, GoAwayInfo goAwayInfo)
+// {
+// serverSessions.values().remove(serverSession);
+// }
+//
+// @Override
+// public void onReply(Stream stream, ReplyInfo replyInfo)
+// {
+// // Push streams never send a reply
+// }
+//
+// @Override
+// public void onHeaders(Stream stream, HeadersInfo headersInfo)
+// {
+// throw new UnsupportedOperationException(); //TODO
+// }
+//
+// @Override
+// public void onData(Stream serverStream, final DataInfo serverDataInfo)
+// {
+// logger.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
+//
+// ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
+// {
+// @Override
+// public void consume(int delta)
+// {
+// super.consume(delta);
+// serverDataInfo.consume(delta);
+// }
+// };
+//
+// StreamHandler handler = (StreamHandler)serverStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
+// handler.data(clientDataInfo);
+// }
+// }
}
diff --git a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/AbstractHTTPSPDYTest.java b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/AbstractHTTPSPDYTest.java
index a5c4c28..a9f0a6f 100644
--- a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/AbstractHTTPSPDYTest.java
+++ b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/AbstractHTTPSPDYTest.java
@@ -18,9 +18,9 @@ import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
+import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.spdy.AsyncConnectionFactory;
import org.eclipse.jetty.spdy.SPDYClient;
import org.eclipse.jetty.spdy.SPDYServerConnector;
import org.eclipse.jetty.spdy.api.SPDY;
@@ -70,9 +70,9 @@ public abstract class AbstractHTTPSPDYTest
protected SPDYServerConnector newHTTPSPDYServerConnector(short version)
{
// For these tests, we need the connector to speak HTTP over SPDY even in non-SSL
- SPDYServerConnector connector = new HTTPSPDYServerConnector();
- AsyncConnectionFactory defaultFactory = new ServerHTTPSPDYAsyncConnectionFactory(version, connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), connector, new PushStrategy.None());
- connector.setDefaultAsyncConnectionFactory(defaultFactory);
+ SPDYServerConnector connector = new HTTPSPDYServerConnector(server);
+ ConnectionFactory defaultFactory = new ServerHTTPSPDYAsyncConnectionFactory(version, connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), connector, new PushStrategy.None());
+ connector.setDefaultConnectionFactory(defaultFactory);
return connector;
}
diff --git a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ProtocolNegotiationTest.java b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ProtocolNegotiationTest.java
index 5de439b..567ac60 100644
--- a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ProtocolNegotiationTest.java
+++ b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ProtocolNegotiationTest.java
@@ -55,7 +55,7 @@ public class ProtocolNegotiationTest
{
server = new Server();
if (connector == null)
- connector = new SPDYServerConnector(null, newSslContextFactory());
+// connector = new SPDYServerConnector(null, newSslContextFactory()); //TODO:
connector.setPort(0);
this.connector = connector;
server.addConnector(connector);
@@ -79,8 +79,8 @@ public class ProtocolNegotiationTest
public void testServerAdvertisingHTTPSpeaksHTTP() throws Exception
{
InetSocketAddress address = startServer(null);
- connector.removeAsyncConnectionFactory("spdy/2");
- connector.putAsyncConnectionFactory("http/1.1", new ServerHTTPAsyncConnectionFactory(connector));
+ connector.removeConnectionFactory("spdy/2");
+// connector.putConnectionFactory("http/1.1", new ServerHTTPAsyncConnectionFactory(connector)); //TODO:
SslContextFactory sslContextFactory = newSslContextFactory();
sslContextFactory.start();
@@ -136,7 +136,7 @@ public class ProtocolNegotiationTest
public void testServerAdvertisingSPDYAndHTTPSpeaksHTTPWhenNegotiated() throws Exception
{
InetSocketAddress address = startServer(null);
- connector.putAsyncConnectionFactory("http/1.1", new ServerHTTPAsyncConnectionFactory(connector));
+// connector.putAsyncConnectionFactory("http/1.1", new ServerHTTPAsyncConnectionFactory(connector)); //TODO:
SslContextFactory sslContextFactory = newSslContextFactory();
sslContextFactory.start();
@@ -194,10 +194,10 @@ public class ProtocolNegotiationTest
@Test
public void testServerAdvertisingSPDYAndHTTPSpeaksDefaultProtocolWhenNPNMissing() throws Exception
{
- SPDYServerConnector connector = new SPDYServerConnector(null, newSslContextFactory());
- connector.setDefaultAsyncConnectionFactory(new ServerHTTPAsyncConnectionFactory(connector));
+// SPDYServerConnector connector = new SPDYServerConnector(null, newSslContextFactory()); //TODO:
+// connector.setDefaultConnectionFactory(new ServerHTTPConnectionFactory(connector));
InetSocketAddress address = startServer(connector);
- connector.putAsyncConnectionFactory("http/1.1", new ServerHTTPAsyncConnectionFactory(connector));
+// connector.putAsyncConnectionFactory("http/1.1", new ServerHTTPAsyncConnectionFactory(connector));
SslContextFactory sslContextFactory = newSslContextFactory();
sslContextFactory.start();
diff --git a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/PushStrategyBenchmarkTest.java b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/PushStrategyBenchmarkTest.java
index 9486157..a680ac5 100644
--- a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/PushStrategyBenchmarkTest.java
+++ b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/PushStrategyBenchmarkTest.java
@@ -15,7 +15,6 @@
package org.eclipse.jetty.spdy.http;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -29,13 +28,8 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import junit.framework.Assert;
-import org.eclipse.jetty.client.Address;
-import org.eclipse.jetty.client.ContentExchange;
-import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
-import org.eclipse.jetty.spdy.AsyncConnectionFactory;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.SPDY;
@@ -44,7 +38,6 @@ import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.SynInfo;
-import org.junit.Test;
public class PushStrategyBenchmarkTest extends AbstractHTTPSPDYTest
{
@@ -62,119 +55,121 @@ public class PushStrategyBenchmarkTest extends AbstractHTTPSPDYTest
private final long roundtrip = 100;
private final int runs = 10;
- @Test
- public void benchmarkPushStrategy() throws Exception
- {
- InetSocketAddress address = startHTTPServer(version(), new PushStrategyBenchmarkHandler());
-
- // Plain HTTP
- AsyncConnectionFactory dacf = new ServerHTTPAsyncConnectionFactory(connector);
- connector.setDefaultAsyncConnectionFactory(dacf);
- HttpClient httpClient = new HttpClient();
- // Simulate browsers, that open 6 connection per origin
- httpClient.setMaxConnectionsPerAddress(6);
- httpClient.start();
- benchmarkHTTP(httpClient);
- httpClient.stop();
-
- // First push strategy
- PushStrategy pushStrategy = new PushStrategy.None();
- dacf = new ServerHTTPSPDYAsyncConnectionFactory(version(), connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), connector, pushStrategy);
- connector.setDefaultAsyncConnectionFactory(dacf);
- Session session = startClient(version(), address, new ClientSessionFrameListener());
- benchmarkSPDY(pushStrategy, session);
- session.goAway().get(5, TimeUnit.SECONDS);
-
- // Second push strategy
- pushStrategy = new ReferrerPushStrategy();
- dacf = new ServerHTTPSPDYAsyncConnectionFactory(version(), connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), connector, pushStrategy);
- connector.setDefaultAsyncConnectionFactory(dacf);
- session = startClient(version(), address, new ClientSessionFrameListener());
- benchmarkSPDY(pushStrategy, session);
- session.goAway().get(5, TimeUnit.SECONDS);
- }
-
- private void benchmarkHTTP(HttpClient httpClient) throws Exception
- {
- // Warm up
- performHTTPRequests(httpClient);
- performHTTPRequests(httpClient);
-
- long total = 0;
- for (int i = 0; i < runs; ++i)
- {
- long begin = System.nanoTime();
- int requests = performHTTPRequests(httpClient);
- long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - begin);
- total += elapsed;
- System.err.printf("HTTP: run %d, %d request(s), roundtrip delay %d ms, elapsed = %d%n",
- i, requests, roundtrip, elapsed);
- }
- System.err.printf("HTTP: roundtrip delay %d ms, average = %d%n%n",
- roundtrip, total / runs);
- }
-
- private int performHTTPRequests(HttpClient httpClient) throws Exception
- {
- int result = 0;
-
- for (int j = 0; j < htmlResources.length; ++j)
- {
- latch.set(new CountDownLatch(cssResources.length + jsResources.length + pngResources.length));
-
- String primaryPath = "/" + j + ".html";
- String referrer = new StringBuilder("http://localhost:").append(connector.getLocalPort()).append(primaryPath).toString();
- ContentExchange exchange = new ContentExchange(true);
- exchange.setMethod("GET");
- exchange.setRequestURI(primaryPath);
- exchange.setVersion("HTTP/1.1");
- exchange.setAddress(new Address("localhost", connector.getLocalPort()));
- exchange.setRequestHeader("Host", "localhost:" + connector.getLocalPort());
- ++result;
- httpClient.send(exchange);
- Assert.assertEquals(HttpExchange.STATUS_COMPLETED, exchange.waitForDone());
- Assert.assertEquals(200, exchange.getResponseStatus());
-
- for (int i = 0; i < cssResources.length; ++i)
- {
- String path = "/" + i + ".css";
- exchange = createExchangeWithReferrer(referrer, path);
- ++result;
- httpClient.send(exchange);
- }
- for (int i = 0; i < jsResources.length; ++i)
- {
- String path = "/" + i + ".js";
- exchange = createExchangeWithReferrer(referrer, path);
- ++result;
- httpClient.send(exchange);
- }
- for (int i = 0; i < pngResources.length; ++i)
- {
- String path = "/" + i + ".png";
- exchange = createExchangeWithReferrer(referrer, path);
- ++result;
- httpClient.send(exchange);
- }
-
- Assert.assertTrue(latch.get().await(5, TimeUnit.SECONDS));
- }
-
- return result;
- }
-
- private ContentExchange createExchangeWithReferrer(String referrer, String path)
- {
- ContentExchange exchange;
- exchange = new TestExchange();
- exchange.setMethod("GET");
- exchange.setRequestURI(path);
- exchange.setVersion("HTTP/1.1");
- exchange.setAddress(new Address("localhost", connector.getLocalPort()));
- exchange.setRequestHeader("Host", "localhost:" + connector.getLocalPort());
- exchange.setRequestHeader("referer", referrer);
- return exchange;
- }
+ //TODO:
+
+// @Test
+// public void benchmarkPushStrategy() throws Exception
+// {
+// InetSocketAddress address = startHTTPServer(version(), new PushStrategyBenchmarkHandler());
+//
+// // Plain HTTP
+// ConnectionFactory dacf = new ServerHTTPAsyncConnectionFactory(connector);
+// connector.setDefaultAsyncConnectionFactory(dacf);
+// HttpClient httpClient = new HttpClient();
+// // Simulate browsers, that open 6 connection per origin
+// httpClient.setMaxConnectionsPerAddress(6);
+// httpClient.start();
+// benchmarkHTTP(httpClient);
+// httpClient.stop();
+//
+// // First push strategy
+// PushStrategy pushStrategy = new PushStrategy.None();
+// dacf = new ServerHTTPSPDYAsyncConnectionFactory(version(), connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), connector, pushStrategy);
+// connector.setDefaultAsyncConnectionFactory(dacf);
+// Session session = startClient(version(), address, new ClientSessionFrameListener());
+// benchmarkSPDY(pushStrategy, session);
+// session.goAway().get(5, TimeUnit.SECONDS);
+//
+// // Second push strategy
+// pushStrategy = new ReferrerPushStrategy();
+// dacf = new ServerHTTPSPDYAsyncConnectionFactory(version(), connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), connector, pushStrategy);
+// connector.setDefaultAsyncConnectionFactory(dacf);
+// session = startClient(version(), address, new ClientSessionFrameListener());
+// benchmarkSPDY(pushStrategy, session);
+// session.goAway().get(5, TimeUnit.SECONDS);
+// }
+//
+// private void benchmarkHTTP(HttpClient httpClient) throws Exception
+// {
+// // Warm up
+// performHTTPRequests(httpClient);
+// performHTTPRequests(httpClient);
+//
+// long total = 0;
+// for (int i = 0; i < runs; ++i)
+// {
+// long begin = System.nanoTime();
+// int requests = performHTTPRequests(httpClient);
+// long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - begin);
+// total += elapsed;
+// System.err.printf("HTTP: run %d, %d request(s), roundtrip delay %d ms, elapsed = %d%n",
+// i, requests, roundtrip, elapsed);
+// }
+// System.err.printf("HTTP: roundtrip delay %d ms, average = %d%n%n",
+// roundtrip, total / runs);
+// }
+//
+// private int performHTTPRequests(HttpClient httpClient) throws Exception
+// {
+// int result = 0;
+//
+// for (int j = 0; j < htmlResources.length; ++j)
+// {
+// latch.set(new CountDownLatch(cssResources.length + jsResources.length + pngResources.length));
+//
+// String primaryPath = "/" + j + ".html";
+// String referrer = new StringBuilder("http://localhost:").append(connector.getLocalPort()).append(primaryPath).toString();
+// ContentExchange exchange = new ContentExchange(true);
+// exchange.setMethod("GET");
+// exchange.setRequestURI(primaryPath);
+// exchange.setVersion("HTTP/1.1");
+// exchange.setAddress(new Address("localhost", connector.getLocalPort()));
+// exchange.setRequestHeader("Host", "localhost:" + connector.getLocalPort());
+// ++result;
+// httpClient.send(exchange);
+// Assert.assertEquals(HttpExchange.STATUS_COMPLETED, exchange.waitForDone());
+// Assert.assertEquals(200, exchange.getResponseStatus());
+//
+// for (int i = 0; i < cssResources.length; ++i)
+// {
+// String path = "/" + i + ".css";
+// exchange = createExchangeWithReferrer(referrer, path);
+// ++result;
+// httpClient.send(exchange);
+// }
+// for (int i = 0; i < jsResources.length; ++i)
+// {
+// String path = "/" + i + ".js";
+// exchange = createExchangeWithReferrer(referrer, path);
+// ++result;
+// httpClient.send(exchange);
+// }
+// for (int i = 0; i < pngResources.length; ++i)
+// {
+// String path = "/" + i + ".png";
+// exchange = createExchangeWithReferrer(referrer, path);
+// ++result;
+// httpClient.send(exchange);
+// }
+//
+// Assert.assertTrue(latch.get().await(5, TimeUnit.SECONDS));
+// }
+//
+// return result;
+// }
+//
+// private ContentExchange createExchangeWithReferrer(String referrer, String path)
+// {
+// ContentExchange exchange;
+// exchange = new TestExchange();
+// exchange.setMethod("GET");
+// exchange.setRequestURI(path);
+// exchange.setVersion("HTTP/1.1");
+// exchange.setAddress(new Address("localhost", connector.getLocalPort()));
+// exchange.setRequestHeader("Host", "localhost:" + connector.getLocalPort());
+// exchange.setRequestHeader("referer", referrer);
+// return exchange;
+// }
private void benchmarkSPDY(PushStrategy pushStrategy, Session session) throws Exception
@@ -379,17 +374,18 @@ public class PushStrategyBenchmarkTest extends AbstractHTTPSPDYTest
}
}
- private class TestExchange extends ContentExchange
- {
- private TestExchange()
- {
- super(true);
- }
-
- @Override
- protected void onResponseComplete() throws IOException
- {
- latch.get().countDown();
- }
- }
+ //TODO:
+// private class TestExchange extends ContentExchange
+// {
+// private TestExchange()
+// {
+// super(true);
+// }
+//
+// @Override
+// protected void onResponseComplete() throws IOException
+// {
+// latch.get().countDown();
+// }
+// }
}
diff --git a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ReferrerPushStrategyTest.java b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ReferrerPushStrategyTest.java
index c362ae0..69b7d3b 100644
--- a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ReferrerPushStrategyTest.java
+++ b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ReferrerPushStrategyTest.java
@@ -11,12 +11,9 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
-import org.eclipse.jetty.spdy.AsyncConnectionFactory;
-import org.eclipse.jetty.spdy.SPDYServerConnector;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.ReplyInfo;
-import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.eclipse.jetty.spdy.api.Stream;
@@ -27,21 +24,23 @@ import org.junit.Test;
public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
{
- @Override
- protected SPDYServerConnector newHTTPSPDYServerConnector()
- {
- return new HTTPSPDYServerConnector()
- {
- private final AsyncConnectionFactory defaultAsyncConnectionFactory =
- new ServerHTTPSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), getScheduler(), this, new ReferrerPushStrategy());
- @Override
- protected AsyncConnectionFactory getDefaultAsyncConnectionFactory()
- {
- return defaultAsyncConnectionFactory;
- }
- };
- }
+ //TODO:
+// @Override
+// protected SPDYServerConnector newHTTPSPDYServerConnector()
+// {
+// return new HTTPSPDYServerConnector(server, new ReferrerPushStrategy())
+// {
+// private final ConnectionFactory defaultAsyncConnectionFactory =
+// new ServerHTTPSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), getScheduler(), this, new ReferrerPushStrategy());
+//
+// @Override
+// public ConnectionFactory getDefaultConnectionFactory()
+// {
+// return defaultAsyncConnectionFactory;
+// }
+// };
+// }
@Test
public void testAssociatedResourceIsPushed() throws Exception
diff --git a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ReferrerPushStrategyV2Test.java b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ReferrerPushStrategyV2Test.java
index 8bc79c3..38b61c9 100644
--- a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ReferrerPushStrategyV2Test.java
+++ b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ReferrerPushStrategyV2Test.java
@@ -23,9 +23,9 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
-import org.eclipse.jetty.spdy.AsyncConnectionFactory;
import org.eclipse.jetty.spdy.SPDYServerConnector;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Headers;
@@ -49,8 +49,8 @@ public class ReferrerPushStrategyV2Test extends AbstractHTTPSPDYTest
protected SPDYServerConnector newHTTPSPDYServerConnector(short version)
{
SPDYServerConnector connector = super.newHTTPSPDYServerConnector(version);
- AsyncConnectionFactory defaultFactory = new ServerHTTPSPDYAsyncConnectionFactory(version, connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), connector, new ReferrerPushStrategy());
- connector.setDefaultAsyncConnectionFactory(defaultFactory);
+ ConnectionFactory defaultFactory = new ServerHTTPSPDYAsyncConnectionFactory(version, connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), connector, new ReferrerPushStrategy());
+ connector.setDefaultConnectionFactory(defaultFactory);
return connector;
}
@@ -62,8 +62,8 @@ public class ReferrerPushStrategyV2Test extends AbstractHTTPSPDYTest
ReferrerPushStrategy pushStrategy = new ReferrerPushStrategy();
int referrerPushPeriod = 1000;
pushStrategy.setReferrerPushPeriod(referrerPushPeriod);
- AsyncConnectionFactory defaultFactory = new ServerHTTPSPDYAsyncConnectionFactory(version(), connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), connector, pushStrategy);
- connector.setDefaultAsyncConnectionFactory(defaultFactory);
+ ConnectionFactory defaultFactory = new ServerHTTPSPDYAsyncConnectionFactory(version(), connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), connector, pushStrategy);
+ connector.setDefaultConnectionFactory(defaultFactory);
Headers mainRequestHeaders = createHeadersWithoutReferrer(mainResource);
Session session1 = sendMainRequestAndCSSRequest(address, mainRequestHeaders);
@@ -84,8 +84,9 @@ public class ReferrerPushStrategyV2Test extends AbstractHTTPSPDYTest
ReferrerPushStrategy pushStrategy = new ReferrerPushStrategy();
int referrerPushPeriod = 1000;
pushStrategy.setReferrerPushPeriod(referrerPushPeriod);
- AsyncConnectionFactory defaultFactory = new ServerHTTPSPDYAsyncConnectionFactory(version(), connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), connector, pushStrategy);
- connector.setDefaultAsyncConnectionFactory(defaultFactory);
+ ConnectionFactory defaultFactory = new ServerHTTPSPDYAsyncConnectionFactory(version(),
+ connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), connector, pushStrategy);
+ connector.setDefaultConnectionFactory(defaultFactory);
Headers mainRequestHeaders = createHeadersWithoutReferrer(mainResource);
Session session1 = sendMainRequestAndCSSRequest(address, mainRequestHeaders);
@@ -105,8 +106,9 @@ public class ReferrerPushStrategyV2Test extends AbstractHTTPSPDYTest
ReferrerPushStrategy pushStrategy = new ReferrerPushStrategy();
pushStrategy.setMaxAssociatedResources(1);
- AsyncConnectionFactory defaultFactory = new ServerHTTPSPDYAsyncConnectionFactory(version(), connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), connector, pushStrategy);
- connector.setDefaultAsyncConnectionFactory(defaultFactory);
+ ConnectionFactory defaultFactory = new ServerHTTPSPDYAsyncConnectionFactory(version(),
+ connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), connector, pushStrategy);
+ connector.setDefaultConnectionFactory(defaultFactory);
Headers mainRequestHeaders = createHeadersWithoutReferrer(mainResource);
Session session1 = sendMainRequestAndCSSRequest(address, mainRequestHeaders);
diff --git a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYTest.java b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYTest.java
index 6375af2..a783336 100644
--- a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYTest.java
+++ b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYTest.java
@@ -28,9 +28,9 @@ import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import com.sun.xml.internal.ws.util.ByteArrayBuffer;
import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.continuation.ContinuationSupport;
-import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
@@ -45,6 +45,9 @@ import org.eclipse.jetty.spdy.api.SynInfo;
import org.junit.Assert;
import org.junit.Test;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
{
@Test
@@ -79,13 +82,13 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
- Assert.assertTrue(replyInfo.isClose());
+ assertThat("replyInfo.isClose is true", replyInfo.isClose(), is(true));
Headers replyHeaders = replyInfo.getHeaders();
- Assert.assertTrue(replyHeaders.get("status").value().contains("200"));
+ assertThat("status is 200", replyHeaders.get("status").value().contains("200"), is(true));
replyLatch.countDown();
}
});
- Assert.assertTrue(handlerLatch.await(5, TimeUnit.SECONDS));
+ Assert.assertTrue(handlerLatch.await(5000, TimeUnit.SECONDS)); //TODO: thomas
Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
}
diff --git a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYv2Test.java b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYv2Test.java
index 5bab1f5..cd47d5f 100644
--- a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYv2Test.java
+++ b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYv2Test.java
@@ -30,9 +30,9 @@ import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import com.sun.xml.internal.ws.util.ByteArrayBuffer;
import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.continuation.ContinuationSupport;
-import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
diff --git a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYv2Test.java b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYv2Test.java
index 6c2c89b..cc21429 100644
--- a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYv2Test.java
+++ b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYv2Test.java
@@ -13,752 +13,714 @@
package org.eclipse.jetty.spdy.proxy;
-import java.io.BufferedReader;
-import java.io.ByteArrayOutputStream;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.spdy.SPDYClient;
-import org.eclipse.jetty.spdy.SPDYServerConnector;
-import org.eclipse.jetty.spdy.ServerSPDYAsyncConnectionFactory;
-import org.eclipse.jetty.spdy.api.BytesDataInfo;
-import org.eclipse.jetty.spdy.api.DataInfo;
-import org.eclipse.jetty.spdy.api.GoAwayInfo;
-import org.eclipse.jetty.spdy.api.Handler;
-import org.eclipse.jetty.spdy.api.Headers;
-import org.eclipse.jetty.spdy.api.PingInfo;
-import org.eclipse.jetty.spdy.api.ReplyInfo;
-import org.eclipse.jetty.spdy.api.RstInfo;
-import org.eclipse.jetty.spdy.api.SPDY;
-import org.eclipse.jetty.spdy.api.Session;
-import org.eclipse.jetty.spdy.api.SessionFrameListener;
-import org.eclipse.jetty.spdy.api.Stream;
-import org.eclipse.jetty.spdy.api.StreamFrameListener;
-import org.eclipse.jetty.spdy.api.StreamStatus;
-import org.eclipse.jetty.spdy.api.SynInfo;
-import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
-import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatchman;
-import org.junit.runners.model.FrameworkMethod;
-
public class ProxyHTTPSPDYv2Test
{
- @Rule
- public final TestWatchman testName = new TestWatchman()
- {
- @Override
- public void starting(FrameworkMethod method)
- {
- super.starting(method);
- System.err.printf("Running %s.%s()%n",
- method.getMethod().getDeclaringClass().getName(),
- method.getName());
- }
- };
-
- private SPDYClient.Factory factory;
- private Server server;
- private Server proxy;
- private SPDYServerConnector proxyConnector;
-
- protected short version()
- {
- return SPDY.V2;
- }
-
- protected InetSocketAddress startServer(ServerSessionFrameListener listener) throws Exception
- {
- server = new Server();
- SPDYServerConnector serverConnector = new SPDYServerConnector(listener);
- serverConnector.setDefaultAsyncConnectionFactory(new ServerSPDYAsyncConnectionFactory(version(), serverConnector.getByteBufferPool(), serverConnector.getExecutor(), serverConnector.getScheduler(), listener));
- serverConnector.setPort(0);
- server.addConnector(serverConnector);
- server.start();
- return new InetSocketAddress("localhost", serverConnector.getLocalPort());
- }
-
- protected InetSocketAddress startProxy(InetSocketAddress address) throws Exception
- {
- proxy = new Server();
- ProxyEngineSelector proxyEngineSelector = new ProxyEngineSelector();
- SPDYProxyEngine spdyProxyEngine = new SPDYProxyEngine(factory);
- proxyEngineSelector.putProxyEngine("spdy/" + version(), spdyProxyEngine);
- proxyEngineSelector.putProxyServerInfo("localhost", new ProxyEngineSelector.ProxyServerInfo("spdy/" + version(), address.getHostName(), address.getPort()));
- proxyConnector = new HTTPSPDYProxyConnector(proxyEngineSelector);
- proxyConnector.setPort(0);
- proxy.addConnector(proxyConnector);
- proxy.start();
- return new InetSocketAddress("localhost", proxyConnector.getLocalPort());
- }
-
- @Before
- public void init() throws Exception
- {
- factory = new SPDYClient.Factory();
- factory.start();
- }
-
- @After
- public void destroy() throws Exception
- {
- if (server != null)
- {
- server.stop();
- server.join();
- }
- if (proxy != null)
- {
- proxy.stop();
- proxy.join();
- }
- factory.stop();
- }
-
- @Test
- public void testClosingClientDoesNotCloseServer() throws Exception
- {
- final CountDownLatch closeLatch = new CountDownLatch(1);
- InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
- {
- @Override
- public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
- {
- Headers responseHeaders = new Headers();
- responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
- responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
- stream.reply(new ReplyInfo(responseHeaders, true));
- return null;
- }
-
- @Override
- public void onGoAway(Session session, GoAwayInfo goAwayInfo)
- {
- closeLatch.countDown();
- }
- }));
-
- Socket client = new Socket();
- client.connect(proxyAddress);
- OutputStream output = client.getOutputStream();
-
- String request = "" +
- "GET / HTTP/1.1\r\n" +
- "Host: localhost:" + proxyAddress.getPort() + "\r\n" +
- "\r\n";
- output.write(request.getBytes("UTF-8"));
- output.flush();
-
- InputStream input = client.getInputStream();
- BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
- String line = reader.readLine();
- Assert.assertTrue(line.contains(" 200"));
- while (line.length() > 0)
- line = reader.readLine();
- Assert.assertFalse(reader.ready());
-
- client.close();
-
- // Must not close, other clients may still be connected
- Assert.assertFalse(closeLatch.await(1, TimeUnit.SECONDS));
- }
-
- @Test
- public void testGETThenNoContentFromTwoClients() throws Exception
- {
- InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
- {
- @Override
- public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
- {
- Assert.assertTrue(synInfo.isClose());
- Headers requestHeaders = synInfo.getHeaders();
- Assert.assertNotNull(requestHeaders.get("via"));
-
- Headers responseHeaders = new Headers();
- responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
- responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
- ReplyInfo replyInfo = new ReplyInfo(responseHeaders, true);
- stream.reply(replyInfo);
- return null;
- }
- }));
-
- Socket client1 = new Socket();
- client1.connect(proxyAddress);
- OutputStream output1 = client1.getOutputStream();
-
- String request = "" +
- "GET / HTTP/1.1\r\n" +
- "Host: localhost:" + proxyAddress.getPort() + "\r\n" +
- "\r\n";
- output1.write(request.getBytes("UTF-8"));
- output1.flush();
-
- InputStream input1 = client1.getInputStream();
- BufferedReader reader1 = new BufferedReader(new InputStreamReader(input1, "UTF-8"));
- String line = reader1.readLine();
- Assert.assertTrue(line.contains(" 200"));
- while (line.length() > 0)
- line = reader1.readLine();
- Assert.assertFalse(reader1.ready());
-
- // Perform another request with another client
- Socket client2 = new Socket();
- client2.connect(proxyAddress);
- OutputStream output2 = client2.getOutputStream();
-
- output2.write(request.getBytes("UTF-8"));
- output2.flush();
-
- InputStream input2 = client2.getInputStream();
- BufferedReader reader2 = new BufferedReader(new InputStreamReader(input2, "UTF-8"));
- line = reader2.readLine();
- Assert.assertTrue(line.contains(" 200"));
- while (line.length() > 0)
- line = reader2.readLine();
- Assert.assertFalse(reader2.ready());
-
- client1.close();
- client2.close();
- }
-
- @Test
- public void testGETThenSmallResponseContent() throws Exception
- {
- final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
- InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
- {
- @Override
- public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
- {
- Assert.assertTrue(synInfo.isClose());
- Headers requestHeaders = synInfo.getHeaders();
- Assert.assertNotNull(requestHeaders.get("via"));
-
- Headers responseHeaders = new Headers();
- responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
- responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
- ReplyInfo replyInfo = new ReplyInfo(responseHeaders, false);
- stream.reply(replyInfo);
- stream.data(new BytesDataInfo(data, true));
-
- return null;
- }
- }));
-
- Socket client = new Socket();
- client.connect(proxyAddress);
- OutputStream output = client.getOutputStream();
-
- String request = "" +
- "GET / HTTP/1.1\r\n" +
- "Host: localhost:" + proxyAddress.getPort() + "\r\n" +
- "\r\n";
- output.write(request.getBytes("UTF-8"));
- output.flush();
-
- InputStream input = client.getInputStream();
- BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
- String line = reader.readLine();
- Assert.assertTrue(line.contains(" 200"));
- while (line.length() > 0)
- line = reader.readLine();
- for (byte datum : data)
- Assert.assertEquals(datum, reader.read());
- Assert.assertFalse(reader.ready());
-
- // Perform another request so that we are sure we reset the states of parsers and generators
- output.write(request.getBytes("UTF-8"));
- output.flush();
-
- line = reader.readLine();
- Assert.assertTrue(line.contains(" 200"));
- while (line.length() > 0)
- line = reader.readLine();
- for (byte datum : data)
- Assert.assertEquals(datum, reader.read());
- Assert.assertFalse(reader.ready());
-
- client.close();
- }
-
- @Test
- public void testPOSTWithSmallRequestContentThenRedirect() throws Exception
- {
- final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
- InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
- {
- @Override
- public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
- {
- return new StreamFrameListener.Adapter()
- {
- @Override
- public void onData(Stream stream, DataInfo dataInfo)
- {
- dataInfo.consume(dataInfo.length());
- if (dataInfo.isClose())
- {
- Headers headers = new Headers();
- headers.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
- headers.put(HTTPSPDYHeader.STATUS.name(version()), "303 See Other");
- stream.reply(new ReplyInfo(headers, true));
- }
- }
- };
- }
- }));
-
- Socket client = new Socket();
- client.connect(proxyAddress);
- OutputStream output = client.getOutputStream();
-
- String request = "" +
- "POST / HTTP/1.1\r\n" +
- "Host: localhost:" + proxyAddress.getPort() + "\r\n" +
- "Content-Length: " + data.length + "\r\n" +
- "Content-Type: application/octet-stream\r\n" +
- "\r\n";
- output.write(request.getBytes("UTF-8"));
- output.write(data);
- output.flush();
-
- InputStream input = client.getInputStream();
- BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
- String line = reader.readLine();
- Assert.assertTrue(line.contains(" 303"));
- while (line.length() > 0)
- line = reader.readLine();
- Assert.assertFalse(reader.ready());
-
- // Perform another request so that we are sure we reset the states of parsers and generators
- output.write(request.getBytes("UTF-8"));
- output.write(data);
- output.flush();
-
- line = reader.readLine();
- Assert.assertTrue(line.contains(" 303"));
- while (line.length() > 0)
- line = reader.readLine();
- Assert.assertFalse(reader.ready());
-
- client.close();
- }
-
- @Test
- public void testPOSTWithSmallRequestContentThenSmallResponseContent() throws Exception
- {
- final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
- InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
- {
- @Override
- public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
- {
- return new StreamFrameListener.Adapter()
- {
- @Override
- public void onData(Stream stream, DataInfo dataInfo)
- {
- dataInfo.consume(dataInfo.length());
- if (dataInfo.isClose())
- {
- Headers responseHeaders = new Headers();
- responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
- responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
- ReplyInfo replyInfo = new ReplyInfo(responseHeaders, false);
- stream.reply(replyInfo);
- stream.data(new BytesDataInfo(data, true));
- }
- }
- };
- }
- }));
-
- Socket client = new Socket();
- client.connect(proxyAddress);
- OutputStream output = client.getOutputStream();
-
- String request = "" +
- "POST / HTTP/1.1\r\n" +
- "Host: localhost:" + proxyAddress.getPort() + "\r\n" +
- "Content-Length: " + data.length + "\r\n" +
- "Content-Type: application/octet-stream\r\n" +
- "\r\n";
- output.write(request.getBytes("UTF-8"));
- output.write(data);
- output.flush();
-
- InputStream input = client.getInputStream();
- BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
- String line = reader.readLine();
- Assert.assertTrue(line.contains(" 200"));
- while (line.length() > 0)
- line = reader.readLine();
- for (byte datum : data)
- Assert.assertEquals(datum, reader.read());
- Assert.assertFalse(reader.ready());
-
- // Perform another request so that we are sure we reset the states of parsers and generators
- output.write(request.getBytes("UTF-8"));
- output.write(data);
- output.flush();
-
- line = reader.readLine();
- Assert.assertTrue(line.contains(" 200"));
- while (line.length() > 0)
- line = reader.readLine();
- for (byte datum : data)
- Assert.assertEquals(datum, reader.read());
- Assert.assertFalse(reader.ready());
-
- client.close();
- }
-
- @Test
- public void testSYNThenREPLY() throws Exception
- {
- final String header = "foo";
- InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
- {
- @Override
- public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
- {
- Headers requestHeaders = synInfo.getHeaders();
- Assert.assertNotNull(requestHeaders.get("via"));
- Assert.assertNotNull(requestHeaders.get(header));
-
- Headers responseHeaders = new Headers();
- responseHeaders.put(header, "baz");
- stream.reply(new ReplyInfo(responseHeaders, true));
- return null;
- }
- }));
- proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version()));
-
- Session client = factory.newSPDYClient(version()).connect(proxyAddress, null).get(5, TimeUnit.SECONDS);
-
- final CountDownLatch replyLatch = new CountDownLatch(1);
- Headers headers = new Headers();
- headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + proxyAddress.getPort());
- headers.put(header, "bar");
- client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
- {
- @Override
- public void onReply(Stream stream, ReplyInfo replyInfo)
- {
- Headers headers = replyInfo.getHeaders();
- Assert.assertNotNull(headers.get(header));
- replyLatch.countDown();
- }
- });
-
- Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
-
- client.goAway().get(5, TimeUnit.SECONDS);
- }
-
- @Test
- public void testSYNThenREPLYAndDATA() throws Exception
- {
- final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
- final String header = "foo";
- InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
- {
- @Override
- public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
- {
- Headers requestHeaders = synInfo.getHeaders();
- Assert.assertNotNull(requestHeaders.get("via"));
- Assert.assertNotNull(requestHeaders.get(header));
-
- Headers responseHeaders = new Headers();
- responseHeaders.put(header, "baz");
- stream.reply(new ReplyInfo(responseHeaders, false));
- stream.data(new BytesDataInfo(data, true));
- return null;
- }
- }));
- proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version()));
-
- Session client = factory.newSPDYClient(version()).connect(proxyAddress, null).get(5, TimeUnit.SECONDS);
-
- final CountDownLatch replyLatch = new CountDownLatch(1);
- final CountDownLatch dataLatch = new CountDownLatch(1);
- Headers headers = new Headers();
- headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + proxyAddress.getPort());
- headers.put(header, "bar");
- client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
- {
- private final ByteArrayOutputStream result = new ByteArrayOutputStream();
-
- @Override
- public void onReply(Stream stream, ReplyInfo replyInfo)
- {
- Headers headers = replyInfo.getHeaders();
- Assert.assertNotNull(headers.get(header));
- replyLatch.countDown();
- }
-
- @Override
- public void onData(Stream stream, DataInfo dataInfo)
- {
- result.write(dataInfo.asBytes(true), 0, dataInfo.length());
- if (dataInfo.isClose())
- {
- Assert.assertArrayEquals(data, result.toByteArray());
- dataLatch.countDown();
- }
- }
- });
-
- Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
- Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
-
- client.goAway().get(5, TimeUnit.SECONDS);
- }
-
- @Test
- public void testGETThenSPDYPushIsIgnored() throws Exception
- {
- final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
- InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
- {
- @Override
- public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
- {
- Headers responseHeaders = new Headers();
- responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
- responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
-
- Headers pushHeaders = new Headers();
- pushHeaders.put(HTTPSPDYHeader.URI.name(version()), "/push");
- stream.syn(new SynInfo(pushHeaders, false), 5, TimeUnit.SECONDS, new Handler.Adapter<Stream>()
- {
- @Override
- public void completed(Stream pushStream)
- {
- pushStream.data(new BytesDataInfo(data, true));
- }
- });
-
- stream.reply(new ReplyInfo(responseHeaders, true));
- return null;
- }
- }));
-
- Socket client = new Socket();
- client.connect(proxyAddress);
- OutputStream output = client.getOutputStream();
-
- String request = "" +
- "GET / HTTP/1.1\r\n" +
- "Host: localhost:" + proxyAddress.getPort() + "\r\n" +
- "\r\n";
- output.write(request.getBytes("UTF-8"));
- output.flush();
-
- client.setSoTimeout(1000);
- InputStream input = client.getInputStream();
- BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
- String line = reader.readLine();
- Assert.assertTrue(line.contains(" 200"));
- while (line.length() > 0)
- line = reader.readLine();
- Assert.assertFalse(reader.ready());
-
- client.close();
- }
-
- @Test
- public void testSYNThenSPDYPushIsReceived() throws Exception
- {
- final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
- InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
- {
- @Override
- public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
- {
- Headers responseHeaders = new Headers();
- responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
- responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
- stream.reply(new ReplyInfo(responseHeaders, false));
-
- Headers pushHeaders = new Headers();
- pushHeaders.put(HTTPSPDYHeader.URI.name(version()), "/push");
- stream.syn(new SynInfo(pushHeaders, false), 5, TimeUnit.SECONDS, new Handler.Adapter<Stream>()
- {
- @Override
- public void completed(Stream pushStream)
- {
- pushStream.data(new BytesDataInfo(data, true));
- }
- });
-
- stream.data(new BytesDataInfo(data, true));
-
- return null;
- }
- }));
- proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version()));
-
- final CountDownLatch pushSynLatch = new CountDownLatch(1);
- final CountDownLatch pushDataLatch = new CountDownLatch(1);
- Session client = factory.newSPDYClient(version()).connect(proxyAddress, new SessionFrameListener.Adapter()
- {
- @Override
- public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
- {
- pushSynLatch.countDown();
- return new StreamFrameListener.Adapter()
- {
- @Override
- public void onData(Stream stream, DataInfo dataInfo)
- {
- dataInfo.consume(dataInfo.length());
- if (dataInfo.isClose())
- pushDataLatch.countDown();
- }
- };
- }
- }).get(5, TimeUnit.SECONDS);
-
- Headers headers = new Headers();
- headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + proxyAddress.getPort());
- final CountDownLatch replyLatch = new CountDownLatch(1);
- final CountDownLatch dataLatch = new CountDownLatch(1);
- client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
- {
- @Override
- public void onReply(Stream stream, ReplyInfo replyInfo)
- {
- replyLatch.countDown();
- }
-
- @Override
- public void onData(Stream stream, DataInfo dataInfo)
- {
- dataInfo.consume(dataInfo.length());
- if (dataInfo.isClose())
- dataLatch.countDown();
- }
- });
-
- Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
- Assert.assertTrue(pushSynLatch.await(5, TimeUnit.SECONDS));
- Assert.assertTrue(pushDataLatch.await(5, TimeUnit.SECONDS));
- Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
-
- client.goAway().get(5, TimeUnit.SECONDS);
- }
-
- @Test
- public void testPING() throws Exception
- {
- // PING is per hop, and it does not carry the information to which server to ping to
- // We just verify that it works
-
- InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()));
- proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version()));
-
- final CountDownLatch pingLatch = new CountDownLatch(1);
- Session client = factory.newSPDYClient(version()).connect(proxyAddress, new SessionFrameListener.Adapter()
- {
- @Override
- public void onPing(Session session, PingInfo pingInfo)
- {
- pingLatch.countDown();
- }
- }).get(5, TimeUnit.SECONDS);
-
- client.ping().get(5, TimeUnit.SECONDS);
-
- Assert.assertTrue(pingLatch.await(5, TimeUnit.SECONDS));
-
- client.goAway().get(5, TimeUnit.SECONDS);
- }
-
- @Test
- public void testGETThenReset() throws Exception
- {
- InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
- {
- @Override
- public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
- {
- Assert.assertTrue(synInfo.isClose());
- Headers requestHeaders = synInfo.getHeaders();
- Assert.assertNotNull(requestHeaders.get("via"));
-
- stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM));
-
- return null;
- }
- }));
-
- Socket client = new Socket();
- client.connect(proxyAddress);
- OutputStream output = client.getOutputStream();
-
- String request = "" +
- "GET / HTTP/1.1\r\n" +
- "Host: localhost:" + proxyAddress.getPort() + "\r\n" +
- "\r\n";
- output.write(request.getBytes("UTF-8"));
- output.flush();
-
- InputStream input = client.getInputStream();
- BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
- Assert.assertNull(reader.readLine());
-
- client.close();
- }
-
- @Test
- public void testSYNThenReset() throws Exception
- {
- InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
- {
- @Override
- public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
- {
- Assert.assertTrue(synInfo.isClose());
- Headers requestHeaders = synInfo.getHeaders();
- Assert.assertNotNull(requestHeaders.get("via"));
-
- stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM));
-
- return null;
- }
- }));
- proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version()));
-
- final CountDownLatch resetLatch = new CountDownLatch(1);
- Session client = factory.newSPDYClient(version()).connect(proxyAddress, new SessionFrameListener.Adapter()
- {
- @Override
- public void onRst(Session session, RstInfo rstInfo)
- {
- resetLatch.countDown();
- }
- }).get(5, TimeUnit.SECONDS);
-
- Headers headers = new Headers();
- headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + proxyAddress.getPort());
- client.syn(new SynInfo(headers, true), null);
-
- Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
-
- client.goAway().get(5, TimeUnit.SECONDS);
- }
+// @Rule
+// public final TestWatchman testName = new TestWatchman()
+// {
+// @Override
+// public void starting(FrameworkMethod method)
+// {
+// super.starting(method);
+// System.err.printf("Running %s.%s()%n",
+// method.getMethod().getDeclaringClass().getName(),
+// method.getName());
+// }
+// };
+//
+// private SPDYClient.Factory factory;
+// private Server server;
+// private Server proxy;
+// private SPDYServerConnector proxyConnector;
+//
+// protected short version()
+// {
+// return SPDY.V2;
+// }
+//
+// protected InetSocketAddress startServer(ServerSessionFrameListener listener) throws Exception
+// {
+// server = new Server();
+// SPDYServerConnector serverConnector = new SPDYServerConnector(server, listener);
+// serverConnector.setDefaultConnectionFactory(new ServerSPDYAsyncConnectionFactory(version(),
+// serverConnector.getByteBufferPool(), serverConnector.getExecutor(), serverConnector.getScheduler(), listener));
+// serverConnector.setPort(0);
+// server.addConnector(serverConnector);
+// server.start();
+// return new InetSocketAddress("localhost", serverConnector.getLocalPort());
+// }
+//
+// protected InetSocketAddress startProxy(InetSocketAddress address) throws Exception
+// {
+// proxy = new Server();
+// ProxyEngineSelector proxyEngineSelector = new ProxyEngineSelector();
+// SPDYProxyEngine spdyProxyEngine = new SPDYProxyEngine(factory);
+// proxyEngineSelector.putProxyEngine("spdy/" + version(), spdyProxyEngine);
+// proxyEngineSelector.putProxyServerInfo("localhost", new ProxyEngineSelector.ProxyServerInfo("spdy/" + version(), address.getHostName(), address.getPort()));
+// proxyConnector = new HTTPSPDYProxyConnector(proxyEngineSelector);
+// proxyConnector.setPort(0);
+// proxy.addConnector(proxyConnector);
+// proxy.start();
+// return new InetSocketAddress("localhost", proxyConnector.getLocalPort());
+// }
+//
+// @Before
+// public void init() throws Exception
+// {
+// factory = new SPDYClient.Factory();
+// factory.start();
+// }
+//
+// @After
+// public void destroy() throws Exception
+// {
+// if (server != null)
+// {
+// server.stop();
+// server.join();
+// }
+// if (proxy != null)
+// {
+// proxy.stop();
+// proxy.join();
+// }
+// factory.stop();
+// }
+//
+// @Test
+// public void testClosingClientDoesNotCloseServer() throws Exception
+// {
+// final CountDownLatch closeLatch = new CountDownLatch(1);
+// InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
+// {
+// @Override
+// public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+// {
+// Headers responseHeaders = new Headers();
+// responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
+// responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
+// stream.reply(new ReplyInfo(responseHeaders, true));
+// return null;
+// }
+//
+// @Override
+// public void onGoAway(Session session, GoAwayInfo goAwayInfo)
+// {
+// closeLatch.countDown();
+// }
+// }));
+//
+// Socket client = new Socket();
+// client.connect(proxyAddress);
+// OutputStream output = client.getOutputStream();
+//
+// String request = "" +
+// "GET / HTTP/1.1\r\n" +
+// "Host: localhost:" + proxyAddress.getPort() + "\r\n" +
+// "\r\n";
+// output.write(request.getBytes("UTF-8"));
+// output.flush();
+//
+// InputStream input = client.getInputStream();
+// BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
+// String line = reader.readLine();
+// Assert.assertTrue(line.contains(" 200"));
+// while (line.length() > 0)
+// line = reader.readLine();
+// Assert.assertFalse(reader.ready());
+//
+// client.close();
+//
+// // Must not close, other clients may still be connected
+// Assert.assertFalse(closeLatch.await(1, TimeUnit.SECONDS));
+// }
+//
+// @Test
+// public void testGETThenNoContentFromTwoClients() throws Exception
+// {
+// InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
+// {
+// @Override
+// public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+// {
+// Assert.assertTrue(synInfo.isClose());
+// Headers requestHeaders = synInfo.getHeaders();
+// Assert.assertNotNull(requestHeaders.get("via"));
+//
+// Headers responseHeaders = new Headers();
+// responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
+// responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
+// ReplyInfo replyInfo = new ReplyInfo(responseHeaders, true);
+// stream.reply(replyInfo);
+// return null;
+// }
+// }));
+//
+// Socket client1 = new Socket();
+// client1.connect(proxyAddress);
+// OutputStream output1 = client1.getOutputStream();
+//
+// String request = "" +
+// "GET / HTTP/1.1\r\n" +
+// "Host: localhost:" + proxyAddress.getPort() + "\r\n" +
+// "\r\n";
+// output1.write(request.getBytes("UTF-8"));
+// output1.flush();
+//
+// InputStream input1 = client1.getInputStream();
+// BufferedReader reader1 = new BufferedReader(new InputStreamReader(input1, "UTF-8"));
+// String line = reader1.readLine();
+// Assert.assertTrue(line.contains(" 200"));
+// while (line.length() > 0)
+// line = reader1.readLine();
+// Assert.assertFalse(reader1.ready());
+//
+// // Perform another request with another client
+// Socket client2 = new Socket();
+// client2.connect(proxyAddress);
+// OutputStream output2 = client2.getOutputStream();
+//
+// output2.write(request.getBytes("UTF-8"));
+// output2.flush();
+//
+// InputStream input2 = client2.getInputStream();
+// BufferedReader reader2 = new BufferedReader(new InputStreamReader(input2, "UTF-8"));
+// line = reader2.readLine();
+// Assert.assertTrue(line.contains(" 200"));
+// while (line.length() > 0)
+// line = reader2.readLine();
+// Assert.assertFalse(reader2.ready());
+//
+// client1.close();
+// client2.close();
+// }
+//
+// @Test
+// public void testGETThenSmallResponseContent() throws Exception
+// {
+// final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
+// InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
+// {
+// @Override
+// public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+// {
+// Assert.assertTrue(synInfo.isClose());
+// Headers requestHeaders = synInfo.getHeaders();
+// Assert.assertNotNull(requestHeaders.get("via"));
+//
+// Headers responseHeaders = new Headers();
+// responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
+// responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
+// ReplyInfo replyInfo = new ReplyInfo(responseHeaders, false);
+// stream.reply(replyInfo);
+// stream.data(new BytesDataInfo(data, true));
+//
+// return null;
+// }
+// }));
+//
+// Socket client = new Socket();
+// client.connect(proxyAddress);
+// OutputStream output = client.getOutputStream();
+//
+// String request = "" +
+// "GET / HTTP/1.1\r\n" +
+// "Host: localhost:" + proxyAddress.getPort() + "\r\n" +
+// "\r\n";
+// output.write(request.getBytes("UTF-8"));
+// output.flush();
+//
+// InputStream input = client.getInputStream();
+// BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
+// String line = reader.readLine();
+// Assert.assertTrue(line.contains(" 200"));
+// while (line.length() > 0)
+// line = reader.readLine();
+// for (byte datum : data)
+// Assert.assertEquals(datum, reader.read());
+// Assert.assertFalse(reader.ready());
+//
+// // Perform another request so that we are sure we reset the states of parsers and generators
+// output.write(request.getBytes("UTF-8"));
+// output.flush();
+//
+// line = reader.readLine();
+// Assert.assertTrue(line.contains(" 200"));
+// while (line.length() > 0)
+// line = reader.readLine();
+// for (byte datum : data)
+// Assert.assertEquals(datum, reader.read());
+// Assert.assertFalse(reader.ready());
+//
+// client.close();
+// }
+//
+// @Test
+// public void testPOSTWithSmallRequestContentThenRedirect() throws Exception
+// {
+// final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
+// InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
+// {
+// @Override
+// public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+// {
+// return new StreamFrameListener.Adapter()
+// {
+// @Override
+// public void onData(Stream stream, DataInfo dataInfo)
+// {
+// dataInfo.consume(dataInfo.length());
+// if (dataInfo.isClose())
+// {
+// Headers headers = new Headers();
+// headers.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
+// headers.put(HTTPSPDYHeader.STATUS.name(version()), "303 See Other");
+// stream.reply(new ReplyInfo(headers, true));
+// }
+// }
+// };
+// }
+// }));
+//
+// Socket client = new Socket();
+// client.connect(proxyAddress);
+// OutputStream output = client.getOutputStream();
+//
+// String request = "" +
+// "POST / HTTP/1.1\r\n" +
+// "Host: localhost:" + proxyAddress.getPort() + "\r\n" +
+// "Content-Length: " + data.length + "\r\n" +
+// "Content-Type: application/octet-stream\r\n" +
+// "\r\n";
+// output.write(request.getBytes("UTF-8"));
+// output.write(data);
+// output.flush();
+//
+// InputStream input = client.getInputStream();
+// BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
+// String line = reader.readLine();
+// Assert.assertTrue(line.contains(" 303"));
+// while (line.length() > 0)
+// line = reader.readLine();
+// Assert.assertFalse(reader.ready());
+//
+// // Perform another request so that we are sure we reset the states of parsers and generators
+// output.write(request.getBytes("UTF-8"));
+// output.write(data);
+// output.flush();
+//
+// line = reader.readLine();
+// Assert.assertTrue(line.contains(" 303"));
+// while (line.length() > 0)
+// line = reader.readLine();
+// Assert.assertFalse(reader.ready());
+//
+// client.close();
+// }
+//
+// @Test
+// public void testPOSTWithSmallRequestContentThenSmallResponseContent() throws Exception
+// {
+// final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
+// InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
+// {
+// @Override
+// public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+// {
+// return new StreamFrameListener.Adapter()
+// {
+// @Override
+// public void onData(Stream stream, DataInfo dataInfo)
+// {
+// dataInfo.consume(dataInfo.length());
+// if (dataInfo.isClose())
+// {
+// Headers responseHeaders = new Headers();
+// responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
+// responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
+// ReplyInfo replyInfo = new ReplyInfo(responseHeaders, false);
+// stream.reply(replyInfo);
+// stream.data(new BytesDataInfo(data, true));
+// }
+// }
+// };
+// }
+// }));
+//
+// Socket client = new Socket();
+// client.connect(proxyAddress);
+// OutputStream output = client.getOutputStream();
+//
+// String request = "" +
+// "POST / HTTP/1.1\r\n" +
+// "Host: localhost:" + proxyAddress.getPort() + "\r\n" +
+// "Content-Length: " + data.length + "\r\n" +
+// "Content-Type: application/octet-stream\r\n" +
+// "\r\n";
+// output.write(request.getBytes("UTF-8"));
+// output.write(data);
+// output.flush();
+//
+// InputStream input = client.getInputStream();
+// BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
+// String line = reader.readLine();
+// Assert.assertTrue(line.contains(" 200"));
+// while (line.length() > 0)
+// line = reader.readLine();
+// for (byte datum : data)
+// Assert.assertEquals(datum, reader.read());
+// Assert.assertFalse(reader.ready());
+//
+// // Perform another request so that we are sure we reset the states of parsers and generators
+// output.write(request.getBytes("UTF-8"));
+// output.write(data);
+// output.flush();
+//
+// line = reader.readLine();
+// Assert.assertTrue(line.contains(" 200"));
+// while (line.length() > 0)
+// line = reader.readLine();
+// for (byte datum : data)
+// Assert.assertEquals(datum, reader.read());
+// Assert.assertFalse(reader.ready());
+//
+// client.close();
+// }
+//
+// @Test
+// public void testSYNThenREPLY() throws Exception
+// {
+// final String header = "foo";
+// InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
+// {
+// @Override
+// public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+// {
+// Headers requestHeaders = synInfo.getHeaders();
+// Assert.assertNotNull(requestHeaders.get("via"));
+// Assert.assertNotNull(requestHeaders.get(header));
+//
+// Headers responseHeaders = new Headers();
+// responseHeaders.put(header, "baz");
+// stream.reply(new ReplyInfo(responseHeaders, true));
+// return null;
+// }
+// }));
+// proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version()));
+//
+// Session client = factory.newSPDYClient(version()).connect(proxyAddress, null).get(5, TimeUnit.SECONDS);
+//
+// final CountDownLatch replyLatch = new CountDownLatch(1);
+// Headers headers = new Headers();
+// headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + proxyAddress.getPort());
+// headers.put(header, "bar");
+// client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
+// {
+// @Override
+// public void onReply(Stream stream, ReplyInfo replyInfo)
+// {
+// Headers headers = replyInfo.getHeaders();
+// Assert.assertNotNull(headers.get(header));
+// replyLatch.countDown();
+// }
+// });
+//
+// Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
+//
+// client.goAway().get(5, TimeUnit.SECONDS);
+// }
+//
+// @Test
+// public void testSYNThenREPLYAndDATA() throws Exception
+// {
+// final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
+// final String header = "foo";
+// InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
+// {
+// @Override
+// public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+// {
+// Headers requestHeaders = synInfo.getHeaders();
+// Assert.assertNotNull(requestHeaders.get("via"));
+// Assert.assertNotNull(requestHeaders.get(header));
+//
+// Headers responseHeaders = new Headers();
+// responseHeaders.put(header, "baz");
+// stream.reply(new ReplyInfo(responseHeaders, false));
+// stream.data(new BytesDataInfo(data, true));
+// return null;
+// }
+// }));
+// proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version()));
+//
+// Session client = factory.newSPDYClient(version()).connect(proxyAddress, null).get(5, TimeUnit.SECONDS);
+//
+// final CountDownLatch replyLatch = new CountDownLatch(1);
+// final CountDownLatch dataLatch = new CountDownLatch(1);
+// Headers headers = new Headers();
+// headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + proxyAddress.getPort());
+// headers.put(header, "bar");
+// client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
+// {
+// private final ByteArrayOutputStream result = new ByteArrayOutputStream();
+//
+// @Override
+// public void onReply(Stream stream, ReplyInfo replyInfo)
+// {
+// Headers headers = replyInfo.getHeaders();
+// Assert.assertNotNull(headers.get(header));
+// replyLatch.countDown();
+// }
+//
+// @Override
+// public void onData(Stream stream, DataInfo dataInfo)
+// {
+// result.write(dataInfo.asBytes(true), 0, dataInfo.length());
+// if (dataInfo.isClose())
+// {
+// Assert.assertArrayEquals(data, result.toByteArray());
+// dataLatch.countDown();
+// }
+// }
+// });
+//
+// Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
+// Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
+//
+// client.goAway().get(5, TimeUnit.SECONDS);
+// }
+//
+// @Test
+// public void testGETThenSPDYPushIsIgnored() throws Exception
+// {
+// final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
+// InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
+// {
+// @Override
+// public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+// {
+// Headers responseHeaders = new Headers();
+// responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
+// responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
+//
+// Headers pushHeaders = new Headers();
+// pushHeaders.put(HTTPSPDYHeader.URI.name(version()), "/push");
+// stream.syn(new SynInfo(pushHeaders, false), 5, TimeUnit.SECONDS, new Handler.Adapter<Stream>()
+// {
+// @Override
+// public void completed(Stream pushStream)
+// {
+// pushStream.data(new BytesDataInfo(data, true));
+// }
+// });
+//
+// stream.reply(new ReplyInfo(responseHeaders, true));
+// return null;
+// }
+// }));
+//
+// Socket client = new Socket();
+// client.connect(proxyAddress);
+// OutputStream output = client.getOutputStream();
+//
+// String request = "" +
+// "GET / HTTP/1.1\r\n" +
+// "Host: localhost:" + proxyAddress.getPort() + "\r\n" +
+// "\r\n";
+// output.write(request.getBytes("UTF-8"));
+// output.flush();
+//
+// client.setSoTimeout(1000);
+// InputStream input = client.getInputStream();
+// BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
+// String line = reader.readLine();
+// Assert.assertTrue(line.contains(" 200"));
+// while (line.length() > 0)
+// line = reader.readLine();
+// Assert.assertFalse(reader.ready());
+//
+// client.close();
+// }
+//
+// @Test
+// public void testSYNThenSPDYPushIsReceived() throws Exception
+// {
+// final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
+// InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
+// {
+// @Override
+// public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+// {
+// Headers responseHeaders = new Headers();
+// responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
+// responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
+// stream.reply(new ReplyInfo(responseHeaders, false));
+//
+// Headers pushHeaders = new Headers();
+// pushHeaders.put(HTTPSPDYHeader.URI.name(version()), "/push");
+// stream.syn(new SynInfo(pushHeaders, false), 5, TimeUnit.SECONDS, new Handler.Adapter<Stream>()
+// {
+// @Override
+// public void completed(Stream pushStream)
+// {
+// pushStream.data(new BytesDataInfo(data, true));
+// }
+// });
+//
+// stream.data(new BytesDataInfo(data, true));
+//
+// return null;
+// }
+// }));
+// proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version()));
+//
+// final CountDownLatch pushSynLatch = new CountDownLatch(1);
+// final CountDownLatch pushDataLatch = new CountDownLatch(1);
+// Session client = factory.newSPDYClient(version()).connect(proxyAddress, new SessionFrameListener.Adapter()
+// {
+// @Override
+// public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+// {
+// pushSynLatch.countDown();
+// return new StreamFrameListener.Adapter()
+// {
+// @Override
+// public void onData(Stream stream, DataInfo dataInfo)
+// {
+// dataInfo.consume(dataInfo.length());
+// if (dataInfo.isClose())
+// pushDataLatch.countDown();
+// }
+// };
+// }
+// }).get(5, TimeUnit.SECONDS);
+//
+// Headers headers = new Headers();
+// headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + proxyAddress.getPort());
+// final CountDownLatch replyLatch = new CountDownLatch(1);
+// final CountDownLatch dataLatch = new CountDownLatch(1);
+// client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
+// {
+// @Override
+// public void onReply(Stream stream, ReplyInfo replyInfo)
+// {
+// replyLatch.countDown();
+// }
+//
+// @Override
+// public void onData(Stream stream, DataInfo dataInfo)
+// {
+// dataInfo.consume(dataInfo.length());
+// if (dataInfo.isClose())
+// dataLatch.countDown();
+// }
+// });
+//
+// Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
+// Assert.assertTrue(pushSynLatch.await(5, TimeUnit.SECONDS));
+// Assert.assertTrue(pushDataLatch.await(5, TimeUnit.SECONDS));
+// Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
+//
+// client.goAway().get(5, TimeUnit.SECONDS);
+// }
+//
+// @Test
+// public void testPING() throws Exception
+// {
+// // PING is per hop, and it does not carry the information to which server to ping to
+// // We just verify that it works
+//
+// InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()));
+// proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version()));
+//
+// final CountDownLatch pingLatch = new CountDownLatch(1);
+// Session client = factory.newSPDYClient(version()).connect(proxyAddress, new SessionFrameListener.Adapter()
+// {
+// @Override
+// public void onPing(Session session, PingInfo pingInfo)
+// {
+// pingLatch.countDown();
+// }
+// }).get(5, TimeUnit.SECONDS);
+//
+// client.ping().get(5, TimeUnit.SECONDS);
+//
+// Assert.assertTrue(pingLatch.await(5, TimeUnit.SECONDS));
+//
+// client.goAway().get(5, TimeUnit.SECONDS);
+// }
+//
+// @Test
+// public void testGETThenReset() throws Exception
+// {
+// InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
+// {
+// @Override
+// public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+// {
+// Assert.assertTrue(synInfo.isClose());
+// Headers requestHeaders = synInfo.getHeaders();
+// Assert.assertNotNull(requestHeaders.get("via"));
+//
+// stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM));
+//
+// return null;
+// }
+// }));
+//
+// Socket client = new Socket();
+// client.connect(proxyAddress);
+// OutputStream output = client.getOutputStream();
+//
+// String request = "" +
+// "GET / HTTP/1.1\r\n" +
+// "Host: localhost:" + proxyAddress.getPort() + "\r\n" +
+// "\r\n";
+// output.write(request.getBytes("UTF-8"));
+// output.flush();
+//
+// InputStream input = client.getInputStream();
+// BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
+// Assert.assertNull(reader.readLine());
+//
+// client.close();
+// }
+//
+// @Test
+// public void testSYNThenReset() throws Exception
+// {
+// InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
+// {
+// @Override
+// public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
+// {
+// Assert.assertTrue(synInfo.isClose());
+// Headers requestHeaders = synInfo.getHeaders();
+// Assert.assertNotNull(requestHeaders.get("via"));
+//
+// stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM));
+//
+// return null;
+// }
+// }));
+// proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version()));
+//
+// final CountDownLatch resetLatch = new CountDownLatch(1);
+// Session client = factory.newSPDYClient(version()).connect(proxyAddress, new SessionFrameListener.Adapter()
+// {
+// @Override
+// public void onRst(Session session, RstInfo rstInfo)
+// {
+// resetLatch.countDown();
+// }
+// }).get(5, TimeUnit.SECONDS);
+//
+// Headers headers = new Headers();
+// headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + proxyAddress.getPort());
+// client.syn(new SynInfo(headers, true), null);
+//
+// Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
+//
+// client.goAway().get(5, TimeUnit.SECONDS);
+// }
}
diff --git a/jetty-spdy/spdy-jetty-http/src/test/resources/jetty-logging.properties b/jetty-spdy/spdy-jetty-http/src/test/resources/jetty-logging.properties
index 5250a08..216d124 100644
--- a/jetty-spdy/spdy-jetty-http/src/test/resources/jetty-logging.properties
+++ b/jetty-spdy/spdy-jetty-http/src/test/resources/jetty-logging.properties
@@ -1,2 +1,4 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
-org.eclipse.jetty.spdy.LEVEL=WARN
+org.eclipse.jetty.spdy.LEVEL=DEBUG
+org.eclipse.jetty.server.LEVEL=DEBUG
+#thomas
diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java
index 12624d8..908b716 100644
--- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java
+++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java
@@ -83,6 +83,16 @@ public class ServerSPDYAsyncConnectionFactory implements ConnectionFactory
return listener;
}
+ protected ByteBufferPool getBufferPool()
+ {
+ return bufferPool;
+ }
+
+ protected Executor getThreadPool()
+ {
+ return threadPool;
+ }
+
private static class ServerSPDYConnection extends SPDYConnection
{
private final ServerSessionFrameListener listener;