diff options
author | Jesse McConnell | 2012-07-13 14:05:05 +0000 |
---|---|---|
committer | Jesse McConnell | 2012-07-13 14:05:05 +0000 |
commit | 5222459aa77b7fb3701859d3a35469c1cc205898 (patch) | |
tree | 926f07cf53054712bb5e994e901303433c98454c | |
parent | 3bcfc2946991975e804c8c00aad550aa0610db07 (diff) | |
parent | c7825731a77810f8a3b24ddffee9ffe5962df5a8 (diff) | |
download | org.eclipse.jetty.project-5222459aa77b7fb3701859d3a35469c1cc205898.tar.gz org.eclipse.jetty.project-5222459aa77b7fb3701859d3a35469c1cc205898.tar.xz org.eclipse.jetty.project-5222459aa77b7fb3701859d3a35469c1cc205898.zip |
Merge branch 'master' into jetty-8
21 files changed, 668 insertions, 309 deletions
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ExpirationWithLimitedConnectionsTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ExpirationWithLimitedConnectionsTest.java new file mode 100644 index 0000000000..3db4a5e72b --- /dev/null +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ExpirationWithLimitedConnectionsTest.java @@ -0,0 +1,191 @@ +// ======================================================================== +// Copyright 2012-2012 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== + +package org.eclipse.jetty.client; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.junit.Assert; +import org.junit.Test; + +public class ExpirationWithLimitedConnectionsTest +{ + @Test + public void testExpirationWithMaxConnectionPerAddressReached() throws Exception + { + final Logger logger = Log.getLogger("org.eclipse.jetty.client"); + logger.setDebugEnabled(true); + + HttpClient client = new HttpClient(); + int maxConnectionsPerAddress = 10; + client.setMaxConnectionsPerAddress(maxConnectionsPerAddress); + long timeout = 1000; + client.setTimeout(timeout); + client.start(); + + final List<Socket> sockets = new CopyOnWriteArrayList<Socket>(); + final List<Exception> failures = new CopyOnWriteArrayList<Exception>(); + final AtomicLong processingDelay = new AtomicLong(200); + + final ExecutorService threadPool = Executors.newCachedThreadPool(); + final ServerSocket server = new ServerSocket(0); + threadPool.submit(new Runnable() + { + public void run() + { + while (true) + { + try + { + final Socket socket = server.accept(); + sockets.add(socket); + logger.debug("CONNECTION {}", socket.getRemoteSocketAddress()); + threadPool.submit(new Runnable() + { + public void run() + { + while (true) + { + try + { + BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8")); + String firstLine = reader.readLine(); + String line = firstLine; + while (line != null) + { + if (line.length() == 0) + break; + line = reader.readLine(); + } + + if (line == null) + break; + + long sleep = processingDelay.get(); + logger.debug("{} {} {} ms", firstLine, socket.getRemoteSocketAddress(), sleep); + TimeUnit.MILLISECONDS.sleep(sleep); + + String response = "" + + "HTTP/1.1 200 OK\r\n" + + "Content-Length: 0\r\n" + + "\r\n"; + OutputStream output = socket.getOutputStream(); + output.write(response.getBytes("UTF-8")); + output.flush(); + } + catch (Exception x) + { + failures.add(x); + break; + } + } + } + }); + } + catch (Exception x) + { + failures.add(x); + break; + } + } + } + }); + + List<ContentExchange> exchanges = new ArrayList<ContentExchange>(); + + final AtomicBoolean firstExpired = new AtomicBoolean(); + int count = 0; + int maxAdditionalRequest = 100; + int additionalRequests = 0; + while (true) + { + TimeUnit.MILLISECONDS.sleep(1); // Just avoid being too fast + ContentExchange exchange = new ContentExchange(true) + { + @Override + protected void onResponseComplete() throws IOException + { + logger.debug("{} {} OK", getMethod(), getRequestURI()); + } + + @Override + protected void onExpire() + { + logger.debug("{} {} EXPIRED {}", getMethod(), getRequestURI(), this); + firstExpired.compareAndSet(false, true); + } + }; + exchanges.add(exchange); + Address address = new Address("localhost", server.getLocalPort()); + exchange.setAddress(address); + exchange.setMethod("GET"); + exchange.setRequestURI("/" + count); + exchange.setVersion("HTTP/1.1"); + exchange.setRequestHeader("Host", address.toString()); + logger.debug("{} {} SENT", exchange.getMethod(), exchange.getRequestURI()); + client.send(exchange); + ++count; + + if (processingDelay.get() > 0) + { + if (client.getDestination(address, false).getConnections() == maxConnectionsPerAddress) + { + if (firstExpired.get()) + { + ++additionalRequests; + if (additionalRequests == maxAdditionalRequest) + processingDelay.set(0); + } + } + } + else + { + ++additionalRequests; + if (additionalRequests == 2 * maxAdditionalRequest) + break; + } + } + + for (ContentExchange exchange : exchanges) + { + int status = exchange.waitForDone(); + Assert.assertTrue(status == HttpExchange.STATUS_COMPLETED || status == HttpExchange.STATUS_EXPIRED); + } + + client.stop(); + + Assert.assertTrue(failures.isEmpty()); + + for (Socket socket : sockets) + socket.close(); + server.close(); + + threadPool.shutdown(); + threadPool.awaitTermination(5, TimeUnit.SECONDS); + } +} diff --git a/jetty-monitor/src/main/config/etc/jetty-monitor.xml b/jetty-monitor/src/main/config/etc/jetty-monitor.xml index 6a866dda28..dc97f88a49 100644 --- a/jetty-monitor/src/main/config/etc/jetty-monitor.xml +++ b/jetty-monitor/src/main/config/etc/jetty-monitor.xml @@ -8,14 +8,14 @@ <New class="org.eclipse.jetty.monitor.ThreadMonitor"> <Set name="scanInterval">2000</Set> <Set name="busyThreshold">90</Set> - <Set name="stackDepth">3</Set> + <Set name="stackDepth">5</Set> <Set name="trailLength">2</Set> <!-- To enable logging CPU utilization for threads above specified threshold, --> <!-- uncomment the following lines, changing log interval (in milliseconds) --> <!-- and log threshold (in percent) as desired. --> <!-- - <Set name="logInterval">10000</Arg> - <Set name="logThreshold">1</Arg> + <Set name="logInterval">10000</Set> + <Set name="logThreshold">65</Set> --> <!-- To enable detail dump of the server whenever a thread is detected as spinning, --> diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/handler/ConnectHandlerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/handler/ConnectHandlerTest.java index 722584758a..8a5ac5b5d2 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/handler/ConnectHandlerTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/handler/ConnectHandlerTest.java @@ -12,15 +12,21 @@ package org.eclipse.jetty.server.handler; //You may elect to redistribute this code under either of these licenses. //======================================================================== +import static org.junit.Assert.*; +import static org.junit.Assume.*; + import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; +import java.net.InetAddress; import java.net.Socket; +import java.net.UnknownHostException; import java.nio.channels.SocketChannel; import java.util.concurrent.ConcurrentMap; + import javax.servlet.ServletException; import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletRequest; @@ -31,13 +37,10 @@ import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.toolchain.test.OS; -import org.eclipse.jetty.util.log.Log; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assume.assumeTrue; - /** * @version $Revision$ $Date$ */ @@ -120,7 +123,25 @@ public class ConnectHandlerTest extends AbstractConnectHandlerTest @Test public void testCONNECTBadHostPort() throws Exception { - String hostPort = "badlocalhost:" + serverConnector.getLocalPort(); + String invalidHostname = "AMAZEBALLS_BADHOST.webtide.com"; + + try + { + InetAddress addr = InetAddress.getByName(invalidHostname); + StringBuilder err = new StringBuilder(); + err.append("DNS Hijacking detected: "); + err.append(invalidHostname).append(" should have not returned a valid IP address ["); + err.append(addr.getHostAddress()).append("]. "); + err.append("Fix your DNS provider to have this test pass."); + err.append("\nFor more info see https://en.wikipedia.org/wiki/DNS_hijacking"); + Assert.assertNull(err.toString(), addr); + } + catch (UnknownHostException e) + { + // expected path + } + + String hostPort = String.format("%s:%d",invalidHostname,serverConnector.getLocalPort()); String request = "" + "CONNECT " + hostPort + " HTTP/1.1\r\n" + "Host: " + hostPort + "\r\n" + @@ -137,7 +158,7 @@ public class ConnectHandlerTest extends AbstractConnectHandlerTest // Expect 500 OK from the CONNECT request Response response = readResponse(input); - assertEquals("500", response.getCode()); + assertEquals("Response Code", "500", response.getCode()); } finally { diff --git a/jetty-servlets/src/main/java/org/eclipse/jetty/servlets/CrossOriginFilter.java b/jetty-servlets/src/main/java/org/eclipse/jetty/servlets/CrossOriginFilter.java index eb1b985995..6561166a18 100644 --- a/jetty-servlets/src/main/java/org/eclipse/jetty/servlets/CrossOriginFilter.java +++ b/jetty-servlets/src/main/java/org/eclipse/jetty/servlets/CrossOriginFilter.java @@ -105,6 +105,7 @@ public class CrossOriginFilter implements Filter public static final String PREFLIGHT_MAX_AGE_PARAM = "preflightMaxAge"; public static final String ALLOW_CREDENTIALS_PARAM = "allowCredentials"; public static final String EXPOSED_HEADERS_PARAM = "exposedHeaders"; + public static final String FORWARD_PREFLIGHT_PARAM = "forwardPreflight"; private static final String ANY_ORIGIN = "*"; private static final List<String> SIMPLE_HTTP_METHODS = Arrays.asList("GET", "POST", "HEAD"); @@ -113,8 +114,9 @@ public class CrossOriginFilter implements Filter private List<String> allowedMethods = new ArrayList<String>(); private List<String> allowedHeaders = new ArrayList<String>(); private List<String> exposedHeaders = new ArrayList<String>(); - private int preflightMaxAge = 0; + private int preflightMaxAge; private boolean allowCredentials; + private boolean forwardPreflight; public void init(FilterConfig config) throws ServletException { @@ -172,6 +174,11 @@ public class CrossOriginFilter implements Filter exposedHeadersConfig = ""; exposedHeaders.addAll(Arrays.asList(exposedHeadersConfig.split(","))); + String forwardPreflightConfig = config.getInitParameter(FORWARD_PREFLIGHT_PARAM); + if (forwardPreflightConfig == null) + forwardPreflightConfig = "true"; + forwardPreflight = Boolean.parseBoolean(forwardPreflightConfig); + if (LOG.isDebugEnabled()) { LOG.debug("Cross-origin filter configuration: " + @@ -180,7 +187,8 @@ public class CrossOriginFilter implements Filter ALLOWED_HEADERS_PARAM + " = " + allowedHeadersConfig + ", " + PREFLIGHT_MAX_AGE_PARAM + " = " + preflightMaxAgeConfig + ", " + ALLOW_CREDENTIALS_PARAM + " = " + allowedCredentialsConfig + "," + - EXPOSED_HEADERS_PARAM + " = " + exposedHeadersConfig + EXPOSED_HEADERS_PARAM + " = " + exposedHeadersConfig + "," + + FORWARD_PREFLIGHT_PARAM + " = " + forwardPreflightConfig ); } } @@ -207,6 +215,10 @@ public class CrossOriginFilter implements Filter { LOG.debug("Cross-origin request to {} is a preflight cross-origin request", request.getRequestURI()); handlePreflightResponse(request, response, origin); + if (forwardPreflight) + LOG.debug("Preflight cross-origin request to {} forwarded to application", request.getRequestURI()); + else + return; } else { diff --git a/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/CrossOriginFilterTest.java b/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/CrossOriginFilterTest.java index da4b571772..c250165550 100644 --- a/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/CrossOriginFilterTest.java +++ b/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/CrossOriginFilterTest.java @@ -1,16 +1,17 @@ +// ======================================================================== +// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== + package org.eclipse.jetty.servlets; -//======================================================================== -//Copyright 2011-2012 Mort Bay Consulting Pty. Ltd. -//------------------------------------------------------------------------ -//All rights reserved. This program and the accompanying materials -//are made available under the terms of the Eclipse Public License v1.0 -//and Apache License v2.0 which accompanies this distribution. -//The Eclipse Public License is available at -//http://www.eclipse.org/legal/epl-v10.html -//The Apache License v2.0 is available at -//http://www.opensource.org/licenses/apache2.0.php -//You may elect to redistribute this code under either of these licenses. -//======================================================================== import java.io.IOException; import java.util.EnumSet; @@ -407,6 +408,30 @@ public class CrossOriginFilterTest Assert.assertTrue(latch.await(1, TimeUnit.SECONDS)); } + @Test + public void testForwardPreflightRequest() throws Exception + { + FilterHolder filterHolder = new FilterHolder(new CrossOriginFilter()); + filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_METHODS_PARAM, "PUT"); + filterHolder.setInitParameter(CrossOriginFilter.FORWARD_PREFLIGHT_PARAM, "false"); + tester.getContext().addFilter(filterHolder, "/*", FilterMapping.DEFAULT); + + CountDownLatch latch = new CountDownLatch(1); + tester.getContext().addServlet(new ServletHolder(new ResourceServlet(latch)), "/*"); + + // Preflight request + String request = "" + + "OPTIONS / HTTP/1.1\r\n" + + "Host: localhost\r\n" + + CrossOriginFilter.ACCESS_CONTROL_REQUEST_METHOD_HEADER + ": PUT\r\n" + + "Origin: http://localhost\r\n" + + "\r\n"; + String response = tester.getResponses(request); + Assert.assertTrue(response.contains("HTTP/1.1 200")); + Assert.assertTrue(response.contains(CrossOriginFilter.ACCESS_CONTROL_ALLOW_METHODS_HEADER)); + Assert.assertFalse(latch.await(1, TimeUnit.SECONDS)); + } + public static class ResourceServlet extends HttpServlet { private static final long serialVersionUID = 1L; diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java index 6019e291bb..360a639591 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java @@ -138,6 +138,11 @@ public class StandardStream implements IStream this.listener = listener; } + public StreamFrameListener getStreamFrameListener() + { + return listener; + } + @Override public void updateCloseState(boolean close, boolean local) { diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Headers.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Headers.java index cef58293fb..261a94f730 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Headers.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Headers.java @@ -216,13 +216,15 @@ public class Headers implements Iterable<Headers.Header> if (obj == null || getClass() != obj.getClass()) return false; Header that = (Header)obj; - return name.equals(that.name) && Arrays.equals(values, that.values); + // Header names must be lowercase, thus we lowercase them before transmission, but keep them as is + // internally. That's why we've to compare them case insensitive. + return name.equalsIgnoreCase(that.name) && Arrays.equals(values, that.values); } @Override public int hashCode() { - int result = name.hashCode(); + int result = name.toLowerCase().hashCode(); result = 31 * result + Arrays.hashCode(values); return result; } diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/generator/HeadersBlockGenerator.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/generator/HeadersBlockGenerator.java index af997855c5..6290841b79 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/generator/HeadersBlockGenerator.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/generator/HeadersBlockGenerator.java @@ -40,7 +40,7 @@ public class HeadersBlockGenerator writeCount(version, buffer, headers.size()); for (Headers.Header header : headers) { - String name = header.name(); + String name = header.name().toLowerCase(); byte[] nameBytes = name.getBytes(iso1); writeNameLength(version, buffer, nameBytes.length); buffer.write(nameBytes, 0, nameBytes.length); diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/frames/HeadersGenerateParseTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/frames/HeadersGenerateParseTest.java index 545aec037e..fd5a36de3d 100644 --- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/frames/HeadersGenerateParseTest.java +++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/frames/HeadersGenerateParseTest.java @@ -22,65 +22,77 @@ import org.eclipse.jetty.spdy.api.HeadersInfo; import org.eclipse.jetty.spdy.api.SPDY; import org.eclipse.jetty.spdy.generator.Generator; import org.eclipse.jetty.spdy.parser.Parser; -import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.junit.Assert.assertThat; + public class HeadersGenerateParseTest { - @Test - public void testGenerateParse() throws Exception + + private Headers headers = new Headers(); + private int streamId = 13; + private byte flags = HeadersInfo.FLAG_RESET_COMPRESSION; + private final TestSPDYParserListener listener = new TestSPDYParserListener(); + private final Parser parser = new Parser(new StandardCompressionFactory().newDecompressor()); + private ByteBuffer buffer; + + @Before + public void setUp() { - byte flags = HeadersInfo.FLAG_RESET_COMPRESSION; - int streamId = 13; - Headers headers = new Headers(); + parser.addListener(listener); headers.put("a", "b"); + buffer = createHeadersFrameBuffer(headers); + } + + private ByteBuffer createHeadersFrameBuffer(Headers headers) + { HeadersFrame frame1 = new HeadersFrame(SPDY.V2, flags, streamId, headers); Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor()); ByteBuffer buffer = generator.control(frame1); + assertThat("Buffer is not null", buffer, notNullValue()); + return buffer; + } - Assert.assertNotNull(buffer); - - TestSPDYParserListener listener = new TestSPDYParserListener(); - Parser parser = new Parser(new StandardCompressionFactory().newDecompressor()); - parser.addListener(listener); + @Test + public void testGenerateParse() throws Exception + { parser.parse(buffer); - ControlFrame frame2 = listener.getControlFrame(); - - Assert.assertNotNull(frame2); - Assert.assertEquals(ControlFrameType.HEADERS, frame2.getType()); - HeadersFrame headersFrame = (HeadersFrame)frame2; - Assert.assertEquals(SPDY.V2, headersFrame.getVersion()); - Assert.assertEquals(streamId, headersFrame.getStreamId()); - Assert.assertEquals(flags, headersFrame.getFlags()); - Assert.assertEquals(headers, headersFrame.getHeaders()); + assertExpectationsAreMet(headers); } @Test public void testGenerateParseOneByteAtATime() throws Exception { - byte flags = HeadersInfo.FLAG_RESET_COMPRESSION; - int streamId = 13; - Headers headers = new Headers(); - headers.put("a", "b"); - HeadersFrame frame1 = new HeadersFrame(SPDY.V2, flags, streamId, headers); - Generator generator = new Generator(new StandardByteBufferPool(), new StandardCompressionFactory().newCompressor()); - ByteBuffer buffer = generator.control(frame1); - - Assert.assertNotNull(buffer); - - TestSPDYParserListener listener = new TestSPDYParserListener(); - Parser parser = new Parser(new StandardCompressionFactory().newDecompressor()); - parser.addListener(listener); while (buffer.hasRemaining()) parser.parse(ByteBuffer.wrap(new byte[]{buffer.get()})); - ControlFrame frame2 = listener.getControlFrame(); - Assert.assertNotNull(frame2); - Assert.assertEquals(ControlFrameType.HEADERS, frame2.getType()); - HeadersFrame headersFrame = (HeadersFrame)frame2; - Assert.assertEquals(SPDY.V2, headersFrame.getVersion()); - Assert.assertEquals(streamId, headersFrame.getStreamId()); - Assert.assertEquals(flags, headersFrame.getFlags()); - Assert.assertEquals(headers, headersFrame.getHeaders()); + assertExpectationsAreMet(headers); + } + + @Test + public void testHeadersAreTranslatedToLowerCase() + { + Headers headers = new Headers(); + headers.put("Via","localhost"); + parser.parse(createHeadersFrameBuffer(headers)); + HeadersFrame parsedHeadersFrame = assertExpectationsAreMet(headers); + Headers.Header viaHeader = parsedHeadersFrame.getHeaders().get("via"); + assertThat("Via Header name is lowercase", viaHeader.name(), is("via")); + } + + private HeadersFrame assertExpectationsAreMet(Headers headers) + { + ControlFrame parsedControlFrame = listener.getControlFrame(); + assertThat("listener received controlFrame", parsedControlFrame, notNullValue()); + assertThat("ControlFrame type is HEADERS", ControlFrameType.HEADERS, is(parsedControlFrame.getType())); + HeadersFrame headersFrame = (HeadersFrame)parsedControlFrame; + assertThat("Version matches", SPDY.V2, is(headersFrame.getVersion())); + assertThat("StreamId matches", streamId, is(headersFrame.getStreamId())); + assertThat("flags match", flags, is(headersFrame.getFlags())); + assertThat("headers match", headers, is(headersFrame.getHeaders())); + return headersFrame; } } diff --git a/jetty-spdy/spdy-jetty-http-webapp/src/main/config/etc/jetty-spdy-proxy.xml b/jetty-spdy/spdy-jetty-http-webapp/src/main/config/etc/jetty-spdy-proxy.xml index 7d84868619..9c637ec41f 100644 --- a/jetty-spdy/spdy-jetty-http-webapp/src/main/config/etc/jetty-spdy-proxy.xml +++ b/jetty-spdy/spdy-jetty-http-webapp/src/main/config/etc/jetty-spdy-proxy.xml @@ -32,24 +32,34 @@ </Call> <!-- - The ProxyEngine receives SPDY/x(HTTP) requests from proxy connectors below - and is configured to process requests for host "localhost". - Such requests are converted from SPDY/x(HTTP) to SPDY/2(HTTP) and forwarded - to 127.0.0.1:9090, where they are served by the upstream server above. + This ProxyEngine translates the incoming SPDY/x(HTTP) request to SPDY/2(HTTP) --> - <New id="proxyEngine" class="org.eclipse.jetty.spdy.proxy.SPDYProxyEngine"> + <New id="spdyProxyEngine" class="org.eclipse.jetty.spdy.proxy.SPDYProxyEngine"> <Arg> <New class="org.eclipse.jetty.spdy.SPDYClient$Factory"> - <Call name="start" /> + <Call name="start"/> </New> </Arg> - <Set name="proxyInfos"> + </New> + + <!-- + The ProxyEngineSelector receives SPDY/x(HTTP) requests from proxy connectors below + and is configured to process requests for host "localhost". + Such requests are converted from SPDY/x(HTTP) to SPDY/2(HTTP) by the configured ProxyEngine + and forwarded to 127.0.0.1:9090, where they are served by the upstream server above. + --> + <New id="proxyEngineSelector" class="org.eclipse.jetty.spdy.proxy.ProxyEngineSelector"> + <Call name="putProxyEngine"> + <Arg>spdy/2</Arg> + <Arg><Ref id="spdyProxyEngine" /></Arg> + </Call> + <Set name="proxyServerInfos"> <Map> <Entry> <Item>localhost</Item> <Item> - <New class="org.eclipse.jetty.spdy.proxy.ProxyEngine$ProxyInfo"> - <Arg type="short">2</Arg> + <New class="org.eclipse.jetty.spdy.proxy.ProxyEngineSelector$ProxyServerInfo"> + <Arg type="String">spdy/2</Arg> <Arg>127.0.0.1</Arg> <Arg type="int">9090</Arg> </New> @@ -69,7 +79,7 @@ <Call name="addConnector"> <Arg> <New class="org.eclipse.jetty.spdy.proxy.HTTPSPDYProxyConnector"> - <Arg><Ref id="proxyEngine" /></Arg> + <Arg><Ref id="proxyEngineSelector" /></Arg> <Set name="Port">8080</Set> </New> </Arg> @@ -77,7 +87,7 @@ <Call name="addConnector"> <Arg> <New class="org.eclipse.jetty.spdy.proxy.HTTPSPDYProxyConnector"> - <Arg><Ref id="proxyEngine" /></Arg> + <Arg><Ref id="proxyEngineSelector" /></Arg> <Arg><Ref id="sslContextFactory" /></Arg> <Set name="Port">8443</Set> </New> diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnection.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnection.java index 67b37ed52c..ecc241ea97 100644 --- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnection.java +++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnection.java @@ -722,6 +722,7 @@ public class ServerHTTPSPDYAsyncConnection extends AbstractHttpConnection implem logger.debug("HTTP < {} bytes of content", dataInfo.length()); stream.data(dataInfo).get(maxIdleTime, TimeUnit.MILLISECONDS); content.clear(); + _bypass = false; content = getContentBuffer(); } } diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/HTTPSPDYProxyConnector.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/HTTPSPDYProxyConnector.java index 522363f5cc..2827af05ce 100644 --- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/HTTPSPDYProxyConnector.java +++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/HTTPSPDYProxyConnector.java @@ -21,19 +21,19 @@ import org.eclipse.jetty.util.ssl.SslContextFactory; public class HTTPSPDYProxyConnector extends AbstractHTTPSPDYServerConnector { - public HTTPSPDYProxyConnector(SPDYProxyEngine proxyEngine) + public HTTPSPDYProxyConnector(ProxyEngineSelector proxyEngineSelector) { - this(proxyEngine, null); + this(proxyEngineSelector, null); } - public HTTPSPDYProxyConnector(SPDYProxyEngine proxyEngine, SslContextFactory sslContextFactory) + public HTTPSPDYProxyConnector(ProxyEngineSelector proxyEngineSelector, SslContextFactory sslContextFactory) { - super(proxyEngine, sslContextFactory); + super(proxyEngineSelector, sslContextFactory); clearAsyncConnectionFactories(); - putAsyncConnectionFactory("spdy/3", new ServerSPDYAsyncConnectionFactory(SPDY.V3, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngine)); - putAsyncConnectionFactory("spdy/2", new ServerSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngine)); - putAsyncConnectionFactory("http/1.1", new ProxyHTTPAsyncConnectionFactory(this, SPDY.V3, proxyEngine)); + putAsyncConnectionFactory("spdy/3", new ServerSPDYAsyncConnectionFactory(SPDY.V3, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngineSelector)); + putAsyncConnectionFactory("spdy/2", new ServerSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngineSelector)); + putAsyncConnectionFactory("http/1.1", new ProxyHTTPAsyncConnectionFactory(this, SPDY.V2, proxyEngineSelector)); setDefaultAsyncConnectionFactory(getAsyncConnectionFactory("http/1.1")); } } diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyEngine.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyEngine.java index 2ba7fba538..0a172614a5 100644 --- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyEngine.java +++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyEngine.java @@ -15,32 +15,25 @@ package org.eclipse.jetty.spdy.proxy; import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.UnknownHostException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.eclipse.jetty.spdy.api.Headers; import org.eclipse.jetty.spdy.api.Stream; import org.eclipse.jetty.spdy.api.StreamFrameListener; -import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener; +import org.eclipse.jetty.spdy.api.SynInfo; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; /** - * <p>{@link ProxyEngine} is the base class for SPDY proxy functionalities, that is a proxy that - * accepts SPDY from its client side and converts to any protocol to its server side.</p> + * <p>{@link ProxyEngine} is the class for SPDY proxy functionalities that receives a SPDY request and converts it to + * any protocol to its server side.</p> * <p>This class listens for SPDY events sent by clients; subclasses are responsible for translating * these SPDY client events into appropriate events to forward to the server, in the appropriate * protocol that is understood by the server.</p> - * <p>This class also provides configuration for the proxy rules.</p> */ -public abstract class ProxyEngine extends ServerSessionFrameListener.Adapter implements StreamFrameListener +public abstract class ProxyEngine { protected final Logger logger = Log.getLogger(getClass()); - private final ConcurrentMap<String, ProxyInfo> proxyInfos = new ConcurrentHashMap<>(); private final String name; protected ProxyEngine() @@ -60,6 +53,8 @@ public abstract class ProxyEngine extends ServerSessionFrameListener.Adapter imp } } + public abstract StreamFrameListener proxy(Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo); + protected ProxyEngine(String name) { this.name = name; @@ -73,6 +68,9 @@ public abstract class ProxyEngine extends ServerSessionFrameListener.Adapter imp protected void addRequestProxyHeaders(Stream stream, Headers headers) { addViaHeader(headers); + String address = (String)stream.getSession().getAttribute("org.eclipse.jetty.spdy.remoteAddress"); + if (address != null) + headers.add("X-Forwarded-For", address); } protected void addResponseProxyHeaders(Stream stream, Headers headers) @@ -93,46 +91,4 @@ public abstract class ProxyEngine extends ServerSessionFrameListener.Adapter imp { } - public Map<String, ProxyInfo> getProxyInfos() - { - return new HashMap<>(proxyInfos); - } - - public void setProxyInfos(Map<String, ProxyInfo> proxyInfos) - { - this.proxyInfos.clear(); - this.proxyInfos.putAll(proxyInfos); - } - - public void putProxyInfo(String host, ProxyInfo proxyInfo) - { - proxyInfos.put(host, proxyInfo); - } - - protected ProxyInfo getProxyInfo(String host) - { - return proxyInfos.get(host); - } - - public static class ProxyInfo - { - private final short version; - private final InetSocketAddress address; - - public ProxyInfo(short version, String host, int port) - { - this.version = version; - this.address = new InetSocketAddress(host, port); - } - - public short getVersion() - { - return version; - } - - public InetSocketAddress getAddress() - { - return address; - } - } } diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyEngineSelector.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyEngineSelector.java new file mode 100644 index 0000000000..ee23df4dbc --- /dev/null +++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyEngineSelector.java @@ -0,0 +1,166 @@ +package org.eclipse.jetty.spdy.proxy; + +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.eclipse.jetty.spdy.api.GoAwayInfo; +import org.eclipse.jetty.spdy.api.Headers; +import org.eclipse.jetty.spdy.api.PingInfo; +import org.eclipse.jetty.spdy.api.RstInfo; +import org.eclipse.jetty.spdy.api.Session; +import org.eclipse.jetty.spdy.api.Stream; +import org.eclipse.jetty.spdy.api.StreamFrameListener; +import org.eclipse.jetty.spdy.api.StreamStatus; +import org.eclipse.jetty.spdy.api.SynInfo; +import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener; +import org.eclipse.jetty.spdy.http.HTTPSPDYHeader; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; + +/** + * <p>{@link ProxyEngineSelector} is the main entry point for syn stream events of a jetty SPDY proxy. It receives the + * syn stream frames from the clients, checks if there's an appropriate {@link ProxyServerInfo} for the given target + * host and forwards the syn to a {@link ProxyEngine} for the protocol defined in {@link ProxyServerInfo}.</p> + * + * <p>If no {@link ProxyServerInfo} can be found for the given target host or no {@link ProxyEngine} can be found for + * the given protocol, it resets the client stream.</p> + * + * <p>This class also provides configuration for the proxy rules.</p> + */ +public class ProxyEngineSelector extends ServerSessionFrameListener.Adapter +{ + protected final Logger logger = Log.getLogger(getClass()); + private final Map<String, ProxyServerInfo> proxyInfos = new ConcurrentHashMap<>(); + private final Map<String, ProxyEngine> proxyEngines = new ConcurrentHashMap<>(); + + @Override + public final StreamFrameListener onSyn(final Stream clientStream, SynInfo clientSynInfo) + { + logger.debug("C -> P {} on {}", clientSynInfo, clientStream); + + final Session clientSession = clientStream.getSession(); + short clientVersion = clientSession.getVersion(); + Headers headers = new Headers(clientSynInfo.getHeaders(), false); + + Headers.Header hostHeader = headers.get(HTTPSPDYHeader.HOST.name(clientVersion)); + if (hostHeader == null) + { + rst(clientStream); + return null; + } + + String host = hostHeader.value(); + int colon = host.indexOf(':'); + if (colon >= 0) + host = host.substring(0, colon); + + ProxyServerInfo proxyServerInfo = getProxyServerInfo(host); + if (proxyServerInfo == null) + { + rst(clientStream); + return null; + } + + String protocol = proxyServerInfo.getProtocol(); + ProxyEngine proxyEngine = proxyEngines.get(protocol); + if (proxyEngine == null) + { + rst(clientStream); + return null; + } + + return proxyEngine.proxy(clientStream, clientSynInfo, proxyServerInfo); + } + + @Override + public void onPing(Session clientSession, PingInfo pingInfo) + { + // We do not know to which upstream server + // to send the PING so we just ignore it + } + + @Override + public void onGoAway(Session session, GoAwayInfo goAwayInfo) + { + // TODO: + } + + public Map<String, ProxyEngine> getProxyEngines() + { + return new HashMap<>(proxyEngines); + } + + public void setProxyEngines(Map<String, ProxyEngine> proxyEngines) + { + this.proxyEngines.clear(); + this.proxyEngines.putAll(proxyEngines); + } + + public ProxyEngine getProxyEngine(String protocol) + { + return proxyEngines.get(protocol); + } + + public void putProxyEngine(String protocol, ProxyEngine proxyEngine) + { + proxyEngines.put(protocol, proxyEngine); + } + + public Map<String, ProxyServerInfo> getProxyServerInfos() + { + return new HashMap<>(proxyInfos); + } + + protected ProxyServerInfo getProxyServerInfo(String host) + { + return proxyInfos.get(host); + } + + public void setProxyServerInfos(Map<String, ProxyServerInfo> proxyServerInfos) + { + this.proxyInfos.clear(); + this.proxyInfos.putAll(proxyServerInfos); + } + + public void putProxyServerInfo(String host, ProxyServerInfo proxyServerInfo) + { + proxyInfos.put(host, proxyServerInfo); + } + + private void rst(Stream stream) + { + RstInfo rstInfo = new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM); + stream.getSession().rst(rstInfo); + } + + public static class ProxyServerInfo + { + private final String protocol; + private final String host; + private final InetSocketAddress address; + + public ProxyServerInfo(String protocol, String host, int port) + { + this.protocol = protocol; + this.host = host; + this.address = new InetSocketAddress(host, port); + } + + public String getProtocol() + { + return protocol; + } + + public String getHost() + { + return host; + } + + public InetSocketAddress getAddress() + { + return address; + } + } +} diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPAsyncConnectionFactory.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPAsyncConnectionFactory.java index 57a0e71973..f73c7e819b 100644 --- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPAsyncConnectionFactory.java +++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPAsyncConnectionFactory.java @@ -24,18 +24,18 @@ import org.eclipse.jetty.spdy.http.ServerHTTPAsyncConnectionFactory; public class ProxyHTTPAsyncConnectionFactory extends ServerHTTPAsyncConnectionFactory { private final short version; - private final ProxyEngine proxyEngine; + private final ProxyEngineSelector proxyEngineSelector; - public ProxyHTTPAsyncConnectionFactory(SPDYServerConnector connector, short version, ProxyEngine proxyEngine) + public ProxyHTTPAsyncConnectionFactory(SPDYServerConnector connector, short version, ProxyEngineSelector proxyEngineSelector) { super(connector); this.version = version; - this.proxyEngine = proxyEngine; + this.proxyEngineSelector = proxyEngineSelector; } @Override public AsyncConnection newAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment) { - return new ProxyHTTPSPDYAsyncConnection(getConnector(), endPoint, version, proxyEngine); + return new ProxyHTTPSPDYAsyncConnection(getConnector(), endPoint, version, proxyEngineSelector); } } diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYAsyncConnection.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYAsyncConnection.java index e55be4b6b0..92c318479a 100644 --- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYAsyncConnection.java +++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYAsyncConnection.java @@ -46,6 +46,7 @@ import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.RstInfo; import org.eclipse.jetty.spdy.api.SessionStatus; import org.eclipse.jetty.spdy.api.Stream; +import org.eclipse.jetty.spdy.api.StreamFrameListener; import org.eclipse.jetty.spdy.api.SynInfo; import org.eclipse.jetty.spdy.http.HTTPSPDYHeader; @@ -53,19 +54,20 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection { private final Headers headers = new Headers(); private final short version; - private final ProxyEngine proxyEngine; + private final ProxyEngineSelector proxyEngineSelector; private final HttpGenerator generator; private final ISession session; - private Stream stream; + private HTTPStream stream; private Buffer content; - public ProxyHTTPSPDYAsyncConnection(SPDYServerConnector connector, EndPoint endpoint, short version, ProxyEngine proxyEngine) + public ProxyHTTPSPDYAsyncConnection(SPDYServerConnector connector, EndPoint endPoint, short version, ProxyEngineSelector proxyEngineSelector) { - super(connector, endpoint, connector.getServer()); + super(connector, endPoint, connector.getServer()); this.version = version; - this.proxyEngine = proxyEngine; + this.proxyEngineSelector = proxyEngineSelector; this.generator = (HttpGenerator)_generator; this.session = new HTTPSession(version, connector); + this.session.setAttribute("org.eclipse.jetty.spdy.remoteAddress", endPoint.getRemoteAddr()); } @Override @@ -116,7 +118,7 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection } else { - proxyEngine.onData(stream, toDataInfo(buffer, false)); + stream.getStreamFrameListener().onData(stream, toDataInfo(buffer, false)); } } @@ -127,23 +129,24 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection { assert content == null; if (headers.isEmpty()) - proxyEngine.onGoAway(session, new GoAwayInfo(0, SessionStatus.OK)); + proxyEngineSelector.onGoAway(session, new GoAwayInfo(0, SessionStatus.OK)); else syn(true); } else { - proxyEngine.onData(stream, toDataInfo(content, true)); + stream.getStreamFrameListener().onData(stream, toDataInfo(content, true)); } headers.clear(); stream = null; content = null; } - private Stream syn(boolean close) + private HTTPStream syn(boolean close) { - Stream stream = new HTTPStream(1, (byte)0, session, null); - proxyEngine.onSyn(stream, new SynInfo(headers, close)); + HTTPStream stream = new HTTPStream(1, (byte)0, session, null); + StreamFrameListener streamFrameListener = proxyEngineSelector.onSyn(stream, new SynInfo(headers, close)); + stream.setStreamFrameListener(streamFrameListener); return stream; } @@ -167,7 +170,7 @@ public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection { private HTTPSession(short version, SPDYServerConnector connector) { - super(version, connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), null, null, 1, proxyEngine, null, null); + super(version, connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), null, null, 1, proxyEngineSelector, null, null); } @Override diff --git a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/SPDYProxyEngine.java b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/SPDYProxyEngine.java index ea89a81fa8..c01af3ef0f 100644 --- a/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/SPDYProxyEngine.java +++ b/jetty-spdy/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/proxy/SPDYProxyEngine.java @@ -15,10 +15,8 @@ package org.eclipse.jetty.spdy.proxy; import java.net.InetSocketAddress; -import java.util.Collections; import java.util.LinkedList; import java.util.Queue; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -30,9 +28,9 @@ import org.eclipse.jetty.spdy.api.GoAwayInfo; import org.eclipse.jetty.spdy.api.Handler; import org.eclipse.jetty.spdy.api.Headers; import org.eclipse.jetty.spdy.api.HeadersInfo; -import org.eclipse.jetty.spdy.api.PingInfo; import org.eclipse.jetty.spdy.api.ReplyInfo; import org.eclipse.jetty.spdy.api.RstInfo; +import org.eclipse.jetty.spdy.api.SPDY; import org.eclipse.jetty.spdy.api.Session; import org.eclipse.jetty.spdy.api.SessionFrameListener; import org.eclipse.jetty.spdy.api.Stream; @@ -45,11 +43,10 @@ import org.eclipse.jetty.spdy.http.HTTPSPDYHeader; * <p>{@link SPDYProxyEngine} implements a SPDY to SPDY proxy, that is, converts SPDY events received by * clients into SPDY events for the servers.</p> */ -public class SPDYProxyEngine extends ProxyEngine +public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener { private static final String STREAM_HANDLER_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.streamHandler"; private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.clientStream"; - private static final String CLIENT_SESSIONS_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.clientSessions"; private final ConcurrentMap<String, Session> serverSessions = new ConcurrentHashMap<>(); private final SessionFrameListener sessionListener = new ProxySessionFrameListener(); @@ -82,68 +79,24 @@ public class SPDYProxyEngine extends ProxyEngine this.timeout = timeout; } - @Override - public void onPing(Session clientSession, PingInfo pingInfo) - { - // We do not know to which upstream server - // to send the PING so we just ignore it - } - - @Override - public void onGoAway(Session clientSession, GoAwayInfo goAwayInfo) + public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo) { - for (Session serverSession : serverSessions.values()) - { - @SuppressWarnings("unchecked") - Set<Session> sessions = (Set<Session>)serverSession.getAttribute(CLIENT_SESSIONS_ATTRIBUTE); - if (sessions.remove(clientSession)) - break; - } - } - - @Override - public StreamFrameListener onSyn(final Stream clientStream, SynInfo clientSynInfo) - { - logger.debug("C -> P {} on {}", clientSynInfo, clientStream); - - final Session clientSession = clientStream.getSession(); - short clientVersion = clientSession.getVersion(); Headers headers = new Headers(clientSynInfo.getHeaders(), false); - Headers.Header hostHeader = headers.get(HTTPSPDYHeader.HOST.name(clientVersion)); - if (hostHeader == null) - { - rst(clientStream); - return null; - } - - String host = hostHeader.value(); - int colon = host.indexOf(':'); - if (colon >= 0) - host = host.substring(0, colon); - ProxyInfo proxyInfo = getProxyInfo(host); - if (proxyInfo == null) - { - rst(clientStream); - return null; - } - - short serverVersion = proxyInfo.getVersion(); - InetSocketAddress address = proxyInfo.getAddress(); - Session serverSession = produceSession(host, serverVersion, address); + short serverVersion = getVersion(proxyServerInfo.getProtocol()); + InetSocketAddress address = proxyServerInfo.getAddress(); + Session serverSession = produceSession(proxyServerInfo.getHost(), serverVersion, address); if (serverSession == null) { rst(clientStream); return null; } - @SuppressWarnings("unchecked") - Set<Session> sessions = (Set<Session>)serverSession.getAttribute(CLIENT_SESSIONS_ATTRIBUTE); - sessions.add(clientSession); + final Session clientSession = clientStream.getSession(); addRequestProxyHeaders(clientStream, headers); customizeRequestHeaders(clientStream, headers); - convert(clientVersion, serverVersion, headers); + convert(clientSession.getVersion(), serverVersion, headers); SynInfo serverSynInfo = new SynInfo(headers, clientSynInfo.isClose()); StreamFrameListener listener = new ProxyStreamFrameListener(clientStream); @@ -153,6 +106,19 @@ public class SPDYProxyEngine extends ProxyEngine return this; } + private static short getVersion(String protocol) + { + switch (protocol) + { + case "spdy/2": + return SPDY.V2; + case "spdy/3": + return SPDY.V3; + default: + throw new IllegalArgumentException("Procotol: " + protocol + " is not a known SPDY protocol"); + } + } + @Override public void onReply(Stream stream, ReplyInfo replyInfo) { @@ -194,7 +160,6 @@ public class SPDYProxyEngine extends ProxyEngine { SPDYClient client = factory.newSPDYClient(version); session = client.connect(address, sessionListener).get(getConnectTimeout(), TimeUnit.MILLISECONDS); - session.setAttribute(CLIENT_SESSIONS_ATTRIBUTE, Collections.newSetFromMap(new ConcurrentHashMap<Session, Boolean>())); logger.debug("Proxy session connected to {}", address); Session existing = serverSessions.putIfAbsent(host, session); if (existing != null) @@ -513,10 +478,6 @@ public class SPDYProxyEngine extends ProxyEngine public void onGoAway(Session serverSession, GoAwayInfo goAwayInfo) { serverSessions.values().remove(serverSession); - @SuppressWarnings("unchecked") - Set<Session> sessions = (Set<Session>)serverSession.removeAttribute(CLIENT_SESSIONS_ATTRIBUTE); - for (Session session : sessions) - session.goAway(getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>()); } @Override @@ -528,7 +489,7 @@ public class SPDYProxyEngine extends ProxyEngine @Override public void onHeaders(Stream stream, HeadersInfo headersInfo) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException(); //TODO } @Override diff --git a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/PushStrategyBenchmarkTest.java b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/PushStrategyBenchmarkTest.java index 3885810961..9486157de2 100644 --- a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/PushStrategyBenchmarkTest.java +++ b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/PushStrategyBenchmarkTest.java @@ -22,6 +22,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -36,6 +38,7 @@ import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.spdy.AsyncConnectionFactory; import org.eclipse.jetty.spdy.api.DataInfo; import org.eclipse.jetty.spdy.api.Headers; +import org.eclipse.jetty.spdy.api.SPDY; import org.eclipse.jetty.spdy.api.Session; import org.eclipse.jetty.spdy.api.SessionFrameListener; import org.eclipse.jetty.spdy.api.Stream; @@ -331,13 +334,36 @@ public class PushStrategyBenchmarkTest extends AbstractHTTPSPDYTest } } + private void addPushedResource(String pushedURI) + { + switch (version()) + { + case SPDY.V2: + { + Matcher matcher = Pattern.compile("https?://[^:]+:\\d+(/.*)").matcher(pushedURI); + Assert.assertTrue(matcher.matches()); + pushedResources.add(matcher.group(1)); + break; + } + case SPDY.V3: + { + pushedResources.add(pushedURI); + break; + } + default: + { + throw new IllegalStateException(); + } + } + } + private class ClientSessionFrameListener extends SessionFrameListener.Adapter { @Override public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) { String path = synInfo.getHeaders().get(HTTPSPDYHeader.URI.name(version())).value(); - pushedResources.add(path); + addPushedResource(path); return new DataListener(); } } diff --git a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYv2Test.java b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYv2Test.java index f886991894..5bab1f512e 100644 --- a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYv2Test.java +++ b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYv2Test.java @@ -1010,6 +1010,61 @@ public class ServerHTTPSPDYv2Test extends AbstractHTTPSPDYTest } @Test + public void testGETWithMultipleMediumContentByPassed() throws Exception + { + final byte[] data = new byte[2048]; + Session session = startClient(version(), startHTTPServer(version(), new AbstractHandler() + { + @Override + public void handle(String target, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) + throws IOException, ServletException + { + // The sequence of write/flush/write/write below triggers a condition where + // HttpGenerator._bypass is set to true on the second write(), and the + // third write causes an infinite spin loop on the third write(). + request.setHandled(true); + OutputStream output = httpResponse.getOutputStream(); + output.write(data); + output.flush(); + output.write(data); + output.write(data); + } + }), null); + + Headers headers = new Headers(); + headers.put(HTTPSPDYHeader.METHOD.name(version()), "GET"); + headers.put(HTTPSPDYHeader.URI.name(version()), "/foo"); + headers.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1"); + headers.put(HTTPSPDYHeader.SCHEME.name(version()), "http"); + headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + connector.getLocalPort()); + final CountDownLatch replyLatch = new CountDownLatch(1); + final CountDownLatch dataLatch = new CountDownLatch(1); + final AtomicInteger contentLength = new AtomicInteger(); + session.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter() + { + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + Assert.assertFalse(replyInfo.isClose()); + Headers replyHeaders = replyInfo.getHeaders(); + Assert.assertTrue(replyHeaders.get(HTTPSPDYHeader.STATUS.name(version())).value().contains("200")); + replyLatch.countDown(); + } + + @Override + public void onData(Stream stream, DataInfo dataInfo) + { + dataInfo.consume(dataInfo.available()); + contentLength.addAndGet(dataInfo.length()); + if (dataInfo.isClose()) + dataLatch.countDown(); + } + }); + Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS)); + Assert.assertEquals(3 * data.length, contentLength.get()); + } + + @Test public void testPOSTThenSuspendRequestThenReadOneChunkThenComplete() throws Exception { final byte[] data = new byte[2000]; diff --git a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYv2Test.java b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYv2Test.java index a2985e4dbd..6c2c89bc87 100644 --- a/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYv2Test.java +++ b/jetty-spdy/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/proxy/ProxyHTTPSPDYv2Test.java @@ -91,9 +91,11 @@ public class ProxyHTTPSPDYv2Test protected InetSocketAddress startProxy(InetSocketAddress address) throws Exception { proxy = new Server(); - SPDYProxyEngine proxyEngine = new SPDYProxyEngine(factory); - proxyEngine.putProxyInfo("localhost", new ProxyEngine.ProxyInfo(version(), address.getHostName(), address.getPort())); - proxyConnector = new HTTPSPDYProxyConnector(proxyEngine); + ProxyEngineSelector proxyEngineSelector = new ProxyEngineSelector(); + SPDYProxyEngine spdyProxyEngine = new SPDYProxyEngine(factory); + proxyEngineSelector.putProxyEngine("spdy/" + version(), spdyProxyEngine); + proxyEngineSelector.putProxyServerInfo("localhost", new ProxyEngineSelector.ProxyServerInfo("spdy/" + version(), address.getHostName(), address.getPort())); + proxyConnector = new HTTPSPDYProxyConnector(proxyEngineSelector); proxyConnector.setPort(0); proxy.addConnector(proxyConnector); proxy.start(); @@ -172,96 +174,6 @@ public class ProxyHTTPSPDYv2Test } @Test - public void testClosingServerClosesHTTPClient() throws Exception - { - InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter() - { - @Override - public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) - { - Headers responseHeaders = new Headers(); - responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1"); - responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK"); - stream.reply(new ReplyInfo(responseHeaders, true)); - stream.getSession().goAway(); - return null; - } - })); - - Socket client = new Socket(); - client.connect(proxyAddress); - OutputStream output = client.getOutputStream(); - - String request = "" + - "GET / HTTP/1.1\r\n" + - "Host: localhost:" + proxyAddress.getPort() + "\r\n" + - "\r\n"; - output.write(request.getBytes("UTF-8")); - output.flush(); - - client.setSoTimeout(1000); - InputStream input = client.getInputStream(); - BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8")); - String line = reader.readLine(); - Assert.assertTrue(line.contains(" 200")); - while (line.length() > 0) - line = reader.readLine(); - Assert.assertFalse(reader.ready()); - - Assert.assertNull(reader.readLine()); - - client.close(); - } - - @Test - public void testClosingServerClosesSPDYClient() throws Exception - { - InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter() - { - @Override - public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) - { - Headers responseHeaders = new Headers(); - responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1"); - responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK"); - stream.reply(new ReplyInfo(responseHeaders, true)); - stream.getSession().goAway(); - return null; - } - })); - proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version())); - - final CountDownLatch goAwayLatch = new CountDownLatch(1); - Session client = factory.newSPDYClient(version()).connect(proxyAddress, new SessionFrameListener.Adapter() - { - @Override - public void onGoAway(Session session, GoAwayInfo goAwayInfo) - { - goAwayLatch.countDown(); - } - }).get(5, TimeUnit.SECONDS); - - final CountDownLatch replyLatch = new CountDownLatch(1); - Headers headers = new Headers(); - headers.put(HTTPSPDYHeader.SCHEME.name(version()), "http"); - headers.put(HTTPSPDYHeader.METHOD.name(version()), "GET"); - headers.put(HTTPSPDYHeader.URI.name(version()), "/"); - headers.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1"); - headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + proxyAddress.getPort()); - client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter() - { - @Override - public void onReply(Stream stream, ReplyInfo replyInfo) - { - replyLatch.countDown(); - } - }); - - Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS)); - Assert.assertTrue(goAwayLatch.await(5, TimeUnit.SECONDS)); - } - - @Test public void testGETThenNoContentFromTwoClients() throws Exception { InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter() diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java index d20784009e..4e45eedf74 100644 --- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java +++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java @@ -69,6 +69,7 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory FlowControlStrategy flowControlStrategy = connector.newFlowControlStrategy(version); StandardSession session = new StandardSession(version, bufferPool, threadPool, scheduler, connection, connection, 2, listener, generator, flowControlStrategy); + session.setAttribute("org.eclipse.jetty.spdy.remoteAddress", endPoint.getRemoteAddr()); session.setWindowSize(connector.getInitialWindowSize()); parser.addListener(session); connection.setSession(session); |