Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGreg Wilkins2015-03-11 04:37:25 +0000
committerGreg Wilkins2015-03-11 04:37:25 +0000
commit870e0ab0b303b37b3c3726e7f5c4954e3f72deac (patch)
treec6aeea7d4993f099f184978c23f4811de013a12c
parent5456de2160f28fef76b2ec29ea62a86ee05e4e35 (diff)
downloadorg.eclipse.jetty.project-870e0ab0b303b37b3c3726e7f5c4954e3f72deac.tar.gz
org.eclipse.jetty.project-870e0ab0b303b37b3c3726e7f5c4954e3f72deac.tar.xz
org.eclipse.jetty.project-870e0ab0b303b37b3c3726e7f5c4954e3f72deac.zip
Converted IteratingCallback to use SpinLock
-rw-r--r--jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java266
1 files changed, 116 insertions, 150 deletions
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java
index e8b82724ae..4faab7c029 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java
@@ -21,6 +21,8 @@ package org.eclipse.jetty.util;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.atomic.AtomicReference;
+import org.eclipse.jetty.util.thread.SpinLock;
+
/**
* This specialized callback implements a pattern that allows
* a large job to be broken into smaller tasks using iteration
@@ -92,12 +94,7 @@ public abstract class IteratingCallback implements Callback
/**
* This callback has been closed and cannot be reset.
*/
- CLOSED,
-
- /**
- * State is locked while leaving processing state to check the iterate boolean
- */
- LOCKED
+ CLOSED
}
/**
@@ -125,18 +122,19 @@ public abstract class IteratingCallback implements Callback
SUCCEEDED
}
- private final AtomicReference<State> _state;
+ private SpinLock _lock = new SpinLock();
+ private State _state;
private boolean _iterate;
protected IteratingCallback()
{
- _state = new AtomicReference<>(State.IDLE);
+ _state = State.IDLE;
}
protected IteratingCallback(boolean needReset)
{
- _state = new AtomicReference<>(needReset ? State.SUCCEEDED : State.IDLE);
+ _state = needReset ? State.SUCCEEDED : State.IDLE;
}
/**
@@ -183,43 +181,40 @@ public abstract class IteratingCallback implements Callback
*/
public void iterate()
{
+ boolean process=false;
+
loop: while (true)
{
- State state=_state.get();
- switch (state)
+ try (SpinLock.Lock lock = _lock.lock())
{
- case PENDING:
- case CALLED:
- // process will be called when callback is handled
- break loop;
-
- case IDLE:
- if (!_state.compareAndSet(state,State.PROCESSING))
- continue;
- processing();
- break loop;
-
- case PROCESSING:
- if (!_state.compareAndSet(state,State.LOCKED))
- continue;
- // Tell the thread that is processing that it must iterate again
- _iterate=true;
- _state.set(State.PROCESSING);
- break loop;
-
- case LOCKED:
- Thread.yield();
- continue loop;
+ switch (_state)
+ {
+ case PENDING:
+ case CALLED:
+ // process will be called when callback is handled
+ break loop;
- case FAILED:
- case SUCCEEDED:
- break loop;
+ case IDLE:
+ _state=State.PROCESSING;
+ process=true;
+ break loop;
- case CLOSED:
- default:
- throw new IllegalStateException("state="+state);
+ case PROCESSING:
+ _iterate=true;
+ break loop;
+
+ case FAILED:
+ case SUCCEEDED:
+ break loop;
+
+ case CLOSED:
+ default:
+ throw new IllegalStateException("state="+_state);
+ }
}
}
+ if (process)
+ processing();
}
private void processing()
@@ -227,6 +222,8 @@ public abstract class IteratingCallback implements Callback
// This should only ever be called when in processing state, however a failed or close call
// may happen concurrently, so state is not assumed.
+ boolean on_complete_success=false;
+
// While we are processing
processing: while (true)
{
@@ -242,13 +239,10 @@ public abstract class IteratingCallback implements Callback
break processing;
}
- // loop until we have successfully acted on the action we have just received
- acting: while(true)
+ // acted on the action we have just received
+ try(SpinLock.Lock lock = _lock.lock())
{
- // action handling needs to know the state
- State state=_state.get();
-
- switch (state)
+ switch (_state)
{
case PROCESSING:
{
@@ -256,67 +250,56 @@ public abstract class IteratingCallback implements Callback
{
case IDLE:
{
- // lock the state
- if (!_state.compareAndSet(state,State.LOCKED))
- continue acting;
-
// Has iterate been called while we were processing?
if (_iterate)
{
// yes, so skip idle and keep processing
_iterate=false;
- _state.set(State.PROCESSING);
+ _state=State.PROCESSING;
continue processing;
}
// No, so we can go idle
- _state.set(State.IDLE);
+ _state=State.IDLE;
break processing;
}
-
+
case SCHEDULED:
- {
- if (!_state.compareAndSet(state, State.PENDING))
- continue acting;
+ {
// we won the race against the callback, so the callback has to process and we can break processing
+ _state=State.PENDING;
break processing;
}
-
+
case SUCCEEDED:
{
- if (!_state.compareAndSet(state, State.LOCKED))
- continue acting;
+ // we lost the race against the callback,
_iterate=false;
- _state.set(State.SUCCEEDED);
- onCompleteSuccess();
+ _state=State.SUCCEEDED;
+ on_complete_success=true;
break processing;
}
default:
- throw new IllegalStateException("state="+state+" action="+action);
+ throw new IllegalStateException("state="+_state+" action="+action);
}
}
-
+
case CALLED:
{
switch (action)
{
case SCHEDULED:
- {
- if (!_state.compareAndSet(state, State.PROCESSING))
- continue acting;
+ {
// we lost the race, so we have to keep processing
+ _state=State.PROCESSING;
continue processing;
}
default:
- throw new IllegalStateException("state="+state+" action="+action);
+ throw new IllegalStateException("state="+_state+" action="+action);
}
}
-
- case LOCKED:
- Thread.yield();
- continue acting;
case SUCCEEDED:
case FAILED:
@@ -326,12 +309,15 @@ public abstract class IteratingCallback implements Callback
case IDLE:
case PENDING:
default:
- throw new IllegalStateException("state="+state+" action="+action);
+ throw new IllegalStateException("state="+_state+" action="+action);
}
}
}
+
+ if (on_complete_success)
+ onCompleteSuccess();
}
-
+
/**
* Invoked when the sub task succeeds.
* Subclasses that override this method must always remember to call
@@ -340,41 +326,36 @@ public abstract class IteratingCallback implements Callback
@Override
public void succeeded()
{
- loop: while (true)
+ boolean process=false;
+ try(SpinLock.Lock lock = _lock.lock())
{
- State state = _state.get();
- switch (state)
+ switch (_state)
{
case PROCESSING:
{
- if (!_state.compareAndSet(state, State.CALLED))
- continue loop;
- break loop;
+ _state=State.CALLED;
+ break;
}
case PENDING:
{
- if (!_state.compareAndSet(state, State.PROCESSING))
- continue loop;
- processing();
- break loop;
+ _state=State.PROCESSING;
+ process=true;
+ break;
}
case CLOSED:
case FAILED:
{
// Too late!
- break loop;
- }
- case LOCKED:
- {
- Thread.yield();
- continue loop;
- }
+ break;
+ }
default:
{
- throw new IllegalStateException("state="+state);
+ throw new IllegalStateException("state="+_state);
}
}
}
+ if (process)
+ processing();
}
/**
@@ -385,73 +366,59 @@ public abstract class IteratingCallback implements Callback
@Override
public void failed(Throwable x)
{
- loop: while (true)
+ boolean failure=false;
+ try(SpinLock.Lock lock = _lock.lock())
{
- State state = _state.get();
- switch (state)
+ switch (_state)
{
case SUCCEEDED:
case FAILED:
case IDLE:
case CLOSED:
case CALLED:
- {
// too late!.
- break loop;
- }
- case LOCKED:
- {
- Thread.yield();
- continue loop;
- }
+ break;
+
case PENDING:
case PROCESSING:
{
- if (!_state.compareAndSet(state, State.FAILED))
- continue loop;
-
- onCompleteFailure(x);
- break loop;
+ _state=State.FAILED;
+ failure=true;
+ break;
}
default:
- throw new IllegalStateException("state="+state);
+ throw new IllegalStateException("state="+_state);
}
}
+ if (failure)
+ onCompleteFailure(x);
}
public void close()
{
- loop: while (true)
+ boolean failure=false;
+ try(SpinLock.Lock lock = _lock.lock())
{
- State state = _state.get();
- switch (state)
+ switch (_state)
{
case IDLE:
case SUCCEEDED:
case FAILED:
- {
- if (!_state.compareAndSet(state, State.CLOSED))
- continue loop;
- break loop;
- }
+ case PROCESSING:
+ _state=State.CLOSED;
+ break;
+
case CLOSED:
- {
- break loop;
- }
- case LOCKED:
- {
- Thread.yield();
- continue loop;
- }
+ break;
+
default:
- {
- if (!_state.compareAndSet(state, State.CLOSED))
- continue loop;
- onCompleteFailure(new ClosedChannelException());
- break loop;
- }
+ _state=State.CLOSED;
+ failure=true;
}
}
+
+ if(failure)
+ onCompleteFailure(new ClosedChannelException());
}
/*
@@ -460,12 +427,18 @@ public abstract class IteratingCallback implements Callback
*/
boolean isIdle()
{
- return _state.get() == State.IDLE;
+ try(SpinLock.Lock lock = _lock.lock())
+ {
+ return _state == State.IDLE;
+ }
}
public boolean isClosed()
{
- return _state.get() == State.CLOSED;
+ try(SpinLock.Lock lock = _lock.lock())
+ {
+ return _state == State.CLOSED;
+ }
}
/**
@@ -473,7 +446,10 @@ public abstract class IteratingCallback implements Callback
*/
public boolean isFailed()
{
- return _state.get() == State.FAILED;
+ try(SpinLock.Lock lock = _lock.lock())
+ {
+ return _state == State.FAILED;
+ }
}
/**
@@ -481,7 +457,10 @@ public abstract class IteratingCallback implements Callback
*/
public boolean isSucceeded()
{
- return _state.get() == State.SUCCEEDED;
+ try(SpinLock.Lock lock = _lock.lock())
+ {
+ return _state == State.SUCCEEDED;
+ }
}
/**
@@ -494,31 +473,18 @@ public abstract class IteratingCallback implements Callback
*/
public boolean reset()
{
- while (true)
+ try(SpinLock.Lock lock = _lock.lock())
{
- State state=_state.get();
- switch(state)
+ switch(_state)
{
case IDLE:
return true;
-
+
case SUCCEEDED:
- if (!_state.compareAndSet(state, State.LOCKED))
- continue;
- _iterate=false;
- _state.set(State.IDLE);
- return true;
-
case FAILED:
- if (!_state.compareAndSet(state, State.LOCKED))
- continue;
_iterate=false;
- _state.set(State.IDLE);
+ _state=State.IDLE;
return true;
-
- case LOCKED:
- Thread.yield();
- continue;
default:
return false;

Back to the top