diff options
Diffstat (limited to 'jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java')
-rw-r--r-- | jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java | 91 |
1 files changed, 76 insertions, 15 deletions
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java index 0e6fd889da..83b4dcecb1 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java @@ -21,6 +21,8 @@ package org.eclipse.jetty.io; import java.io.Closeable; import java.io.IOException; import java.net.ConnectException; +import java.net.Socket; +import java.net.SocketAddress; import java.net.SocketTimeoutException; import java.nio.channels.CancelledKeyException; import java.nio.channels.SelectionKey; @@ -39,6 +41,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.util.ConcurrentArrayQueue; import org.eclipse.jetty.util.TypeUtil; +import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.Dumpable; @@ -65,6 +68,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa private final ManagedSelector[] _selectors; private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT; private long _selectorIndex; + private int _priorityDelta; protected SelectorManager(Executor executor, Scheduler scheduler) { @@ -110,6 +114,42 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa _connectTimeout = milliseconds; } + + @ManagedAttribute("The priority delta to apply to selector threads") + public int getSelectorPriorityDelta() + { + return _priorityDelta; + } + + /** + * Sets the selector thread priority delta to the given amount. + * <p>This allows the selector threads to run at a different priority. + * Typically this would be used to lower the priority to give preference + * to handling previously accepted connections rather than accepting + * new connections.</p> + * + * @param selectorPriorityDelta the amount to change the thread priority + * delta to (may be negative) + * @see Thread#getPriority() + */ + public void setSelectorPriorityDelta(int selectorPriorityDelta) + { + int oldDelta = _priorityDelta; + _priorityDelta = selectorPriorityDelta; + if (oldDelta != selectorPriorityDelta && isStarted()) + { + for (ManagedSelector selector : _selectors) + { + Thread thread = selector._thread; + if (thread != null) + { + int deltaDiff = selectorPriorityDelta - oldDelta; + thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, thread.getPriority() - deltaDiff))); + } + } + } + } + /** * Executes the given task in a different thread. * @@ -140,11 +180,13 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa /** * <p>Registers a channel to perform a non-blocking connect.</p> - * <p>The channel must be set in non-blocking mode, and {@link SocketChannel#connect(SocketAddress)} - * must be called prior to calling this method.</p> + * <p>The channel must be set in non-blocking mode, {@link SocketChannel#connect(SocketAddress)} + * must be called prior to calling this method, and the connect operation must not be completed + * (the return value of {@link SocketChannel#connect(SocketAddress)} must be false).</p> * * @param channel the channel to register * @param attachment the attachment object + * @see #accept(SocketChannel, Object) */ public void connect(SocketChannel channel, Object attachment) { @@ -153,16 +195,27 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa } /** + * @see #accept(SocketChannel, Object) + */ + public void accept(SocketChannel channel) + { + accept(channel, null); + } + + /** * <p>Registers a channel to perform non-blocking read/write operations.</p> * <p>This method is called just after a channel has been accepted by {@link ServerSocketChannel#accept()}, - * or just after having performed a blocking connect via {@link Socket#connect(SocketAddress, int)}.</p> + * or just after having performed a blocking connect via {@link Socket#connect(SocketAddress, int)}, or + * just after a non-blocking connect via {@link SocketChannel#connect(SocketAddress)} that completed + * successfully.</p> * * @param channel the channel to register + * @param attachment the attachment object */ - public void accept(final SocketChannel channel) + public void accept(SocketChannel channel, Object attachment) { final ManagedSelector selector = chooseSelector(); - selector.submit(selector.new Accept(channel)); + selector.submit(selector.new Accept(channel, attachment)); } /** @@ -173,7 +226,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa * * @param server the server channel to register */ - public void acceptor(final ServerSocketChannel server) + public void acceptor(ServerSocketChannel server) { final ManagedSelector selector = chooseSelector(); selector.submit(selector.new Acceptor(server)); @@ -479,9 +532,13 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa { _thread = Thread.currentThread(); String name = _thread.getName(); + int priority = _thread.getPriority(); try { - _thread.setName(name + "-selector-" + SelectorManager.this.getClass().getSimpleName()+"@"+Integer.toHexString(SelectorManager.this.hashCode())+"/"+_id); + if (_priorityDelta != 0) + _thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, priority + _priorityDelta))); + + _thread.setName(String.format("%s-selector-%s@%h/%d", name, SelectorManager.this.getClass().getSimpleName(), SelectorManager.this.hashCode(), _id)); if (LOG.isDebugEnabled()) LOG.debug("Starting {} on {}", _thread, this); while (isRunning()) @@ -494,6 +551,8 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa if (LOG.isDebugEnabled()) LOG.debug("Stopped {} on {}", _thread, this); _thread.setName(name); + if (_priorityDelta != 0) + _thread.setPriority(priority); } } @@ -812,11 +871,13 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa private class Accept implements Runnable { - private final SocketChannel _channel; + private final SocketChannel channel; + private final Object attachment; - public Accept(SocketChannel channel) + private Accept(SocketChannel channel, Object attachment) { - this._channel = channel; + this.channel = channel; + this.attachment = attachment; } @Override @@ -824,13 +885,13 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa { try { - SelectionKey key = _channel.register(_selector, 0, null); - EndPoint endpoint = createEndPoint(_channel, key); + SelectionKey key = channel.register(_selector, 0, attachment); + EndPoint endpoint = createEndPoint(channel, key); key.attach(endpoint); } catch (Throwable x) { - closeNoExceptions(_channel); + closeNoExceptions(channel); LOG.debug(x); } } @@ -843,7 +904,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa private final Object attachment; private final Scheduler.Task timeout; - public Connect(SocketChannel channel, Object attachment) + private Connect(SocketChannel channel, Object attachment) { this.channel = channel; this.attachment = attachment; @@ -863,7 +924,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa } } - protected void failed(Throwable failure) + private void failed(Throwable failure) { if (failed.compareAndSet(false, true)) { |