diff options
Diffstat (limited to 'jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java')
-rw-r--r-- | jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java | 754 |
1 files changed, 754 insertions, 0 deletions
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java new file mode 100644 index 0000000000..d2a33bff57 --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -0,0 +1,754 @@ +// +// ======================================================================== +// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.io; + +import java.io.Closeable; +import java.io.IOException; +import java.net.ConnectException; +import java.net.SocketTimeoutException; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.eclipse.jetty.util.component.AbstractLifeCycle; +import org.eclipse.jetty.util.component.ContainerLifeCycle; +import org.eclipse.jetty.util.component.Dumpable; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.ExecutionStrategy; +import org.eclipse.jetty.util.thread.Locker; +import org.eclipse.jetty.util.thread.Scheduler; + +/** + * <p>{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.</p> + * <p>{@link ManagedSelector} runs the select loop, which waits on {@link Selector#select()} until events + * happen for registered channels. When events happen, it notifies the {@link EndPoint} associated + * with the channel.</p> + */ +public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable +{ + private static final Logger LOG = Log.getLogger(ManagedSelector.class); + + private final Locker _locker = new Locker(); + private boolean _selecting = false; + private final Queue<Runnable> _actions = new ArrayDeque<>(); + private final SelectorManager _selectorManager; + private final int _id; + private final ExecutionStrategy _strategy; + private Selector _selector; + + public ManagedSelector(SelectorManager selectorManager, int id) + { + _selectorManager = selectorManager; + _id = id; + _strategy = ExecutionStrategy.Factory.instanceFor(new SelectorProducer(), selectorManager.getExecutor()); + setStopTimeout(5000); + } + + @Override + protected void doStart() throws Exception + { + super.doStart(); + _selector = newSelector(); + } + + protected Selector newSelector() throws IOException + { + return Selector.open(); + } + + public int size() + { + Selector s = _selector; + if (s == null) + return 0; + return s.keys().size(); + } + + @Override + protected void doStop() throws Exception + { + if (LOG.isDebugEnabled()) + LOG.debug("Stopping {}", this); + CloseEndPoints close_endps = new CloseEndPoints(); + submit(close_endps); + close_endps.await(getStopTimeout()); + super.doStop(); + CloseSelector close_selector = new CloseSelector(); + submit(close_selector); + close_selector.await(getStopTimeout()); + + if (LOG.isDebugEnabled()) + LOG.debug("Stopped {}", this); + } + + public void submit(Runnable change) + { + if (LOG.isDebugEnabled()) + LOG.debug("Queued change {} on {}", change, this); + + Selector selector = null; + try (Locker.Lock lock = _locker.lock()) + { + _actions.offer(change); + if (_selecting) + { + selector = _selector; + // To avoid the extra select wakeup. + _selecting = false; + } + } + if (selector != null) + selector.wakeup(); + } + + @Override + public void run() + { + _strategy.execute(); + } + + /** + * A {@link SelectableEndPoint} is an {@link EndPoint} that wish to be + * notified of non-blocking events by the {@link ManagedSelector}. + */ + public interface SelectableEndPoint extends EndPoint + { + /** + * Callback method invoked when a read or write events has been + * detected by the {@link ManagedSelector} for this endpoint. + * + * @return a job that may block or null + */ + Runnable onSelected(); + + /** + * Callback method invoked when all the keys selected by the + * {@link ManagedSelector} for this endpoint have been processed. + */ + void updateKey(); + } + + private class SelectorProducer implements ExecutionStrategy.Producer + { + private Set<SelectionKey> _keys = Collections.emptySet(); + private Iterator<SelectionKey> _cursor = Collections.emptyIterator(); + + @Override + public Runnable produce() + { + while (true) + { + Runnable task = processSelected(); + if (task != null) + return task; + + Runnable action = runActions(); + if (action != null) + return action; + + update(); + + if (!select()) + return null; + } + } + + private Runnable runActions() + { + while (true) + { + Runnable action; + try (Locker.Lock lock = _locker.lock()) + { + action = _actions.poll(); + if (action == null) + { + // No more actions, so we need to select + _selecting = true; + return null; + } + } + + if (action instanceof Product) + return action; + + // Running the change may queue another action. + runChange(action); + } + } + + private void runChange(Runnable change) + { + try + { + if (LOG.isDebugEnabled()) + LOG.debug("Running change {}", change); + change.run(); + } + catch (Throwable x) + { + LOG.debug("Could not run change " + change, x); + } + } + + private boolean select() + { + try + { + Selector selector = _selector; + if (selector != null && selector.isOpen()) + { + if (LOG.isDebugEnabled()) + LOG.debug("Selector loop waiting on select"); + int selected = selector.select(); + if (LOG.isDebugEnabled()) + LOG.debug("Selector loop woken up from select, {}/{} selected", selected, selector.keys().size()); + + try (Locker.Lock lock = _locker.lock()) + { + // finished selecting + _selecting = false; + } + + _keys = selector.selectedKeys(); + _cursor = _keys.iterator(); + + return true; + } + } + catch (Throwable x) + { + closeNoExceptions(_selector); + if (isRunning()) + LOG.warn(x); + else + LOG.debug(x); + } + return false; + } + + private Runnable processSelected() + { + while (_cursor.hasNext()) + { + SelectionKey key = _cursor.next(); + if (key.isValid()) + { + Object attachment = key.attachment(); + try + { + if (attachment instanceof SelectableEndPoint) + { + // Try to produce a task + Runnable task = ((SelectableEndPoint)attachment).onSelected(); + if (task != null) + return task; + } + else if (key.isConnectable()) + { + Runnable task = processConnect(key, (Connect)attachment); + if (task != null) + return task; + } + else if (key.isAcceptable()) + { + processAccept(key); + } + else + { + throw new IllegalStateException("key=" + key + ", att=" + attachment + ", iOps=" + key.interestOps() + ", rOps=" + key.readyOps()); + } + } + catch (CancelledKeyException x) + { + LOG.debug("Ignoring cancelled key for channel {}", key.channel()); + if (attachment instanceof EndPoint) + closeNoExceptions((EndPoint)attachment); + } + catch (Throwable x) + { + LOG.warn("Could not process key for channel " + key.channel(), x); + if (attachment instanceof EndPoint) + closeNoExceptions((EndPoint)attachment); + } + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel()); + Object attachment = key.attachment(); + if (attachment instanceof EndPoint) + closeNoExceptions((EndPoint)attachment); + } + } + return null; + } + + private void update() + { + for (SelectionKey key : _keys) + updateKey(key); + _keys.clear(); + } + + private void updateKey(SelectionKey key) + { + Object attachment = key.attachment(); + if (attachment instanceof SelectableEndPoint) + ((SelectableEndPoint)attachment).updateKey(); + } + } + + private interface Product extends Runnable + { + } + + private Runnable processConnect(SelectionKey key, final Connect connect) + { + SocketChannel channel = (SocketChannel)key.channel(); + try + { + key.attach(connect.attachment); + boolean connected = _selectorManager.finishConnect(channel); + if (LOG.isDebugEnabled()) + LOG.debug("Connected {} {}", connected, channel); + if (connected) + { + if (connect.timeout.cancel()) + { + key.interestOps(0); + return new CreateEndPoint(channel, key) + { + @Override + protected void failed(Throwable failure) + { + super.failed(failure); + connect.failed(failure); + } + }; + } + else + { + throw new SocketTimeoutException("Concurrent Connect Timeout"); + } + } + else + { + throw new ConnectException(); + } + } + catch (Throwable x) + { + connect.failed(x); + return null; + } + } + + private void processAccept(SelectionKey key) + { + ServerSocketChannel server = (ServerSocketChannel)key.channel(); + SocketChannel channel = null; + try + { + while ((channel = server.accept()) != null) + { + _selectorManager.accepted(channel); + } + } + catch (Throwable x) + { + closeNoExceptions(channel); + LOG.warn("Accept failed for channel " + channel, x); + } + } + + private void closeNoExceptions(Closeable closeable) + { + try + { + if (closeable != null) + closeable.close(); + } + catch (Throwable x) + { + LOG.ignore(x); + } + } + + private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException + { + EndPoint endPoint = _selectorManager.newEndPoint(channel, this, selectionKey); + _selectorManager.endPointOpened(endPoint); + Connection connection = _selectorManager.newConnection(channel, endPoint, selectionKey.attachment()); + endPoint.setConnection(connection); + selectionKey.attach(endPoint); + _selectorManager.connectionOpened(connection); + if (LOG.isDebugEnabled()) + LOG.debug("Created {}", endPoint); + return endPoint; + } + + public void destroyEndPoint(final EndPoint endPoint) + { + final Connection connection = endPoint.getConnection(); + submit(new Product() + { + @Override + public void run() + { + if (LOG.isDebugEnabled()) + LOG.debug("Destroyed {}", endPoint); + if (connection != null) + _selectorManager.connectionClosed(connection); + _selectorManager.endPointClosed(endPoint); + } + }); + } + + @Override + public String dump() + { + return ContainerLifeCycle.dump(this); + } + + @Override + public void dump(Appendable out, String indent) throws IOException + { + out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append(System.lineSeparator()); + + Selector selector = _selector; + if (selector != null && selector.isOpen()) + { + final ArrayList<Object> dump = new ArrayList<>(selector.keys().size() * 2); + + DumpKeys dumpKeys = new DumpKeys(dump); + submit(dumpKeys); + dumpKeys.await(5, TimeUnit.SECONDS); + + ContainerLifeCycle.dump(out, indent, dump); + } + } + + @Override + public String toString() + { + Selector selector = _selector; + return String.format("%s id=%s keys=%d selected=%d", + super.toString(), + _id, + selector != null && selector.isOpen() ? selector.keys().size() : -1, + selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1); + } + + private class DumpKeys implements Runnable + { + private final CountDownLatch latch = new CountDownLatch(1); + private final List<Object> _dumps; + + private DumpKeys(List<Object> dumps) + { + this._dumps = dumps; + } + + @Override + public void run() + { + Selector selector = _selector; + if (selector != null && selector.isOpen()) + { + Set<SelectionKey> keys = selector.keys(); + _dumps.add(selector + " keys=" + keys.size()); + for (SelectionKey key : keys) + { + try + { + _dumps.add(String.format("SelectionKey@%x{i=%d}->%s", key.hashCode(), key.interestOps(), key.attachment())); + } + catch (Throwable x) + { + LOG.ignore(x); + } + } + } + latch.countDown(); + } + + public boolean await(long timeout, TimeUnit unit) + { + try + { + return latch.await(timeout, unit); + } + catch (InterruptedException x) + { + return false; + } + } + } + + class Acceptor implements Runnable + { + private final ServerSocketChannel _channel; + + public Acceptor(ServerSocketChannel channel) + { + this._channel = channel; + } + + @Override + public void run() + { + try + { + SelectionKey key = _channel.register(_selector, SelectionKey.OP_ACCEPT, null); + if (LOG.isDebugEnabled()) + LOG.debug("{} acceptor={}", this, key); + } + catch (Throwable x) + { + closeNoExceptions(_channel); + LOG.warn(x); + } + } + } + + class Accept implements Runnable + { + private final SocketChannel channel; + private final Object attachment; + + Accept(SocketChannel channel, Object attachment) + { + this.channel = channel; + this.attachment = attachment; + } + + @Override + public void run() + { + try + { + final SelectionKey key = channel.register(_selector, 0, attachment); + submit(new CreateEndPoint(channel, key)); + } + catch (Throwable x) + { + closeNoExceptions(channel); + LOG.debug(x); + } + } + } + + private class CreateEndPoint implements Product + { + private final SocketChannel channel; + private final SelectionKey key; + + public CreateEndPoint(SocketChannel channel, SelectionKey key) + { + this.channel = channel; + this.key = key; + } + + @Override + public void run() + { + try + { + createEndPoint(channel, key); + } + catch (Throwable x) + { + LOG.debug(x); + failed(x); + } + } + + protected void failed(Throwable failure) + { + closeNoExceptions(channel); + LOG.debug(failure); + } + } + + class Connect implements Runnable + { + private final AtomicBoolean failed = new AtomicBoolean(); + private final SocketChannel channel; + private final Object attachment; + private final Scheduler.Task timeout; + + Connect(SocketChannel channel, Object attachment) + { + this.channel = channel; + this.attachment = attachment; + this.timeout = ManagedSelector.this._selectorManager.getScheduler().schedule(new ConnectTimeout(this), ManagedSelector.this._selectorManager.getConnectTimeout(), TimeUnit.MILLISECONDS); + } + + @Override + public void run() + { + try + { + channel.register(_selector, SelectionKey.OP_CONNECT, this); + } + catch (Throwable x) + { + failed(x); + } + } + + private void failed(Throwable failure) + { + if (failed.compareAndSet(false, true)) + { + timeout.cancel(); + closeNoExceptions(channel); + ManagedSelector.this._selectorManager.connectionFailed(channel, failure, attachment); + } + } + } + + private class ConnectTimeout implements Runnable + { + private final Connect connect; + + private ConnectTimeout(Connect connect) + { + this.connect = connect; + } + + @Override + public void run() + { + SocketChannel channel = connect.channel; + if (channel.isConnectionPending()) + { + if (LOG.isDebugEnabled()) + LOG.debug("Channel {} timed out while connecting, closing it", channel); + connect.failed(new SocketTimeoutException("Connect Timeout")); + } + } + } + + private class CloseEndPoints implements Runnable + { + private final CountDownLatch _latch = new CountDownLatch(1); + private CountDownLatch _allClosed; + + @Override + public void run() + { + List<EndPoint> end_points = new ArrayList<>(); + for (SelectionKey key : _selector.keys()) + { + if (key.isValid()) + { + Object attachment = key.attachment(); + if (attachment instanceof EndPoint) + end_points.add((EndPoint)attachment); + } + } + + int size = end_points.size(); + if (LOG.isDebugEnabled()) + LOG.debug("Closing {} endPoints on {}", size, ManagedSelector.this); + + _allClosed = new CountDownLatch(size); + _latch.countDown(); + + for (EndPoint endp : end_points) + submit(new EndPointCloser(endp, _allClosed)); + + if (LOG.isDebugEnabled()) + LOG.debug("Closed {} endPoints on {}", size, ManagedSelector.this); + } + + public boolean await(long timeout) + { + try + { + return _latch.await(timeout, TimeUnit.MILLISECONDS) && + _allClosed.await(timeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException x) + { + return false; + } + } + } + + private class EndPointCloser implements Product + { + private final EndPoint _endPoint; + private final CountDownLatch _latch; + + private EndPointCloser(EndPoint endPoint, CountDownLatch latch) + { + _endPoint = endPoint; + _latch = latch; + } + + @Override + public void run() + { + closeNoExceptions(_endPoint.getConnection()); + _latch.countDown(); + } + } + + private class CloseSelector implements Runnable + { + private CountDownLatch _latch = new CountDownLatch(1); + + @Override + public void run() + { + Selector selector = _selector; + _selector = null; + closeNoExceptions(selector); + _latch.countDown(); + } + + public boolean await(long timeout) + { + try + { + return _latch.await(timeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException x) + { + return false; + } + } + } +} |