diff options
Diffstat (limited to 'jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncServletIOTest.java')
-rw-r--r-- | jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncServletIOTest.java | 195 |
1 files changed, 194 insertions, 1 deletions
diff --git a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncServletIOTest.java b/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncServletIOTest.java index 66e03ec405..ea5cc7a718 100644 --- a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncServletIOTest.java +++ b/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncServletIOTest.java @@ -18,9 +18,14 @@ package org.eclipse.jetty.servlet; +import static java.nio.charset.StandardCharsets.ISO_8859_1; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import java.io.BufferedReader; import java.io.IOException; @@ -57,6 +62,7 @@ import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.hamcrest.Matchers; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -69,6 +75,7 @@ public class AsyncServletIOTest protected AsyncIOServlet _servlet0=new AsyncIOServlet(); protected AsyncIOServlet2 _servlet2=new AsyncIOServlet2(); protected AsyncIOServlet3 _servlet3=new AsyncIOServlet3(); + protected AsyncIOServlet4 _servlet4=new AsyncIOServlet4(); protected int _port; protected Server _server = new Server(); protected ServletHandler _servletHandler; @@ -101,6 +108,10 @@ public class AsyncServletIOTest holder3.setAsyncSupported(true); _servletHandler.addServletWithMapping(holder3,"/path3/*"); + ServletHolder holder4=new ServletHolder(_servlet4); + holder4.setAsyncSupported(true); + _servletHandler.addServletWithMapping(holder4,"/path4/*"); + _server.start(); _port=_connector.getLocalPort(); @@ -232,7 +243,7 @@ public class AsyncServletIOTest int port=_port; try (Socket socket = new Socket("localhost",port)) { - socket.setSoTimeout(1000000); + socket.setSoTimeout(10000); OutputStream out = socket.getOutputStream(); out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); @@ -263,6 +274,8 @@ public class AsyncServletIOTest } } + + public synchronized List<String> process(String content,int... writes) throws Exception { return process(content.getBytes(StandardCharsets.ISO_8859_1),writes); @@ -596,4 +609,184 @@ public class AsyncServletIOTest async.complete(); } } + + + @Test + public void testCompleteWhilePending() throws Exception + { + _servlet4.onDA.set(0); + _servlet4.onWP.set(0); + + StringBuilder request = new StringBuilder(512); + request.append("POST /ctx/path4/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("Content-Type: text/plain\r\n") + .append("Content-Length: 20\r\n") + .append("\r\n") + .append("12345678\r\n"); + + int port=_port; + List<String> list = new ArrayList<>(); + try (Socket socket = new Socket("localhost",port)) + { + socket.setSoTimeout(10000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(ISO_8859_1)); + out.flush(); + Thread.sleep(100); + out.write("ABC".getBytes(ISO_8859_1)); + out.flush(); + Thread.sleep(100); + out.write("DEF".getBytes(ISO_8859_1)); + out.flush(); + + BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); + + // response line + String line = in.readLine(); + LOG.debug("response-line: "+line); + Assert.assertThat(line,startsWith("HTTP/1.1 200 OK")); + + boolean chunked=false; + // Skip headers + while (line!=null) + { + line = in.readLine(); + LOG.debug("header-line: "+line); + chunked|="Transfer-Encoding: chunked".equals(line); + if (line.length()==0) + break; + } + + assertTrue(chunked); + + // Get body slowly + String last=null; + try + { + while (true) + { + last=line; + //Thread.sleep(1000); + line = in.readLine(); + LOG.debug("body: "+line); + if (line==null) + break; + list.add(line); + } + } + catch(IOException e) + { + // ignored + } + + LOG.debug("last: "+last); + // last non empty line should contain some X's + assertThat(last,containsString("X")); + // last non empty line should not contain end chunk + assertThat(last,not(containsString("0"))); + } + + assertTrue(_servlet4.completed.await(5, TimeUnit.SECONDS)); + Thread.sleep(100); + assertEquals(0,_servlet4.onDA.get()); + assertEquals(0,_servlet4.onWP.get()); + + + } + + @SuppressWarnings("serial") + public class AsyncIOServlet4 extends HttpServlet + { + public CountDownLatch completed = new CountDownLatch(1); + public AtomicInteger onDA = new AtomicInteger(); + public AtomicInteger onWP = new AtomicInteger(); + + @Override + public void doPost(final HttpServletRequest request, final HttpServletResponse response) throws IOException + { + final AsyncContext async = request.startAsync(); + final ServletInputStream in = request.getInputStream(); + final ServletOutputStream out = response.getOutputStream(); + + in.setReadListener(new ReadListener() + { + @Override + public void onError(Throwable t) + { + t.printStackTrace(); + } + + @Override + public void onDataAvailable() throws IOException + { + onDA.incrementAndGet(); + + boolean readF=false; + // Read all available content + while(in.isReady()) + { + int c = in.read(); + if (c<0) + throw new IllegalStateException(); + if (c=='F') + readF=true; + } + + if (readF) + { + onDA.set(0); + + final byte[] buffer = new byte[64*1024]; + Arrays.fill(buffer,(byte)'X'); + for (int i=199;i<buffer.length;i+=200) + buffer[i]=(byte)'\n'; + + // Once we read block, let's make ourselves write blocked + out.setWriteListener(new WriteListener() + { + @Override + public void onWritePossible() throws IOException + { + onWP.incrementAndGet(); + + while (out.isReady()) + out.write(buffer); + + try + { + // As soon as we are write blocked, complete + onWP.set(0); + async.complete(); + } + catch(Exception e) + { + e.printStackTrace(); + } + finally + { + completed.countDown(); + } + } + + @Override + public void onError(Throwable t) + { + t.printStackTrace(); + } + }); + } + } + + @Override + public void onAllDataRead() throws IOException + { + throw new IllegalStateException(); + } + }); + + } + } + + } |