diff options
author | Greg Wilkins | 2014-04-25 15:26:27 +0000 |
---|---|---|
committer | Greg Wilkins | 2014-04-25 15:26:43 +0000 |
commit | 87c5b30d1cc4f878c44f089a4b3d05aaa121bbad (patch) | |
tree | f213c7cbe4ee40a9e659228ed533fe804ff68223 | |
parent | b4542a031b1873f91f5f75a5b2147913191b4f1a (diff) | |
download | org.eclipse.jetty.project-87c5b30d1cc4f878c44f089a4b3d05aaa121bbad.tar.gz org.eclipse.jetty.project-87c5b30d1cc4f878c44f089a4b3d05aaa121bbad.tar.xz org.eclipse.jetty.project-87c5b30d1cc4f878c44f089a4b3d05aaa121bbad.zip |
432901 ensure a single onError callback only in pending and unready states
-rw-r--r-- | jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java | 144 | ||||
-rw-r--r-- | jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncIOServletTest.java | 86 |
2 files changed, 168 insertions, 62 deletions
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 610db58cbf..085fbeb67d 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -74,7 +74,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable write completed - - - ASYNC READY->owp - */ - enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, CLOSED } + enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED } private final AtomicReference<OutputState> _state=new AtomicReference<>(OutputState.OPEN); public HttpOutput(HttpChannel<?> channel) @@ -146,7 +146,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable break loop; case UNREADY: - throw new WritePendingException(); // TODO ? + if (_state.compareAndSet(state,OutputState.ERROR)) + _writeListener.onError(_onError==null?new EofException("Async close"):_onError); + continue; default: if (_state.compareAndSet(state,OutputState.CLOSED)) @@ -179,7 +181,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable break loop; case UNREADY: - throw new WritePendingException(); // TODO ? + if (_state.compareAndSet(state,OutputState.ERROR)) + _writeListener.onError(_onError==null?new EofException("Async closed"):_onError); + continue; default: if (_state.compareAndSet(state,OutputState.CLOSED)) @@ -238,6 +242,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable case UNREADY: throw new WritePendingException(); + case ERROR: + throw new EofException(_onError); + case CLOSED: return; } @@ -298,6 +305,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable case UNREADY: throw new WritePendingException(); + case ERROR: + throw new EofException(_onError); + case CLOSED: throw new EofException("Closed"); } @@ -396,6 +406,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable case UNREADY: throw new WritePendingException(); + case ERROR: + throw new EofException(_onError); + case CLOSED: throw new EofException("Closed"); } @@ -476,6 +489,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable case UNREADY: throw new WritePendingException(); + case ERROR: + throw new EofException(_onError); + case CLOSED: throw new EofException("Closed"); } @@ -615,6 +631,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable if (!_state.compareAndSet(OutputState.OPEN, OutputState.PENDING)) continue; break; + case ERROR: + throw new EofException(_onError); case CLOSED: throw new EofException("Closed"); default: @@ -706,6 +724,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable return false; case UNREADY: return false; + + case ERROR: + return true; case CLOSED: return true; @@ -716,45 +737,54 @@ public class HttpOutput extends ServletOutputStream implements Runnable @Override public void run() { - if(_onError!=null) + loop: while (true) { - Throwable th=_onError; - _onError=null; - _writeListener.onError(new IOException(th)); - close(); - } - - switch(_state.get()) - { - case READY: - try - { - _writeListener.onWritePossible(); - } - catch (Throwable e) + OutputState state = _state.get(); + + if(_onError!=null) + { + switch(state) { - _writeListener.onError(e); - close(); + case CLOSED: + case ERROR: + _onError=null; + break loop; + + default: + if (_state.compareAndSet(state, OutputState.ERROR)) + { + Throwable th=_onError; + _onError=null; + _writeListener.onError(new IOException(th)); + close(); + + break loop; + } + } - break; - - case CLOSED: - try - { - new Throwable().printStackTrace(); + continue loop; + } + + switch(_state.get()) + { + case READY: + case CLOSED: // even though a write is not possible, because a close has // occurred, we need to call onWritePossible to tell async // producer that the last write completed. - _writeListener.onWritePossible(); - } - catch (Throwable e) - { - _writeListener.onError(e); - } - break; - - default: - + try + { + _writeListener.onWritePossible(); + break loop; + } + catch (Throwable e) + { + _onError=e; + } + break; + default: + + } } } @@ -769,37 +799,29 @@ public class HttpOutput extends ServletOutputStream implements Runnable @Override protected void completed() { - try + while(true) { - while(true) + OutputState last=_state.get(); + switch(last) { - HttpOutput.OutputState last=_state.get(); - switch(last) - { - case PENDING: - if (!_state.compareAndSet(HttpOutput.OutputState.PENDING, HttpOutput.OutputState.ASYNC)) - continue; - break; + case PENDING: + if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC)) + continue; + break; - case UNREADY: - if (!_state.compareAndSet(HttpOutput.OutputState.UNREADY, HttpOutput.OutputState.READY)) - continue; - _channel.getState().onWritePossible(); - break; + case UNREADY: + if (!_state.compareAndSet(OutputState.UNREADY, OutputState.READY)) + continue; + _channel.getState().onWritePossible(); + break; - case CLOSED: - break; + case CLOSED: + break; - default: - throw new IllegalStateException(); - } - break; + default: + throw new IllegalStateException(); } - } - catch (Exception e) - { - _onError=e; - _channel.getState().onWritePossible(); + break; } } diff --git a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncIOServletTest.java b/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncIOServletTest.java index 563b895921..0abff4f158 100644 --- a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncIOServletTest.java +++ b/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncIOServletTest.java @@ -18,11 +18,16 @@ package org.eclipse.jetty.servlet; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; + import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStream; import java.net.Socket; +import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -30,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.servlet.AsyncContext; import javax.servlet.ReadListener; import javax.servlet.ServletException; +import javax.servlet.ServletOutputStream; import javax.servlet.WriteListener; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; @@ -39,6 +45,7 @@ import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.toolchain.test.http.SimpleHttpParser; import org.eclipse.jetty.toolchain.test.http.SimpleHttpResponse; +import org.hamcrest.Matchers; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -56,7 +63,7 @@ public class AsyncIOServletTest connector = new ServerConnector(server); server.addConnector(connector); - context = new ServletContextHandler(server, "", false, false); + context = new ServletContextHandler(server, "/", false, false); ServletHolder holder = new ServletHolder(servlet); holder.setAsyncSupported(true); context.addServlet(holder, path); @@ -257,4 +264,81 @@ public class AsyncIOServletTest Assert.assertEquals("500", response.getCode()); } } + + + @Test + public void testAsyncWriteClosed() throws Exception + { + final CountDownLatch latch = new CountDownLatch(1); + String text = "Now is the winter of our discontent. How Now Brown Cow. The quick brown fox jumped over the lazy dog.\n"; + for (int i=0;i<10;i++) + text=text+text; + final byte[] data = text.getBytes(StandardCharsets.ISO_8859_1); + + startServer(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException + { + response.flushBuffer(); + + final AsyncContext async = request.startAsync(); + final ServletOutputStream out = response.getOutputStream(); + out.setWriteListener(new WriteListener() + { + @Override + public void onWritePossible() throws IOException + { + while (out.isReady()) + { + try + { + Thread.sleep(100); + out.write(data); + } + catch(IOException e) + { + throw e; + } + catch(Exception e) + { + e.printStackTrace(); + } + } + } + + @Override + public void onError(Throwable t) + { + async.complete(); + latch.countDown(); + } + }); + } + }); + + String request = "GET " + path + " HTTP/1.1\r\n" + + "Host: localhost:" + connector.getLocalPort() + "\r\n" + + "\r\n"; + + try (Socket client = new Socket("localhost", connector.getLocalPort())) + { + OutputStream output = client.getOutputStream(); + output.write(request.getBytes("UTF-8")); + output.flush(); + + BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream())); + String line=in.readLine(); + assertThat(line, containsString("200 OK")); + while (line.length()>0) + line=in.readLine(); + line=in.readLine(); + assertThat(line, not(containsString(" "))); + line=in.readLine(); + assertThat(line, containsString("discontent. How Now Brown Cow. The ")); + } + + if (!latch.await(5, TimeUnit.SECONDS)) + Assert.fail(); + } } |