Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java')
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java91
1 files changed, 76 insertions, 15 deletions
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java
index 0e6fd889da..83b4dcecb1 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java
@@ -21,6 +21,8 @@ package org.eclipse.jetty.io;
import java.io.Closeable;
import java.io.IOException;
import java.net.ConnectException;
+import java.net.Socket;
+import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
@@ -39,6 +41,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.TypeUtil;
+import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
@@ -65,6 +68,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
private final ManagedSelector[] _selectors;
private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
private long _selectorIndex;
+ private int _priorityDelta;
protected SelectorManager(Executor executor, Scheduler scheduler)
{
@@ -110,6 +114,42 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
_connectTimeout = milliseconds;
}
+
+ @ManagedAttribute("The priority delta to apply to selector threads")
+ public int getSelectorPriorityDelta()
+ {
+ return _priorityDelta;
+ }
+
+ /**
+ * Sets the selector thread priority delta to the given amount.
+ * <p>This allows the selector threads to run at a different priority.
+ * Typically this would be used to lower the priority to give preference
+ * to handling previously accepted connections rather than accepting
+ * new connections.</p>
+ *
+ * @param selectorPriorityDelta the amount to change the thread priority
+ * delta to (may be negative)
+ * @see Thread#getPriority()
+ */
+ public void setSelectorPriorityDelta(int selectorPriorityDelta)
+ {
+ int oldDelta = _priorityDelta;
+ _priorityDelta = selectorPriorityDelta;
+ if (oldDelta != selectorPriorityDelta && isStarted())
+ {
+ for (ManagedSelector selector : _selectors)
+ {
+ Thread thread = selector._thread;
+ if (thread != null)
+ {
+ int deltaDiff = selectorPriorityDelta - oldDelta;
+ thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, thread.getPriority() - deltaDiff)));
+ }
+ }
+ }
+ }
+
/**
* Executes the given task in a different thread.
*
@@ -140,11 +180,13 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
/**
* <p>Registers a channel to perform a non-blocking connect.</p>
- * <p>The channel must be set in non-blocking mode, and {@link SocketChannel#connect(SocketAddress)}
- * must be called prior to calling this method.</p>
+ * <p>The channel must be set in non-blocking mode, {@link SocketChannel#connect(SocketAddress)}
+ * must be called prior to calling this method, and the connect operation must not be completed
+ * (the return value of {@link SocketChannel#connect(SocketAddress)} must be false).</p>
*
* @param channel the channel to register
* @param attachment the attachment object
+ * @see #accept(SocketChannel, Object)
*/
public void connect(SocketChannel channel, Object attachment)
{
@@ -153,16 +195,27 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
}
/**
+ * @see #accept(SocketChannel, Object)
+ */
+ public void accept(SocketChannel channel)
+ {
+ accept(channel, null);
+ }
+
+ /**
* <p>Registers a channel to perform non-blocking read/write operations.</p>
* <p>This method is called just after a channel has been accepted by {@link ServerSocketChannel#accept()},
- * or just after having performed a blocking connect via {@link Socket#connect(SocketAddress, int)}.</p>
+ * or just after having performed a blocking connect via {@link Socket#connect(SocketAddress, int)}, or
+ * just after a non-blocking connect via {@link SocketChannel#connect(SocketAddress)} that completed
+ * successfully.</p>
*
* @param channel the channel to register
+ * @param attachment the attachment object
*/
- public void accept(final SocketChannel channel)
+ public void accept(SocketChannel channel, Object attachment)
{
final ManagedSelector selector = chooseSelector();
- selector.submit(selector.new Accept(channel));
+ selector.submit(selector.new Accept(channel, attachment));
}
/**
@@ -173,7 +226,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
*
* @param server the server channel to register
*/
- public void acceptor(final ServerSocketChannel server)
+ public void acceptor(ServerSocketChannel server)
{
final ManagedSelector selector = chooseSelector();
selector.submit(selector.new Acceptor(server));
@@ -479,9 +532,13 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
{
_thread = Thread.currentThread();
String name = _thread.getName();
+ int priority = _thread.getPriority();
try
{
- _thread.setName(name + "-selector-" + SelectorManager.this.getClass().getSimpleName()+"@"+Integer.toHexString(SelectorManager.this.hashCode())+"/"+_id);
+ if (_priorityDelta != 0)
+ _thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, priority + _priorityDelta)));
+
+ _thread.setName(String.format("%s-selector-%s@%h/%d", name, SelectorManager.this.getClass().getSimpleName(), SelectorManager.this.hashCode(), _id));
if (LOG.isDebugEnabled())
LOG.debug("Starting {} on {}", _thread, this);
while (isRunning())
@@ -494,6 +551,8 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
if (LOG.isDebugEnabled())
LOG.debug("Stopped {} on {}", _thread, this);
_thread.setName(name);
+ if (_priorityDelta != 0)
+ _thread.setPriority(priority);
}
}
@@ -812,11 +871,13 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
private class Accept implements Runnable
{
- private final SocketChannel _channel;
+ private final SocketChannel channel;
+ private final Object attachment;
- public Accept(SocketChannel channel)
+ private Accept(SocketChannel channel, Object attachment)
{
- this._channel = channel;
+ this.channel = channel;
+ this.attachment = attachment;
}
@Override
@@ -824,13 +885,13 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
{
try
{
- SelectionKey key = _channel.register(_selector, 0, null);
- EndPoint endpoint = createEndPoint(_channel, key);
+ SelectionKey key = channel.register(_selector, 0, attachment);
+ EndPoint endpoint = createEndPoint(channel, key);
key.attach(endpoint);
}
catch (Throwable x)
{
- closeNoExceptions(_channel);
+ closeNoExceptions(channel);
LOG.debug(x);
}
}
@@ -843,7 +904,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
private final Object attachment;
private final Scheduler.Task timeout;
- public Connect(SocketChannel channel, Object attachment)
+ private Connect(SocketChannel channel, Object attachment)
{
this.channel = channel;
this.attachment = attachment;
@@ -863,7 +924,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
}
}
- protected void failed(Throwable failure)
+ private void failed(Throwable failure)
{
if (failed.compareAndSet(false, true))
{

Back to the top