diff options
author | Greg Wilkins | 2016-02-04 06:00:13 +0000 |
---|---|---|
committer | Greg Wilkins | 2016-02-04 06:00:13 +0000 |
commit | df79ad689a38592472d3696e15bf0252e617b8e7 (patch) | |
tree | c2fedd48685d4f06c044821f01b3a7c12a455b71 /jetty-io | |
parent | 459ba4ae5aa7f9a29d18b4e08848ff6e1cec0e4a (diff) | |
parent | 009fde2400a746b1ce24ba04bd4fcd001378516b (diff) | |
download | org.eclipse.jetty.project-df79ad689a38592472d3696e15bf0252e617b8e7.tar.gz org.eclipse.jetty.project-df79ad689a38592472d3696e15bf0252e617b8e7.tar.xz org.eclipse.jetty.project-df79ad689a38592472d3696e15bf0252e617b8e7.zip |
Merge remote-tracking branch 'origin/jetty-9.3.x'
Diffstat (limited to 'jetty-io')
3 files changed, 196 insertions, 5 deletions
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java index a0257efa4c..6e5688fa63 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java @@ -28,6 +28,7 @@ import java.nio.channels.SelectionKey; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.ExecutionStrategy.Rejectable; import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.Scheduler; @@ -73,6 +74,27 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage } } + private abstract class RejectableRunnable extends RunnableTask implements Rejectable + { + RejectableRunnable(String op) + { + super(op); + } + + @Override + public void reject() + { + try + { + close(); + } + catch (Throwable x) + { + LOG.warn(x); + } + } + } + private final Runnable _runUpdateKey = new RunnableTask("runUpdateKey") { @Override @@ -82,7 +104,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage } }; - private final Runnable _runFillable = new RunnableTask("runFillable") + private final Runnable _runFillable = new RejectableRunnable("runFillable") { @Override public void run() @@ -91,7 +113,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage } }; - private final Runnable _runCompleteWrite = new RunnableTask("runCompleteWrite") + private final Runnable _runCompleteWrite = new RejectableRunnable("runCompleteWrite") { @Override public void run() @@ -100,7 +122,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage } }; - private final Runnable _runFillableCompleteWrite = new RunnableTask("runFillableCompleteWrite") + private final Runnable _runFillableCompleteWrite = new RejectableRunnable("runFillableCompleteWrite") { @Override public void run() diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index 35c6d51225..40d5a01a51 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -43,6 +43,7 @@ import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ExecutionStrategy; +import org.eclipse.jetty.util.thread.ExecutionStrategy.Rejectable; import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.Scheduler; @@ -536,7 +537,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump } } - class Accept implements Runnable + class Accept implements Runnable, Rejectable { private final SelectableChannel channel; private final Object attachment; @@ -548,6 +549,13 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump } @Override + public void reject() + { + LOG.debug("rejected accept {}",channel); + closeNoExceptions(channel); + } + + @Override public void run() { try @@ -563,7 +571,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump } } - private class CreateEndPoint implements Product + private class CreateEndPoint implements Product, Rejectable { private final SelectableChannel channel; private final SelectionKey key; @@ -588,6 +596,13 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump } } + @Override + public void reject() + { + LOG.debug("rejected create {}",channel); + closeNoExceptions(channel); + } + protected void failed(Throwable failure) { closeNoExceptions(channel); diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java index 0e26326ad9..99632febb0 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java @@ -37,9 +37,12 @@ import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; @@ -49,9 +52,11 @@ import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.TimerScheduler; +import org.hamcrest.Matchers; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; public class SelectChannelEndPointTest @@ -124,10 +129,18 @@ public class SelectChannelEndPointTest ByteBuffer _in = BufferUtil.allocate(32 * 1024); ByteBuffer _out = BufferUtil.allocate(32 * 1024); long _last = -1; + final CountDownLatch _latch; public TestConnection(EndPoint endp) { super(endp, _threadPool); + _latch=null; + } + + public TestConnection(EndPoint endp,CountDownLatch latch) + { + super(endp, _threadPool); + _latch=latch; } @Override @@ -153,6 +166,18 @@ public class SelectChannelEndPointTest @Override public void onFillable() { + if (_latch!=null) + { + try + { + _latch.await(); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + } + Callback blocking = _blockingRead; if (blocking!=null) { @@ -671,4 +696,133 @@ public class SelectChannelEndPointTest } assertFalse(server.isOpen()); } + + + // TODO make this test reliable + @Test + @Ignore + public void testRejectedExecution() throws Exception + { + _manager.stop(); + _threadPool.stop(); + + final CountDownLatch latch = new CountDownLatch(1); + + BlockingQueue<Runnable> q = new ArrayBlockingQueue<>(4); + _threadPool = new QueuedThreadPool(4,4,60000,q); + _manager = new SelectorManager(_threadPool, _scheduler, 1) + { + + @Override + protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException + { + SocketChannelEndPoint endp = new SocketChannelEndPoint(channel,selector,selectionKey,getScheduler()); + _lastEndPoint = endp; + _lastEndPointLatch.countDown(); + return endp; + } + + @Override + public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException + { + return new TestConnection(endpoint,latch); + } + }; + + _threadPool.start(); + _manager.start(); + + AtomicInteger timeout = new AtomicInteger(); + AtomicInteger rejections = new AtomicInteger(); + AtomicInteger echoed = new AtomicInteger(); + + CountDownLatch closed = new CountDownLatch(20); + for (int i=0;i<20;i++) + { + new Thread() + { + public void run() + { + try(Socket client = newClient();) + { + client.setSoTimeout(5000); + + SocketChannel server = _connector.accept(); + server.configureBlocking(false); + + _manager.accept(server); + + // Write client to server + client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8)); + client.getOutputStream().flush(); + client.shutdownOutput(); + + // Verify echo server to client + for (char c : "HelloWorld".toCharArray()) + { + int b = client.getInputStream().read(); + assertTrue(b > 0); + assertEquals(c, (char)b); + } + assertEquals(-1,client.getInputStream().read()); + echoed.incrementAndGet(); + } + catch(SocketTimeoutException x) + { + x.printStackTrace(); + timeout.incrementAndGet(); + } + catch(Throwable x) + { + rejections.incrementAndGet(); + } + finally + { + closed.countDown(); + } + } + }.start(); + } + + // unblock the handling + latch.countDown(); + + // wait for all clients to complete or fail + closed.await(); + + // assert some clients must have been rejected + Assert.assertThat(rejections.get(),Matchers.greaterThan(0)); + // but not all of them + Assert.assertThat(rejections.get(),Matchers.lessThan(20)); + // none should have timed out + Assert.assertThat(timeout.get(),Matchers.equalTo(0)); + // and the rest should have worked + Assert.assertThat(echoed.get(),Matchers.equalTo(20-rejections.get())); + + // and the selector is still working for new requests + try(Socket client = newClient();) + { + client.setSoTimeout(5000); + + SocketChannel server = _connector.accept(); + server.configureBlocking(false); + + _manager.accept(server); + + // Write client to server + client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8)); + client.getOutputStream().flush(); + client.shutdownOutput(); + + // Verify echo server to client + for (char c : "HelloWorld".toCharArray()) + { + int b = client.getInputStream().read(); + assertTrue(b > 0); + assertEquals(c, (char)b); + } + assertEquals(-1,client.getInputStream().read()); + } + + } } |