Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGreg Wilkins2015-02-05 03:26:34 +0000
committerGreg Wilkins2015-02-05 03:26:34 +0000
commit7db7ef30204a4ff897e49ec82960dff90078ed1f (patch)
tree558f90b936c4a9b6aeae51694f7048fae6822c14
parentf6cfe07a69669197accb166fc7359a4e2a810871 (diff)
downloadorg.eclipse.jetty.project-7db7ef30204a4ff897e49ec82960dff90078ed1f.tar.gz
org.eclipse.jetty.project-7db7ef30204a4ff897e49ec82960dff90078ed1f.tar.xz
org.eclipse.jetty.project-7db7ef30204a4ff897e49ec82960dff90078ed1f.zip
Added test cases and improved isReady and isFinished handling
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java32
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/HttpConfiguration.java2
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java182
-rw-r--r--jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputTest.java22
-rw-r--r--jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncIOServletTest.java102
5 files changed, 247 insertions, 93 deletions
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java
index a6650b7dc9..35d0f1b78a 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java
@@ -296,19 +296,6 @@ public class HttpChannelState
throw new IllegalStateException(this.getStatusString());
}
- if (_asyncRead)
- {
- _state=State.ASYNC_IO;
- _asyncRead=false;
- return Action.READ_CALLBACK;
- }
-
- if (_asyncWrite)
- {
- _asyncWrite=false;
- _state=State.ASYNC_IO;
- return Action.WRITE_CALLBACK;
- }
if (_async!=null)
{
@@ -327,8 +314,25 @@ public class HttpChannelState
_state=State.DISPATCHED;
_async=null;
return Action.ASYNC_EXPIRED;
- case EXPIRING:
case STARTED:
+ if (_asyncRead)
+ {
+ _state=State.ASYNC_IO;
+ _asyncRead=false;
+ return Action.READ_CALLBACK;
+ }
+
+ if (_asyncWrite)
+ {
+ _asyncWrite=false;
+ _state=State.ASYNC_IO;
+ return Action.WRITE_CALLBACK;
+ }
+ scheduleTimeout();
+ _state=State.ASYNC_WAIT;
+ return Action.WAIT;
+
+ case EXPIRING:
scheduleTimeout();
_state=State.ASYNC_WAIT;
return Action.WAIT;
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConfiguration.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConfiguration.java
index d9c833fe94..fc19accd3b 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConfiguration.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConfiguration.java
@@ -54,7 +54,7 @@ public class HttpConfiguration
private boolean _sendServerVersion = true;
private boolean _sendXPoweredBy = false;
private boolean _sendDateHeader = true;
- private boolean _delayDispatchUntilContent = false;
+ private boolean _delayDispatchUntilContent = false; // TODO change to true
/* ------------------------------------------------------------ */
/**
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java
index 5d0e6118af..c314315454 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java
@@ -27,6 +27,7 @@ import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;
import org.eclipse.jetty.io.EofException;
+import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@@ -112,7 +113,7 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
{
int read = read(_oneByteBuffer, 0, 1);
if (read==0)
- throw new IllegalStateException("unready");
+ throw new IllegalStateException("unready read=0");
return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
}
@@ -148,12 +149,9 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
}
/**
- * Access the next content to be consumed from. Returning the next item does not consume it
- * and it may be returned multiple times until it is consumed.
- * <p/>
- * Calls to {@link #get(Content, byte[], int, int)}
- * or {@link #skip(Content, int)} are required to consume data from the content.
- *
+ * Get the next content from the inputQ, calling {@link #produceContent()}
+ * if need be. EOF is processed and state changed.
+ *
* @return the content or null if none available.
* @throws IOException if retrieving the content fails
*/
@@ -161,16 +159,21 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
{
if (!Thread.holdsLock(_inputQ))
throw new IllegalStateException();
- Content content = pollInputQ();
+ Content content = pollContent();
if (content==null && !isFinished())
{
produceContent();
- content = pollInputQ();
+ content = pollContent();
}
return content;
}
- protected Content pollInputQ()
+ /** Poll the inputQ for Content.
+ * Consumed buffers and {@link PoisonPillContent}s are removed and
+ * EOF state updated if need be.
+ * @return Content or null
+ */
+ protected Content pollContent()
{
if (!Thread.holdsLock(_inputQ))
throw new IllegalStateException();
@@ -203,7 +206,55 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
return content;
}
-
+
+ /**
+ * Get the next readable from the inputQ, calling {@link #produceContent()}
+ * if need be. EOF is NOT processed and state is not changed.
+ *
+ * @return the content or EOF or null if none available.
+ * @throws IOException if retrieving the content fails
+ */
+ protected Content nextReadable() throws IOException
+ {
+ if (!Thread.holdsLock(_inputQ))
+ throw new IllegalStateException();
+ Content content = pollReadable();
+ if (content==null && !isFinished())
+ {
+ produceContent();
+ content = pollReadable();
+ }
+ return content;
+ }
+
+ /** Poll the inputQ for Content or EOF.
+ * Consumed buffers and non EOF {@link PoisonPillContent}s are removed.
+ * EOF state is not updated.
+ * @return Content, EOF or null
+ */
+ protected Content pollReadable()
+ {
+ if (!Thread.holdsLock(_inputQ))
+ throw new IllegalStateException();
+
+ // Items are removed only when they are fully consumed.
+ Content content = _inputQ.peekUnsafe();
+
+ // Skip consumed items at the head of the queue except EOF
+ while (content != null)
+ {
+ if (content==EOF_CONTENT || content==EARLY_EOF_CONTENT || remaining(content)>0)
+ return content;
+
+ _inputQ.pollUnsafe();
+ content.succeeded();
+ if (LOG.isDebugEnabled())
+ LOG.debug("{} consumed {}", this, content);
+ content = _inputQ.peekUnsafe();
+ }
+
+ return content;
+ }
/**
* @param item the content
@@ -214,7 +265,6 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
return item.remaining();
}
-
/**
* Copies the given content into the given byte buffer.
*
@@ -230,7 +280,7 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
content.getContent().get(buffer, offset, l);
_contentConsumed+=l;
if (l>0 && !content.hasContent())
- pollInputQ(); // hungry succeed
+ pollContent(); // hungry succeed
return l;
}
@@ -248,7 +298,7 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
buffer.position(buffer.position()+l);
_contentConsumed+=l;
if (l>0 && !content.hasContent())
- pollInputQ(); // hungry succeed
+ pollContent(); // hungry succeed
}
@@ -391,34 +441,29 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
@Override
public boolean isReady()
{
- synchronized (_inputQ)
+ try
{
- if (_listener == null )
- return true;
- if (_unready)
- return false;
- if (_state instanceof EOFState)
- return true;
-
- if (_inputQ.isEmpty())
+ synchronized (_inputQ)
{
- try
- {
- produceContent();
- }
- catch(IOException e)
- {
- failed(e);
- }
+ if (_listener == null )
+ return true;
+ if (_unready)
+ return false;
+ if (_state instanceof EOFState)
+ return true;
+ if (nextReadable()!=null)
+ return true;
+
+ _unready = true;
}
-
- if (!_inputQ.isEmpty())
- return true;
-
- _unready = true;
+ unready();
+ return false;
+ }
+ catch(IOException e)
+ {
+ LOG.ignore(e);
+ return true;
}
- unready();
- return false;
}
protected void unready()
@@ -429,15 +474,28 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
public void setReadListener(ReadListener readListener)
{
readListener = Objects.requireNonNull(readListener);
- synchronized (_inputQ)
+ boolean content;
+ try
{
- if (_state != STREAM)
- throw new IllegalStateException("state=" + _state);
- _state = ASYNC;
- _listener = readListener;
- _unready = true;
+ synchronized (_inputQ)
+ {
+ if (_state != STREAM)
+ throw new IllegalStateException("state=" + _state);
+ _state = ASYNC;
+ _listener = readListener;
+ _unready = true;
+ content=nextContent()!=null;
+ }
}
- onReadPossible();
+ catch(IOException e)
+ {
+ throw new RuntimeIOException(e);
+ }
+
+ if (content)
+ onReadPossible();
+ else
+ unready();
}
public void failed(Throwable x)
@@ -488,30 +546,28 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
try
{
- if (error != null)
- listener.onError(error);
- else if (aeof)
- listener.onAllDataRead();
- else
- {
+ if (aeof)
+ listener.onAllDataRead();
+ else if (error == null)
listener.onDataAvailable();
- synchronized (_inputQ)
- {
- if (_state==AEOF)
- {
- _state=EOF;
- aeof=true;
- }
- }
- if (aeof)
- listener.onAllDataRead();
- }
+ else
+ listener.onError(error);
}
catch (Throwable e)
{
LOG.warn(e.toString());
LOG.debug(e);
- listener.onError(e);
+ try
+ {
+ if (aeof || error==null)
+ listener.onError(e);
+ }
+ catch (Throwable e2)
+ {
+ LOG.warn(e2.toString());
+ LOG.debug(e2);
+ throw new RuntimeIOException(e2);
+ }
}
}
diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputTest.java
index bb9f0cecbe..f0da531888 100644
--- a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputTest.java
+++ b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputTest.java
@@ -27,22 +27,21 @@ import static org.junit.Assert.fail;
import java.io.EOFException;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Queue;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.ReadListener;
+import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.hamcrest.Matchers;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+@RunWith(AdvancedRunner.class)
public class HttpInputTest
{
Queue<String> _history = new ConcurrentArrayQueue<String>()
@@ -328,7 +327,8 @@ public class HttpInputTest
public void testAsyncEmpty() throws Exception
{
_in.setReadListener(_listener);
- assertThat(_history.poll(),equalTo("onReadPossible"));
+ assertThat(_history.poll(),equalTo("produceContent 0"));
+ assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),nullValue());
_in.run();
@@ -349,7 +349,8 @@ public class HttpInputTest
public void testAsyncRead() throws Exception
{
_in.setReadListener(_listener);
- assertThat(_history.poll(),equalTo("onReadPossible"));
+ assertThat(_history.poll(),equalTo("produceContent 0"));
+ assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),nullValue());
_in.run();
@@ -401,7 +402,8 @@ public class HttpInputTest
public void testAsyncEOF() throws Exception
{
_in.setReadListener(_listener);
- assertThat(_history.poll(),equalTo("onReadPossible"));
+ assertThat(_history.poll(),equalTo("produceContent 0"));
+ assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),nullValue());
_in.run();
@@ -423,7 +425,8 @@ public class HttpInputTest
public void testAsyncReadEOF() throws Exception
{
_in.setReadListener(_listener);
- assertThat(_history.poll(),equalTo("onReadPossible"));
+ assertThat(_history.poll(),equalTo("produceContent 0"));
+ assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),nullValue());
_in.run();
@@ -475,7 +478,8 @@ public class HttpInputTest
public void testAsyncError() throws Exception
{
_in.setReadListener(_listener);
- assertThat(_history.poll(),equalTo("onReadPossible"));
+ assertThat(_history.poll(),equalTo("produceContent 0"));
+ assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),nullValue());
_in.run();
assertThat(_history.poll(),equalTo("onDataAvailable"));
diff --git a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncIOServletTest.java b/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncIOServletTest.java
index 2818d1b618..84afe8c1c2 100644
--- a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncIOServletTest.java
+++ b/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncIOServletTest.java
@@ -39,6 +39,7 @@ import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
@@ -69,6 +70,7 @@ public class AsyncIOServletTest
server = new Server();
connector = new ServerConnector(server);
connector.setIdleTimeout(30000);
+ connector.getConnectionFactory(HttpConnectionFactory.class).getHttpConfiguration().setDelayDispatchUntilContent(false);
server.addConnector(connector);
context = new ServletContextHandler(server, "/", false, false);
@@ -149,11 +151,16 @@ public class AsyncIOServletTest
output.write(request.getBytes("UTF-8"));
output.flush();
- SimpleHttpParser parser = new SimpleHttpParser();
- SimpleHttpResponse response = parser.readResponse(new BufferedReader(new InputStreamReader(client.getInputStream(), "UTF-8")));
-
+ BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
+ String line=in.readLine();
+ assertThat(line, containsString("500 Server Error"));
+ while (line.length()>0)
+ {
+ line=in.readLine();
+ }
+ line=in.readLine();
+
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
- Assert.assertEquals("500", response.getCode());
}
}
@@ -248,10 +255,10 @@ public class AsyncIOServletTest
}
@Override
- public void onError(Throwable t)
+ public void onError(final Throwable t)
{
errors.incrementAndGet();
- throw new NullPointerException("explicitly_thrown_by_test_2");
+ throw new NullPointerException("explicitly_thrown_by_test_2"){{this.initCause(t);}};
}
});
}
@@ -486,6 +493,7 @@ public class AsyncIOServletTest
{
OutputStream output = client.getOutputStream();
output.write(request.getBytes("UTF-8"));
+ output.flush();
output.write(data);
output.flush();
@@ -588,4 +596,86 @@ public class AsyncIOServletTest
assertThat(line, containsString("OK"));
}
}
+
+
+ @Test
+ public void testCompleteBeforeOnAllDataRead() throws Exception
+ {
+ String text = "XYZ";
+ final byte[] data = text.getBytes(StandardCharsets.ISO_8859_1);
+ final AtomicBoolean allDataRead = new AtomicBoolean(false);
+
+ startServer(new HttpServlet()
+ {
+ @Override
+ protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException
+ {
+ response.flushBuffer();
+
+ final AsyncContext async = request.startAsync();
+ final ServletInputStream in = request.getInputStream();
+ final ServletOutputStream out = response.getOutputStream();
+
+ in.setReadListener(new ReadListener()
+ {
+ @Override
+ public void onError(Throwable t)
+ {
+ t.printStackTrace();
+ }
+
+ @Override
+ public void onDataAvailable() throws IOException
+ {
+ while (in.isReady())
+ {
+ int b = in.read();
+ if (b<0)
+ {
+ out.write("OK\n".getBytes(StandardCharsets.ISO_8859_1));
+ async.complete();
+ return;
+ }
+ }
+ }
+
+ @Override
+ public void onAllDataRead() throws IOException
+ {
+ out.write("BAD!!!\n".getBytes(StandardCharsets.ISO_8859_1));
+ allDataRead.set(true);
+ throw new IllegalStateException();
+ }
+ });
+ }
+ });
+
+ String request = "GET " + path + " HTTP/1.1\r\n" +
+ "Host: localhost:" + connector.getLocalPort() + "\r\n" +
+ "Content-Type: text/plain\r\n"+
+ "Content-Length: "+data.length+"\r\n" +
+ "Connection: close\r\n" +
+ "\r\n";
+
+ try (Socket client = new Socket("localhost", connector.getLocalPort()))
+ {
+ OutputStream output = client.getOutputStream();
+ output.write(request.getBytes("UTF-8"));
+ output.flush();
+ Thread.sleep(100);
+ output.write(data);
+ output.flush();
+
+ BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
+ String line=in.readLine();
+ assertThat(line, containsString("200 OK"));
+ while (line.length()>0)
+ {
+ line=in.readLine();
+ }
+ line=in.readLine();
+ assertThat(line, containsString("OK"));
+ Assert.assertFalse(allDataRead.get());
+ }
+ }
}

Back to the top