Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimone Bordet2014-09-10 14:34:25 +0000
committerSimone Bordet2014-09-10 19:45:27 +0000
commitfd1c9dd8d2c298ba66ae1b3b98bc85719cdefb41 (patch)
treed70a9dec6e7ac00265497831957a9606b778c0b5
parent1ee11138a7c40741195f1709a000569018e3b39c (diff)
downloadorg.eclipse.jetty.project-fd1c9dd8d2c298ba66ae1b3b98bc85719cdefb41.tar.gz
org.eclipse.jetty.project-fd1c9dd8d2c298ba66ae1b3b98bc85719cdefb41.tar.xz
org.eclipse.jetty.project-fd1c9dd8d2c298ba66ae1b3b98bc85719cdefb41.zip
443713 - Reduce number of SelectionKey.setInterestOps() calls.
Introduced a state machine to handle the various scenarios (ST = selector thread, Tx = pooled thread): ST: call to SCEP.onSelected() moves from SELECTING -> PENDING. ST: call to SCEP.updateKey() moves from PENDING -> UPDATING -> SELECTING T1: call to SCEP.changeInterests() moves (SELECTING | PENDING) -> CHANGING -> SELECTING The race between ST and T1 to move from PENDING to either UPDATING or CHANGING will be won by one thread only, which will then perform the call to SelectionKey.setInterestOps(). Preferably, this will be done by ST during an updateKey() call. If updateKey() has already been invoked, then changeInterests() will perform the call to SelectionKey.setInterestOps(). However, if T1 loses, it still has to perform the key update, so it will spin until ST moves back to SELECTING.
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java208
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java49
2 files changed, 201 insertions, 56 deletions
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java
index e60c9fa706..a4dfd83bc2 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java
@@ -22,7 +22,7 @@ import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.SelectorManager.ManagedSelector;
import org.eclipse.jetty.util.log.Log;
@@ -43,27 +43,21 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
{
try
{
- if (getChannel().isOpen())
- {
- int oldInterestOps = _key.interestOps();
- int newInterestOps = _interestOps.get();
- if (newInterestOps != oldInterestOps)
- setKeyInterests(oldInterestOps, newInterestOps);
- }
+ setKeyInterests();
}
catch (CancelledKeyException x)
{
LOG.debug("Ignoring key update for concurrently closed channel {}", this);
close();
}
- catch (Exception x)
+ catch (Throwable x)
{
LOG.warn("Ignoring key update for " + this, x);
close();
}
}
};
-
+ private final AtomicReference<State> _interestState = new AtomicReference<>(State.SELECTING);
/**
* true if {@link ManagedSelector#destroyEndPoint(EndPoint)} has not been called
*/
@@ -73,11 +67,11 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
/**
* The desired value for {@link SelectionKey#interestOps()}
*/
- private final AtomicInteger _interestOps = new AtomicInteger();
+ private int _interestOps;
public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout)
{
- super(scheduler,channel);
+ super(scheduler, channel);
_selector = selector;
_key = key;
setIdleTimeout(idleTimeout);
@@ -86,78 +80,183 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
@Override
protected boolean needsFill()
{
- updateLocalInterests(SelectionKey.OP_READ, true);
- return false;
+ return !changeInterests(SelectionKey.OP_READ, true);
}
@Override
protected void onIncompleteFlush()
{
- updateLocalInterests(SelectionKey.OP_WRITE, true);
+ changeInterests(SelectionKey.OP_WRITE, true);
}
@Override
public void onSelected()
{
+ /**
+ * This method never runs concurrently with other
+ * methods that update _interestState.
+ */
+
assert _selector.isSelectorThread();
- int oldInterestOps = _key.interestOps();
+
+ // Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both).
int readyOps = _key.readyOps();
+ int oldInterestOps = _interestOps;
int newInterestOps = oldInterestOps & ~readyOps;
- setKeyInterests(oldInterestOps, newInterestOps);
- updateLocalInterests(readyOps, false);
- if (_key.isReadable())
+ _interestOps = newInterestOps;
+
+ if (!_interestState.compareAndSet(State.SELECTING, State.PENDING))
+ throw new IllegalStateException("Invalid state: " + _interestState);
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("onSelected {}->{} for {}", oldInterestOps, newInterestOps, this);
+
+ if ((readyOps & SelectionKey.OP_READ) != 0)
getFillInterest().fillable();
- if (_key.isWritable())
+ if ((readyOps & SelectionKey.OP_WRITE) != 0)
getWriteFlusher().completeWrite();
}
-
- private void updateLocalInterests(int operation, boolean add)
+ @Override
+ public void updateKey()
{
+ /**
+ * This method may run concurrently with {@link #changeInterests(int, boolean)}.
+ */
+
+ assert _selector.isSelectorThread();
+
while (true)
{
- int oldInterestOps = _interestOps.get();
- int newInterestOps;
- if (add)
- newInterestOps = oldInterestOps | operation;
- else
- newInterestOps = oldInterestOps & ~operation;
-
- if (isInputShutdown())
- newInterestOps &= ~SelectionKey.OP_READ;
- if (isOutputShutdown())
- newInterestOps &= ~SelectionKey.OP_WRITE;
-
- if (newInterestOps != oldInterestOps)
+ State current = _interestState.get();
+ switch (current)
{
- if (_interestOps.compareAndSet(oldInterestOps, newInterestOps))
+ case SELECTING:
{
- if (LOG.isDebugEnabled())
- LOG.debug("Local interests updated {} -> {} for {}", oldInterestOps, newInterestOps, this);
- _selector.updateKey(_updateTask);
+ // When a whole cycle triggered by changeInterests()
+ // happens, we finish the job by updating the key.
+ setKeyInterests();
+ return;
}
- else
+ case PENDING:
{
- if (LOG.isDebugEnabled())
- LOG.debug("Local interests update conflict: now {}, was {}, attempted {} for {}", _interestOps.get(), oldInterestOps, newInterestOps, this);
- continue;
+ if (!_interestState.compareAndSet(current, State.UPDATING))
+ continue;
+ break;
+ }
+ case UPDATING:
+ {
+ // Set the key interest as expected.
+ setKeyInterests();
+ if (!_interestState.compareAndSet(current, State.SELECTING))
+ throw new IllegalStateException();
+ return;
+ }
+ case CHANGING:
+ {
+ // We lost the race to update _interestOps,
+ // let changeInterests() perform the update.
+ return;
+ }
+ default:
+ {
+ throw new IllegalStateException();
}
}
- else
+ }
+ }
+
+ private boolean changeInterests(int operation, boolean add)
+ {
+ /**
+ * This method may run concurrently with {@link #updateKey()}.
+ */
+
+ boolean pending = false;
+ boolean changed = true;
+ while (true)
+ {
+ State current = _interestState.get();
+ switch (current)
{
- if (LOG.isDebugEnabled())
- LOG.debug("Ignoring local interests update {} -> {} for {}", oldInterestOps, newInterestOps, this);
+ case SELECTING:
+ case PENDING:
+ {
+ if (!_interestState.compareAndSet(current, State.CHANGING))
+ continue;
+ pending = current == State.PENDING;
+ break;
+ }
+ case UPDATING:
+ {
+ // We lost the race to update _interestOps, but we
+ // must update it nonetheless, so yield and spin,
+ // waiting for the state to be SELECTING again.
+ Thread.yield();
+ break;
+ }
+ case CHANGING:
+ {
+ int oldInterestOps = _interestOps;
+ int newInterestOps;
+ if (add)
+ newInterestOps = oldInterestOps | operation;
+ else
+ newInterestOps = oldInterestOps & ~operation;
+
+ if (isInputShutdown())
+ {
+ newInterestOps &= ~SelectionKey.OP_READ;
+ if (add && (operation & SelectionKey.OP_READ) != 0)
+ changed = false;
+ }
+
+ if (isOutputShutdown())
+ {
+ newInterestOps &= ~SelectionKey.OP_WRITE;
+ if (add && (operation & SelectionKey.OP_WRITE) != 0)
+ changed = false;
+ }
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("changeInterests pending={} {}->{} for {}", pending, oldInterestOps, newInterestOps, this);
+
+ if (newInterestOps != oldInterestOps)
+ _interestOps = newInterestOps;
+
+ if (!_interestState.compareAndSet(current, State.SELECTING))
+ throw new IllegalStateException("Invalid state: " + current);
+
+ // We only update the key if updateKey() does not do it for us,
+ // because doing it from the selector thread is less expensive.
+ // This must be done after CASing the state above, otherwise the
+ // selector may select and call onSelected() concurrently.
+ submitKeyUpdate(!pending);
+
+ return changed;
+ }
+ default:
+ {
+ throw new IllegalStateException();
+ }
}
- break;
}
}
+ protected void submitKeyUpdate(boolean submit)
+ {
+ if (submit)
+ _selector.updateKey(_updateTask);
+ }
- private void setKeyInterests(int oldInterestOps, int newInterestOps)
+ private void setKeyInterests()
{
+ int oldInterestOps = _key.interestOps();
+ int newInterestOps = _interestOps;
if (LOG.isDebugEnabled())
- LOG.debug("Key interests updated {} -> {}", oldInterestOps, newInterestOps);
- _key.interestOps(newInterestOps);
+ LOG.debug("Key interests update {} -> {}", oldInterestOps, newInterestOps);
+ if (oldInterestOps != newInterestOps)
+ _key.interestOps(newInterestOps);
}
@Override
@@ -199,13 +298,18 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
int keyReadiness = valid ? _key.readyOps() : -1;
return String.format("%s{io=%d,kio=%d,kro=%d}",
super.toString(),
- _interestOps.get(),
+ _interestOps,
keyInterests,
keyReadiness);
}
catch (CancelledKeyException x)
{
- return String.format("%s{io=%s,kio=-2,kro=-2}", super.toString(), _interestOps.get());
+ return String.format("%s{io=%s,kio=-2,kro=-2}", super.toString(), _interestOps);
}
}
+
+ private enum State
+ {
+ SELECTING, PENDING, UPDATING, CHANGING
+ }
}
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java
index 29e2cd5e8e..c457c35a4e 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java
@@ -650,6 +650,17 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
((EndPoint)attachment).close();
}
}
+
+ // Allow any dispatched tasks to run.
+ Thread.yield();
+
+ // Update the keys.
+ for (SelectionKey key : selectedKeys)
+ {
+ if (key.isValid())
+ updateKey(key);
+ }
+
selectedKeys.clear();
}
catch (Throwable x)
@@ -697,6 +708,30 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
}
}
+ private void updateKey(SelectionKey key)
+ {
+ Object attachment = key.attachment();
+ try
+ {
+ if (attachment instanceof SelectableEndPoint)
+ {
+ ((SelectableEndPoint)attachment).updateKey();
+ }
+ }
+ catch (CancelledKeyException x)
+ {
+ LOG.debug("Ignoring cancelled key for channel {}", key.channel());
+ if (attachment instanceof EndPoint)
+ closeNoExceptions((EndPoint)attachment);
+ }
+ catch (Throwable x)
+ {
+ LOG.warn("Could not process key for channel " + key.channel(), x);
+ if (attachment instanceof EndPoint)
+ closeNoExceptions((EndPoint)attachment);
+ }
+ }
+
private void processConnect(SelectionKey key, Connect connect)
{
SocketChannel channel = (SocketChannel)key.channel();
@@ -1075,15 +1110,21 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
}
/**
- * A {@link SelectableEndPoint} is an {@link EndPoint} that wish to be notified of
- * non-blocking events by the {@link ManagedSelector}.
+ * A {@link SelectableEndPoint} is an {@link EndPoint} that wish to be
+ * notified of non-blocking events by the {@link ManagedSelector}.
*/
public interface SelectableEndPoint extends EndPoint
{
/**
- * <p>Callback method invoked when a read or write events has been detected by the {@link ManagedSelector}
- * for this endpoint.</p>
+ * Callback method invoked when a read or write events has been
+ * detected by the {@link ManagedSelector} for this endpoint.
*/
void onSelected();
+
+ /**
+ * Callback method invoked when all the keys selected by the
+ * {@link ManagedSelector} for this endpoint have been processed.
+ */
+ void updateKey();
}
}

Back to the top