summaryrefslogtreecommitdiffstatsabout
diff options
context:
space:
mode:
authorThomas Becker2012-07-13 05:53:56 (EDT)
committer Thomas Becker2012-07-13 05:53:56 (EDT)
commit198f713f7dd65bee34c1bb4a57fcdeaf6c2cecd4 (patch)
treeda44b6c9fadabb327921c4a15c54587b45811aa4
parentb442fef60bd53c3d0de37fd7a8b95faab99912e8 (diff)
downloadorg.eclipse.jetty.project-198f713f7dd65bee34c1bb4a57fcdeaf6c2cecd4.zip
org.eclipse.jetty.project-198f713f7dd65bee34c1bb4a57fcdeaf6c2cecd4.tar.gz
org.eclipse.jetty.project-198f713f7dd65bee34c1bb4a57fcdeaf6c2cecd4.tar.bz2
spdy: spdy-proxy refactoring to allow multiple ProxyEngine implementations for different protocols. Header name fix to lowercase header names before creating the bytes to sent over the wire.refs/changes/58/6758/1
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java5
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Headers.java6
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/generator/HeadersBlockGenerator.java2
-rw-r--r--jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/frames/HeadersGenerateParseTest.java94
-rw-r--r--jetty-spdy/spdy-jetty-http-webapp/src/main/config/etc/jetty-spdy-proxy.xml32
-rw-r--r--jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/HTTPSPDYProxyConnector.java14
-rw-r--r--jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyEngine.java59
-rw-r--r--jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyEngineSelector.java166
-rw-r--r--jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPAsyncConnectionFactory.java8
-rw-r--r--jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYAsyncConnection.java24
-rw-r--r--jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/SPDYProxyEngine.java83
-rw-r--r--jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYv2Test.java98
12 files changed, 307 insertions, 284 deletions
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java
index 6019e29..360a639 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java
@@ -138,6 +138,11 @@ public class StandardStream implements IStream
this.listener = listener;
}
+ public StreamFrameListener getStreamFrameListener()
+ {
+ return listener;
+ }
+
@Override
public void updateCloseState(boolean close, boolean local)
{
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Headers.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Headers.java
index cef5829..261a94f 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Headers.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Headers.java
@@ -216,13 +216,15 @@ public class Headers implements Iterable<Headers.Header>
if (obj == null || getClass() != obj.getClass())
return false;
Header that = (Header)obj;
- return name.equals(that.name) && Arrays.equals(values, that.values);
+ // Header names must be lowercase, thus we lowercase them before transmission, but keep them as is
+ // internally. That's why we've to compare them case insensitive.
+ return name.equalsIgnoreCase(that.name) && Arrays.equals(values, that.values);
}
@Override
public int hashCode()
{
- int result = name.hashCode();
+ int result = name.toLowerCase().hashCode();
result = 31 * result + Arrays.hashCode(values);
return result;
}
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/generator/HeadersBlockGenerator.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/generator/HeadersBlockGenerator.java
index af99785..6290841 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/generator/HeadersBlockGenerator.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/generator/HeadersBlockGenerator.java
@@ -40,7 +40,7 @@ public class HeadersBlockGenerator
writeCount(version, buffer, headers.size());
for (Headers.Header header : headers)
{
- String name = header.name();
+ String name = header.name().toLowerCase();
byte[] nameBytes = name.getBytes(iso1);
writeNameLength(version, buffer, nameBytes.length);
buffer.write(nameBytes, 0, nameBytes.length);
diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/frames/HeadersGenerateParseTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/frames/HeadersGenerateParseTest.java
index 545aec0..fd5a36d 100644
--- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/frames/HeadersGenerateParseTest.java
+++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/frames/HeadersGenerateParseTest.java
@@ -22,65 +22,77 @@ import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.generator.Generator;
import org.eclipse.jetty.spdy.parser.Parser;
-import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
public class HeadersGenerateParseTest
{
- @Test
- public void testGenerateParse() throws Exception
+
+ private Headers headers = new Headers();
+ private int streamId = 13;
+ private byte flags = HeadersInfo.FLAG_RESET_COMPRESSION;
+ private final TestSPDYParserListener listener = new TestSPDYParserListener();
+ private final Parser parser = new Parser(new StandardCompressionFactory().newDecompressor());
+ private ByteBuffer buffer;
+
+ @Before
+ public void setUp()
{
- byte flags = HeadersInfo.FLAG_RESET_COMPRESSION;
- int streamId = 13;
- Headers headers = new Headers();
+ parser.addListener(listener);
headers.put("a", "b");
+ buffer = createHeadersFrameBuffer(headers);
+ }
+
+ private ByteBuffer createHeadersFrameBuffer(Headers headers)
+ {
HeadersFrame frame1 = new HeadersFrame(SPDY.V2, flags, streamId, headers);
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.control(frame1);
+ assertThat("Buffer is not null", buffer, notNullValue());
+ return buffer;
+ }
- Assert.assertNotNull(buffer);
-
- TestSPDYParserListener listener = new TestSPDYParserListener();
- Parser parser = new Parser(new StandardCompressionFactory().newDecompressor());
- parser.addListener(listener);
+ @Test
+ public void testGenerateParse() throws Exception
+ {
parser.parse(buffer);
- ControlFrame frame2 = listener.getControlFrame();
-
- Assert.assertNotNull(frame2);
- Assert.assertEquals(ControlFrameType.HEADERS, frame2.getType());
- HeadersFrame headersFrame = (HeadersFrame)frame2;
- Assert.assertEquals(SPDY.V2, headersFrame.getVersion());
- Assert.assertEquals(streamId, headersFrame.getStreamId());
- Assert.assertEquals(flags, headersFrame.getFlags());
- Assert.assertEquals(headers, headersFrame.getHeaders());
+ assertExpectationsAreMet(headers);
}
@Test
public void testGenerateParseOneByteAtATime() throws Exception
{
- byte flags = HeadersInfo.FLAG_RESET_COMPRESSION;
- int streamId = 13;
- Headers headers = new Headers();
- headers.put("a", "b");
- HeadersFrame frame1 = new HeadersFrame(SPDY.V2, flags, streamId, headers);
- Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
- ByteBuffer buffer = generator.control(frame1);
-
- Assert.assertNotNull(buffer);
-
- TestSPDYParserListener listener = new TestSPDYParserListener();
- Parser parser = new Parser(new StandardCompressionFactory().newDecompressor());
- parser.addListener(listener);
while (buffer.hasRemaining())
parser.parse(ByteBuffer.wrap(new byte[]{buffer.get()}));
- ControlFrame frame2 = listener.getControlFrame();
- Assert.assertNotNull(frame2);
- Assert.assertEquals(ControlFrameType.HEADERS, frame2.getType());
- HeadersFrame headersFrame = (HeadersFrame)frame2;
- Assert.assertEquals(SPDY.V2, headersFrame.getVersion());
- Assert.assertEquals(streamId, headersFrame.getStreamId());
- Assert.assertEquals(flags, headersFrame.getFlags());
- Assert.assertEquals(headers, headersFrame.getHeaders());
+ assertExpectationsAreMet(headers);
+ }
+
+ @Test
+ public void testHeadersAreTranslatedToLowerCase()
+ {
+ Headers headers = new Headers();
+ headers.put("Via","localhost");
+ parser.parse(createHeadersFrameBuffer(headers));
+ HeadersFrame parsedHeadersFrame = assertExpectationsAreMet(headers);
+ Headers.Header viaHeader = parsedHeadersFrame.getHeaders().get("via");
+ assertThat("Via Header name is lowercase", viaHeader.name(), is("via"));
+ }
+
+ private HeadersFrame assertExpectationsAreMet(Headers headers)
+ {
+ ControlFrame parsedControlFrame = listener.getControlFrame();
+ assertThat("listener received controlFrame", parsedControlFrame, notNullValue());
+ assertThat("ControlFrame type is HEADERS", ControlFrameType.HEADERS, is(parsedControlFrame.getType()));
+ HeadersFrame headersFrame = (HeadersFrame)parsedControlFrame;
+ assertThat("Version matches", SPDY.V2, is(headersFrame.getVersion()));
+ assertThat("StreamId matches", streamId, is(headersFrame.getStreamId()));
+ assertThat("flags match", flags, is(headersFrame.getFlags()));
+ assertThat("headers match", headers, is(headersFrame.getHeaders()));
+ return headersFrame;
}
}
diff --git a/jetty-spdy/spdy-jetty-http-webapp/src/main/config/etc/jetty-spdy-proxy.xml b/jetty-spdy/spdy-jetty-http-webapp/src/main/config/etc/jetty-spdy-proxy.xml
index 7d84868..9c637ec 100644
--- a/jetty-spdy/spdy-jetty-http-webapp/src/main/config/etc/jetty-spdy-proxy.xml
+++ b/jetty-spdy/spdy-jetty-http-webapp/src/main/config/etc/jetty-spdy-proxy.xml
@@ -32,24 +32,34 @@
</Call>
<!--
- The ProxyEngine receives SPDY/x(HTTP) requests from proxy connectors below
- and is configured to process requests for host "localhost".
- Such requests are converted from SPDY/x(HTTP) to SPDY/2(HTTP) and forwarded
- to 127.0.0.1:9090, where they are served by the upstream server above.
+ This ProxyEngine translates the incoming SPDY/x(HTTP) request to SPDY/2(HTTP)
-->
- <New id="proxyEngine" class="org.eclipse.jetty.spdy.proxy.SPDYProxyEngine">
+ <New id="spdyProxyEngine" class="org.eclipse.jetty.spdy.proxy.SPDYProxyEngine">
<Arg>
<New class="org.eclipse.jetty.spdy.SPDYClient$Factory">
- <Call name="start" />
+ <Call name="start"/>
</New>
</Arg>
- <Set name="proxyInfos">
+ </New>
+
+ <!--
+ The ProxyEngineSelector receives SPDY/x(HTTP) requests from proxy connectors below
+ and is configured to process requests for host "localhost".
+ Such requests are converted from SPDY/x(HTTP) to SPDY/2(HTTP) by the configured ProxyEngine
+ and forwarded to 127.0.0.1:9090, where they are served by the upstream server above.
+ -->
+ <New id="proxyEngineSelector" class="org.eclipse.jetty.spdy.proxy.ProxyEngineSelector">
+ <Call name="putProxyEngine">
+ <Arg>spdy/2</Arg>
+ <Arg><Ref id="spdyProxyEngine" /></Arg>
+ </Call>
+ <Set name="proxyServerInfos">
<Map>
<Entry>
<Item>localhost</Item>
<Item>
- <New class="org.eclipse.jetty.spdy.proxy.ProxyEngine$ProxyInfo">
- <Arg type="short">2</Arg>
+ <New class="org.eclipse.jetty.spdy.proxy.ProxyEngineSelector$ProxyServerInfo">
+ <Arg type="String">spdy/2</Arg>
<Arg>127.0.0.1</Arg>
<Arg type="int">9090</Arg>
</New>
@@ -69,7 +79,7 @@
<Call name="addConnector">
<Arg>
<New class="org.eclipse.jetty.spdy.proxy.HTTPSPDYProxyConnector">
- <Arg><Ref id="proxyEngine" /></Arg>
+ <Arg><Ref id="proxyEngineSelector" /></Arg>
<Set name="Port">8080</Set>
</New>
</Arg>
@@ -77,7 +87,7 @@
<Call name="addConnector">
<Arg>
<New class="org.eclipse.jetty.spdy.proxy.HTTPSPDYProxyConnector">
- <Arg><Ref id="proxyEngine" /></Arg>
+ <Arg><Ref id="proxyEngineSelector" /></Arg>
<Arg><Ref id="sslContextFactory" /></Arg>
<Set name="Port">8443</Set>
</New>
diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/HTTPSPDYProxyConnector.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/HTTPSPDYProxyConnector.java
index 522363f..2827af0 100644
--- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/HTTPSPDYProxyConnector.java
+++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/HTTPSPDYProxyConnector.java
@@ -21,19 +21,19 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
public class HTTPSPDYProxyConnector extends AbstractHTTPSPDYServerConnector
{
- public HTTPSPDYProxyConnector(SPDYProxyEngine proxyEngine)
+ public HTTPSPDYProxyConnector(ProxyEngineSelector proxyEngineSelector)
{
- this(proxyEngine, null);
+ this(proxyEngineSelector, null);
}
- public HTTPSPDYProxyConnector(SPDYProxyEngine proxyEngine, SslContextFactory sslContextFactory)
+ public HTTPSPDYProxyConnector(ProxyEngineSelector proxyEngineSelector, SslContextFactory sslContextFactory)
{
- super(proxyEngine, sslContextFactory);
+ super(proxyEngineSelector, sslContextFactory);
clearAsyncConnectionFactories();
- putAsyncConnectionFactory("spdy/3", new ServerSPDYAsyncConnectionFactory(SPDY.V3, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngine));
- putAsyncConnectionFactory("spdy/2", new ServerSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngine));
- putAsyncConnectionFactory("http/1.1", new ProxyHTTPAsyncConnectionFactory(this, SPDY.V3, proxyEngine));
+ putAsyncConnectionFactory("spdy/3", new ServerSPDYAsyncConnectionFactory(SPDY.V3, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngineSelector));
+ putAsyncConnectionFactory("spdy/2", new ServerSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngineSelector));
+ putAsyncConnectionFactory("http/1.1", new ProxyHTTPAsyncConnectionFactory(this, SPDY.V2, proxyEngineSelector));
setDefaultAsyncConnectionFactory(getAsyncConnectionFactory("http/1.1"));
}
}
diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyEngine.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyEngine.java
index 4b4ee33..0a17261 100644
--- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyEngine.java
+++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyEngine.java
@@ -15,32 +15,25 @@
package org.eclipse.jetty.spdy.proxy;
import java.net.InetAddress;
-import java.net.InetSocketAddress;
import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
-import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
+import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
- * <p>{@link ProxyEngine} is the base class for SPDY proxy functionalities, that is a proxy that
- * accepts SPDY from its client side and converts to any protocol to its server side.</p>
+ * <p>{@link ProxyEngine} is the class for SPDY proxy functionalities that receives a SPDY request and converts it to
+ * any protocol to its server side.</p>
* <p>This class listens for SPDY events sent by clients; subclasses are responsible for translating
* these SPDY client events into appropriate events to forward to the server, in the appropriate
* protocol that is understood by the server.</p>
- * <p>This class also provides configuration for the proxy rules.</p>
*/
-public abstract class ProxyEngine extends ServerSessionFrameListener.Adapter implements StreamFrameListener
+public abstract class ProxyEngine
{
protected final Logger logger = Log.getLogger(getClass());
- private final ConcurrentMap<String, ProxyInfo> proxyInfos = new ConcurrentHashMap<>();
private final String name;
protected ProxyEngine()
@@ -60,6 +53,8 @@ public abstract class ProxyEngine extends ServerSessionFrameListener.Adapter imp
}
}
+ public abstract StreamFrameListener proxy(Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo);
+
protected ProxyEngine(String name)
{
this.name = name;
@@ -96,46 +91,4 @@ public abstract class ProxyEngine extends ServerSessionFrameListener.Adapter imp
{
}
- public Map<String, ProxyInfo> getProxyInfos()
- {
- return new HashMap<>(proxyInfos);
- }
-
- public void setProxyInfos(Map<String, ProxyInfo> proxyInfos)
- {
- this.proxyInfos.clear();
- this.proxyInfos.putAll(proxyInfos);
- }
-
- public void putProxyInfo(String host, ProxyInfo proxyInfo)
- {
- proxyInfos.put(host, proxyInfo);
- }
-
- protected ProxyInfo getProxyInfo(String host)
- {
- return proxyInfos.get(host);
- }
-
- public static class ProxyInfo
- {
- private final short version;
- private final InetSocketAddress address;
-
- public ProxyInfo(short version, String host, int port)
- {
- this.version = version;
- this.address = new InetSocketAddress(host, port);
- }
-
- public short getVersion()
- {
- return version;
- }
-
- public InetSocketAddress getAddress()
- {
- return address;
- }
- }
}
diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyEngineSelector.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyEngineSelector.java
new file mode 100644
index 0000000..ee23df4
--- /dev/null
+++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyEngineSelector.java
@@ -0,0 +1,166 @@
+package org.eclipse.jetty.spdy.proxy;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.eclipse.jetty.spdy.api.GoAwayInfo;
+import org.eclipse.jetty.spdy.api.Headers;
+import org.eclipse.jetty.spdy.api.PingInfo;
+import org.eclipse.jetty.spdy.api.RstInfo;
+import org.eclipse.jetty.spdy.api.Session;
+import org.eclipse.jetty.spdy.api.Stream;
+import org.eclipse.jetty.spdy.api.StreamFrameListener;
+import org.eclipse.jetty.spdy.api.StreamStatus;
+import org.eclipse.jetty.spdy.api.SynInfo;
+import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
+import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+
+/**
+ * <p>{@link ProxyEngineSelector} is the main entry point for syn stream events of a jetty SPDY proxy. It receives the
+ * syn stream frames from the clients, checks if there's an appropriate {@link ProxyServerInfo} for the given target
+ * host and forwards the syn to a {@link ProxyEngine} for the protocol defined in {@link ProxyServerInfo}.</p>
+ *
+ * <p>If no {@link ProxyServerInfo} can be found for the given target host or no {@link ProxyEngine} can be found for
+ * the given protocol, it resets the client stream.</p>
+ *
+ * <p>This class also provides configuration for the proxy rules.</p>
+ */
+public class ProxyEngineSelector extends ServerSessionFrameListener.Adapter
+{
+ protected final Logger logger = Log.getLogger(getClass());
+ private final Map<String, ProxyServerInfo> proxyInfos = new ConcurrentHashMap<>();
+ private final Map<String, ProxyEngine> proxyEngines = new ConcurrentHashMap<>();
+
+ @Override
+ public final StreamFrameListener onSyn(final Stream clientStream, SynInfo clientSynInfo)
+ {
+ logger.debug("C -> P {} on {}", clientSynInfo, clientStream);
+
+ final Session clientSession = clientStream.getSession();
+ short clientVersion = clientSession.getVersion();
+ Headers headers = new Headers(clientSynInfo.getHeaders(), false);
+
+ Headers.Header hostHeader = headers.get(HTTPSPDYHeader.HOST.name(clientVersion));
+ if (hostHeader == null)
+ {
+ rst(clientStream);
+ return null;
+ }
+
+ String host = hostHeader.value();
+ int colon = host.indexOf(':');
+ if (colon >= 0)
+ host = host.substring(0, colon);
+
+ ProxyServerInfo proxyServerInfo = getProxyServerInfo(host);
+ if (proxyServerInfo == null)
+ {
+ rst(clientStream);
+ return null;
+ }
+
+ String protocol = proxyServerInfo.getProtocol();
+ ProxyEngine proxyEngine = proxyEngines.get(protocol);
+ if (proxyEngine == null)
+ {
+ rst(clientStream);
+ return null;
+ }
+
+ return proxyEngine.proxy(clientStream, clientSynInfo, proxyServerInfo);
+ }
+
+ @Override
+ public void onPing(Session clientSession, PingInfo pingInfo)
+ {
+ // We do not know to which upstream server
+ // to send the PING so we just ignore it
+ }
+
+ @Override
+ public void onGoAway(Session session, GoAwayInfo goAwayInfo)
+ {
+ // TODO:
+ }
+
+ public Map<String, ProxyEngine> getProxyEngines()
+ {
+ return new HashMap<>(proxyEngines);
+ }
+
+ public void setProxyEngines(Map<String, ProxyEngine> proxyEngines)
+ {
+ this.proxyEngines.clear();
+ this.proxyEngines.putAll(proxyEngines);
+ }
+
+ public ProxyEngine getProxyEngine(String protocol)
+ {
+ return proxyEngines.get(protocol);
+ }
+
+ public void putProxyEngine(String protocol, ProxyEngine proxyEngine)
+ {
+ proxyEngines.put(protocol, proxyEngine);
+ }
+
+ public Map<String, ProxyServerInfo> getProxyServerInfos()
+ {
+ return new HashMap<>(proxyInfos);
+ }
+
+ protected ProxyServerInfo getProxyServerInfo(String host)
+ {
+ return proxyInfos.get(host);
+ }
+
+ public void setProxyServerInfos(Map<String, ProxyServerInfo> proxyServerInfos)
+ {
+ this.proxyInfos.clear();
+ this.proxyInfos.putAll(proxyServerInfos);
+ }
+
+ public void putProxyServerInfo(String host, ProxyServerInfo proxyServerInfo)
+ {
+ proxyInfos.put(host, proxyServerInfo);
+ }
+
+ private void rst(Stream stream)
+ {
+ RstInfo rstInfo = new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM);
+ stream.getSession().rst(rstInfo);
+ }
+
+ public static class ProxyServerInfo
+ {
+ private final String protocol;
+ private final String host;
+ private final InetSocketAddress address;
+
+ public ProxyServerInfo(String protocol, String host, int port)
+ {
+ this.protocol = protocol;
+ this.host = host;
+ this.address = new InetSocketAddress(host, port);
+ }
+
+ public String getProtocol()
+ {
+ return protocol;
+ }
+
+ public String getHost()
+ {
+ return host;
+ }
+
+ public InetSocketAddress getAddress()
+ {
+ return address;
+ }
+ }
+}
diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPAsyncConnectionFactory.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPAsyncConnectionFactory.java
index 57a0e71..f73c7e8 100644
--- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPAsyncConnectionFactory.java
+++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPAsyncConnectionFactory.java
@@ -24,18 +24,18 @@ import org.eclipse.jetty.spdy.http.ServerHTTPAsyncConnectionFactory;
public class ProxyHTTPAsyncConnectionFactory extends ServerHTTPAsyncConnectionFactory
{
private final short version;
- private final ProxyEngine proxyEngine;
+ private final ProxyEngineSelector proxyEngineSelector;
- public ProxyHTTPAsyncConnectionFactory(SPDYServerConnector connector, short version, ProxyEngine proxyEngine)
+ public ProxyHTTPAsyncConnectionFactory(SPDYServerConnector connector, short version, ProxyEngineSelector proxyEngineSelector)
{
super(connector);
this.version = version;
- this.proxyEngine = proxyEngine;
+ this.proxyEngineSelector = proxyEngineSelector;
}
@Override
public AsyncConnection newAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment)
{
- return new ProxyHTTPSPDYAsyncConnection(getConnector(), endPoint, version, proxyEngine);
+ return new ProxyHTTPSPDYAsyncConnection(getConnector(), endPoint, version, proxyEngineSelector);
}
}
diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYAsyncConnection.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYAsyncConnection.java
index 1ddcd11..92c3184 100644
--- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYAsyncConnection.java
+++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYAsyncConnection.java
@@ -46,6 +46,7 @@ import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.SessionStatus;
import org.eclipse.jetty.spdy.api.Stream;
+import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
@@ -53,17 +54,17 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection
{
private final Headers headers = new Headers();
private final short version;
- private final ProxyEngine proxyEngine;
+ private final ProxyEngineSelector proxyEngineSelector;
private final HttpGenerator generator;
private final ISession session;
- private Stream stream;
+ private HTTPStream stream;
private Buffer content;
- public ProxyHTTPSPDYAsyncConnection(SPDYServerConnector connector, EndPoint endPoint, short version, ProxyEngine proxyEngine)
+ public ProxyHTTPSPDYAsyncConnection(SPDYServerConnector connector, EndPoint endPoint, short version, ProxyEngineSelector proxyEngineSelector)
{
super(connector, endPoint, connector.getServer());
this.version = version;
- this.proxyEngine = proxyEngine;
+ this.proxyEngineSelector = proxyEngineSelector;
this.generator = (HttpGenerator)_generator;
this.session = new HTTPSession(version, connector);
this.session.setAttribute("org.eclipse.jetty.spdy.remoteAddress", endPoint.getRemoteAddr());
@@ -117,7 +118,7 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection
}
else
{
- proxyEngine.onData(stream, toDataInfo(buffer, false));
+ stream.getStreamFrameListener().onData(stream, toDataInfo(buffer, false));
}
}
@@ -128,23 +129,24 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection
{
assert content == null;
if (headers.isEmpty())
- proxyEngine.onGoAway(session, new GoAwayInfo(0, SessionStatus.OK));
+ proxyEngineSelector.onGoAway(session, new GoAwayInfo(0, SessionStatus.OK));
else
syn(true);
}
else
{
- proxyEngine.onData(stream, toDataInfo(content, true));
+ stream.getStreamFrameListener().onData(stream, toDataInfo(content, true));
}
headers.clear();
stream = null;
content = null;
}
- private Stream syn(boolean close)
+ private HTTPStream syn(boolean close)
{
- Stream stream = new HTTPStream(1, (byte)0, session, null);
- proxyEngine.onSyn(stream, new SynInfo(headers, close));
+ HTTPStream stream = new HTTPStream(1, (byte)0, session, null);
+ StreamFrameListener streamFrameListener = proxyEngineSelector.onSyn(stream, new SynInfo(headers, close));
+ stream.setStreamFrameListener(streamFrameListener);
return stream;
}
@@ -168,7 +170,7 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection
{
private HTTPSession(short version, SPDYServerConnector connector)
{
- super(version, connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), null, null, 1, proxyEngine, null, null);
+ super(version, connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), null, null, 1, proxyEngineSelector, null, null);
}
@Override
diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/SPDYProxyEngine.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/SPDYProxyEngine.java
index ea89a81..c01af3e 100644
--- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/SPDYProxyEngine.java
+++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/SPDYProxyEngine.java
@@ -15,10 +15,8 @@
package org.eclipse.jetty.spdy.proxy;
import java.net.InetSocketAddress;
-import java.util.Collections;
import java.util.LinkedList;
import java.util.Queue;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@@ -30,9 +28,9 @@ import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.HeadersInfo;
-import org.eclipse.jetty.spdy.api.PingInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
+import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.eclipse.jetty.spdy.api.Stream;
@@ -45,11 +43,10 @@ import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
* <p>{@link SPDYProxyEngine} implements a SPDY to SPDY proxy, that is, converts SPDY events received by
* clients into SPDY events for the servers.</p>
*/
-public class SPDYProxyEngine extends ProxyEngine
+public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
{
private static final String STREAM_HANDLER_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.streamHandler";
private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.clientStream";
- private static final String CLIENT_SESSIONS_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.clientSessions";
private final ConcurrentMap<String, Session> serverSessions = new ConcurrentHashMap<>();
private final SessionFrameListener sessionListener = new ProxySessionFrameListener();
@@ -82,68 +79,24 @@ public class SPDYProxyEngine extends ProxyEngine
this.timeout = timeout;
}
- @Override
- public void onPing(Session clientSession, PingInfo pingInfo)
- {
- // We do not know to which upstream server
- // to send the PING so we just ignore it
- }
-
- @Override
- public void onGoAway(Session clientSession, GoAwayInfo goAwayInfo)
+ public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo)
{
- for (Session serverSession : serverSessions.values())
- {
- @SuppressWarnings("unchecked")
- Set<Session> sessions = (Set<Session>)serverSession.getAttribute(CLIENT_SESSIONS_ATTRIBUTE);
- if (sessions.remove(clientSession))
- break;
- }
- }
-
- @Override
- public StreamFrameListener onSyn(final Stream clientStream, SynInfo clientSynInfo)
- {
- logger.debug("C -> P {} on {}", clientSynInfo, clientStream);
-
- final Session clientSession = clientStream.getSession();
- short clientVersion = clientSession.getVersion();
Headers headers = new Headers(clientSynInfo.getHeaders(), false);
- Headers.Header hostHeader = headers.get(HTTPSPDYHeader.HOST.name(clientVersion));
- if (hostHeader == null)
- {
- rst(clientStream);
- return null;
- }
-
- String host = hostHeader.value();
- int colon = host.indexOf(':');
- if (colon >= 0)
- host = host.substring(0, colon);
- ProxyInfo proxyInfo = getProxyInfo(host);
- if (proxyInfo == null)
- {
- rst(clientStream);
- return null;
- }
-
- short serverVersion = proxyInfo.getVersion();
- InetSocketAddress address = proxyInfo.getAddress();
- Session serverSession = produceSession(host, serverVersion, address);
+ short serverVersion = getVersion(proxyServerInfo.getProtocol());
+ InetSocketAddress address = proxyServerInfo.getAddress();
+ Session serverSession = produceSession(proxyServerInfo.getHost(), serverVersion, address);
if (serverSession == null)
{
rst(clientStream);
return null;
}
- @SuppressWarnings("unchecked")
- Set<Session> sessions = (Set<Session>)serverSession.getAttribute(CLIENT_SESSIONS_ATTRIBUTE);
- sessions.add(clientSession);
+ final Session clientSession = clientStream.getSession();
addRequestProxyHeaders(clientStream, headers);
customizeRequestHeaders(clientStream, headers);
- convert(clientVersion, serverVersion, headers);
+ convert(clientSession.getVersion(), serverVersion, headers);
SynInfo serverSynInfo = new SynInfo(headers, clientSynInfo.isClose());
StreamFrameListener listener = new ProxyStreamFrameListener(clientStream);
@@ -153,6 +106,19 @@ public class SPDYProxyEngine extends ProxyEngine
return this;
}
+ private static short getVersion(String protocol)
+ {
+ switch (protocol)
+ {
+ case "spdy/2":
+ return SPDY.V2;
+ case "spdy/3":
+ return SPDY.V3;
+ default:
+ throw new IllegalArgumentException("Procotol: " + protocol + " is not a known SPDY protocol");
+ }
+ }
+
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
@@ -194,7 +160,6 @@ public class SPDYProxyEngine extends ProxyEngine
{
SPDYClient client = factory.newSPDYClient(version);
session = client.connect(address, sessionListener).get(getConnectTimeout(), TimeUnit.MILLISECONDS);
- session.setAttribute(CLIENT_SESSIONS_ATTRIBUTE, Collections.newSetFromMap(new ConcurrentHashMap<Session, Boolean>()));
logger.debug("Proxy session connected to {}", address);
Session existing = serverSessions.putIfAbsent(host, session);
if (existing != null)
@@ -513,10 +478,6 @@ public class SPDYProxyEngine extends ProxyEngine
public void onGoAway(Session serverSession, GoAwayInfo goAwayInfo)
{
serverSessions.values().remove(serverSession);
- @SuppressWarnings("unchecked")
- Set<Session> sessions = (Set<Session>)serverSession.removeAttribute(CLIENT_SESSIONS_ATTRIBUTE);
- for (Session session : sessions)
- session.goAway(getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
}
@Override
@@ -528,7 +489,7 @@ public class SPDYProxyEngine extends ProxyEngine
@Override
public void onHeaders(Stream stream, HeadersInfo headersInfo)
{
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException(); //TODO
}
@Override
diff --git a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYv2Test.java b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYv2Test.java
index a2985e4..6c2c89b 100644
--- a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYv2Test.java
+++ b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYv2Test.java
@@ -91,9 +91,11 @@ public class ProxyHTTPSPDYv2Test
protected InetSocketAddress startProxy(InetSocketAddress address) throws Exception
{
proxy = new Server();
- SPDYProxyEngine proxyEngine = new SPDYProxyEngine(factory);
- proxyEngine.putProxyInfo("localhost", new ProxyEngine.ProxyInfo(version(), address.getHostName(), address.getPort()));
- proxyConnector = new HTTPSPDYProxyConnector(proxyEngine);
+ ProxyEngineSelector proxyEngineSelector = new ProxyEngineSelector();
+ SPDYProxyEngine spdyProxyEngine = new SPDYProxyEngine(factory);
+ proxyEngineSelector.putProxyEngine("spdy/" + version(), spdyProxyEngine);
+ proxyEngineSelector.putProxyServerInfo("localhost", new ProxyEngineSelector.ProxyServerInfo("spdy/" + version(), address.getHostName(), address.getPort()));
+ proxyConnector = new HTTPSPDYProxyConnector(proxyEngineSelector);
proxyConnector.setPort(0);
proxy.addConnector(proxyConnector);
proxy.start();
@@ -172,96 +174,6 @@ public class ProxyHTTPSPDYv2Test
}
@Test
- public void testClosingServerClosesHTTPClient() throws Exception
- {
- InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
- {
- @Override
- public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
- {
- Headers responseHeaders = new Headers();
- responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
- responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
- stream.reply(new ReplyInfo(responseHeaders, true));
- stream.getSession().goAway();
- return null;
- }
- }));
-
- Socket client = new Socket();
- client.connect(proxyAddress);
- OutputStream output = client.getOutputStream();
-
- String request = "" +
- "GET / HTTP/1.1\r\n" +
- "Host: localhost:" + proxyAddress.getPort() + "\r\n" +
- "\r\n";
- output.write(request.getBytes("UTF-8"));
- output.flush();
-
- client.setSoTimeout(1000);
- InputStream input = client.getInputStream();
- BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
- String line = reader.readLine();
- Assert.assertTrue(line.contains(" 200"));
- while (line.length() > 0)
- line = reader.readLine();
- Assert.assertFalse(reader.ready());
-
- Assert.assertNull(reader.readLine());
-
- client.close();
- }
-
- @Test
- public void testClosingServerClosesSPDYClient() throws Exception
- {
- InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
- {
- @Override
- public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
- {
- Headers responseHeaders = new Headers();
- responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
- responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
- stream.reply(new ReplyInfo(responseHeaders, true));
- stream.getSession().goAway();
- return null;
- }
- }));
- proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version()));
-
- final CountDownLatch goAwayLatch = new CountDownLatch(1);
- Session client = factory.newSPDYClient(version()).connect(proxyAddress, new SessionFrameListener.Adapter()
- {
- @Override
- public void onGoAway(Session session, GoAwayInfo goAwayInfo)
- {
- goAwayLatch.countDown();
- }
- }).get(5, TimeUnit.SECONDS);
-
- final CountDownLatch replyLatch = new CountDownLatch(1);
- Headers headers = new Headers();
- headers.put(HTTPSPDYHeader.SCHEME.name(version()), "http");
- headers.put(HTTPSPDYHeader.METHOD.name(version()), "GET");
- headers.put(HTTPSPDYHeader.URI.name(version()), "/");
- headers.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
- headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + proxyAddress.getPort());
- client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
- {
- @Override
- public void onReply(Stream stream, ReplyInfo replyInfo)
- {
- replyLatch.countDown();
- }
- });
-
- Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
- Assert.assertTrue(goAwayLatch.await(5, TimeUnit.SECONDS));
- }
-
- @Test
public void testGETThenNoContentFromTwoClients() throws Exception
{
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()