Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGreg Wilkins2016-02-01 11:10:24 -0500
committerGreg Wilkins2016-02-01 11:10:24 -0500
commitc81dcfc79059b6e0a3a0bfd22cb5b5633dd3291b (patch)
treef4dacb8c6d58022c357c2d5721e0797c53fa96af
parent2902a134637f6c638ee76859324035c82e2583fe (diff)
downloadorg.eclipse.jetty.project-c81dcfc79059b6e0a3a0bfd22cb5b5633dd3291b.tar.gz
org.eclipse.jetty.project-c81dcfc79059b6e0a3a0bfd22cb5b5633dd3291b.tar.xz
org.eclipse.jetty.project-c81dcfc79059b6e0a3a0bfd22cb5b5633dd3291b.zip
486930 - Selector does not correctly handle rejected execution exception
This fix work in two ways: 1) Both the PEC and EPC strategies when confronted with a RejectedExecutionException will continue to Produce rather than consume. 2) If a produced Runnable cannot be consumed and it supports the new Rejectable interface, then it's reject() method is called by the producer thread. Typically this is implemented to close the connection - with the risk being that the close might block, but that is probably better than leaking the connection?
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java19
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java24
-rw-r--r--jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java116
-rw-r--r--jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutionStrategy.java10
-rw-r--r--jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java38
-rw-r--r--jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java22
6 files changed, 220 insertions, 9 deletions
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
index 546095cf70..0af682740f 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
@@ -44,6 +44,7 @@ import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
+import org.eclipse.jetty.util.thread.ExecutionStrategy.Rejectable;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Scheduler;
@@ -541,7 +542,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
}
- class Accept implements Runnable
+ class Accept implements Runnable, Rejectable
{
private final SocketChannel channel;
private final Object attachment;
@@ -553,6 +554,13 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
@Override
+ public void reject()
+ {
+ LOG.debug("rejected accept {}",channel);
+ closeNoExceptions(channel);
+ }
+
+ @Override
public void run()
{
try
@@ -568,7 +576,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
}
- private class CreateEndPoint implements Product
+ private class CreateEndPoint implements Product, Rejectable
{
private final SocketChannel channel;
private final SelectionKey key;
@@ -593,6 +601,13 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
}
+ @Override
+ public void reject()
+ {
+ LOG.debug("rejected create {}",channel);
+ closeNoExceptions(channel);
+ }
+
protected void failed(Throwable failure)
{
closeNoExceptions(channel);
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java
index e7587bd1af..435c81621a 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.util.thread.ExecutionStrategy.Rejectable;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Scheduler;
@@ -67,7 +68,24 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
return SelectChannelEndPoint.this.toString()+":runUpdateKey";
}
};
- private final Runnable _runFillable = new Runnable()
+
+ private abstract class RejectableRunnable implements Runnable,Rejectable
+ {
+ @Override
+ public void reject()
+ {
+ try
+ {
+ close();
+ }
+ catch (Throwable x)
+ {
+ LOG.warn(x);
+ }
+ }
+ }
+
+ private final Runnable _runFillable = new RejectableRunnable()
{
@Override
public void run()
@@ -81,7 +99,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
return SelectChannelEndPoint.this.toString()+":runFillable";
}
};
- private final Runnable _runCompleteWrite = new Runnable()
+ private final Runnable _runCompleteWrite = new RejectableRunnable()
{
@Override
public void run()
@@ -95,7 +113,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
return SelectChannelEndPoint.this.toString()+":runCompleteWrite";
}
};
- private final Runnable _runFillableCompleteWrite = new Runnable()
+ private final Runnable _runFillableCompleteWrite = new RejectableRunnable()
{
@Override
public void run()
diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java
index 08f1630ae1..de4a5d2eb7 100644
--- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java
+++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java
@@ -36,9 +36,12 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@@ -48,6 +51,7 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.TimerScheduler;
+import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -121,10 +125,18 @@ public class SelectChannelEndPointTest
ByteBuffer _in = BufferUtil.allocate(32 * 1024);
ByteBuffer _out = BufferUtil.allocate(32 * 1024);
long _last = -1;
+ final CountDownLatch _latch;
public TestConnection(EndPoint endp)
{
super(endp, _threadPool);
+ _latch=null;
+ }
+
+ public TestConnection(EndPoint endp,CountDownLatch latch)
+ {
+ super(endp, _threadPool);
+ _latch=latch;
}
@Override
@@ -150,6 +162,18 @@ public class SelectChannelEndPointTest
@Override
public void onFillable()
{
+ if (_latch!=null)
+ {
+ try
+ {
+ _latch.await();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
Callback blocking = _blockingRead;
if (blocking!=null)
{
@@ -668,4 +692,96 @@ public class SelectChannelEndPointTest
}
assertFalse(server.isOpen());
}
+
+
+ @Test
+ public void testRejectedExecution() throws Exception
+ {
+ _manager.stop();
+ _threadPool.stop();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ BlockingQueue<Runnable> q = new ArrayBlockingQueue<>(4);
+ _threadPool = new QueuedThreadPool(4,4,60000,q);
+ _manager = new SelectorManager(_threadPool, _scheduler, 1)
+ {
+ @Override
+ public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment)
+ {
+ return new TestConnection(endpoint,latch);
+ }
+
+ @Override
+ protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
+ {
+ SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, selectionKey, getScheduler(), 60000);
+ _lastEndPoint = endp;
+ _lastEndPointLatch.countDown();
+ return endp;
+ }
+ };
+
+ _threadPool.start();
+ _manager.start();
+
+ AtomicInteger timeout = new AtomicInteger();
+ AtomicInteger rejections = new AtomicInteger();
+ AtomicInteger echoed = new AtomicInteger();
+
+ CountDownLatch closed = new CountDownLatch(10);
+ for (int i=0;i<10;i++)
+ {
+ new Thread()
+ {
+ public void run()
+ {
+ try(Socket client = newClient();)
+ {
+ client.setSoTimeout(5000);
+
+ SocketChannel server = _connector.accept();
+ server.configureBlocking(false);
+
+ _manager.accept(server);
+
+ // Write client to server
+ client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8));
+ client.getOutputStream().flush();
+ client.shutdownOutput();
+
+ // Verify echo server to client
+ for (char c : "HelloWorld".toCharArray())
+ {
+ int b = client.getInputStream().read();
+ assertTrue(b > 0);
+ assertEquals(c, (char)b);
+ }
+ assertEquals(-1,client.getInputStream().read());
+ echoed.incrementAndGet();
+ }
+ catch(SocketTimeoutException x)
+ {
+ x.printStackTrace();
+ timeout.incrementAndGet();
+ }
+ catch(Throwable x)
+ {
+ rejections.incrementAndGet();
+ }
+ finally
+ {
+ closed.countDown();
+ }
+ }
+ }.start();
+ }
+
+ latch.countDown();
+ closed.await();
+ Assert.assertThat(rejections.get(),Matchers.greaterThan(0));
+ Assert.assertThat(rejections.get(),Matchers.lessThan(10));
+ Assert.assertThat(timeout.get(),Matchers.equalTo(0));
+ Assert.assertThat(echoed.get(),Matchers.equalTo(10-rejections.get()));
+ }
}
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutionStrategy.java
index eef0910892..31a17c82cb 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutionStrategy.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutionStrategy.java
@@ -20,6 +20,7 @@ package org.eclipse.jetty.util.thread;
import java.lang.reflect.Constructor;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
import org.eclipse.jetty.util.Loader;
import org.eclipse.jetty.util.log.Log;
@@ -53,6 +54,15 @@ public interface ExecutionStrategy
*/
public void execute();
+
+ /**
+ * A task that can handle {@link RejectedExecutionException}
+ */
+ public interface Rejectable
+ {
+ public void reject();
+ }
+
/**
* <p>A producer of {@link Runnable} tasks to run.</p>
* <p>The {@link ExecutionStrategy} will repeatedly invoke {@link #produce()} until
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java
index d3053221b0..c3436ad35d 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java
@@ -19,6 +19,7 @@
package org.eclipse.jetty.util.thread.strategy;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@@ -140,7 +141,14 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
while (_threadpool!=null && _threadpool.isLowOnThreads())
{
LOG.debug("EWYK low resources {}",this);
- _lowresources.execute();
+ try
+ {
+ _lowresources.execute();
+ }
+ catch(Throwable e)
+ {
+ LOG.warn(e);
+ }
}
// no longer low resources so produceAndRun normally
@@ -204,13 +212,37 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
// Spawn a new thread to continue production by running the produce loop.
if (LOG.isDebugEnabled())
LOG.debug("{} dispatch",this);
- _executor.execute(this);
+ try
+ {
+ _executor.execute(this);
+ }
+ catch(RejectedExecutionException e)
+ {
+ // If we cannot execute, the close or discard the task and keep producing
+ LOG.debug(e);
+ LOG.warn("RejectedExecution {}",task);
+ try
+ {
+ if (task instanceof Rejectable)
+ ((Rejectable)task).reject();
+ }
+ catch (Exception x)
+ {
+ e.addSuppressed(x);
+ LOG.warn(e);
+ }
+ finally
+ {
+ task=null;
+ }
+ }
}
// Run the task.
if (LOG.isDebugEnabled())
LOG.debug("{} run {}",this,task);
- task.run();
+ if (task != null)
+ task.run();
if (LOG.isDebugEnabled())
LOG.debug("{} ran {}",this,task);
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java
index 64903a6fbd..6e7c95e571 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java
@@ -19,6 +19,7 @@
package org.eclipse.jetty.util.thread.strategy;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@@ -55,7 +56,26 @@ public class ProduceExecuteConsume implements ExecutionStrategy
break;
// Execute the task.
- _executor.execute(task);
+ try
+ {
+ _executor.execute(task);
+ }
+ catch(RejectedExecutionException e)
+ {
+ // Close or discard tasks that cannot be executed
+ if (task instanceof Rejectable)
+ {
+ try
+ {
+ ((Rejectable)task).reject();
+ }
+ catch (Throwable x)
+ {
+ e.addSuppressed(x);
+ LOG.warn(e);
+ }
+ }
+ }
}
}

Back to the top