Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGreg Wilkins2014-04-25 15:26:27 +0000
committerGreg Wilkins2014-04-25 15:26:43 +0000
commit87c5b30d1cc4f878c44f089a4b3d05aaa121bbad (patch)
treef213c7cbe4ee40a9e659228ed533fe804ff68223
parentb4542a031b1873f91f5f75a5b2147913191b4f1a (diff)
downloadorg.eclipse.jetty.project-87c5b30d1cc4f878c44f089a4b3d05aaa121bbad.tar.gz
org.eclipse.jetty.project-87c5b30d1cc4f878c44f089a4b3d05aaa121bbad.tar.xz
org.eclipse.jetty.project-87c5b30d1cc4f878c44f089a4b3d05aaa121bbad.zip
432901 ensure a single onError callback only in pending and unready states
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java144
-rw-r--r--jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncIOServletTest.java86
2 files changed, 168 insertions, 62 deletions
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
index 610db58cbf..085fbeb67d 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
@@ -74,7 +74,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
write completed - - - ASYNC READY->owp -
*/
- enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, CLOSED }
+ enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED }
private final AtomicReference<OutputState> _state=new AtomicReference<>(OutputState.OPEN);
public HttpOutput(HttpChannel<?> channel)
@@ -146,7 +146,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
break loop;
case UNREADY:
- throw new WritePendingException(); // TODO ?
+ if (_state.compareAndSet(state,OutputState.ERROR))
+ _writeListener.onError(_onError==null?new EofException("Async close"):_onError);
+ continue;
default:
if (_state.compareAndSet(state,OutputState.CLOSED))
@@ -179,7 +181,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
break loop;
case UNREADY:
- throw new WritePendingException(); // TODO ?
+ if (_state.compareAndSet(state,OutputState.ERROR))
+ _writeListener.onError(_onError==null?new EofException("Async closed"):_onError);
+ continue;
default:
if (_state.compareAndSet(state,OutputState.CLOSED))
@@ -238,6 +242,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
case UNREADY:
throw new WritePendingException();
+ case ERROR:
+ throw new EofException(_onError);
+
case CLOSED:
return;
}
@@ -298,6 +305,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
case UNREADY:
throw new WritePendingException();
+ case ERROR:
+ throw new EofException(_onError);
+
case CLOSED:
throw new EofException("Closed");
}
@@ -396,6 +406,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
case UNREADY:
throw new WritePendingException();
+ case ERROR:
+ throw new EofException(_onError);
+
case CLOSED:
throw new EofException("Closed");
}
@@ -476,6 +489,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
case UNREADY:
throw new WritePendingException();
+ case ERROR:
+ throw new EofException(_onError);
+
case CLOSED:
throw new EofException("Closed");
}
@@ -615,6 +631,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable
if (!_state.compareAndSet(OutputState.OPEN, OutputState.PENDING))
continue;
break;
+ case ERROR:
+ throw new EofException(_onError);
case CLOSED:
throw new EofException("Closed");
default:
@@ -706,6 +724,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
return false;
case UNREADY:
return false;
+
+ case ERROR:
+ return true;
case CLOSED:
return true;
@@ -716,45 +737,54 @@ public class HttpOutput extends ServletOutputStream implements Runnable
@Override
public void run()
{
- if(_onError!=null)
+ loop: while (true)
{
- Throwable th=_onError;
- _onError=null;
- _writeListener.onError(new IOException(th));
- close();
- }
-
- switch(_state.get())
- {
- case READY:
- try
- {
- _writeListener.onWritePossible();
- }
- catch (Throwable e)
+ OutputState state = _state.get();
+
+ if(_onError!=null)
+ {
+ switch(state)
{
- _writeListener.onError(e);
- close();
+ case CLOSED:
+ case ERROR:
+ _onError=null;
+ break loop;
+
+ default:
+ if (_state.compareAndSet(state, OutputState.ERROR))
+ {
+ Throwable th=_onError;
+ _onError=null;
+ _writeListener.onError(new IOException(th));
+ close();
+
+ break loop;
+ }
+
}
- break;
-
- case CLOSED:
- try
- {
- new Throwable().printStackTrace();
+ continue loop;
+ }
+
+ switch(_state.get())
+ {
+ case READY:
+ case CLOSED:
// even though a write is not possible, because a close has
// occurred, we need to call onWritePossible to tell async
// producer that the last write completed.
- _writeListener.onWritePossible();
- }
- catch (Throwable e)
- {
- _writeListener.onError(e);
- }
- break;
-
- default:
-
+ try
+ {
+ _writeListener.onWritePossible();
+ break loop;
+ }
+ catch (Throwable e)
+ {
+ _onError=e;
+ }
+ break;
+ default:
+
+ }
}
}
@@ -769,37 +799,29 @@ public class HttpOutput extends ServletOutputStream implements Runnable
@Override
protected void completed()
{
- try
+ while(true)
{
- while(true)
+ OutputState last=_state.get();
+ switch(last)
{
- HttpOutput.OutputState last=_state.get();
- switch(last)
- {
- case PENDING:
- if (!_state.compareAndSet(HttpOutput.OutputState.PENDING, HttpOutput.OutputState.ASYNC))
- continue;
- break;
+ case PENDING:
+ if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
+ continue;
+ break;
- case UNREADY:
- if (!_state.compareAndSet(HttpOutput.OutputState.UNREADY, HttpOutput.OutputState.READY))
- continue;
- _channel.getState().onWritePossible();
- break;
+ case UNREADY:
+ if (!_state.compareAndSet(OutputState.UNREADY, OutputState.READY))
+ continue;
+ _channel.getState().onWritePossible();
+ break;
- case CLOSED:
- break;
+ case CLOSED:
+ break;
- default:
- throw new IllegalStateException();
- }
- break;
+ default:
+ throw new IllegalStateException();
}
- }
- catch (Exception e)
- {
- _onError=e;
- _channel.getState().onWritePossible();
+ break;
}
}
diff --git a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncIOServletTest.java b/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncIOServletTest.java
index 563b895921..0abff4f158 100644
--- a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncIOServletTest.java
+++ b/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncIOServletTest.java
@@ -18,11 +18,16 @@
package org.eclipse.jetty.servlet;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
+import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -30,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.AsyncContext;
import javax.servlet.ReadListener;
import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
@@ -39,6 +45,7 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.toolchain.test.http.SimpleHttpParser;
import org.eclipse.jetty.toolchain.test.http.SimpleHttpResponse;
+import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -56,7 +63,7 @@ public class AsyncIOServletTest
connector = new ServerConnector(server);
server.addConnector(connector);
- context = new ServletContextHandler(server, "", false, false);
+ context = new ServletContextHandler(server, "/", false, false);
ServletHolder holder = new ServletHolder(servlet);
holder.setAsyncSupported(true);
context.addServlet(holder, path);
@@ -257,4 +264,81 @@ public class AsyncIOServletTest
Assert.assertEquals("500", response.getCode());
}
}
+
+
+ @Test
+ public void testAsyncWriteClosed() throws Exception
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+ String text = "Now is the winter of our discontent. How Now Brown Cow. The quick brown fox jumped over the lazy dog.\n";
+ for (int i=0;i<10;i++)
+ text=text+text;
+ final byte[] data = text.getBytes(StandardCharsets.ISO_8859_1);
+
+ startServer(new HttpServlet()
+ {
+ @Override
+ protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException
+ {
+ response.flushBuffer();
+
+ final AsyncContext async = request.startAsync();
+ final ServletOutputStream out = response.getOutputStream();
+ out.setWriteListener(new WriteListener()
+ {
+ @Override
+ public void onWritePossible() throws IOException
+ {
+ while (out.isReady())
+ {
+ try
+ {
+ Thread.sleep(100);
+ out.write(data);
+ }
+ catch(IOException e)
+ {
+ throw e;
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ public void onError(Throwable t)
+ {
+ async.complete();
+ latch.countDown();
+ }
+ });
+ }
+ });
+
+ String request = "GET " + path + " HTTP/1.1\r\n" +
+ "Host: localhost:" + connector.getLocalPort() + "\r\n" +
+ "\r\n";
+
+ try (Socket client = new Socket("localhost", connector.getLocalPort()))
+ {
+ OutputStream output = client.getOutputStream();
+ output.write(request.getBytes("UTF-8"));
+ output.flush();
+
+ BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
+ String line=in.readLine();
+ assertThat(line, containsString("200 OK"));
+ while (line.length()>0)
+ line=in.readLine();
+ line=in.readLine();
+ assertThat(line, not(containsString(" ")));
+ line=in.readLine();
+ assertThat(line, containsString("discontent. How Now Brown Cow. The "));
+ }
+
+ if (!latch.await(5, TimeUnit.SECONDS))
+ Assert.fail();
+ }
}

Back to the top