Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimone Bordet2012-07-30 18:22:57 +0000
committerSimone Bordet2012-07-30 21:02:25 +0000
commit35b61feae2d505938c720594e108d872d8aa17b2 (patch)
treeb5d3f9ed3c594d80b2e3c4a3c4aae4feae4da2d7 /jetty-io
parente75e0e9a04d733f1b0134d5b509aa052ba2cbd8f (diff)
downloadorg.eclipse.jetty.project-35b61feae2d505938c720594e108d872d8aa17b2.tar.gz
org.eclipse.jetty.project-35b61feae2d505938c720594e108d872d8aa17b2.tar.xz
org.eclipse.jetty.project-35b61feae2d505938c720594e108d872d8aa17b2.zip
Jetty9 - Test for better handling for I/O interests.
Diffstat (limited to 'jetty-io')
-rw-r--r--jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointInterestsTest.java192
1 files changed, 192 insertions, 0 deletions
diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointInterestsTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointInterestsTest.java
new file mode 100644
index 0000000000..94885e2798
--- /dev/null
+++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointInterestsTest.java
@@ -0,0 +1,192 @@
+// ========================================================================
+// Copyright (c) 2012-2012 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+
+package org.eclipse.jetty.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SelectChannelEndPointInterestsTest
+{
+ private QueuedThreadPool threadPool;
+ private ScheduledExecutorService scheduler;
+ private ServerSocketChannel connector;
+ private SelectorManager selectorManager;
+
+ public void init(final Interested interested) throws Exception
+ {
+ threadPool = new QueuedThreadPool();
+ threadPool.start();
+
+ scheduler = Executors.newSingleThreadScheduledExecutor();
+
+ connector = ServerSocketChannel.open();
+ connector.bind(new InetSocketAddress("localhost", 0));
+
+ selectorManager = new SelectorManager()
+ {
+ @Override
+ protected void execute(Runnable task)
+ {
+ threadPool.execute(task);
+ }
+
+ @Override
+ protected AsyncEndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
+ {
+ return new SelectChannelEndPoint(channel, selector, selectionKey, scheduler, 60000)
+ {
+ @Override
+ protected void onIncompleteFlush()
+ {
+ super.onIncompleteFlush();
+ interested.onIncompleteFlush();
+ }
+ };
+ }
+
+ @Override
+ public AsyncConnection newConnection(SocketChannel channel, final AsyncEndPoint endPoint, Object attachment)
+ {
+ return new AbstractAsyncConnection(endPoint, threadPool)
+ {
+ @Override
+ public void onFillable()
+ {
+ interested.onFillable(endPoint, this);
+ }
+ };
+ }
+ };
+ selectorManager.start();
+ }
+
+ @Test
+ public void testReadBlockedThenWriteBlockedThenReadableThenWritable() throws Exception
+ {
+ final AtomicInteger size = new AtomicInteger(1024 * 1024);
+ final AtomicReference<Exception> failure = new AtomicReference<>();
+ final CountDownLatch latch1 = new CountDownLatch(1);
+ final CountDownLatch latch2 = new CountDownLatch(1);
+ final AtomicBoolean writeBlocked = new AtomicBoolean();
+ init(new Interested()
+ {
+ @Override
+ public void onFillable(AsyncEndPoint endPoint, AbstractAsyncConnection connection)
+ {
+ ByteBuffer input = BufferUtil.allocate(2);
+ int read = fill(endPoint, input);
+
+ if (read == 1)
+ {
+ byte b = input.get();
+ if (b == 1)
+ {
+ connection.fillInterested();
+
+ ByteBuffer output = ByteBuffer.allocate(size.get());
+ endPoint.write(null, new Callback.Empty<>(), output);
+
+ latch1.countDown();
+ }
+ else
+ {
+ latch2.countDown();
+ }
+ }
+ else
+ {
+ failure.set(new Exception("Unexpectedly read " + read + " bytes"));
+ }
+ }
+
+ @Override
+ public void onIncompleteFlush()
+ {
+ writeBlocked.set(true);
+ }
+
+ private int fill(AsyncEndPoint endPoint, ByteBuffer buffer)
+ {
+ try
+ {
+ return endPoint.fill(buffer);
+ }
+ catch (IOException x)
+ {
+ failure.set(x);
+ return 0;
+ }
+ }
+ });
+
+ Socket client = new Socket();
+ client.connect(connector.getLocalAddress());
+ client.setSoTimeout(5000);
+
+ SocketChannel server = connector.accept();
+ server.configureBlocking(false);
+ selectorManager.accept(server);
+
+ OutputStream clientOutput = client.getOutputStream();
+ clientOutput.write(1);
+ clientOutput.flush();
+ Assert.assertTrue(latch1.await(5, TimeUnit.SECONDS));
+
+ // We do not read to keep the socket write blocked
+
+ clientOutput.write(2);
+ clientOutput.flush();
+ Assert.assertTrue(latch2.await(5, TimeUnit.SECONDS));
+
+ // Sleep before reading to allow waking up the server only for read
+ Thread.sleep(1000);
+
+ // Now read what was written, waking up the server for write
+ InputStream clientInput = client.getInputStream();
+ while (size.getAndDecrement() > 0)
+ clientInput.read();
+
+ client.close();
+
+ Assert.assertNull(failure.get());
+ }
+
+ private interface Interested
+ {
+ void onFillable(AsyncEndPoint endPoint, AbstractAsyncConnection connection);
+
+ void onIncompleteFlush();
+ }
+
+}

Back to the top