diff options
author | Simone Bordet | 2014-09-22 16:21:35 +0000 |
---|---|---|
committer | Simone Bordet | 2014-09-22 16:21:35 +0000 |
commit | ddcf878c32b646786380ca24b341adf32fc4e2d6 (patch) | |
tree | ae7dcdb2fa11650a7115a23fb59c5c00de0dac94 | |
parent | 1cd367ae7c083be5c18c531b7c49b47d629c27c4 (diff) | |
download | org.eclipse.jetty.project-ddcf878c32b646786380ca24b341adf32fc4e2d6.tar.gz org.eclipse.jetty.project-ddcf878c32b646786380ca24b341adf32fc4e2d6.tar.xz org.eclipse.jetty.project-ddcf878c32b646786380ca24b341adf32fc4e2d6.zip |
444416 - AsyncProxyServlet recursion.
Implemented reading of content using IteratingCallback to avoid
recursion.
-rw-r--r-- | jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncProxyServlet.java | 129 | ||||
-rw-r--r-- | jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletFailureTest.java | 38 |
2 files changed, 72 insertions, 95 deletions
diff --git a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncProxyServlet.java b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncProxyServlet.java index d42c4a2a68..9650dd6118 100644 --- a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncProxyServlet.java +++ b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncProxyServlet.java @@ -22,8 +22,6 @@ import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; import java.nio.channels.WritePendingException; -import java.util.concurrent.atomic.AtomicReference; - import javax.servlet.ReadListener; import javax.servlet.ServletConfig; import javax.servlet.ServletException; @@ -38,6 +36,7 @@ import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.util.DeferredContentProvider; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.IteratingCallback; public class AsyncProxyServlet extends ProxyServlet { @@ -114,25 +113,12 @@ public class AsyncProxyServlet extends ProxyServlet } } - /** - * State machine for reader - * null PENDING SUCCESS FAILURE - * --------------------------------------- - * onRequestContent call null null null null - * onRequestContent called PENDING - (iterate) - - * succeeded SUCCESS SUCCESS->ODA - - - * failed FAILED FAILED - - - * - */ - private enum ReadState { OFFER, PENDING, SUCCESS, FAILURE }; - - protected class StreamReader implements ReadListener, Callback + protected class StreamReader extends IteratingCallback implements ReadListener { private final byte[] buffer = new byte[getHttpClient().getRequestBufferSize()]; private final Request proxyRequest; private final HttpServletRequest request; private final DeferredContentProvider provider; - private final AtomicReference<ReadState> state = new AtomicReference<>(null); protected StreamReader(Request proxyRequest, HttpServletRequest request, DeferredContentProvider provider) { @@ -144,13 +130,28 @@ public class AsyncProxyServlet extends ProxyServlet @Override public void onDataAvailable() throws IOException { - int requestId=0; - ServletInputStream input = request.getInputStream(); + iterate(); + } + + @Override + public void onAllDataRead() throws IOException + { if (_log.isDebugEnabled()) - { - requestId= getRequestId(request); - _log.debug("{} asynchronous read start on {}", requestId, input); - } + _log.debug("{} proxying content to upstream completed", getRequestId(request)); + provider.close(); + } + + @Override + public void onError(Throwable t) + { + onClientRequestFailure(proxyRequest, request, t); + } + + @Override + protected Action process() throws Exception + { + int requestId = _log.isDebugEnabled() ? getRequestId(request) : 0; + ServletInputStream input = request.getInputStream(); // First check for isReady() because it has // side effects, and then for isFinished(). @@ -163,16 +164,22 @@ public class AsyncProxyServlet extends ProxyServlet { if (_log.isDebugEnabled()) _log.debug("{} proxying content to upstream: {} bytes", requestId, read); - state.set(ReadState.OFFER); onRequestContent(proxyRequest, request, provider, buffer, 0, read, this); - if (state.compareAndSet(ReadState.OFFER,ReadState.PENDING) || state.get()!=ReadState.SUCCESS) - break; + return Action.SCHEDULED; } } - if (!input.isFinished()) + + if (input.isFinished()) + { + if (_log.isDebugEnabled()) + _log.debug("{} asynchronous read complete on {}", requestId, input); + return Action.SUCCEEDED; + } + else { if (_log.isDebugEnabled()) _log.debug("{} asynchronous read pending on {}", requestId, input); + return Action.IDLE; } } @@ -182,76 +189,10 @@ public class AsyncProxyServlet extends ProxyServlet } @Override - public void onAllDataRead() throws IOException - { - if (_log.isDebugEnabled()) - _log.debug("{} proxying content to upstream completed", getRequestId(request)); - provider.close(); - } - - @Override - public void onError(Throwable x) - { - failed(x); - } - - @Override - public void succeeded() - { - loop: while (true) - { - switch(state.get()) - { - case OFFER: - if (!state.compareAndSet(ReadState.OFFER,ReadState.SUCCESS)) - continue; - // Nothing to do as onDataAvailable() will iterate - break loop; - - case PENDING: - if (!state.compareAndSet(ReadState.PENDING,ReadState.SUCCESS)) - continue; - try - { - onDataAvailable(); - } - catch (Throwable x) - { - failed(x); - } - break loop; - - default: - throw new IllegalStateException("state="+state.get()); - } - } - - } - - @Override public void failed(Throwable x) { - while (true) - { - switch(state.get()) - { - case OFFER: - if (!state.compareAndSet(ReadState.OFFER,ReadState.SUCCESS)) - continue; - onClientRequestFailure(proxyRequest, request, x); - break; - - case PENDING: - if (!state.compareAndSet(ReadState.PENDING,ReadState.SUCCESS)) - continue; - - onClientRequestFailure(proxyRequest, request, x); - break; - - default: - throw new IllegalStateException(x); - } - } + super.failed(x); + onError(x); } } diff --git a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletFailureTest.java b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletFailureTest.java index b0ff68c535..afb9428249 100644 --- a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletFailureTest.java +++ b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletFailureTest.java @@ -181,7 +181,7 @@ public class ProxyServletFailureTest } @Test - public void testClientRequestStallsContentProxyIdlesTimeout() throws Exception + public void testClientRequestDoesNotSendContentProxyIdlesTimeout() throws Exception { prepareProxy(); int idleTimeout = 2000; @@ -216,6 +216,42 @@ public class ProxyServletFailureTest } @Test + public void testClientRequestStallsContentProxyIdlesTimeout() throws Exception + { + prepareProxy(); + int idleTimeout = 2000; + proxyConnector.setIdleTimeout(idleTimeout); + + prepareServer(new EchoHttpServlet()); + + try (Socket socket = new Socket("localhost", proxyConnector.getLocalPort())) + { + String serverHostPort = "localhost:" + serverConnector.getLocalPort(); + String request = "" + + "GET http://" + serverHostPort + " HTTP/1.1\r\n" + + "Host: " + serverHostPort + "\r\n" + + "Content-Length: 2\r\n" + + "\r\n" + + "Z"; + OutputStream output = socket.getOutputStream(); + output.write(request.getBytes("UTF-8")); + output.flush(); + + // Do not send all the promised content, wait to idle timeout. + + socket.setSoTimeout(2 * idleTimeout); + SimpleHttpParser parser = new SimpleHttpParser(); + BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8")); + SimpleHttpResponse response = parser.readResponse(reader); + Assert.assertTrue(Integer.parseInt(response.getCode()) >= 500); + String connectionHeader = response.getHeaders().get("connection"); + Assert.assertNotNull(connectionHeader); + Assert.assertTrue(connectionHeader.contains("close")); + Assert.assertEquals(-1, reader.read()); + } + } + + @Test public void testProxyRequestStallsContentServerIdlesTimeout() throws Exception { final byte[] content = new byte[]{'C', '0', 'F', 'F', 'E', 'E'}; |