Skip to main content
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJesse McConnell2012-07-13 14:05:05 +0000
committerJesse McConnell2012-07-13 14:05:05 +0000
commit5222459aa77b7fb3701859d3a35469c1cc205898 (patch)
tree926f07cf53054712bb5e994e901303433c98454c
parent3bcfc2946991975e804c8c00aad550aa0610db07 (diff)
parentc7825731a77810f8a3b24ddffee9ffe5962df5a8 (diff)
downloadorg.eclipse.jetty.project-5222459aa77b7fb3701859d3a35469c1cc205898.tar.gz
org.eclipse.jetty.project-5222459aa77b7fb3701859d3a35469c1cc205898.tar.xz
org.eclipse.jetty.project-5222459aa77b7fb3701859d3a35469c1cc205898.zip
Merge branch 'master' into jetty-8
-rw-r--r--jetty-client/src/test/java/org/eclipse/jetty/client/ExpirationWithLimitedConnectionsTest.java191
-rw-r--r--jetty-monitor/src/main/config/etc/jetty-monitor.xml6
-rw-r--r--jetty-server/src/test/java/org/eclipse/jetty/server/handler/ConnectHandlerTest.java33
-rw-r--r--jetty-servlets/src/main/java/org/eclipse/jetty/servlets/CrossOriginFilter.java16
-rw-r--r--jetty-servlets/src/test/java/org/eclipse/jetty/servlets/CrossOriginFilterTest.java49
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java5
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Headers.java6
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/generator/HeadersBlockGenerator.java2
-rw-r--r--jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/frames/HeadersGenerateParseTest.java94
-rw-r--r--jetty-spdy/spdy-jetty-http-webapp/src/main/config/etc/jetty-spdy-proxy.xml32
-rw-r--r--jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnection.java1
-rw-r--r--jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/HTTPSPDYProxyConnector.java14
-rw-r--r--jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyEngine.java62
-rw-r--r--jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyEngineSelector.java166
-rw-r--r--jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPAsyncConnectionFactory.java8
-rw-r--r--jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYAsyncConnection.java27
-rw-r--r--jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/SPDYProxyEngine.java83
-rw-r--r--jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/PushStrategyBenchmarkTest.java28
-rw-r--r--jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYv2Test.java55
-rw-r--r--jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYv2Test.java98
-rw-r--r--jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java1
21 files changed, 668 insertions, 309 deletions
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ExpirationWithLimitedConnectionsTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ExpirationWithLimitedConnectionsTest.java
new file mode 100644
index 0000000000..3db4a5e72b
--- /dev/null
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ExpirationWithLimitedConnectionsTest.java
@@ -0,0 +1,191 @@
+// ========================================================================
+// Copyright 2012-2012 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+
+package org.eclipse.jetty.client;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ExpirationWithLimitedConnectionsTest
+{
+ @Test
+ public void testExpirationWithMaxConnectionPerAddressReached() throws Exception
+ {
+ final Logger logger = Log.getLogger("org.eclipse.jetty.client");
+ logger.setDebugEnabled(true);
+
+ HttpClient client = new HttpClient();
+ int maxConnectionsPerAddress = 10;
+ client.setMaxConnectionsPerAddress(maxConnectionsPerAddress);
+ long timeout = 1000;
+ client.setTimeout(timeout);
+ client.start();
+
+ final List<Socket> sockets = new CopyOnWriteArrayList<Socket>();
+ final List<Exception> failures = new CopyOnWriteArrayList<Exception>();
+ final AtomicLong processingDelay = new AtomicLong(200);
+
+ final ExecutorService threadPool = Executors.newCachedThreadPool();
+ final ServerSocket server = new ServerSocket(0);
+ threadPool.submit(new Runnable()
+ {
+ public void run()
+ {
+ while (true)
+ {
+ try
+ {
+ final Socket socket = server.accept();
+ sockets.add(socket);
+ logger.debug("CONNECTION {}", socket.getRemoteSocketAddress());
+ threadPool.submit(new Runnable()
+ {
+ public void run()
+ {
+ while (true)
+ {
+ try
+ {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"));
+ String firstLine = reader.readLine();
+ String line = firstLine;
+ while (line != null)
+ {
+ if (line.length() == 0)
+ break;
+ line = reader.readLine();
+ }
+
+ if (line == null)
+ break;
+
+ long sleep = processingDelay.get();
+ logger.debug("{} {} {} ms", firstLine, socket.getRemoteSocketAddress(), sleep);
+ TimeUnit.MILLISECONDS.sleep(sleep);
+
+ String response = "" +
+ "HTTP/1.1 200 OK\r\n" +
+ "Content-Length: 0\r\n" +
+ "\r\n";
+ OutputStream output = socket.getOutputStream();
+ output.write(response.getBytes("UTF-8"));
+ output.flush();
+ }
+ catch (Exception x)
+ {
+ failures.add(x);
+ break;
+ }
+ }
+ }
+ });
+ }
+ catch (Exception x)
+ {
+ failures.add(x);
+ break;
+ }
+ }
+ }
+ });
+
+ List<ContentExchange> exchanges = new ArrayList<ContentExchange>();
+
+ final AtomicBoolean firstExpired = new AtomicBoolean();
+ int count = 0;
+ int maxAdditionalRequest = 100;
+ int additionalRequests = 0;
+ while (true)
+ {
+ TimeUnit.MILLISECONDS.sleep(1); // Just avoid being too fast
+ ContentExchange exchange = new ContentExchange(true)
+ {
+ @Override
+ protected void onResponseComplete() throws IOException
+ {
+ logger.debug("{} {} OK", getMethod(), getRequestURI());
+ }
+
+ @Override
+ protected void onExpire()
+ {
+ logger.debug("{} {} EXPIRED {}", getMethod(), getRequestURI(), this);
+ firstExpired.compareAndSet(false, true);
+ }
+ };
+ exchanges.add(exchange);
+ Address address = new Address("localhost", server.getLocalPort());
+ exchange.setAddress(address);
+ exchange.setMethod("GET");
+ exchange.setRequestURI("/" + count);
+ exchange.setVersion("HTTP/1.1");
+ exchange.setRequestHeader("Host", address.toString());
+ logger.debug("{} {} SENT", exchange.getMethod(), exchange.getRequestURI());
+ client.send(exchange);
+ ++count;
+
+ if (processingDelay.get() > 0)
+ {
+ if (client.getDestination(address, false).getConnections() == maxConnectionsPerAddress)
+ {
+ if (firstExpired.get())
+ {
+ ++additionalRequests;
+ if (additionalRequests == maxAdditionalRequest)
+ processingDelay.set(0);
+ }
+ }
+ }
+ else
+ {
+ ++additionalRequests;
+ if (additionalRequests == 2 * maxAdditionalRequest)
+ break;
+ }
+ }
+
+ for (ContentExchange exchange : exchanges)
+ {
+ int status = exchange.waitForDone();
+ Assert.assertTrue(status == HttpExchange.STATUS_COMPLETED || status == HttpExchange.STATUS_EXPIRED);
+ }
+
+ client.stop();
+
+ Assert.assertTrue(failures.isEmpty());
+
+ for (Socket socket : sockets)
+ socket.close();
+ server.close();
+
+ threadPool.shutdown();
+ threadPool.awaitTermination(5, TimeUnit.SECONDS);
+ }
+}
diff --git a/jetty-monitor/src/main/config/etc/jetty-monitor.xml b/jetty-monitor/src/main/config/etc/jetty-monitor.xml
index 6a866dda28..dc97f88a49 100644
--- a/jetty-monitor/src/main/config/etc/jetty-monitor.xml
+++ b/jetty-monitor/src/main/config/etc/jetty-monitor.xml
@@ -8,14 +8,14 @@
<New class="org.eclipse.jetty.monitor.ThreadMonitor">
<Set name="scanInterval">2000</Set>
<Set name="busyThreshold">90</Set>
- <Set name="stackDepth">3</Set>
+ <Set name="stackDepth">5</Set>
<Set name="trailLength">2</Set>
<!-- To enable logging CPU utilization for threads above specified threshold, -->
<!-- uncomment the following lines, changing log interval (in milliseconds) -->
<!-- and log threshold (in percent) as desired. -->
<!--
- <Set name="logInterval">10000</Arg>
- <Set name="logThreshold">1</Arg>
+ <Set name="logInterval">10000</Set>
+ <Set name="logThreshold">65</Set>
-->
<!-- To enable detail dump of the server whenever a thread is detected as spinning, -->
diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/handler/ConnectHandlerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/handler/ConnectHandlerTest.java
index 722584758a..8a5ac5b5d2 100644
--- a/jetty-server/src/test/java/org/eclipse/jetty/server/handler/ConnectHandlerTest.java
+++ b/jetty-server/src/test/java/org/eclipse/jetty/server/handler/ConnectHandlerTest.java
@@ -12,15 +12,21 @@ package org.eclipse.jetty.server.handler;
//You may elect to redistribute this code under either of these licenses.
//========================================================================
+import static org.junit.Assert.*;
+import static org.junit.Assume.*;
+
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
+import java.net.InetAddress;
import java.net.Socket;
+import java.net.UnknownHostException;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ConcurrentMap;
+
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
@@ -31,13 +37,10 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.toolchain.test.OS;
-import org.eclipse.jetty.util.log.Log;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assume.assumeTrue;
-
/**
* @version $Revision$ $Date$
*/
@@ -120,7 +123,25 @@ public class ConnectHandlerTest extends AbstractConnectHandlerTest
@Test
public void testCONNECTBadHostPort() throws Exception
{
- String hostPort = "badlocalhost:" + serverConnector.getLocalPort();
+ String invalidHostname = "AMAZEBALLS_BADHOST.webtide.com";
+
+ try
+ {
+ InetAddress addr = InetAddress.getByName(invalidHostname);
+ StringBuilder err = new StringBuilder();
+ err.append("DNS Hijacking detected: ");
+ err.append(invalidHostname).append(" should have not returned a valid IP address [");
+ err.append(addr.getHostAddress()).append("]. ");
+ err.append("Fix your DNS provider to have this test pass.");
+ err.append("\nFor more info see https://en.wikipedia.org/wiki/DNS_hijacking");
+ Assert.assertNull(err.toString(), addr);
+ }
+ catch (UnknownHostException e)
+ {
+ // expected path
+ }
+
+ String hostPort = String.format("%s:%d",invalidHostname,serverConnector.getLocalPort());
String request = "" +
"CONNECT " + hostPort + " HTTP/1.1\r\n" +
"Host: " + hostPort + "\r\n" +
@@ -137,7 +158,7 @@ public class ConnectHandlerTest extends AbstractConnectHandlerTest
// Expect 500 OK from the CONNECT request
Response response = readResponse(input);
- assertEquals("500", response.getCode());
+ assertEquals("Response Code", "500", response.getCode());
}
finally
{
diff --git a/jetty-servlets/src/main/java/org/eclipse/jetty/servlets/CrossOriginFilter.java b/jetty-servlets/src/main/java/org/eclipse/jetty/servlets/CrossOriginFilter.java
index eb1b985995..6561166a18 100644
--- a/jetty-servlets/src/main/java/org/eclipse/jetty/servlets/CrossOriginFilter.java
+++ b/jetty-servlets/src/main/java/org/eclipse/jetty/servlets/CrossOriginFilter.java
@@ -105,6 +105,7 @@ public class CrossOriginFilter implements Filter
public static final String PREFLIGHT_MAX_AGE_PARAM = "preflightMaxAge";
public static final String ALLOW_CREDENTIALS_PARAM = "allowCredentials";
public static final String EXPOSED_HEADERS_PARAM = "exposedHeaders";
+ public static final String FORWARD_PREFLIGHT_PARAM = "forwardPreflight";
private static final String ANY_ORIGIN = "*";
private static final List<String> SIMPLE_HTTP_METHODS = Arrays.asList("GET", "POST", "HEAD");
@@ -113,8 +114,9 @@ public class CrossOriginFilter implements Filter
private List<String> allowedMethods = new ArrayList<String>();
private List<String> allowedHeaders = new ArrayList<String>();
private List<String> exposedHeaders = new ArrayList<String>();
- private int preflightMaxAge = 0;
+ private int preflightMaxAge;
private boolean allowCredentials;
+ private boolean forwardPreflight;
public void init(FilterConfig config) throws ServletException
{
@@ -172,6 +174,11 @@ public class CrossOriginFilter implements Filter
exposedHeadersConfig = "";
exposedHeaders.addAll(Arrays.asList(exposedHeadersConfig.split(",")));
+ String forwardPreflightConfig = config.getInitParameter(FORWARD_PREFLIGHT_PARAM);
+ if (forwardPreflightConfig == null)
+ forwardPreflightConfig = "true";
+ forwardPreflight = Boolean.parseBoolean(forwardPreflightConfig);
+
if (LOG.isDebugEnabled())
{
LOG.debug("Cross-origin filter configuration: " +
@@ -180,7 +187,8 @@ public class CrossOriginFilter implements Filter
ALLOWED_HEADERS_PARAM + " = " + allowedHeadersConfig + ", " +
PREFLIGHT_MAX_AGE_PARAM + " = " + preflightMaxAgeConfig + ", " +
ALLOW_CREDENTIALS_PARAM + " = " + allowedCredentialsConfig + "," +
- EXPOSED_HEADERS_PARAM + " = " + exposedHeadersConfig
+ EXPOSED_HEADERS_PARAM + " = " + exposedHeadersConfig + "," +
+ FORWARD_PREFLIGHT_PARAM + " = " + forwardPreflightConfig
);
}
}
@@ -207,6 +215,10 @@ public class CrossOriginFilter implements Filter
{
LOG.debug("Cross-origin request to {} is a preflight cross-origin request", request.getRequestURI());
handlePreflightResponse(request, response, origin);
+ if (forwardPreflight)
+ LOG.debug("Preflight cross-origin request to {} forwarded to application", request.getRequestURI());
+ else
+ return;
}
else
{
diff --git a/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/CrossOriginFilterTest.java b/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/CrossOriginFilterTest.java
index da4b571772..c250165550 100644
--- a/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/CrossOriginFilterTest.java
+++ b/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/CrossOriginFilterTest.java
@@ -1,16 +1,17 @@
+// ========================================================================
+// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+
package org.eclipse.jetty.servlets;
-//========================================================================
-//Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
-//------------------------------------------------------------------------
-//All rights reserved. This program and the accompanying materials
-//are made available under the terms of the Eclipse Public License v1.0
-//and Apache License v2.0 which accompanies this distribution.
-//The Eclipse Public License is available at
-//http://www.eclipse.org/legal/epl-v10.html
-//The Apache License v2.0 is available at
-//http://www.opensource.org/licenses/apache2.0.php
-//You may elect to redistribute this code under either of these licenses.
-//========================================================================
import java.io.IOException;
import java.util.EnumSet;
@@ -407,6 +408,30 @@ public class CrossOriginFilterTest
Assert.assertTrue(latch.await(1, TimeUnit.SECONDS));
}
+ @Test
+ public void testForwardPreflightRequest() throws Exception
+ {
+ FilterHolder filterHolder = new FilterHolder(new CrossOriginFilter());
+ filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_METHODS_PARAM, "PUT");
+ filterHolder.setInitParameter(CrossOriginFilter.FORWARD_PREFLIGHT_PARAM, "false");
+ tester.getContext().addFilter(filterHolder, "/*", FilterMapping.DEFAULT);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ tester.getContext().addServlet(new ServletHolder(new ResourceServlet(latch)), "/*");
+
+ // Preflight request
+ String request = "" +
+ "OPTIONS / HTTP/1.1\r\n" +
+ "Host: localhost\r\n" +
+ CrossOriginFilter.ACCESS_CONTROL_REQUEST_METHOD_HEADER + ": PUT\r\n" +
+ "Origin: http://localhost\r\n" +
+ "\r\n";
+ String response = tester.getResponses(request);
+ Assert.assertTrue(response.contains("HTTP/1.1 200"));
+ Assert.assertTrue(response.contains(CrossOriginFilter.ACCESS_CONTROL_ALLOW_METHODS_HEADER));
+ Assert.assertFalse(latch.await(1, TimeUnit.SECONDS));
+ }
+
public static class ResourceServlet extends HttpServlet
{
private static final long serialVersionUID = 1L;
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java
index 6019e291bb..360a639591 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java
@@ -138,6 +138,11 @@ public class StandardStream implements IStream
this.listener = listener;
}
+ public StreamFrameListener getStreamFrameListener()
+ {
+ return listener;
+ }
+
@Override
public void updateCloseState(boolean close, boolean local)
{
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Headers.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Headers.java
index cef58293fb..261a94f730 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Headers.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Headers.java
@@ -216,13 +216,15 @@ public class Headers implements Iterable<Headers.Header>
if (obj == null || getClass() != obj.getClass())
return false;
Header that = (Header)obj;
- return name.equals(that.name) && Arrays.equals(values, that.values);
+ // Header names must be lowercase, thus we lowercase them before transmission, but keep them as is
+ // internally. That's why we've to compare them case insensitive.
+ return name.equalsIgnoreCase(that.name) && Arrays.equals(values, that.values);
}
@Override
public int hashCode()
{
- int result = name.hashCode();
+ int result = name.toLowerCase().hashCode();
result = 31 * result + Arrays.hashCode(values);
return result;
}
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/generator/HeadersBlockGenerator.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/generator/HeadersBlockGenerator.java
index af997855c5..6290841b79 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/generator/HeadersBlockGenerator.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/generator/HeadersBlockGenerator.java
@@ -40,7 +40,7 @@ public class HeadersBlockGenerator
writeCount(version, buffer, headers.size());
for (Headers.Header header : headers)
{
- String name = header.name();
+ String name = header.name().toLowerCase();
byte[] nameBytes = name.getBytes(iso1);
writeNameLength(version, buffer, nameBytes.length);
buffer.write(nameBytes, 0, nameBytes.length);
diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/frames/HeadersGenerateParseTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/frames/HeadersGenerateParseTest.java
index 545aec037e..fd5a36de3d 100644
--- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/frames/HeadersGenerateParseTest.java
+++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/frames/HeadersGenerateParseTest.java
@@ -22,65 +22,77 @@ import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.generator.Generator;
import org.eclipse.jetty.spdy.parser.Parser;
-import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
public class HeadersGenerateParseTest
{
- @Test
- public void testGenerateParse() throws Exception
+
+ private Headers headers = new Headers();
+ private int streamId = 13;
+ private byte flags = HeadersInfo.FLAG_RESET_COMPRESSION;
+ private final TestSPDYParserListener listener = new TestSPDYParserListener();
+ private final Parser parser = new Parser(new StandardCompressionFactory().newDecompressor());
+ private ByteBuffer buffer;
+
+ @Before
+ public void setUp()
{
- byte flags = HeadersInfo.FLAG_RESET_COMPRESSION;
- int streamId = 13;
- Headers headers = new Headers();
+ parser.addListener(listener);
headers.put("a", "b");
+ buffer = createHeadersFrameBuffer(headers);
+ }
+
+ private ByteBuffer createHeadersFrameBuffer(Headers headers)
+ {
HeadersFrame frame1 = new HeadersFrame(SPDY.V2, flags, streamId, headers);
Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
ByteBuffer buffer = generator.control(frame1);
+ assertThat("Buffer is not null", buffer, notNullValue());
+ return buffer;
+ }
- Assert.assertNotNull(buffer);
-
- TestSPDYParserListener listener = new TestSPDYParserListener();
- Parser parser = new Parser(new StandardCompressionFactory().newDecompressor());
- parser.addListener(listener);
+ @Test
+ public void testGenerateParse() throws Exception
+ {
parser.parse(buffer);
- ControlFrame frame2 = listener.getControlFrame();
-
- Assert.assertNotNull(frame2);
- Assert.assertEquals(ControlFrameType.HEADERS, frame2.getType());
- HeadersFrame headersFrame = (HeadersFrame)frame2;
- Assert.assertEquals(SPDY.V2, headersFrame.getVersion());
- Assert.assertEquals(streamId, headersFrame.getStreamId());
- Assert.assertEquals(flags, headersFrame.getFlags());
- Assert.assertEquals(headers, headersFrame.getHeaders());
+ assertExpectationsAreMet(headers);
}
@Test
public void testGenerateParseOneByteAtATime() throws Exception
{
- byte flags = HeadersInfo.FLAG_RESET_COMPRESSION;
- int streamId = 13;
- Headers headers = new Headers();
- headers.put("a", "b");
- HeadersFrame frame1 = new HeadersFrame(SPDY.V2, flags, streamId, headers);
- Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor());
- ByteBuffer buffer = generator.control(frame1);
-
- Assert.assertNotNull(buffer);
-
- TestSPDYParserListener listener = new TestSPDYParserListener();
- Parser parser = new Parser(new StandardCompressionFactory().newDecompressor());
- parser.addListener(listener);
while (buffer.hasRemaining())
parser.parse(ByteBuffer.wrap(new byte[]{buffer.get()}));
- ControlFrame frame2 = listener.getControlFrame();
- Assert.assertNotNull(frame2);
- Assert.assertEquals(ControlFrameType.HEADERS, frame2.getType());
- HeadersFrame headersFrame = (HeadersFrame)frame2;
- Assert.assertEquals(SPDY.V2, headersFrame.getVersion());
- Assert.assertEquals(streamId, headersFrame.getStreamId());
- Assert.assertEquals(flags, headersFrame.getFlags());
- Assert.assertEquals(headers, headersFrame.getHeaders());
+ assertExpectationsAreMet(headers);
+ }
+
+ @Test
+ public void testHeadersAreTranslatedToLowerCase()
+ {
+ Headers headers = new Headers();
+ headers.put("Via","localhost");
+ parser.parse(createHeadersFrameBuffer(headers));
+ HeadersFrame parsedHeadersFrame = assertExpectationsAreMet(headers);
+ Headers.Header viaHeader = parsedHeadersFrame.getHeaders().get("via");
+ assertThat("Via Header name is lowercase", viaHeader.name(), is("via"));
+ }
+
+ private HeadersFrame assertExpectationsAreMet(Headers headers)
+ {
+ ControlFrame parsedControlFrame = listener.getControlFrame();
+ assertThat("listener received controlFrame", parsedControlFrame, notNullValue());
+ assertThat("ControlFrame type is HEADERS", ControlFrameType.HEADERS, is(parsedControlFrame.getType()));
+ HeadersFrame headersFrame = (HeadersFrame)parsedControlFrame;
+ assertThat("Version matches", SPDY.V2, is(headersFrame.getVersion()));
+ assertThat("StreamId matches", streamId, is(headersFrame.getStreamId()));
+ assertThat("flags match", flags, is(headersFrame.getFlags()));
+ assertThat("headers match", headers, is(headersFrame.getHeaders()));
+ return headersFrame;
}
}
diff --git a/jetty-spdy/spdy-jetty-http-webapp/src/main/config/etc/jetty-spdy-proxy.xml b/jetty-spdy/spdy-jetty-http-webapp/src/main/config/etc/jetty-spdy-proxy.xml
index 7d84868619..9c637ec41f 100644
--- a/jetty-spdy/spdy-jetty-http-webapp/src/main/config/etc/jetty-spdy-proxy.xml
+++ b/jetty-spdy/spdy-jetty-http-webapp/src/main/config/etc/jetty-spdy-proxy.xml
@@ -32,24 +32,34 @@
</Call>
<!--
- The ProxyEngine receives SPDY/x(HTTP) requests from proxy connectors below
- and is configured to process requests for host "localhost".
- Such requests are converted from SPDY/x(HTTP) to SPDY/2(HTTP) and forwarded
- to 127.0.0.1:9090, where they are served by the upstream server above.
+ This ProxyEngine translates the incoming SPDY/x(HTTP) request to SPDY/2(HTTP)
-->
- <New id="proxyEngine" class="org.eclipse.jetty.spdy.proxy.SPDYProxyEngine">
+ <New id="spdyProxyEngine" class="org.eclipse.jetty.spdy.proxy.SPDYProxyEngine">
<Arg>
<New class="org.eclipse.jetty.spdy.SPDYClient$Factory">
- <Call name="start" />
+ <Call name="start"/>
</New>
</Arg>
- <Set name="proxyInfos">
+ </New>
+
+ <!--
+ The ProxyEngineSelector receives SPDY/x(HTTP) requests from proxy connectors below
+ and is configured to process requests for host "localhost".
+ Such requests are converted from SPDY/x(HTTP) to SPDY/2(HTTP) by the configured ProxyEngine
+ and forwarded to 127.0.0.1:9090, where they are served by the upstream server above.
+ -->
+ <New id="proxyEngineSelector" class="org.eclipse.jetty.spdy.proxy.ProxyEngineSelector">
+ <Call name="putProxyEngine">
+ <Arg>spdy/2</Arg>
+ <Arg><Ref id="spdyProxyEngine" /></Arg>
+ </Call>
+ <Set name="proxyServerInfos">
<Map>
<Entry>
<Item>localhost</Item>
<Item>
- <New class="org.eclipse.jetty.spdy.proxy.ProxyEngine$ProxyInfo">
- <Arg type="short">2</Arg>
+ <New class="org.eclipse.jetty.spdy.proxy.ProxyEngineSelector$ProxyServerInfo">
+ <Arg type="String">spdy/2</Arg>
<Arg>127.0.0.1</Arg>
<Arg type="int">9090</Arg>
</New>
@@ -69,7 +79,7 @@
<Call name="addConnector">
<Arg>
<New class="org.eclipse.jetty.spdy.proxy.HTTPSPDYProxyConnector">
- <Arg><Ref id="proxyEngine" /></Arg>
+ <Arg><Ref id="proxyEngineSelector" /></Arg>
<Set name="Port">8080</Set>
</New>
</Arg>
@@ -77,7 +87,7 @@
<Call name="addConnector">
<Arg>
<New class="org.eclipse.jetty.spdy.proxy.HTTPSPDYProxyConnector">
- <Arg><Ref id="proxyEngine" /></Arg>
+ <Arg><Ref id="proxyEngineSelector" /></Arg>
<Arg><Ref id="sslContextFactory" /></Arg>
<Set name="Port">8443</Set>
</New>
diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnection.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnection.java
index 67b37ed52c..ecc241ea97 100644
--- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnection.java
+++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnection.java
@@ -722,6 +722,7 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem
logger.debug("HTTP < {} bytes of content", dataInfo.length());
stream.data(dataInfo).get(maxIdleTime, TimeUnit.MILLISECONDS);
content.clear();
+ _bypass = false;
content = getContentBuffer();
}
}
diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/HTTPSPDYProxyConnector.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/HTTPSPDYProxyConnector.java
index 522363f5cc..2827af05ce 100644
--- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/HTTPSPDYProxyConnector.java
+++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/HTTPSPDYProxyConnector.java
@@ -21,19 +21,19 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
public class HTTPSPDYProxyConnector extends AbstractHTTPSPDYServerConnector
{
- public HTTPSPDYProxyConnector(SPDYProxyEngine proxyEngine)
+ public HTTPSPDYProxyConnector(ProxyEngineSelector proxyEngineSelector)
{
- this(proxyEngine, null);
+ this(proxyEngineSelector, null);
}
- public HTTPSPDYProxyConnector(SPDYProxyEngine proxyEngine, SslContextFactory sslContextFactory)
+ public HTTPSPDYProxyConnector(ProxyEngineSelector proxyEngineSelector, SslContextFactory sslContextFactory)
{
- super(proxyEngine, sslContextFactory);
+ super(proxyEngineSelector, sslContextFactory);
clearAsyncConnectionFactories();
- putAsyncConnectionFactory("spdy/3", new ServerSPDYAsyncConnectionFactory(SPDY.V3, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngine));
- putAsyncConnectionFactory("spdy/2", new ServerSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngine));
- putAsyncConnectionFactory("http/1.1", new ProxyHTTPAsyncConnectionFactory(this, SPDY.V3, proxyEngine));
+ putAsyncConnectionFactory("spdy/3", new ServerSPDYAsyncConnectionFactory(SPDY.V3, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngineSelector));
+ putAsyncConnectionFactory("spdy/2", new ServerSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngineSelector));
+ putAsyncConnectionFactory("http/1.1", new ProxyHTTPAsyncConnectionFactory(this, SPDY.V2, proxyEngineSelector));
setDefaultAsyncConnectionFactory(getAsyncConnectionFactory("http/1.1"));
}
}
diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyEngine.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyEngine.java
index 2ba7fba538..0a172614a5 100644
--- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyEngine.java
+++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyEngine.java
@@ -15,32 +15,25 @@
package org.eclipse.jetty.spdy.proxy;
import java.net.InetAddress;
-import java.net.InetSocketAddress;
import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
-import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
+import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
- * <p>{@link ProxyEngine} is the base class for SPDY proxy functionalities, that is a proxy that
- * accepts SPDY from its client side and converts to any protocol to its server side.</p>
+ * <p>{@link ProxyEngine} is the class for SPDY proxy functionalities that receives a SPDY request and converts it to
+ * any protocol to its server side.</p>
* <p>This class listens for SPDY events sent by clients; subclasses are responsible for translating
* these SPDY client events into appropriate events to forward to the server, in the appropriate
* protocol that is understood by the server.</p>
- * <p>This class also provides configuration for the proxy rules.</p>
*/
-public abstract class ProxyEngine extends ServerSessionFrameListener.Adapter implements StreamFrameListener
+public abstract class ProxyEngine
{
protected final Logger logger = Log.getLogger(getClass());
- private final ConcurrentMap<String, ProxyInfo> proxyInfos = new ConcurrentHashMap<>();
private final String name;
protected ProxyEngine()
@@ -60,6 +53,8 @@ public abstract class ProxyEngine extends ServerSessionFrameListener.Adapter imp
}
}
+ public abstract StreamFrameListener proxy(Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo);
+
protected ProxyEngine(String name)
{
this.name = name;
@@ -73,6 +68,9 @@ public abstract class ProxyEngine extends ServerSessionFrameListener.Adapter imp
protected void addRequestProxyHeaders(Stream stream, Headers headers)
{
addViaHeader(headers);
+ String address = (String)stream.getSession().getAttribute("org.eclipse.jetty.spdy.remoteAddress");
+ if (address != null)
+ headers.add("X-Forwarded-For", address);
}
protected void addResponseProxyHeaders(Stream stream, Headers headers)
@@ -93,46 +91,4 @@ public abstract class ProxyEngine extends ServerSessionFrameListener.Adapter imp
{
}
- public Map<String, ProxyInfo> getProxyInfos()
- {
- return new HashMap<>(proxyInfos);
- }
-
- public void setProxyInfos(Map<String, ProxyInfo> proxyInfos)
- {
- this.proxyInfos.clear();
- this.proxyInfos.putAll(proxyInfos);
- }
-
- public void putProxyInfo(String host, ProxyInfo proxyInfo)
- {
- proxyInfos.put(host, proxyInfo);
- }
-
- protected ProxyInfo getProxyInfo(String host)
- {
- return proxyInfos.get(host);
- }
-
- public static class ProxyInfo
- {
- private final short version;
- private final InetSocketAddress address;
-
- public ProxyInfo(short version, String host, int port)
- {
- this.version = version;
- this.address = new InetSocketAddress(host, port);
- }
-
- public short getVersion()
- {
- return version;
- }
-
- public InetSocketAddress getAddress()
- {
- return address;
- }
- }
}
diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyEngineSelector.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyEngineSelector.java
new file mode 100644
index 0000000000..ee23df4dbc
--- /dev/null
+++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyEngineSelector.java
@@ -0,0 +1,166 @@
+package org.eclipse.jetty.spdy.proxy;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.eclipse.jetty.spdy.api.GoAwayInfo;
+import org.eclipse.jetty.spdy.api.Headers;
+import org.eclipse.jetty.spdy.api.PingInfo;
+import org.eclipse.jetty.spdy.api.RstInfo;
+import org.eclipse.jetty.spdy.api.Session;
+import org.eclipse.jetty.spdy.api.Stream;
+import org.eclipse.jetty.spdy.api.StreamFrameListener;
+import org.eclipse.jetty.spdy.api.StreamStatus;
+import org.eclipse.jetty.spdy.api.SynInfo;
+import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
+import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+
+/**
+ * <p>{@link ProxyEngineSelector} is the main entry point for syn stream events of a jetty SPDY proxy. It receives the
+ * syn stream frames from the clients, checks if there's an appropriate {@link ProxyServerInfo} for the given target
+ * host and forwards the syn to a {@link ProxyEngine} for the protocol defined in {@link ProxyServerInfo}.</p>
+ *
+ * <p>If no {@link ProxyServerInfo} can be found for the given target host or no {@link ProxyEngine} can be found for
+ * the given protocol, it resets the client stream.</p>
+ *
+ * <p>This class also provides configuration for the proxy rules.</p>
+ */
+public class ProxyEngineSelector extends ServerSessionFrameListener.Adapter
+{
+ protected final Logger logger = Log.getLogger(getClass());
+ private final Map<String, ProxyServerInfo> proxyInfos = new ConcurrentHashMap<>();
+ private final Map<String, ProxyEngine> proxyEngines = new ConcurrentHashMap<>();
+
+ @Override
+ public final StreamFrameListener onSyn(final Stream clientStream, SynInfo clientSynInfo)
+ {
+ logger.debug("C -> P {} on {}", clientSynInfo, clientStream);
+
+ final Session clientSession = clientStream.getSession();
+ short clientVersion = clientSession.getVersion();
+ Headers headers = new Headers(clientSynInfo.getHeaders(), false);
+
+ Headers.Header hostHeader = headers.get(HTTPSPDYHeader.HOST.name(clientVersion));
+ if (hostHeader == null)
+ {
+ rst(clientStream);
+ return null;
+ }
+
+ String host = hostHeader.value();
+ int colon = host.indexOf(':');
+ if (colon >= 0)
+ host = host.substring(0, colon);
+
+ ProxyServerInfo proxyServerInfo = getProxyServerInfo(host);
+ if (proxyServerInfo == null)
+ {
+ rst(clientStream);
+ return null;
+ }
+
+ String protocol = proxyServerInfo.getProtocol();
+ ProxyEngine proxyEngine = proxyEngines.get(protocol);
+ if (proxyEngine == null)
+ {
+ rst(clientStream);
+ return null;
+ }
+
+ return proxyEngine.proxy(clientStream, clientSynInfo, proxyServerInfo);
+ }
+
+ @Override
+ public void onPing(Session clientSession, PingInfo pingInfo)
+ {
+ // We do not know to which upstream server
+ // to send the PING so we just ignore it
+ }
+
+ @Override
+ public void onGoAway(Session session, GoAwayInfo goAwayInfo)
+ {
+ // TODO:
+ }
+
+ public Map<String, ProxyEngine> getProxyEngines()
+ {
+ return new HashMap<>(proxyEngines);
+ }
+
+ public void setProxyEngines(Map<String, ProxyEngine> proxyEngines)
+ {
+ this.proxyEngines.clear();
+ this.proxyEngines.putAll(proxyEngines);
+ }
+
+ public ProxyEngine getProxyEngine(String protocol)
+ {
+ return proxyEngines.get(protocol);
+ }
+
+ public void putProxyEngine(String protocol, ProxyEngine proxyEngine)
+ {
+ proxyEngines.put(protocol, proxyEngine);
+ }
+
+ public Map<String, ProxyServerInfo> getProxyServerInfos()
+ {
+ return new HashMap<>(proxyInfos);
+ }
+
+ protected ProxyServerInfo getProxyServerInfo(String host)
+ {
+ return proxyInfos.get(host);
+ }
+
+ public void setProxyServerInfos(Map<String, ProxyServerInfo> proxyServerInfos)
+ {
+ this.proxyInfos.clear();
+ this.proxyInfos.putAll(proxyServerInfos);
+ }
+
+ public void putProxyServerInfo(String host, ProxyServerInfo proxyServerInfo)
+ {
+ proxyInfos.put(host, proxyServerInfo);
+ }
+
+ private void rst(Stream stream)
+ {
+ RstInfo rstInfo = new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM);
+ stream.getSession().rst(rstInfo);
+ }
+
+ public static class ProxyServerInfo
+ {
+ private final String protocol;
+ private final String host;
+ private final InetSocketAddress address;
+
+ public ProxyServerInfo(String protocol, String host, int port)
+ {
+ this.protocol = protocol;
+ this.host = host;
+ this.address = new InetSocketAddress(host, port);
+ }
+
+ public String getProtocol()
+ {
+ return protocol;
+ }
+
+ public String getHost()
+ {
+ return host;
+ }
+
+ public InetSocketAddress getAddress()
+ {
+ return address;
+ }
+ }
+}
diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPAsyncConnectionFactory.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPAsyncConnectionFactory.java
index 57a0e71973..f73c7e819b 100644
--- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPAsyncConnectionFactory.java
+++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPAsyncConnectionFactory.java
@@ -24,18 +24,18 @@ import org.eclipse.jetty.spdy.http.ServerHTTPAsyncConnectionFactory;
public class ProxyHTTPAsyncConnectionFactory extends ServerHTTPAsyncConnectionFactory
{
private final short version;
- private final ProxyEngine proxyEngine;
+ private final ProxyEngineSelector proxyEngineSelector;
- public ProxyHTTPAsyncConnectionFactory(SPDYServerConnector connector, short version, ProxyEngine proxyEngine)
+ public ProxyHTTPAsyncConnectionFactory(SPDYServerConnector connector, short version, ProxyEngineSelector proxyEngineSelector)
{
super(connector);
this.version = version;
- this.proxyEngine = proxyEngine;
+ this.proxyEngineSelector = proxyEngineSelector;
}
@Override
public AsyncConnection newAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment)
{
- return new ProxyHTTPSPDYAsyncConnection(getConnector(), endPoint, version, proxyEngine);
+ return new ProxyHTTPSPDYAsyncConnection(getConnector(), endPoint, version, proxyEngineSelector);
}
}
diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYAsyncConnection.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYAsyncConnection.java
index e55be4b6b0..92c318479a 100644
--- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYAsyncConnection.java
+++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYAsyncConnection.java
@@ -46,6 +46,7 @@ import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.SessionStatus;
import org.eclipse.jetty.spdy.api.Stream;
+import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
@@ -53,19 +54,20 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection
{
private final Headers headers = new Headers();
private final short version;
- private final ProxyEngine proxyEngine;
+ private final ProxyEngineSelector proxyEngineSelector;
private final HttpGenerator generator;
private final ISession session;
- private Stream stream;
+ private HTTPStream stream;
private Buffer content;
- public ProxyHTTPSPDYAsyncConnection(SPDYServerConnector connector, EndPoint endpoint, short version, ProxyEngine proxyEngine)
+ public ProxyHTTPSPDYAsyncConnection(SPDYServerConnector connector, EndPoint endPoint, short version, ProxyEngineSelector proxyEngineSelector)
{
- super(connector, endpoint, connector.getServer());
+ super(connector, endPoint, connector.getServer());
this.version = version;
- this.proxyEngine = proxyEngine;
+ this.proxyEngineSelector = proxyEngineSelector;
this.generator = (HttpGenerator)_generator;
this.session = new HTTPSession(version, connector);
+ this.session.setAttribute("org.eclipse.jetty.spdy.remoteAddress", endPoint.getRemoteAddr());
}
@Override
@@ -116,7 +118,7 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection
}
else
{
- proxyEngine.onData(stream, toDataInfo(buffer, false));
+ stream.getStreamFrameListener().onData(stream, toDataInfo(buffer, false));
}
}
@@ -127,23 +129,24 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection
{
assert content == null;
if (headers.isEmpty())
- proxyEngine.onGoAway(session, new GoAwayInfo(0, SessionStatus.OK));
+ proxyEngineSelector.onGoAway(session, new GoAwayInfo(0, SessionStatus.OK));
else
syn(true);
}
else
{
- proxyEngine.onData(stream, toDataInfo(content, true));
+ stream.getStreamFrameListener().onData(stream, toDataInfo(content, true));
}
headers.clear();
stream = null;
content = null;
}
- private Stream syn(boolean close)
+ private HTTPStream syn(boolean close)
{
- Stream stream = new HTTPStream(1, (byte)0, session, null);
- proxyEngine.onSyn(stream, new SynInfo(headers, close));
+ HTTPStream stream = new HTTPStream(1, (byte)0, session, null);
+ StreamFrameListener streamFrameListener = proxyEngineSelector.onSyn(stream, new SynInfo(headers, close));
+ stream.setStreamFrameListener(streamFrameListener);
return stream;
}
@@ -167,7 +170,7 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection
{
private HTTPSession(short version, SPDYServerConnector connector)
{
- super(version, connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), null, null, 1, proxyEngine, null, null);
+ super(version, connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), null, null, 1, proxyEngineSelector, null, null);
}
@Override
diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/SPDYProxyEngine.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/SPDYProxyEngine.java
index ea89a81fa8..c01af3ef0f 100644
--- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/SPDYProxyEngine.java
+++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/SPDYProxyEngine.java
@@ -15,10 +15,8 @@
package org.eclipse.jetty.spdy.proxy;
import java.net.InetSocketAddress;
-import java.util.Collections;
import java.util.LinkedList;
import java.util.Queue;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@@ -30,9 +28,9 @@ import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.HeadersInfo;
-import org.eclipse.jetty.spdy.api.PingInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
+import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.eclipse.jetty.spdy.api.Stream;
@@ -45,11 +43,10 @@ import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
* <p>{@link SPDYProxyEngine} implements a SPDY to SPDY proxy, that is, converts SPDY events received by
* clients into SPDY events for the servers.</p>
*/
-public class SPDYProxyEngine extends ProxyEngine
+public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
{
private static final String STREAM_HANDLER_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.streamHandler";
private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.clientStream";
- private static final String CLIENT_SESSIONS_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.clientSessions";
private final ConcurrentMap<String, Session> serverSessions = new ConcurrentHashMap<>();
private final SessionFrameListener sessionListener = new ProxySessionFrameListener();
@@ -82,68 +79,24 @@ public class SPDYProxyEngine extends ProxyEngine
this.timeout = timeout;
}
- @Override
- public void onPing(Session clientSession, PingInfo pingInfo)
- {
- // We do not know to which upstream server
- // to send the PING so we just ignore it
- }
-
- @Override
- public void onGoAway(Session clientSession, GoAwayInfo goAwayInfo)
+ public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo)
{
- for (Session serverSession : serverSessions.values())
- {
- @SuppressWarnings("unchecked")
- Set<Session> sessions = (Set<Session>)serverSession.getAttribute(CLIENT_SESSIONS_ATTRIBUTE);
- if (sessions.remove(clientSession))
- break;
- }
- }
-
- @Override
- public StreamFrameListener onSyn(final Stream clientStream, SynInfo clientSynInfo)
- {
- logger.debug("C -> P {} on {}", clientSynInfo, clientStream);
-
- final Session clientSession = clientStream.getSession();
- short clientVersion = clientSession.getVersion();
Headers headers = new Headers(clientSynInfo.getHeaders(), false);
- Headers.Header hostHeader = headers.get(HTTPSPDYHeader.HOST.name(clientVersion));
- if (hostHeader == null)
- {
- rst(clientStream);
- return null;
- }
-
- String host = hostHeader.value();
- int colon = host.indexOf(':');
- if (colon >= 0)
- host = host.substring(0, colon);
- ProxyInfo proxyInfo = getProxyInfo(host);
- if (proxyInfo == null)
- {
- rst(clientStream);
- return null;
- }
-
- short serverVersion = proxyInfo.getVersion();
- InetSocketAddress address = proxyInfo.getAddress();
- Session serverSession = produceSession(host, serverVersion, address);
+ short serverVersion = getVersion(proxyServerInfo.getProtocol());
+ InetSocketAddress address = proxyServerInfo.getAddress();
+ Session serverSession = produceSession(proxyServerInfo.getHost(), serverVersion, address);
if (serverSession == null)
{
rst(clientStream);
return null;
}
- @SuppressWarnings("unchecked")
- Set<Session> sessions = (Set<Session>)serverSession.getAttribute(CLIENT_SESSIONS_ATTRIBUTE);
- sessions.add(clientSession);
+ final Session clientSession = clientStream.getSession();
addRequestProxyHeaders(clientStream, headers);
customizeRequestHeaders(clientStream, headers);
- convert(clientVersion, serverVersion, headers);
+ convert(clientSession.getVersion(), serverVersion, headers);
SynInfo serverSynInfo = new SynInfo(headers, clientSynInfo.isClose());
StreamFrameListener listener = new ProxyStreamFrameListener(clientStream);
@@ -153,6 +106,19 @@ public class SPDYProxyEngine extends ProxyEngine
return this;
}
+ private static short getVersion(String protocol)
+ {
+ switch (protocol)
+ {
+ case "spdy/2":
+ return SPDY.V2;
+ case "spdy/3":
+ return SPDY.V3;
+ default:
+ throw new IllegalArgumentException("Procotol: " + protocol + " is not a known SPDY protocol");
+ }
+ }
+
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
@@ -194,7 +160,6 @@ public class SPDYProxyEngine extends ProxyEngine
{
SPDYClient client = factory.newSPDYClient(version);
session = client.connect(address, sessionListener).get(getConnectTimeout(), TimeUnit.MILLISECONDS);
- session.setAttribute(CLIENT_SESSIONS_ATTRIBUTE, Collections.newSetFromMap(new ConcurrentHashMap<Session, Boolean>()));
logger.debug("Proxy session connected to {}", address);
Session existing = serverSessions.putIfAbsent(host, session);
if (existing != null)
@@ -513,10 +478,6 @@ public class SPDYProxyEngine extends ProxyEngine
public void onGoAway(Session serverSession, GoAwayInfo goAwayInfo)
{
serverSessions.values().remove(serverSession);
- @SuppressWarnings("unchecked")
- Set<Session> sessions = (Set<Session>)serverSession.removeAttribute(CLIENT_SESSIONS_ATTRIBUTE);
- for (Session session : sessions)
- session.goAway(getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
}
@Override
@@ -528,7 +489,7 @@ public class SPDYProxyEngine extends ProxyEngine
@Override
public void onHeaders(Stream stream, HeadersInfo headersInfo)
{
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException(); //TODO
}
@Override
diff --git a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/PushStrategyBenchmarkTest.java b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/PushStrategyBenchmarkTest.java
index 3885810961..9486157de2 100644
--- a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/PushStrategyBenchmarkTest.java
+++ b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/PushStrategyBenchmarkTest.java
@@ -22,6 +22,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -36,6 +38,7 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.spdy.AsyncConnectionFactory;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Headers;
+import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.eclipse.jetty.spdy.api.Stream;
@@ -331,13 +334,36 @@ public class PushStrategyBenchmarkTest extends AbstractHTTPSPDYTest
}
}
+ private void addPushedResource(String pushedURI)
+ {
+ switch (version())
+ {
+ case SPDY.V2:
+ {
+ Matcher matcher = Pattern.compile("https?://[^:]+:\\d+(/.*)").matcher(pushedURI);
+ Assert.assertTrue(matcher.matches());
+ pushedResources.add(matcher.group(1));
+ break;
+ }
+ case SPDY.V3:
+ {
+ pushedResources.add(pushedURI);
+ break;
+ }
+ default:
+ {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
private class ClientSessionFrameListener extends SessionFrameListener.Adapter
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
String path = synInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version())).value();
- pushedResources.add(path);
+ addPushedResource(path);
return new DataListener();
}
}
diff --git a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYv2Test.java b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYv2Test.java
index f886991894..5bab1f512e 100644
--- a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYv2Test.java
+++ b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYv2Test.java
@@ -1010,6 +1010,61 @@ public class ServerHTTPSPDYv2Test extends AbstractHTTPSPDYTest
}
@Test
+ public void testGETWithMultipleMediumContentByPassed() throws Exception
+ {
+ final byte[] data = new byte[2048];
+ Session session = startClient(version(), startHTTPServer(version(), new AbstractHandler()
+ {
+ @Override
+ public void handle(String target, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse)
+ throws IOException, ServletException
+ {
+ // The sequence of write/flush/write/write below triggers a condition where
+ // HttpGenerator._bypass is set to true on the second write(), and the
+ // third write causes an infinite spin loop on the third write().
+ request.setHandled(true);
+ OutputStream output = httpResponse.getOutputStream();
+ output.write(data);
+ output.flush();
+ output.write(data);
+ output.write(data);
+ }
+ }), null);
+
+ Headers headers = new Headers();
+ headers.put(HTTPSPDYHeader.METHOD.name(version()), "GET");
+ headers.put(HTTPSPDYHeader.URI.name(version()), "/foo");
+ headers.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
+ headers.put(HTTPSPDYHeader.SCHEME.name(version()), "http");
+ headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + connector.getLocalPort());
+ final CountDownLatch replyLatch = new CountDownLatch(1);
+ final CountDownLatch dataLatch = new CountDownLatch(1);
+ final AtomicInteger contentLength = new AtomicInteger();
+ session.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
+ {
+ @Override
+ public void onReply(Stream stream, ReplyInfo replyInfo)
+ {
+ Assert.assertFalse(replyInfo.isClose());
+ Headers replyHeaders = replyInfo.getHeaders();
+ Assert.assertTrue(replyHeaders.get(HTTPSPDYHeader.STATUS.name(version())).value().contains("200"));
+ replyLatch.countDown();
+ }
+
+ @Override
+ public void onData(Stream stream, DataInfo dataInfo)
+ {
+ dataInfo.consume(dataInfo.available());
+ contentLength.addAndGet(dataInfo.length());
+ if (dataInfo.isClose())
+ dataLatch.countDown();
+ }
+ });
+ Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
+ Assert.assertEquals(3 * data.length, contentLength.get());
+ }
+
+ @Test
public void testPOSTThenSuspendRequestThenReadOneChunkThenComplete() throws Exception
{
final byte[] data = new byte[2000];
diff --git a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYv2Test.java b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYv2Test.java
index a2985e4dbd..6c2c89bc87 100644
--- a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYv2Test.java
+++ b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYv2Test.java
@@ -91,9 +91,11 @@ public class ProxyHTTPSPDYv2Test
protected InetSocketAddress startProxy(InetSocketAddress address) throws Exception
{
proxy = new Server();
- SPDYProxyEngine proxyEngine = new SPDYProxyEngine(factory);
- proxyEngine.putProxyInfo("localhost", new ProxyEngine.ProxyInfo(version(), address.getHostName(), address.getPort()));
- proxyConnector = new HTTPSPDYProxyConnector(proxyEngine);
+ ProxyEngineSelector proxyEngineSelector = new ProxyEngineSelector();
+ SPDYProxyEngine spdyProxyEngine = new SPDYProxyEngine(factory);
+ proxyEngineSelector.putProxyEngine("spdy/" + version(), spdyProxyEngine);
+ proxyEngineSelector.putProxyServerInfo("localhost", new ProxyEngineSelector.ProxyServerInfo("spdy/" + version(), address.getHostName(), address.getPort()));
+ proxyConnector = new HTTPSPDYProxyConnector(proxyEngineSelector);
proxyConnector.setPort(0);
proxy.addConnector(proxyConnector);
proxy.start();
@@ -172,96 +174,6 @@ public class ProxyHTTPSPDYv2Test
}
@Test
- public void testClosingServerClosesHTTPClient() throws Exception
- {
- InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
- {
- @Override
- public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
- {
- Headers responseHeaders = new Headers();
- responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
- responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
- stream.reply(new ReplyInfo(responseHeaders, true));
- stream.getSession().goAway();
- return null;
- }
- }));
-
- Socket client = new Socket();
- client.connect(proxyAddress);
- OutputStream output = client.getOutputStream();
-
- String request = "" +
- "GET / HTTP/1.1\r\n" +
- "Host: localhost:" + proxyAddress.getPort() + "\r\n" +
- "\r\n";
- output.write(request.getBytes("UTF-8"));
- output.flush();
-
- client.setSoTimeout(1000);
- InputStream input = client.getInputStream();
- BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
- String line = reader.readLine();
- Assert.assertTrue(line.contains(" 200"));
- while (line.length() > 0)
- line = reader.readLine();
- Assert.assertFalse(reader.ready());
-
- Assert.assertNull(reader.readLine());
-
- client.close();
- }
-
- @Test
- public void testClosingServerClosesSPDYClient() throws Exception
- {
- InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
- {
- @Override
- public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
- {
- Headers responseHeaders = new Headers();
- responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
- responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
- stream.reply(new ReplyInfo(responseHeaders, true));
- stream.getSession().goAway();
- return null;
- }
- }));
- proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version()));
-
- final CountDownLatch goAwayLatch = new CountDownLatch(1);
- Session client = factory.newSPDYClient(version()).connect(proxyAddress, new SessionFrameListener.Adapter()
- {
- @Override
- public void onGoAway(Session session, GoAwayInfo goAwayInfo)
- {
- goAwayLatch.countDown();
- }
- }).get(5, TimeUnit.SECONDS);
-
- final CountDownLatch replyLatch = new CountDownLatch(1);
- Headers headers = new Headers();
- headers.put(HTTPSPDYHeader.SCHEME.name(version()), "http");
- headers.put(HTTPSPDYHeader.METHOD.name(version()), "GET");
- headers.put(HTTPSPDYHeader.URI.name(version()), "/");
- headers.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
- headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + proxyAddress.getPort());
- client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
- {
- @Override
- public void onReply(Stream stream, ReplyInfo replyInfo)
- {
- replyLatch.countDown();
- }
- });
-
- Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
- Assert.assertTrue(goAwayLatch.await(5, TimeUnit.SECONDS));
- }
-
- @Test
public void testGETThenNoContentFromTwoClients() throws Exception
{
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java
index d20784009e..4e45eedf74 100644
--- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java
+++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java
@@ -69,6 +69,7 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
FlowControlStrategy flowControlStrategy = connector.newFlowControlStrategy(version);
StandardSession session = new StandardSession(version, bufferPool, threadPool, scheduler, connection, connection, 2, listener, generator, flowControlStrategy);
+ session.setAttribute("org.eclipse.jetty.spdy.remoteAddress", endPoint.getRemoteAddr());
session.setWindowSize(connector.getInitialWindowSize());
parser.addListener(session);
connection.setSession(session);

Back to the top