diff options
author | Greg Wilkins | 2013-12-13 22:14:12 +0000 |
---|---|---|
committer | Greg Wilkins | 2013-12-13 22:14:12 +0000 |
commit | 714148335639d2592b9376dec722d3c7e06887c9 (patch) | |
tree | 84c6f69fe7150450195a336c4f78018e51bfa9e5 | |
parent | 46ef022cf47c19eaea72ccb336ae436e5eaebeff (diff) | |
download | org.eclipse.jetty.project-714148335639d2592b9376dec722d3c7e06887c9.tar.gz org.eclipse.jetty.project-714148335639d2592b9376dec722d3c7e06887c9.tar.xz org.eclipse.jetty.project-714148335639d2592b9376dec722d3c7e06887c9.zip |
424043 - IteratingCallback Idle race
8 files changed, 133 insertions, 99 deletions
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java index 311c9b1068..fc3b900418 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java @@ -674,11 +674,11 @@ public abstract class HttpSender implements AsyncContentProvider.Listener private class ContentCallback extends IteratingCallback { @Override - protected State process() throws Exception + protected Next process() throws Exception { HttpExchange exchange = getHttpExchange(); if (exchange == null) - return State.IDLE; + return Next.IDLE; Request request = exchange.getRequest(); HttpContent content = HttpSender.this.content; @@ -687,21 +687,21 @@ public abstract class HttpSender implements AsyncContentProvider.Listener if (contentBuffer != null) { if (!someToContent(request, contentBuffer)) - return State.IDLE; + return Next.IDLE; } if (content.advance()) { // There is more content to send sendContent(exchange, content, this); - return State.SCHEDULED; + return Next.SCHEDULED; } if (content.isConsumed()) { sendContent(exchange, content, lastCallback); - return State.SCHEDULED; + return Next.SCHEDULED; } while (true) @@ -714,7 +714,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener if (updateSenderState(current, SenderState.IDLE)) { LOG.debug("Waiting for deferred content for {}", request); - return State.IDLE; + return Next.IDLE; } break; } @@ -725,7 +725,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener LOG.debug("Deferred content available for {}", request); // TODO: this case is not covered by tests sendContent(exchange, content, this); - return State.SCHEDULED; + return Next.SCHEDULED; } break; } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 4c41803fdc..c3f42f479e 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -489,7 +489,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http } @Override - public State process() throws Exception + public Next process() throws Exception { ByteBuffer chunk = _chunk; while (true) @@ -569,7 +569,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http } else continue; - return State.SCHEDULED; + return Next.SCHEDULED; } case SHUTDOWN_OUT: { @@ -584,7 +584,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http if (!_lastContent || _content==null || !_content.hasArray() || !_header.hasArray() || _content.array()!=_header.array()) _bufferPool.release(_header); } - return State.SUCCEEDED; + return Next.SUCCEEDED; } case CONTINUE: { @@ -641,7 +641,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http } @Override - public State process() throws Exception + public Next process() throws Exception { ByteBuffer chunk = _chunk; while (true) @@ -686,7 +686,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http } else continue; - return State.SCHEDULED; + return Next.SCHEDULED; } case SHUTDOWN_OUT: { @@ -695,7 +695,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http } case DONE: { - return State.SUCCEEDED; + return Next.SUCCEEDED; } case CONTINUE: { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 9d2e6faeb8..44c5cd206d 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -758,23 +758,23 @@ public class HttpOutput extends ServletOutputStream implements Runnable } @Override - protected State process() + protected Next process() { if (BufferUtil.hasContent(_aggregate)) { _flushed=true; write(_aggregate, false, this); - return State.SCHEDULED; + return Next.SCHEDULED; } if (!_flushed) { _flushed=true; write(BufferUtil.EMPTY_BUFFER,false,this); - return State.SCHEDULED; + return Next.SCHEDULED; } - return State.SUCCEEDED; + return Next.SUCCEEDED; } } @@ -807,21 +807,21 @@ public class HttpOutput extends ServletOutputStream implements Runnable } @Override - protected State process() + protected Next process() { // flush any content from the aggregate if (BufferUtil.hasContent(_aggregate)) { _completed=_len==0; write(_aggregate, _complete && _completed, this); - return State.SCHEDULED; + return Next.SCHEDULED; } // Can we just aggregate the remainder? if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_commitSize) { BufferUtil.put(_buffer,_aggregate); - return State.SUCCEEDED; + return Next.SUCCEEDED; } // Is there data left to write? @@ -832,7 +832,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable { _completed=true; write(_buffer, _complete, this); - return State.SCHEDULED; + return Next.SCHEDULED; } // otherwise take a slice @@ -844,7 +844,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable _slice.position(p); _completed=!_buffer.hasRemaining(); write(_slice, _complete && _completed, this); - return State.SCHEDULED; + return Next.SCHEDULED; } // all content written, but if we have not yet signal completion, we @@ -855,12 +855,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable { _completed=true; write(BufferUtil.EMPTY_BUFFER, _complete, this); - return State.SCHEDULED; + return Next.SCHEDULED; } closed(); } - return State.SUCCEEDED; + return Next.SUCCEEDED; } } @@ -887,7 +887,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable } @Override - protected State process() throws Exception + protected Next process() throws Exception { // Only return if EOF has previously been read and thus // a write done with EOF=true @@ -897,7 +897,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable _in.close(); closed(); _channel.getByteBufferPool().release(_buffer); - return State.SUCCEEDED; + return Next.SUCCEEDED; } // Read until buffer full or EOF @@ -915,7 +915,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable _buffer.position(0); _buffer.limit(len); write(_buffer,_eof,this); - return State.SCHEDULED; + return Next.SCHEDULED; } @Override @@ -958,7 +958,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable } @Override - protected State process() throws Exception + protected Next process() throws Exception { // Only return if EOF has previously been read and thus // a write done with EOF=true @@ -967,7 +967,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable _in.close(); closed(); _channel.getByteBufferPool().release(_buffer); - return State.SUCCEEDED; + return Next.SUCCEEDED; } // Read from stream until buffer full or EOF @@ -979,7 +979,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable _buffer.flip(); write(_buffer,_eof,this); - return State.SCHEDULED; + return Next.SCHEDULED; } @Override diff --git a/jetty-servlets/src/main/java/org/eclipse/jetty/servlets/gzip/GzipHttpOutput.java b/jetty-servlets/src/main/java/org/eclipse/jetty/servlets/gzip/GzipHttpOutput.java index 2464924cc1..430c4f4791 100644 --- a/jetty-servlets/src/main/java/org/eclipse/jetty/servlets/gzip/GzipHttpOutput.java +++ b/jetty-servlets/src/main/java/org/eclipse/jetty/servlets/gzip/GzipHttpOutput.java @@ -297,7 +297,7 @@ public class GzipHttpOutput extends HttpOutput } @Override - protected State process() throws Exception + protected Next process() throws Exception { if (_deflater.needsInput()) { @@ -307,11 +307,11 @@ public class GzipHttpOutput extends HttpOutput _deflater=null; getHttpChannel().getByteBufferPool().release(_buffer); _buffer=null; - return State.SUCCEEDED; + return Next.SUCCEEDED; } if (!_complete) - return State.SUCCEEDED; + return Next.SUCCEEDED; } BufferUtil.compact(_buffer); @@ -324,7 +324,7 @@ public class GzipHttpOutput extends HttpOutput addTrailer(); superWrite(_buffer,complete,this); - return State.SCHEDULED; + return Next.SCHEDULED; } } @@ -342,7 +342,7 @@ public class GzipHttpOutput extends HttpOutput } @Override - protected State process() throws Exception + protected Next process() throws Exception { if (_deflater.needsInput()) { @@ -354,11 +354,11 @@ public class GzipHttpOutput extends HttpOutput _deflater=null; getHttpChannel().getByteBufferPool().release(_buffer); _buffer=null; - return State.SUCCEEDED; + return Next.SUCCEEDED; } if (!_complete) - return State.SUCCEEDED; + return Next.SUCCEEDED; } else { @@ -389,7 +389,7 @@ public class GzipHttpOutput extends HttpOutput addTrailer(); superWrite(_buffer,complete,this); - return State.SCHEDULED; + return Next.SCHEDULED; } } diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Flusher.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Flusher.java index 0be5f941d1..79cc0faefc 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Flusher.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Flusher.java @@ -143,7 +143,7 @@ public class Flusher private final Set<IStream> stalled = new HashSet<>(); @Override - protected State process() throws Exception + protected Next process() throws Exception { synchronized (lock) { @@ -194,7 +194,7 @@ public class Flusher } if (active.size() == 0) - return State.IDLE; + return Next.IDLE; // Get the bytes to write ByteBuffer[] buffers = new ByteBuffer[active.size()]; @@ -213,7 +213,7 @@ public class Flusher // MAX_GATHER parameter, and/or autotune the buffer returned // by FrameBytes.getByteBuffer() (see also comment there). - return State.SCHEDULED; + return Next.SCHEDULED; } @Override diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java index b89e4b2f4f..591257b095 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java @@ -44,7 +44,8 @@ import java.util.concurrent.atomic.AtomicReference; */ public abstract class IteratingCallback implements Callback { - protected enum State { IDLE, SCHEDULED, ITERATING, SUCCEEDED, FAILED }; + protected enum Next { IDLE, SCHEDULED, SUCCEEDED, FAILED }; + protected enum State { IDLE, SCHEDULED, ITERATING, ITERATE_AGAIN, SUCCEEDED, FAILED }; private final AtomicReference<State> _state = new AtomicReference<>(State.IDLE); public IteratingCallback() @@ -66,7 +67,7 @@ public abstract class IteratingCallback implements Callback * * @throws Exception */ - abstract protected State process() throws Exception; + abstract protected Next process() throws Exception; /* ------------------------------------------------------------ */ @@ -81,42 +82,65 @@ public abstract class IteratingCallback implements Callback { try { - // Keep iterating as long as succeeded() is called during process() - // If we are in WAITING state, either this is the first iteration or - // succeeded()/failed() were called already. - while(_state.compareAndSet(State.IDLE,State.ITERATING)) + while(true) { - State next = process(); - switch (next) + switch (_state.get()) { - case SUCCEEDED: - // The task has complete, there should have been no callbacks - if (!_state.compareAndSet(State.ITERATING,State.SUCCEEDED)) - throw new IllegalStateException("state="+_state.get()); - completed(); - return; - - case SCHEDULED: - // This callback has been scheduled, so it may or may not have - // already been called back. Let's find out - if (_state.compareAndSet(State.ITERATING,State.SCHEDULED)) - // not called back yet, so lets wait for it - return; - // call back must have happened, so lets iterate - continue; - case IDLE: - // No more progress can be made. Wait for another call to iterate - if (!_state.compareAndSet(State.ITERATING,State.IDLE)) - throw new IllegalStateException("state="+_state.get()); - return; - - case FAILED: - _state.set(State.FAILED); - return; + // Keep iterating as long as succeeded() is called during process() + // If we are in WAITING state, either this is the first iteration or + // succeeded()/failed() were called already. + while(_state.compareAndSet(State.IDLE,State.ITERATING)) + { + Next next = process(); + switch (next) + { + case SUCCEEDED: + // The task has complete, there should have been no callbacks + // Can double CaS here because state never changes directly ITERATING_AGAIN --> ITERATE + if (!_state.compareAndSet(State.ITERATING,State.SUCCEEDED)&& + !_state.compareAndSet(State.ITERATE_AGAIN,State.SUCCEEDED)) + throw new IllegalStateException("state="+_state.get()); + completed(); + return; + + case SCHEDULED: + // This callback has been scheduled, so it may or may not have + // already been called back. Let's find out + // Can double CaS here because state never changes directly ITERATING_AGAIN --> ITERATE + if (_state.compareAndSet(State.ITERATING,State.SCHEDULED) || + _state.compareAndSet(State.ITERATE_AGAIN,State.SCHEDULED)) + // not called back yet, so lets wait for it + return; + // call back must have happened, so lets iterate + continue; + + case IDLE: + // No more progress can be made by this call to iterate + if (_state.compareAndSet(State.ITERATING,State.IDLE)) + return; + // was iterate called again since we already decided to go IDLE? + if (_state.compareAndSet(State.ITERATE_AGAIN,State.IDLE)) + continue; // Try another iteration as more work may have been added while previous process was returning + throw new IllegalStateException("state="+_state.get()); + + case FAILED: + _state.set(State.FAILED); + return; + + default: + throw new IllegalStateException("state="+_state.get()+" next="+next); + } + } + break; + + case ITERATING: + if (_state.compareAndSet(State.ITERATING,State.ITERATE_AGAIN)) + return; + break; default: - throw new IllegalStateException("state="+_state.get()+" next="+next); + return; } } } @@ -140,6 +164,11 @@ public abstract class IteratingCallback implements Callback { switch(_state.get()) { + case ITERATE_AGAIN: + if (_state.compareAndSet(State.ITERATE_AGAIN,State.IDLE)) + break loop; + continue; + case ITERATING: if (_state.compareAndSet(State.ITERATING,State.IDLE)) break loop; @@ -174,6 +203,11 @@ public abstract class IteratingCallback implements Callback { switch(_state.get()) { + case ITERATE_AGAIN: + if (_state.compareAndSet(State.ITERATE_AGAIN,State.FAILED)) + break loop; + continue; + case ITERATING: if (_state.compareAndSet(State.ITERATING,State.FAILED)) break loop; diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java index 1896d8a0d9..fe3319e818 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java @@ -54,15 +54,15 @@ public class IteratingCallbackTest int i=10; @Override - protected State process() throws Exception + protected Next process() throws Exception { processed++; if (i-->1) { succeeded(); // fake a completed IO operation - return State.SCHEDULED; + return Next.SCHEDULED; } - return State.SUCCEEDED; + return Next.SUCCEEDED; } }; @@ -81,15 +81,15 @@ public class IteratingCallbackTest int i=4; @Override - protected State process() throws Exception + protected Next process() throws Exception { processed++; if (i-->1) { scheduler.schedule(successTask,50,TimeUnit.MILLISECONDS); - return State.SCHEDULED; + return Next.SCHEDULED; } - return State.SUCCEEDED; + return Next.SUCCEEDED; } }; @@ -108,15 +108,15 @@ public class IteratingCallbackTest int i=4; @Override - protected State process() throws Exception + protected Next process() throws Exception { processed++; if (i-->1) { scheduler.schedule(successTask,50,TimeUnit.MILLISECONDS); - return State.SCHEDULED; + return Next.SCHEDULED; } - return State.SUCCEEDED; + return Next.SUCCEEDED; } }; @@ -145,7 +145,7 @@ public class IteratingCallbackTest int i=10; @Override - protected State process() throws Exception + protected Next process() throws Exception { processed++; if (i-->1) @@ -154,9 +154,9 @@ public class IteratingCallbackTest succeeded(); // fake a completed IO operation else failed(new Exception("testing")); - return State.SCHEDULED; + return Next.SCHEDULED; } - return State.SUCCEEDED; + return Next.SUCCEEDED; } }; @@ -173,15 +173,15 @@ public class IteratingCallbackTest int i=4; @Override - protected State process() throws Exception + protected Next process() throws Exception { processed++; if (i-->1) { scheduler.schedule(i>2?successTask:failTask,50,TimeUnit.MILLISECONDS); - return State.SCHEDULED; + return Next.SCHEDULED; } - return State.SUCCEEDED; + return Next.SUCCEEDED; } }; @@ -202,7 +202,7 @@ public class IteratingCallbackTest int i=5; @Override - protected State process() + protected Next process() { processed++; @@ -210,11 +210,11 @@ public class IteratingCallbackTest { case 5: succeeded(); - return State.SCHEDULED; + return Next.SCHEDULED; case 4: scheduler.schedule(successTask,5,TimeUnit.MILLISECONDS); - return State.SCHEDULED; + return Next.SCHEDULED; case 3: scheduler.schedule(new Runnable() @@ -225,18 +225,18 @@ public class IteratingCallbackTest idle.countDown(); } },5,TimeUnit.MILLISECONDS); - return State.IDLE; + return Next.IDLE; case 2: succeeded(); - return State.SCHEDULED; + return Next.SCHEDULED; case 1: scheduler.schedule(successTask,5,TimeUnit.MILLISECONDS); - return State.SCHEDULED; + return Next.SCHEDULED; case 0: - return State.SUCCEEDED; + return Next.SUCCEEDED; default: throw new IllegalStateException(); diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java index f8609da2b0..3f40792cc8 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java @@ -217,7 +217,7 @@ public class FrameFlusher } @Override - protected State process() throws Exception + protected Next process() throws Exception { synchronized (lock) { @@ -241,11 +241,11 @@ public class FrameFlusher } if (buffers.size()==0) - return State.IDLE; + return Next.IDLE; endpoint.write(this,buffers.toArray(new ByteBuffer[buffers.size()])); buffers.clear(); - return State.SCHEDULED; + return Next.SCHEDULED; } @Override |