Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java')
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java754
1 files changed, 754 insertions, 0 deletions
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
new file mode 100644
index 0000000000..d2a33bff57
--- /dev/null
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
@@ -0,0 +1,754 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.io;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.SocketTimeoutException;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.eclipse.jetty.util.component.AbstractLifeCycle;
+import org.eclipse.jetty.util.component.ContainerLifeCycle;
+import org.eclipse.jetty.util.component.Dumpable;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.util.thread.ExecutionStrategy;
+import org.eclipse.jetty.util.thread.Locker;
+import org.eclipse.jetty.util.thread.Scheduler;
+
+/**
+ * <p>{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.</p>
+ * <p>{@link ManagedSelector} runs the select loop, which waits on {@link Selector#select()} until events
+ * happen for registered channels. When events happen, it notifies the {@link EndPoint} associated
+ * with the channel.</p>
+ */
+public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
+{
+ private static final Logger LOG = Log.getLogger(ManagedSelector.class);
+
+ private final Locker _locker = new Locker();
+ private boolean _selecting = false;
+ private final Queue<Runnable> _actions = new ArrayDeque<>();
+ private final SelectorManager _selectorManager;
+ private final int _id;
+ private final ExecutionStrategy _strategy;
+ private Selector _selector;
+
+ public ManagedSelector(SelectorManager selectorManager, int id)
+ {
+ _selectorManager = selectorManager;
+ _id = id;
+ _strategy = ExecutionStrategy.Factory.instanceFor(new SelectorProducer(), selectorManager.getExecutor());
+ setStopTimeout(5000);
+ }
+
+ @Override
+ protected void doStart() throws Exception
+ {
+ super.doStart();
+ _selector = newSelector();
+ }
+
+ protected Selector newSelector() throws IOException
+ {
+ return Selector.open();
+ }
+
+ public int size()
+ {
+ Selector s = _selector;
+ if (s == null)
+ return 0;
+ return s.keys().size();
+ }
+
+ @Override
+ protected void doStop() throws Exception
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Stopping {}", this);
+ CloseEndPoints close_endps = new CloseEndPoints();
+ submit(close_endps);
+ close_endps.await(getStopTimeout());
+ super.doStop();
+ CloseSelector close_selector = new CloseSelector();
+ submit(close_selector);
+ close_selector.await(getStopTimeout());
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("Stopped {}", this);
+ }
+
+ public void submit(Runnable change)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Queued change {} on {}", change, this);
+
+ Selector selector = null;
+ try (Locker.Lock lock = _locker.lock())
+ {
+ _actions.offer(change);
+ if (_selecting)
+ {
+ selector = _selector;
+ // To avoid the extra select wakeup.
+ _selecting = false;
+ }
+ }
+ if (selector != null)
+ selector.wakeup();
+ }
+
+ @Override
+ public void run()
+ {
+ _strategy.execute();
+ }
+
+ /**
+ * A {@link SelectableEndPoint} is an {@link EndPoint} that wish to be
+ * notified of non-blocking events by the {@link ManagedSelector}.
+ */
+ public interface SelectableEndPoint extends EndPoint
+ {
+ /**
+ * Callback method invoked when a read or write events has been
+ * detected by the {@link ManagedSelector} for this endpoint.
+ *
+ * @return a job that may block or null
+ */
+ Runnable onSelected();
+
+ /**
+ * Callback method invoked when all the keys selected by the
+ * {@link ManagedSelector} for this endpoint have been processed.
+ */
+ void updateKey();
+ }
+
+ private class SelectorProducer implements ExecutionStrategy.Producer
+ {
+ private Set<SelectionKey> _keys = Collections.emptySet();
+ private Iterator<SelectionKey> _cursor = Collections.emptyIterator();
+
+ @Override
+ public Runnable produce()
+ {
+ while (true)
+ {
+ Runnable task = processSelected();
+ if (task != null)
+ return task;
+
+ Runnable action = runActions();
+ if (action != null)
+ return action;
+
+ update();
+
+ if (!select())
+ return null;
+ }
+ }
+
+ private Runnable runActions()
+ {
+ while (true)
+ {
+ Runnable action;
+ try (Locker.Lock lock = _locker.lock())
+ {
+ action = _actions.poll();
+ if (action == null)
+ {
+ // No more actions, so we need to select
+ _selecting = true;
+ return null;
+ }
+ }
+
+ if (action instanceof Product)
+ return action;
+
+ // Running the change may queue another action.
+ runChange(action);
+ }
+ }
+
+ private void runChange(Runnable change)
+ {
+ try
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Running change {}", change);
+ change.run();
+ }
+ catch (Throwable x)
+ {
+ LOG.debug("Could not run change " + change, x);
+ }
+ }
+
+ private boolean select()
+ {
+ try
+ {
+ Selector selector = _selector;
+ if (selector != null && selector.isOpen())
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Selector loop waiting on select");
+ int selected = selector.select();
+ if (LOG.isDebugEnabled())
+ LOG.debug("Selector loop woken up from select, {}/{} selected", selected, selector.keys().size());
+
+ try (Locker.Lock lock = _locker.lock())
+ {
+ // finished selecting
+ _selecting = false;
+ }
+
+ _keys = selector.selectedKeys();
+ _cursor = _keys.iterator();
+
+ return true;
+ }
+ }
+ catch (Throwable x)
+ {
+ closeNoExceptions(_selector);
+ if (isRunning())
+ LOG.warn(x);
+ else
+ LOG.debug(x);
+ }
+ return false;
+ }
+
+ private Runnable processSelected()
+ {
+ while (_cursor.hasNext())
+ {
+ SelectionKey key = _cursor.next();
+ if (key.isValid())
+ {
+ Object attachment = key.attachment();
+ try
+ {
+ if (attachment instanceof SelectableEndPoint)
+ {
+ // Try to produce a task
+ Runnable task = ((SelectableEndPoint)attachment).onSelected();
+ if (task != null)
+ return task;
+ }
+ else if (key.isConnectable())
+ {
+ Runnable task = processConnect(key, (Connect)attachment);
+ if (task != null)
+ return task;
+ }
+ else if (key.isAcceptable())
+ {
+ processAccept(key);
+ }
+ else
+ {
+ throw new IllegalStateException("key=" + key + ", att=" + attachment + ", iOps=" + key.interestOps() + ", rOps=" + key.readyOps());
+ }
+ }
+ catch (CancelledKeyException x)
+ {
+ LOG.debug("Ignoring cancelled key for channel {}", key.channel());
+ if (attachment instanceof EndPoint)
+ closeNoExceptions((EndPoint)attachment);
+ }
+ catch (Throwable x)
+ {
+ LOG.warn("Could not process key for channel " + key.channel(), x);
+ if (attachment instanceof EndPoint)
+ closeNoExceptions((EndPoint)attachment);
+ }
+ }
+ else
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
+ Object attachment = key.attachment();
+ if (attachment instanceof EndPoint)
+ closeNoExceptions((EndPoint)attachment);
+ }
+ }
+ return null;
+ }
+
+ private void update()
+ {
+ for (SelectionKey key : _keys)
+ updateKey(key);
+ _keys.clear();
+ }
+
+ private void updateKey(SelectionKey key)
+ {
+ Object attachment = key.attachment();
+ if (attachment instanceof SelectableEndPoint)
+ ((SelectableEndPoint)attachment).updateKey();
+ }
+ }
+
+ private interface Product extends Runnable
+ {
+ }
+
+ private Runnable processConnect(SelectionKey key, final Connect connect)
+ {
+ SocketChannel channel = (SocketChannel)key.channel();
+ try
+ {
+ key.attach(connect.attachment);
+ boolean connected = _selectorManager.finishConnect(channel);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Connected {} {}", connected, channel);
+ if (connected)
+ {
+ if (connect.timeout.cancel())
+ {
+ key.interestOps(0);
+ return new CreateEndPoint(channel, key)
+ {
+ @Override
+ protected void failed(Throwable failure)
+ {
+ super.failed(failure);
+ connect.failed(failure);
+ }
+ };
+ }
+ else
+ {
+ throw new SocketTimeoutException("Concurrent Connect Timeout");
+ }
+ }
+ else
+ {
+ throw new ConnectException();
+ }
+ }
+ catch (Throwable x)
+ {
+ connect.failed(x);
+ return null;
+ }
+ }
+
+ private void processAccept(SelectionKey key)
+ {
+ ServerSocketChannel server = (ServerSocketChannel)key.channel();
+ SocketChannel channel = null;
+ try
+ {
+ while ((channel = server.accept()) != null)
+ {
+ _selectorManager.accepted(channel);
+ }
+ }
+ catch (Throwable x)
+ {
+ closeNoExceptions(channel);
+ LOG.warn("Accept failed for channel " + channel, x);
+ }
+ }
+
+ private void closeNoExceptions(Closeable closeable)
+ {
+ try
+ {
+ if (closeable != null)
+ closeable.close();
+ }
+ catch (Throwable x)
+ {
+ LOG.ignore(x);
+ }
+ }
+
+ private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
+ {
+ EndPoint endPoint = _selectorManager.newEndPoint(channel, this, selectionKey);
+ _selectorManager.endPointOpened(endPoint);
+ Connection connection = _selectorManager.newConnection(channel, endPoint, selectionKey.attachment());
+ endPoint.setConnection(connection);
+ selectionKey.attach(endPoint);
+ _selectorManager.connectionOpened(connection);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Created {}", endPoint);
+ return endPoint;
+ }
+
+ public void destroyEndPoint(final EndPoint endPoint)
+ {
+ final Connection connection = endPoint.getConnection();
+ submit(new Product()
+ {
+ @Override
+ public void run()
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Destroyed {}", endPoint);
+ if (connection != null)
+ _selectorManager.connectionClosed(connection);
+ _selectorManager.endPointClosed(endPoint);
+ }
+ });
+ }
+
+ @Override
+ public String dump()
+ {
+ return ContainerLifeCycle.dump(this);
+ }
+
+ @Override
+ public void dump(Appendable out, String indent) throws IOException
+ {
+ out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append(System.lineSeparator());
+
+ Selector selector = _selector;
+ if (selector != null && selector.isOpen())
+ {
+ final ArrayList<Object> dump = new ArrayList<>(selector.keys().size() * 2);
+
+ DumpKeys dumpKeys = new DumpKeys(dump);
+ submit(dumpKeys);
+ dumpKeys.await(5, TimeUnit.SECONDS);
+
+ ContainerLifeCycle.dump(out, indent, dump);
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ Selector selector = _selector;
+ return String.format("%s id=%s keys=%d selected=%d",
+ super.toString(),
+ _id,
+ selector != null && selector.isOpen() ? selector.keys().size() : -1,
+ selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
+ }
+
+ private class DumpKeys implements Runnable
+ {
+ private final CountDownLatch latch = new CountDownLatch(1);
+ private final List<Object> _dumps;
+
+ private DumpKeys(List<Object> dumps)
+ {
+ this._dumps = dumps;
+ }
+
+ @Override
+ public void run()
+ {
+ Selector selector = _selector;
+ if (selector != null && selector.isOpen())
+ {
+ Set<SelectionKey> keys = selector.keys();
+ _dumps.add(selector + " keys=" + keys.size());
+ for (SelectionKey key : keys)
+ {
+ try
+ {
+ _dumps.add(String.format("SelectionKey@%x{i=%d}->%s", key.hashCode(), key.interestOps(), key.attachment()));
+ }
+ catch (Throwable x)
+ {
+ LOG.ignore(x);
+ }
+ }
+ }
+ latch.countDown();
+ }
+
+ public boolean await(long timeout, TimeUnit unit)
+ {
+ try
+ {
+ return latch.await(timeout, unit);
+ }
+ catch (InterruptedException x)
+ {
+ return false;
+ }
+ }
+ }
+
+ class Acceptor implements Runnable
+ {
+ private final ServerSocketChannel _channel;
+
+ public Acceptor(ServerSocketChannel channel)
+ {
+ this._channel = channel;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ SelectionKey key = _channel.register(_selector, SelectionKey.OP_ACCEPT, null);
+ if (LOG.isDebugEnabled())
+ LOG.debug("{} acceptor={}", this, key);
+ }
+ catch (Throwable x)
+ {
+ closeNoExceptions(_channel);
+ LOG.warn(x);
+ }
+ }
+ }
+
+ class Accept implements Runnable
+ {
+ private final SocketChannel channel;
+ private final Object attachment;
+
+ Accept(SocketChannel channel, Object attachment)
+ {
+ this.channel = channel;
+ this.attachment = attachment;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ final SelectionKey key = channel.register(_selector, 0, attachment);
+ submit(new CreateEndPoint(channel, key));
+ }
+ catch (Throwable x)
+ {
+ closeNoExceptions(channel);
+ LOG.debug(x);
+ }
+ }
+ }
+
+ private class CreateEndPoint implements Product
+ {
+ private final SocketChannel channel;
+ private final SelectionKey key;
+
+ public CreateEndPoint(SocketChannel channel, SelectionKey key)
+ {
+ this.channel = channel;
+ this.key = key;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ createEndPoint(channel, key);
+ }
+ catch (Throwable x)
+ {
+ LOG.debug(x);
+ failed(x);
+ }
+ }
+
+ protected void failed(Throwable failure)
+ {
+ closeNoExceptions(channel);
+ LOG.debug(failure);
+ }
+ }
+
+ class Connect implements Runnable
+ {
+ private final AtomicBoolean failed = new AtomicBoolean();
+ private final SocketChannel channel;
+ private final Object attachment;
+ private final Scheduler.Task timeout;
+
+ Connect(SocketChannel channel, Object attachment)
+ {
+ this.channel = channel;
+ this.attachment = attachment;
+ this.timeout = ManagedSelector.this._selectorManager.getScheduler().schedule(new ConnectTimeout(this), ManagedSelector.this._selectorManager.getConnectTimeout(), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ channel.register(_selector, SelectionKey.OP_CONNECT, this);
+ }
+ catch (Throwable x)
+ {
+ failed(x);
+ }
+ }
+
+ private void failed(Throwable failure)
+ {
+ if (failed.compareAndSet(false, true))
+ {
+ timeout.cancel();
+ closeNoExceptions(channel);
+ ManagedSelector.this._selectorManager.connectionFailed(channel, failure, attachment);
+ }
+ }
+ }
+
+ private class ConnectTimeout implements Runnable
+ {
+ private final Connect connect;
+
+ private ConnectTimeout(Connect connect)
+ {
+ this.connect = connect;
+ }
+
+ @Override
+ public void run()
+ {
+ SocketChannel channel = connect.channel;
+ if (channel.isConnectionPending())
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Channel {} timed out while connecting, closing it", channel);
+ connect.failed(new SocketTimeoutException("Connect Timeout"));
+ }
+ }
+ }
+
+ private class CloseEndPoints implements Runnable
+ {
+ private final CountDownLatch _latch = new CountDownLatch(1);
+ private CountDownLatch _allClosed;
+
+ @Override
+ public void run()
+ {
+ List<EndPoint> end_points = new ArrayList<>();
+ for (SelectionKey key : _selector.keys())
+ {
+ if (key.isValid())
+ {
+ Object attachment = key.attachment();
+ if (attachment instanceof EndPoint)
+ end_points.add((EndPoint)attachment);
+ }
+ }
+
+ int size = end_points.size();
+ if (LOG.isDebugEnabled())
+ LOG.debug("Closing {} endPoints on {}", size, ManagedSelector.this);
+
+ _allClosed = new CountDownLatch(size);
+ _latch.countDown();
+
+ for (EndPoint endp : end_points)
+ submit(new EndPointCloser(endp, _allClosed));
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("Closed {} endPoints on {}", size, ManagedSelector.this);
+ }
+
+ public boolean await(long timeout)
+ {
+ try
+ {
+ return _latch.await(timeout, TimeUnit.MILLISECONDS) &&
+ _allClosed.await(timeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException x)
+ {
+ return false;
+ }
+ }
+ }
+
+ private class EndPointCloser implements Product
+ {
+ private final EndPoint _endPoint;
+ private final CountDownLatch _latch;
+
+ private EndPointCloser(EndPoint endPoint, CountDownLatch latch)
+ {
+ _endPoint = endPoint;
+ _latch = latch;
+ }
+
+ @Override
+ public void run()
+ {
+ closeNoExceptions(_endPoint.getConnection());
+ _latch.countDown();
+ }
+ }
+
+ private class CloseSelector implements Runnable
+ {
+ private CountDownLatch _latch = new CountDownLatch(1);
+
+ @Override
+ public void run()
+ {
+ Selector selector = _selector;
+ _selector = null;
+ closeNoExceptions(selector);
+ _latch.countDown();
+ }
+
+ public boolean await(long timeout)
+ {
+ try
+ {
+ return _latch.await(timeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException x)
+ {
+ return false;
+ }
+ }
+ }
+}

Back to the top