Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGreg Wilkins2015-07-01 07:18:50 +0000
committerGreg Wilkins2015-07-01 07:21:53 +0000
commit8d869bf88b2292e2c767d82c50d46e8519d1d090 (patch)
tree56c4ce6cddd4a33ef3bdc3f05288f9aec6fed100
parenta87823930a5b476b0090ba3c9323de23f4eb83b2 (diff)
downloadorg.eclipse.jetty.project-8d869bf88b2292e2c767d82c50d46e8519d1d090.tar.gz
org.eclipse.jetty.project-8d869bf88b2292e2c767d82c50d46e8519d1d090.tar.xz
org.eclipse.jetty.project-8d869bf88b2292e2c767d82c50d46e8519d1d090.zip
470727 - Thread Starvation with EWYK
Implemented NonBlockingCallbacks and fallback to PEC scheduling
-rw-r--r--apache-jstl/src/test/java/org/eclipse/jetty/jstl/JstlTest.java2
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java6
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java3
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java15
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java11
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java2
-rw-r--r--jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestFixture.java12
-rw-r--r--jetty-server/src/test/java/org/eclipse/jetty/server/ThreadStarvationTest.java215
-rw-r--r--jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java7
-rw-r--r--jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java2
-rw-r--r--jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java15
11 files changed, 284 insertions, 6 deletions
diff --git a/apache-jstl/src/test/java/org/eclipse/jetty/jstl/JstlTest.java b/apache-jstl/src/test/java/org/eclipse/jetty/jstl/JstlTest.java
index 06790be058..858701f67e 100644
--- a/apache-jstl/src/test/java/org/eclipse/jetty/jstl/JstlTest.java
+++ b/apache-jstl/src/test/java/org/eclipse/jetty/jstl/JstlTest.java
@@ -34,8 +34,10 @@ import org.eclipse.jetty.webapp.WebAppContext;
import org.eclipse.jetty.webapp.Configuration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
+@Ignore
public class JstlTest
{
private static Server server;
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java b/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java
index 34f7385ee9..758601bfe3 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/FillInterest.java
@@ -104,6 +104,12 @@ public abstract class FillInterest
{
return _interested.get() != null;
}
+
+ public boolean isCallbackNonBlocking()
+ {
+ Callback callback = _interested.get();
+ return callback instanceof Callback.NonBlocking;
+ }
/**
* Call to signal a failure to a registered interest
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
index ae5522c3fe..d2a33bff57 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
@@ -269,8 +269,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
if (attachment instanceof SelectableEndPoint)
{
// Try to produce a task
- SelectableEndPoint selectable = (SelectableEndPoint)attachment;
- Runnable task = selectable.onSelected();
+ Runnable task = ((SelectableEndPoint)attachment).onSelected();
if (task != null)
return task;
}
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java
index dff8d4092b..f15343d26f 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java
@@ -100,8 +100,23 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
if (LOG.isDebugEnabled())
LOG.debug("onSelected {}->{} for {}", oldInterestOps, newInterestOps, this);
+
boolean readable = (readyOps & SelectionKey.OP_READ) != 0;
boolean writable = (readyOps & SelectionKey.OP_WRITE) != 0;
+
+ // Call non blocking directly
+ if (readable && getFillInterest().isCallbackNonBlocking())
+ {
+ getFillInterest().fillable();
+ readable=false;
+ }
+ if (writable && getWriteFlusher().isCallbackNonBlocking())
+ {
+ getWriteFlusher().completeWrite();
+ writable=false;
+ }
+
+ // return task to complete the job
return readable ? (writable ? _runFillableCompleteWrite : _runFillable)
: (writable ? _runCompleteWrite : null);
}
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java
index 2d5dc6bb91..2705355465 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java
@@ -268,8 +268,19 @@ abstract public class WriteFlusher
if (_callback!=null)
_callback.succeeded();
}
+
+ boolean isCallbackNonBlocking()
+ {
+ return _callback instanceof Callback.NonBlocking;
+ }
}
+ public boolean isCallbackNonBlocking()
+ {
+ State s = _state.get();
+ return (s instanceof PendingState) && ((PendingState)s).isCallbackNonBlocking();
+ }
+
/**
* Abstract call to be implemented by specific WriteFlushers. It should schedule a call to {@link #completeWrite()}
* or {@link #onFail(Throwable)} when appropriate.
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java
index 661f7d6544..b9f79a292a 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java
@@ -539,7 +539,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
}
- private class BlockingReadCallback implements Callback
+ private class BlockingReadCallback implements Callback.NonBlocking
{
@Override
public void succeeded()
diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestFixture.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestFixture.java
index c06034279c..12a0c3a1ca 100644
--- a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestFixture.java
+++ b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestFixture.java
@@ -37,6 +37,7 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.handler.HandlerWrapper;
import org.eclipse.jetty.toolchain.test.PropertyFlag;
import org.eclipse.jetty.util.IO;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.Before;
@@ -45,6 +46,7 @@ public class HttpServerTestFixture
protected static final long PAUSE=10L;
protected static final int LOOPS= PropertyFlag.isEnabled("test.stress")?250:50;
+ protected QueuedThreadPool _threadPool;
protected Server _server;
protected URI _serverURI;
protected ServerConnector _connector;
@@ -62,15 +64,21 @@ public class HttpServerTestFixture
@Before
public void before()
{
- _server = new Server();
+ _threadPool = new QueuedThreadPool();
+ _server = new Server(_threadPool);
}
protected void startServer(ServerConnector connector) throws Exception
{
+ startServer(connector,new HandlerWrapper());
+ }
+
+ protected void startServer(ServerConnector connector, Handler handler) throws Exception
+ {
_connector = connector;
_connector.getConnectionFactory(HttpConnectionFactory.class).getHttpConfiguration().setSendDateHeader(false);
_server.addConnector(_connector);
- _server.setHandler(new HandlerWrapper());
+ _server.setHandler(handler);
_server.start();
_serverURI = _server.getURI();
}
diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ThreadStarvationTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ThreadStarvationTest.java
new file mode 100644
index 0000000000..fa46de262f
--- /dev/null
+++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ThreadStarvationTest.java
@@ -0,0 +1,215 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.server;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.SocketException;
+import java.util.concurrent.Exchanger;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLException;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.io.ssl.SslConnection;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.toolchain.test.AdvancedRunner;
+import org.eclipse.jetty.toolchain.test.TestTracker;
+import org.eclipse.jetty.util.IO;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(AdvancedRunner.class)
+public class ThreadStarvationTest extends HttpServerTestFixture
+{
+ ServerConnector _connector;
+
+ @Rule
+ public TestTracker tracker = new TestTracker();
+
+ @Before
+ public void init() throws Exception
+ {
+ _threadPool.setMinThreads(4);
+ _threadPool.setMaxThreads(4);
+ _threadPool.setDetailedDump(false);
+ _connector = new ServerConnector(_server,1,1);
+ _connector.setIdleTimeout(10000);
+ }
+
+ @Test
+ public void testReadInput() throws Exception
+ {
+ startServer(_connector,new ReadHandler());
+ System.err.println(_threadPool.dump());
+
+ Socket client=newSocket(_serverURI.getHost(),_serverURI.getPort());
+ client.setSoTimeout(10000);
+
+ OutputStream os=client.getOutputStream();
+ InputStream is=client.getInputStream();
+
+ os.write((
+ "GET / HTTP/1.0\r\n"+
+ "host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+
+ "content-length: 10\r\n" +
+ "\r\n" +
+ "0123456789\r\n").getBytes("utf-8"));
+ os.flush();
+
+ String response = IO.toString(is);
+ assertEquals(-1, is.read());
+ assertThat(response,containsString("200 OK"));
+ assertThat(response,containsString("Read Input 10"));
+
+ }
+
+ @Test
+ public void testEWYKStarvation() throws Exception
+ {
+ System.setProperty("org.eclipse.jetty.io.ManagedSelector$SelectorProducer.ExecutionStrategy","org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume");
+ startServer(_connector,new ReadHandler());
+
+ Socket[] client = new Socket[3];
+ OutputStream[] os = new OutputStream[client.length];
+ InputStream[] is = new InputStream[client.length];
+
+ for (int i=0;i<client.length;i++)
+ {
+ client[i]=newSocket(_serverURI.getHost(),_serverURI.getPort());
+ client[i].setSoTimeout(10000);
+
+ os[i]=client[i].getOutputStream();
+ is[i]=client[i].getInputStream();
+
+ os[i].write((
+ "PUT / HTTP/1.0\r\n"+
+ "host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+
+ "content-length: 10\r\n" +
+ "\r\n1").getBytes("utf-8"));
+ os[i].flush();
+ }
+ Thread.sleep(500);
+ System.err.println(_threadPool.dump());
+
+ for (int i=0;i<client.length;i++)
+ {
+ os[i].write(("234567890\r\n").getBytes("utf-8"));
+ os[i].flush();
+ }
+
+ Thread.sleep(500);
+ System.err.println(_threadPool.dump());
+
+ for (int i=0;i<client.length;i++)
+ {
+ String response = IO.toString(is[i]);
+ assertEquals(-1, is[i].read());
+ assertThat(response,containsString("200 OK"));
+ assertThat(response,containsString("Read Input 10"));
+ }
+
+ }
+
+
+ @Test
+ public void testPECStarvation() throws Exception
+ {
+ System.setProperty("org.eclipse.jetty.io.ManagedSelector$SelectorProducer.ExecutionStrategy","org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume");
+
+ startServer(_connector,new ReadHandler());
+ System.err.println(_threadPool.dump());
+
+ Socket[] client = new Socket[3];
+ OutputStream[] os = new OutputStream[client.length];
+ InputStream[] is = new InputStream[client.length];
+
+ for (int i=0;i<client.length;i++)
+ {
+ client[i]=newSocket(_serverURI.getHost(),_serverURI.getPort());
+ client[i].setSoTimeout(10000);
+
+ os[i]=client[i].getOutputStream();
+ is[i]=client[i].getInputStream();
+
+ os[i].write((
+ "PUT / HTTP/1.0\r\n"+
+ "host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+
+ "content-length: 10\r\n" +
+ "\r\n1").getBytes("utf-8"));
+ os[i].flush();
+ }
+ Thread.sleep(500);
+ System.err.println(_threadPool.dump());
+
+ for (int i=0;i<client.length;i++)
+ {
+ os[i].write(("234567890\r\n").getBytes("utf-8"));
+ os[i].flush();
+ }
+
+ Thread.sleep(500);
+ System.err.println(_threadPool.dump());
+
+ for (int i=0;i<client.length;i++)
+ {
+ String response = IO.toString(is[i]);
+ assertEquals(-1, is[i].read());
+ assertThat(response,containsString("200 OK"));
+ assertThat(response,containsString("Read Input 10"));
+ }
+
+ }
+
+
+ protected static class ReadHandler extends AbstractHandler
+ {
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+ {
+ baseRequest.setHandled(true);
+ response.setStatus(200);
+
+ int l = request.getContentLength();
+ int r = 0;
+ while (r<l)
+ {
+ if (request.getInputStream().read()>=0)
+ r++;
+ }
+
+ response.getOutputStream().write(("Read Input "+r+"\r\n").getBytes());
+ }
+ }
+}
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java
index 8b1012874b..2a6f47f086 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java
@@ -56,6 +56,13 @@ public interface Callback
public void failed(Throwable x);
/**
+ * A marker interface for a callback that is guaranteed not to
+ * block and thus does not need a dispatch
+ */
+ public interface NonBlocking extends Callback
+ {}
+
+ /**
* <p>Empty implementation of {@link Callback}</p>
*/
public static class Adapter implements Callback
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java
index 68d1234d96..9e24f7e10d 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java
@@ -125,7 +125,7 @@ public class SharedBlockingCallback
* A Closeable Callback.
* Uses the auto close mechanism to check block has been called OK.
*/
- public class Blocker implements Callback, Closeable
+ public class Blocker implements Callback.NonBlocking, Closeable
{
private Throwable _state = IDLE;
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java
index 406e3d00a1..3232eb2a04 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java
@@ -25,6 +25,7 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Locker.Lock;
+import org.eclipse.jetty.util.thread.ThreadPool;
/**
* <p>A strategy where the thread calls produce will always run the resulting task
@@ -52,11 +53,15 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
private boolean _execute;
private boolean _producing;
private boolean _pending;
+ private final ThreadPool _threadpool;
+ private final ProduceExecuteConsume _lowresources;
public ExecuteProduceConsume(Producer producer, Executor executor)
{
this._producer = producer;
this._executor = executor;
+ _threadpool = (executor instanceof ThreadPool)?((ThreadPool)executor):null;
+ _lowresources = _threadpool==null?null:new ProduceExecuteConsume(producer,executor);
}
@Override
@@ -64,6 +69,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
{
if (LOG.isDebugEnabled())
LOG.debug("{} execute",this);
+
boolean produce=false;
try (Lock locked = _locker.lock())
{
@@ -123,7 +129,16 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
}
if (produce)
+ {
+ // If we are low on resources, then switch to PEC strategy which does not
+ // suffer as badly from thread starvation
+ while (_threadpool!=null && _threadpool.isLowOnThreads())
+ {
+ LOG.debug("EWYK low resources {}",this);
+ _lowresources.execute();
+ }
produceAndRun();
+ }
}
private void produceAndRun()

Back to the top