Skip to main content
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimone Bordet2014-09-22 16:21:35 +0000
committerSimone Bordet2014-09-22 16:21:35 +0000
commitddcf878c32b646786380ca24b341adf32fc4e2d6 (patch)
treeae7dcdb2fa11650a7115a23fb59c5c00de0dac94
parent1cd367ae7c083be5c18c531b7c49b47d629c27c4 (diff)
downloadorg.eclipse.jetty.project-ddcf878c32b646786380ca24b341adf32fc4e2d6.tar.gz
org.eclipse.jetty.project-ddcf878c32b646786380ca24b341adf32fc4e2d6.tar.xz
org.eclipse.jetty.project-ddcf878c32b646786380ca24b341adf32fc4e2d6.zip
444416 - AsyncProxyServlet recursion.
Implemented reading of content using IteratingCallback to avoid recursion.
-rw-r--r--jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncProxyServlet.java129
-rw-r--r--jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletFailureTest.java38
2 files changed, 72 insertions, 95 deletions
diff --git a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncProxyServlet.java b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncProxyServlet.java
index d42c4a2a68..9650dd6118 100644
--- a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncProxyServlet.java
+++ b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/AsyncProxyServlet.java
@@ -22,8 +22,6 @@ import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
-import java.util.concurrent.atomic.AtomicReference;
-
import javax.servlet.ReadListener;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
@@ -38,6 +36,7 @@ import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.IteratingCallback;
public class AsyncProxyServlet extends ProxyServlet
{
@@ -114,25 +113,12 @@ public class AsyncProxyServlet extends ProxyServlet
}
}
- /**
- * State machine for reader
- * null PENDING SUCCESS FAILURE
- * ---------------------------------------
- * onRequestContent call null null null null
- * onRequestContent called PENDING - (iterate) -
- * succeeded SUCCESS SUCCESS->ODA - -
- * failed FAILED FAILED - -
- *
- */
- private enum ReadState { OFFER, PENDING, SUCCESS, FAILURE };
-
- protected class StreamReader implements ReadListener, Callback
+ protected class StreamReader extends IteratingCallback implements ReadListener
{
private final byte[] buffer = new byte[getHttpClient().getRequestBufferSize()];
private final Request proxyRequest;
private final HttpServletRequest request;
private final DeferredContentProvider provider;
- private final AtomicReference<ReadState> state = new AtomicReference<>(null);
protected StreamReader(Request proxyRequest, HttpServletRequest request, DeferredContentProvider provider)
{
@@ -144,13 +130,28 @@ public class AsyncProxyServlet extends ProxyServlet
@Override
public void onDataAvailable() throws IOException
{
- int requestId=0;
- ServletInputStream input = request.getInputStream();
+ iterate();
+ }
+
+ @Override
+ public void onAllDataRead() throws IOException
+ {
if (_log.isDebugEnabled())
- {
- requestId= getRequestId(request);
- _log.debug("{} asynchronous read start on {}", requestId, input);
- }
+ _log.debug("{} proxying content to upstream completed", getRequestId(request));
+ provider.close();
+ }
+
+ @Override
+ public void onError(Throwable t)
+ {
+ onClientRequestFailure(proxyRequest, request, t);
+ }
+
+ @Override
+ protected Action process() throws Exception
+ {
+ int requestId = _log.isDebugEnabled() ? getRequestId(request) : 0;
+ ServletInputStream input = request.getInputStream();
// First check for isReady() because it has
// side effects, and then for isFinished().
@@ -163,16 +164,22 @@ public class AsyncProxyServlet extends ProxyServlet
{
if (_log.isDebugEnabled())
_log.debug("{} proxying content to upstream: {} bytes", requestId, read);
- state.set(ReadState.OFFER);
onRequestContent(proxyRequest, request, provider, buffer, 0, read, this);
- if (state.compareAndSet(ReadState.OFFER,ReadState.PENDING) || state.get()!=ReadState.SUCCESS)
- break;
+ return Action.SCHEDULED;
}
}
- if (!input.isFinished())
+
+ if (input.isFinished())
+ {
+ if (_log.isDebugEnabled())
+ _log.debug("{} asynchronous read complete on {}", requestId, input);
+ return Action.SUCCEEDED;
+ }
+ else
{
if (_log.isDebugEnabled())
_log.debug("{} asynchronous read pending on {}", requestId, input);
+ return Action.IDLE;
}
}
@@ -182,76 +189,10 @@ public class AsyncProxyServlet extends ProxyServlet
}
@Override
- public void onAllDataRead() throws IOException
- {
- if (_log.isDebugEnabled())
- _log.debug("{} proxying content to upstream completed", getRequestId(request));
- provider.close();
- }
-
- @Override
- public void onError(Throwable x)
- {
- failed(x);
- }
-
- @Override
- public void succeeded()
- {
- loop: while (true)
- {
- switch(state.get())
- {
- case OFFER:
- if (!state.compareAndSet(ReadState.OFFER,ReadState.SUCCESS))
- continue;
- // Nothing to do as onDataAvailable() will iterate
- break loop;
-
- case PENDING:
- if (!state.compareAndSet(ReadState.PENDING,ReadState.SUCCESS))
- continue;
- try
- {
- onDataAvailable();
- }
- catch (Throwable x)
- {
- failed(x);
- }
- break loop;
-
- default:
- throw new IllegalStateException("state="+state.get());
- }
- }
-
- }
-
- @Override
public void failed(Throwable x)
{
- while (true)
- {
- switch(state.get())
- {
- case OFFER:
- if (!state.compareAndSet(ReadState.OFFER,ReadState.SUCCESS))
- continue;
- onClientRequestFailure(proxyRequest, request, x);
- break;
-
- case PENDING:
- if (!state.compareAndSet(ReadState.PENDING,ReadState.SUCCESS))
- continue;
-
- onClientRequestFailure(proxyRequest, request, x);
- break;
-
- default:
- throw new IllegalStateException(x);
- }
- }
+ super.failed(x);
+ onError(x);
}
}
diff --git a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletFailureTest.java b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletFailureTest.java
index b0ff68c535..afb9428249 100644
--- a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletFailureTest.java
+++ b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/ProxyServletFailureTest.java
@@ -181,7 +181,7 @@ public class ProxyServletFailureTest
}
@Test
- public void testClientRequestStallsContentProxyIdlesTimeout() throws Exception
+ public void testClientRequestDoesNotSendContentProxyIdlesTimeout() throws Exception
{
prepareProxy();
int idleTimeout = 2000;
@@ -216,6 +216,42 @@ public class ProxyServletFailureTest
}
@Test
+ public void testClientRequestStallsContentProxyIdlesTimeout() throws Exception
+ {
+ prepareProxy();
+ int idleTimeout = 2000;
+ proxyConnector.setIdleTimeout(idleTimeout);
+
+ prepareServer(new EchoHttpServlet());
+
+ try (Socket socket = new Socket("localhost", proxyConnector.getLocalPort()))
+ {
+ String serverHostPort = "localhost:" + serverConnector.getLocalPort();
+ String request = "" +
+ "GET http://" + serverHostPort + " HTTP/1.1\r\n" +
+ "Host: " + serverHostPort + "\r\n" +
+ "Content-Length: 2\r\n" +
+ "\r\n" +
+ "Z";
+ OutputStream output = socket.getOutputStream();
+ output.write(request.getBytes("UTF-8"));
+ output.flush();
+
+ // Do not send all the promised content, wait to idle timeout.
+
+ socket.setSoTimeout(2 * idleTimeout);
+ SimpleHttpParser parser = new SimpleHttpParser();
+ BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"));
+ SimpleHttpResponse response = parser.readResponse(reader);
+ Assert.assertTrue(Integer.parseInt(response.getCode()) >= 500);
+ String connectionHeader = response.getHeaders().get("connection");
+ Assert.assertNotNull(connectionHeader);
+ Assert.assertTrue(connectionHeader.contains("close"));
+ Assert.assertEquals(-1, reader.read());
+ }
+ }
+
+ @Test
public void testProxyRequestStallsContentServerIdlesTimeout() throws Exception
{
final byte[] content = new byte[]{'C', '0', 'F', 'F', 'E', 'E'};

Back to the top