Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGreg Wilkins2016-02-04 06:00:13 +0000
committerGreg Wilkins2016-02-04 06:00:13 +0000
commitdf79ad689a38592472d3696e15bf0252e617b8e7 (patch)
treec2fedd48685d4f06c044821f01b3a7c12a455b71 /jetty-io
parent459ba4ae5aa7f9a29d18b4e08848ff6e1cec0e4a (diff)
parent009fde2400a746b1ce24ba04bd4fcd001378516b (diff)
downloadorg.eclipse.jetty.project-df79ad689a38592472d3696e15bf0252e617b8e7.tar.gz
org.eclipse.jetty.project-df79ad689a38592472d3696e15bf0252e617b8e7.tar.xz
org.eclipse.jetty.project-df79ad689a38592472d3696e15bf0252e617b8e7.zip
Merge remote-tracking branch 'origin/jetty-9.3.x'
Diffstat (limited to 'jetty-io')
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java28
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java19
-rw-r--r--jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java154
3 files changed, 196 insertions, 5 deletions
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java
index a0257efa4c..6e5688fa63 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java
@@ -28,6 +28,7 @@ import java.nio.channels.SelectionKey;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.util.thread.ExecutionStrategy.Rejectable;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Scheduler;
@@ -73,6 +74,27 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
}
}
+ private abstract class RejectableRunnable extends RunnableTask implements Rejectable
+ {
+ RejectableRunnable(String op)
+ {
+ super(op);
+ }
+
+ @Override
+ public void reject()
+ {
+ try
+ {
+ close();
+ }
+ catch (Throwable x)
+ {
+ LOG.warn(x);
+ }
+ }
+ }
+
private final Runnable _runUpdateKey = new RunnableTask("runUpdateKey")
{
@Override
@@ -82,7 +104,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
}
};
- private final Runnable _runFillable = new RunnableTask("runFillable")
+ private final Runnable _runFillable = new RejectableRunnable("runFillable")
{
@Override
public void run()
@@ -91,7 +113,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
}
};
- private final Runnable _runCompleteWrite = new RunnableTask("runCompleteWrite")
+ private final Runnable _runCompleteWrite = new RejectableRunnable("runCompleteWrite")
{
@Override
public void run()
@@ -100,7 +122,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
}
};
- private final Runnable _runFillableCompleteWrite = new RunnableTask("runFillableCompleteWrite")
+ private final Runnable _runFillableCompleteWrite = new RejectableRunnable("runFillableCompleteWrite")
{
@Override
public void run()
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
index 35c6d51225..40d5a01a51 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
@@ -43,6 +43,7 @@ import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
+import org.eclipse.jetty.util.thread.ExecutionStrategy.Rejectable;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Scheduler;
@@ -536,7 +537,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
}
- class Accept implements Runnable
+ class Accept implements Runnable, Rejectable
{
private final SelectableChannel channel;
private final Object attachment;
@@ -548,6 +549,13 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
@Override
+ public void reject()
+ {
+ LOG.debug("rejected accept {}",channel);
+ closeNoExceptions(channel);
+ }
+
+ @Override
public void run()
{
try
@@ -563,7 +571,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
}
- private class CreateEndPoint implements Product
+ private class CreateEndPoint implements Product, Rejectable
{
private final SelectableChannel channel;
private final SelectionKey key;
@@ -588,6 +596,13 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
}
+ @Override
+ public void reject()
+ {
+ LOG.debug("rejected create {}",channel);
+ closeNoExceptions(channel);
+ }
+
protected void failed(Throwable failure)
{
closeNoExceptions(channel);
diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java
index 0e26326ad9..99632febb0 100644
--- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java
+++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java
@@ -37,9 +37,12 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@@ -49,9 +52,11 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.TimerScheduler;
+import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
public class SelectChannelEndPointTest
@@ -124,10 +129,18 @@ public class SelectChannelEndPointTest
ByteBuffer _in = BufferUtil.allocate(32 * 1024);
ByteBuffer _out = BufferUtil.allocate(32 * 1024);
long _last = -1;
+ final CountDownLatch _latch;
public TestConnection(EndPoint endp)
{
super(endp, _threadPool);
+ _latch=null;
+ }
+
+ public TestConnection(EndPoint endp,CountDownLatch latch)
+ {
+ super(endp, _threadPool);
+ _latch=latch;
}
@Override
@@ -153,6 +166,18 @@ public class SelectChannelEndPointTest
@Override
public void onFillable()
{
+ if (_latch!=null)
+ {
+ try
+ {
+ _latch.await();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
Callback blocking = _blockingRead;
if (blocking!=null)
{
@@ -671,4 +696,133 @@ public class SelectChannelEndPointTest
}
assertFalse(server.isOpen());
}
+
+
+ // TODO make this test reliable
+ @Test
+ @Ignore
+ public void testRejectedExecution() throws Exception
+ {
+ _manager.stop();
+ _threadPool.stop();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ BlockingQueue<Runnable> q = new ArrayBlockingQueue<>(4);
+ _threadPool = new QueuedThreadPool(4,4,60000,q);
+ _manager = new SelectorManager(_threadPool, _scheduler, 1)
+ {
+
+ @Override
+ protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
+ {
+ SocketChannelEndPoint endp = new SocketChannelEndPoint(channel,selector,selectionKey,getScheduler());
+ _lastEndPoint = endp;
+ _lastEndPointLatch.countDown();
+ return endp;
+ }
+
+ @Override
+ public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException
+ {
+ return new TestConnection(endpoint,latch);
+ }
+ };
+
+ _threadPool.start();
+ _manager.start();
+
+ AtomicInteger timeout = new AtomicInteger();
+ AtomicInteger rejections = new AtomicInteger();
+ AtomicInteger echoed = new AtomicInteger();
+
+ CountDownLatch closed = new CountDownLatch(20);
+ for (int i=0;i<20;i++)
+ {
+ new Thread()
+ {
+ public void run()
+ {
+ try(Socket client = newClient();)
+ {
+ client.setSoTimeout(5000);
+
+ SocketChannel server = _connector.accept();
+ server.configureBlocking(false);
+
+ _manager.accept(server);
+
+ // Write client to server
+ client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8));
+ client.getOutputStream().flush();
+ client.shutdownOutput();
+
+ // Verify echo server to client
+ for (char c : "HelloWorld".toCharArray())
+ {
+ int b = client.getInputStream().read();
+ assertTrue(b > 0);
+ assertEquals(c, (char)b);
+ }
+ assertEquals(-1,client.getInputStream().read());
+ echoed.incrementAndGet();
+ }
+ catch(SocketTimeoutException x)
+ {
+ x.printStackTrace();
+ timeout.incrementAndGet();
+ }
+ catch(Throwable x)
+ {
+ rejections.incrementAndGet();
+ }
+ finally
+ {
+ closed.countDown();
+ }
+ }
+ }.start();
+ }
+
+ // unblock the handling
+ latch.countDown();
+
+ // wait for all clients to complete or fail
+ closed.await();
+
+ // assert some clients must have been rejected
+ Assert.assertThat(rejections.get(),Matchers.greaterThan(0));
+ // but not all of them
+ Assert.assertThat(rejections.get(),Matchers.lessThan(20));
+ // none should have timed out
+ Assert.assertThat(timeout.get(),Matchers.equalTo(0));
+ // and the rest should have worked
+ Assert.assertThat(echoed.get(),Matchers.equalTo(20-rejections.get()));
+
+ // and the selector is still working for new requests
+ try(Socket client = newClient();)
+ {
+ client.setSoTimeout(5000);
+
+ SocketChannel server = _connector.accept();
+ server.configureBlocking(false);
+
+ _manager.accept(server);
+
+ // Write client to server
+ client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8));
+ client.getOutputStream().flush();
+ client.shutdownOutput();
+
+ // Verify echo server to client
+ for (char c : "HelloWorld".toCharArray())
+ {
+ int b = client.getInputStream().read();
+ assertTrue(b > 0);
+ assertEquals(c, (char)b);
+ }
+ assertEquals(-1,client.getInputStream().read());
+ }
+
+ }
}

Back to the top