Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGreg Wilkins2014-12-04 16:39:53 +0000
committerGreg Wilkins2014-12-04 16:39:53 +0000
commit07e87f0ecfc17cb0fda6c24df8803961b34e6df0 (patch)
tree9d8ba036caaa615fe98808e601b728e6dc30d5e1
parent867feedca48d3204f61b8444330515c5586216a6 (diff)
downloadorg.eclipse.jetty.project-07e87f0ecfc17cb0fda6c24df8803961b34e6df0.tar.gz
org.eclipse.jetty.project-07e87f0ecfc17cb0fda6c24df8803961b34e6df0.tar.xz
org.eclipse.jetty.project-07e87f0ecfc17cb0fda6c24df8803961b34e6df0.zip
454157 abort to avoid spin in async HttpInput.consumeAll
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java2
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java2
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java20
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java2
-rw-r--r--jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncServletIOTest.java88
5 files changed, 108 insertions, 6 deletions
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java b/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java
index b2c3f68555..9cc23933c6 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java
@@ -58,7 +58,7 @@ public abstract class FillInterest
if (!_interested.compareAndSet(null,callback))
{
- LOG.warn("Read pending for "+_interested.get()+" pervented "+callback);
+ LOG.warn("Read pending for "+_interested.get()+" prevented "+callback);
throw new ReadPendingException();
}
try
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java
index 968b8210b0..3f845fb119 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java
@@ -479,7 +479,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable, H
}
else if (isCommitted())
{
- _transport.abort();
+ abort();
if (!(x instanceof EofException))
LOG.warn("Could not send response error 500: "+x);
}
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java
index 9dbbfe0e5f..69bb3072af 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java
@@ -348,11 +348,27 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
// Finish consuming the request
// If we are still expecting
if (_channel.isExpecting100Continue())
+ {
// close to seek EOF
_parser.close();
+ }
else if (_parser.inContentState() && _generator.isPersistent())
- // Complete reading the request
- _channel.getRequest().getHttpInput().consumeAll();
+ {
+ // If we are async, then we have problems to complete neatly
+ if (_channel.getRequest().getHttpInput().isAsync())
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("unconsumed async input {}", this);
+ _channel.abort();
+ }
+ else
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("unconsumed input {}", this);
+ // Complete reading the request
+ _channel.getRequest().getHttpInput().consumeAll();
+ }
+ }
// Reset the channel, parsers and generator
_channel.reset();
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java
index 0dd86fb91c..a906a9b9c2 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java
@@ -280,7 +280,7 @@ public abstract class HttpInput<T> extends ServletInputStream implements Runnabl
public void consumeAll()
{
synchronized (lock())
- {
+ {
try
{
while (!isFinished())
diff --git a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncServletIOTest.java b/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncServletIOTest.java
index 5cd51d8d78..19ef2b10b8 100644
--- a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncServletIOTest.java
+++ b/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncServletIOTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
@@ -65,6 +66,7 @@ public class AsyncServletIOTest
private static final Logger LOG = Log.getLogger(AsyncServletIOTest.class);
protected AsyncIOServlet _servlet0=new AsyncIOServlet();
protected AsyncIOServlet2 _servlet2=new AsyncIOServlet2();
+ protected AsyncIOServlet3 _servlet3=new AsyncIOServlet3();
protected int _port;
protected Server _server = new Server();
protected ServletHandler _servletHandler;
@@ -89,9 +91,13 @@ public class AsyncServletIOTest
_servletHandler.addServletWithMapping(holder,"/path/*");
ServletHolder holder2=new ServletHolder(_servlet2);
- holder.setAsyncSupported(true);
+ holder2.setAsyncSupported(true);
_servletHandler.addServletWithMapping(holder2,"/path2/*");
+ ServletHolder holder3=new ServletHolder(_servlet3);
+ holder3.setAsyncSupported(true);
+ _servletHandler.addServletWithMapping(holder3,"/path3/*");
+
_server.start();
_port=_connector.getLocalPort();
@@ -209,6 +215,50 @@ public class AsyncServletIOTest
Assert.assertTrue(_servlet2.completed.await(5, TimeUnit.SECONDS));
}
+ @Test
+ public void testAsyncConsumeAll() throws Exception
+ {
+ StringBuilder request = new StringBuilder(512);
+ request.append("GET /ctx/path3/info HTTP/1.1\r\n")
+ .append("Host: localhost\r\n")
+ .append("Content-Type: text/plain\r\n")
+ .append("Content-Length: 10\r\n")
+ .append("\r\n");
+
+ int port=_port;
+ try (Socket socket = new Socket("localhost",port))
+ {
+ socket.setSoTimeout(1000000);
+ OutputStream out = socket.getOutputStream();
+ out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1));
+
+ BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()),102400);
+
+ // response line
+ String line = in.readLine();
+ LOG.debug("response-line: "+line);
+ Assert.assertThat(line,startsWith("HTTP/1.1 200 OK"));
+
+ // Skip headers
+ while (line!=null)
+ {
+ line = in.readLine();
+ LOG.debug("header-line: "+line);
+ if (line.length()==0)
+ break;
+ }
+
+ // Get body
+ line = in.readLine();
+ LOG.debug("body: "+line);
+ Assert.assertEquals("DONE",line);
+
+ // The connection should be aborted
+ line = in.readLine();
+ Assert.assertNull(line);
+ }
+ }
+
public synchronized List<String> process(String content,int... writes) throws Exception
{
return process(content.getBytes(StandardCharsets.ISO_8859_1),writes);
@@ -507,4 +557,40 @@ public class AsyncServletIOTest
}
}
}
+
+
+ @SuppressWarnings("serial")
+ public class AsyncIOServlet3 extends HttpServlet
+ {
+ public CountDownLatch completed = new CountDownLatch(1);
+
+ @Override
+ public void doGet(final HttpServletRequest request, final HttpServletResponse response) throws IOException
+ {
+ AsyncContext async = request.startAsync();
+
+ request.getInputStream().setReadListener(new ReadListener()
+ {
+
+ @Override
+ public void onError(Throwable t)
+ {
+ }
+
+ @Override
+ public void onDataAvailable() throws IOException
+ {
+ }
+
+ @Override
+ public void onAllDataRead() throws IOException
+ {
+ }
+ });
+
+ response.setStatus(200);
+ response.getOutputStream().print("DONE");
+ async.complete();
+ }
+ }
}

Back to the top