Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGreg Wilkins2013-12-13 22:14:12 +0000
committerGreg Wilkins2013-12-13 22:14:12 +0000
commit714148335639d2592b9376dec722d3c7e06887c9 (patch)
tree84c6f69fe7150450195a336c4f78018e51bfa9e5
parent46ef022cf47c19eaea72ccb336ae436e5eaebeff (diff)
downloadorg.eclipse.jetty.project-714148335639d2592b9376dec722d3c7e06887c9.tar.gz
org.eclipse.jetty.project-714148335639d2592b9376dec722d3c7e06887c9.tar.xz
org.eclipse.jetty.project-714148335639d2592b9376dec722d3c7e06887c9.zip
424043 - IteratingCallback Idle race
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java14
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java12
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java34
-rw-r--r--jetty-servlets/src/main/java/org/eclipse/jetty/servlets/gzip/GzipHttpOutput.java16
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Flusher.java6
-rw-r--r--jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java100
-rw-r--r--jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java44
-rw-r--r--jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java6
8 files changed, 133 insertions, 99 deletions
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java
index 311c9b1068..fc3b900418 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java
@@ -674,11 +674,11 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
private class ContentCallback extends IteratingCallback
{
@Override
- protected State process() throws Exception
+ protected Next process() throws Exception
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
- return State.IDLE;
+ return Next.IDLE;
Request request = exchange.getRequest();
HttpContent content = HttpSender.this.content;
@@ -687,21 +687,21 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (contentBuffer != null)
{
if (!someToContent(request, contentBuffer))
- return State.IDLE;
+ return Next.IDLE;
}
if (content.advance())
{
// There is more content to send
sendContent(exchange, content, this);
- return State.SCHEDULED;
+ return Next.SCHEDULED;
}
if (content.isConsumed())
{
sendContent(exchange, content, lastCallback);
- return State.SCHEDULED;
+ return Next.SCHEDULED;
}
while (true)
@@ -714,7 +714,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (updateSenderState(current, SenderState.IDLE))
{
LOG.debug("Waiting for deferred content for {}", request);
- return State.IDLE;
+ return Next.IDLE;
}
break;
}
@@ -725,7 +725,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
LOG.debug("Deferred content available for {}", request);
// TODO: this case is not covered by tests
sendContent(exchange, content, this);
- return State.SCHEDULED;
+ return Next.SCHEDULED;
}
break;
}
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java
index 4c41803fdc..c3f42f479e 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java
@@ -489,7 +489,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
@Override
- public State process() throws Exception
+ public Next process() throws Exception
{
ByteBuffer chunk = _chunk;
while (true)
@@ -569,7 +569,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
else
continue;
- return State.SCHEDULED;
+ return Next.SCHEDULED;
}
case SHUTDOWN_OUT:
{
@@ -584,7 +584,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
if (!_lastContent || _content==null || !_content.hasArray() || !_header.hasArray() || _content.array()!=_header.array())
_bufferPool.release(_header);
}
- return State.SUCCEEDED;
+ return Next.SUCCEEDED;
}
case CONTINUE:
{
@@ -641,7 +641,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
@Override
- public State process() throws Exception
+ public Next process() throws Exception
{
ByteBuffer chunk = _chunk;
while (true)
@@ -686,7 +686,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
else
continue;
- return State.SCHEDULED;
+ return Next.SCHEDULED;
}
case SHUTDOWN_OUT:
{
@@ -695,7 +695,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
case DONE:
{
- return State.SUCCEEDED;
+ return Next.SUCCEEDED;
}
case CONTINUE:
{
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
index 9d2e6faeb8..44c5cd206d 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
@@ -758,23 +758,23 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
@Override
- protected State process()
+ protected Next process()
{
if (BufferUtil.hasContent(_aggregate))
{
_flushed=true;
write(_aggregate, false, this);
- return State.SCHEDULED;
+ return Next.SCHEDULED;
}
if (!_flushed)
{
_flushed=true;
write(BufferUtil.EMPTY_BUFFER,false,this);
- return State.SCHEDULED;
+ return Next.SCHEDULED;
}
- return State.SUCCEEDED;
+ return Next.SUCCEEDED;
}
}
@@ -807,21 +807,21 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
@Override
- protected State process()
+ protected Next process()
{
// flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate))
{
_completed=_len==0;
write(_aggregate, _complete && _completed, this);
- return State.SCHEDULED;
+ return Next.SCHEDULED;
}
// Can we just aggregate the remainder?
if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_commitSize)
{
BufferUtil.put(_buffer,_aggregate);
- return State.SUCCEEDED;
+ return Next.SUCCEEDED;
}
// Is there data left to write?
@@ -832,7 +832,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
_completed=true;
write(_buffer, _complete, this);
- return State.SCHEDULED;
+ return Next.SCHEDULED;
}
// otherwise take a slice
@@ -844,7 +844,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_slice.position(p);
_completed=!_buffer.hasRemaining();
write(_slice, _complete && _completed, this);
- return State.SCHEDULED;
+ return Next.SCHEDULED;
}
// all content written, but if we have not yet signal completion, we
@@ -855,12 +855,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable
{
_completed=true;
write(BufferUtil.EMPTY_BUFFER, _complete, this);
- return State.SCHEDULED;
+ return Next.SCHEDULED;
}
closed();
}
- return State.SUCCEEDED;
+ return Next.SUCCEEDED;
}
}
@@ -887,7 +887,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
@Override
- protected State process() throws Exception
+ protected Next process() throws Exception
{
// Only return if EOF has previously been read and thus
// a write done with EOF=true
@@ -897,7 +897,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_in.close();
closed();
_channel.getByteBufferPool().release(_buffer);
- return State.SUCCEEDED;
+ return Next.SUCCEEDED;
}
// Read until buffer full or EOF
@@ -915,7 +915,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_buffer.position(0);
_buffer.limit(len);
write(_buffer,_eof,this);
- return State.SCHEDULED;
+ return Next.SCHEDULED;
}
@Override
@@ -958,7 +958,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
@Override
- protected State process() throws Exception
+ protected Next process() throws Exception
{
// Only return if EOF has previously been read and thus
// a write done with EOF=true
@@ -967,7 +967,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_in.close();
closed();
_channel.getByteBufferPool().release(_buffer);
- return State.SUCCEEDED;
+ return Next.SUCCEEDED;
}
// Read from stream until buffer full or EOF
@@ -979,7 +979,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_buffer.flip();
write(_buffer,_eof,this);
- return State.SCHEDULED;
+ return Next.SCHEDULED;
}
@Override
diff --git a/jetty-servlets/src/main/java/org/eclipse/jetty/servlets/gzip/GzipHttpOutput.java b/jetty-servlets/src/main/java/org/eclipse/jetty/servlets/gzip/GzipHttpOutput.java
index 2464924cc1..430c4f4791 100644
--- a/jetty-servlets/src/main/java/org/eclipse/jetty/servlets/gzip/GzipHttpOutput.java
+++ b/jetty-servlets/src/main/java/org/eclipse/jetty/servlets/gzip/GzipHttpOutput.java
@@ -297,7 +297,7 @@ public class GzipHttpOutput extends HttpOutput
}
@Override
- protected State process() throws Exception
+ protected Next process() throws Exception
{
if (_deflater.needsInput())
{
@@ -307,11 +307,11 @@ public class GzipHttpOutput extends HttpOutput
_deflater=null;
getHttpChannel().getByteBufferPool().release(_buffer);
_buffer=null;
- return State.SUCCEEDED;
+ return Next.SUCCEEDED;
}
if (!_complete)
- return State.SUCCEEDED;
+ return Next.SUCCEEDED;
}
BufferUtil.compact(_buffer);
@@ -324,7 +324,7 @@ public class GzipHttpOutput extends HttpOutput
addTrailer();
superWrite(_buffer,complete,this);
- return State.SCHEDULED;
+ return Next.SCHEDULED;
}
}
@@ -342,7 +342,7 @@ public class GzipHttpOutput extends HttpOutput
}
@Override
- protected State process() throws Exception
+ protected Next process() throws Exception
{
if (_deflater.needsInput())
{
@@ -354,11 +354,11 @@ public class GzipHttpOutput extends HttpOutput
_deflater=null;
getHttpChannel().getByteBufferPool().release(_buffer);
_buffer=null;
- return State.SUCCEEDED;
+ return Next.SUCCEEDED;
}
if (!_complete)
- return State.SUCCEEDED;
+ return Next.SUCCEEDED;
}
else
{
@@ -389,7 +389,7 @@ public class GzipHttpOutput extends HttpOutput
addTrailer();
superWrite(_buffer,complete,this);
- return State.SCHEDULED;
+ return Next.SCHEDULED;
}
}
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Flusher.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Flusher.java
index 0be5f941d1..79cc0faefc 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Flusher.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Flusher.java
@@ -143,7 +143,7 @@ public class Flusher
private final Set<IStream> stalled = new HashSet<>();
@Override
- protected State process() throws Exception
+ protected Next process() throws Exception
{
synchronized (lock)
{
@@ -194,7 +194,7 @@ public class Flusher
}
if (active.size() == 0)
- return State.IDLE;
+ return Next.IDLE;
// Get the bytes to write
ByteBuffer[] buffers = new ByteBuffer[active.size()];
@@ -213,7 +213,7 @@ public class Flusher
// MAX_GATHER parameter, and/or autotune the buffer returned
// by FrameBytes.getByteBuffer() (see also comment there).
- return State.SCHEDULED;
+ return Next.SCHEDULED;
}
@Override
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java
index b89e4b2f4f..591257b095 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java
@@ -44,7 +44,8 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public abstract class IteratingCallback implements Callback
{
- protected enum State { IDLE, SCHEDULED, ITERATING, SUCCEEDED, FAILED };
+ protected enum Next { IDLE, SCHEDULED, SUCCEEDED, FAILED };
+ protected enum State { IDLE, SCHEDULED, ITERATING, ITERATE_AGAIN, SUCCEEDED, FAILED };
private final AtomicReference<State> _state = new AtomicReference<>(State.IDLE);
public IteratingCallback()
@@ -66,7 +67,7 @@ public abstract class IteratingCallback implements Callback
*
* @throws Exception
*/
- abstract protected State process() throws Exception;
+ abstract protected Next process() throws Exception;
/* ------------------------------------------------------------ */
@@ -81,42 +82,65 @@ public abstract class IteratingCallback implements Callback
{
try
{
- // Keep iterating as long as succeeded() is called during process()
- // If we are in WAITING state, either this is the first iteration or
- // succeeded()/failed() were called already.
- while(_state.compareAndSet(State.IDLE,State.ITERATING))
+ while(true)
{
- State next = process();
- switch (next)
+ switch (_state.get())
{
- case SUCCEEDED:
- // The task has complete, there should have been no callbacks
- if (!_state.compareAndSet(State.ITERATING,State.SUCCEEDED))
- throw new IllegalStateException("state="+_state.get());
- completed();
- return;
-
- case SCHEDULED:
- // This callback has been scheduled, so it may or may not have
- // already been called back. Let's find out
- if (_state.compareAndSet(State.ITERATING,State.SCHEDULED))
- // not called back yet, so lets wait for it
- return;
- // call back must have happened, so lets iterate
- continue;
-
case IDLE:
- // No more progress can be made. Wait for another call to iterate
- if (!_state.compareAndSet(State.ITERATING,State.IDLE))
- throw new IllegalStateException("state="+_state.get());
- return;
-
- case FAILED:
- _state.set(State.FAILED);
- return;
+ // Keep iterating as long as succeeded() is called during process()
+ // If we are in WAITING state, either this is the first iteration or
+ // succeeded()/failed() were called already.
+ while(_state.compareAndSet(State.IDLE,State.ITERATING))
+ {
+ Next next = process();
+ switch (next)
+ {
+ case SUCCEEDED:
+ // The task has complete, there should have been no callbacks
+ // Can double CaS here because state never changes directly ITERATING_AGAIN --> ITERATE
+ if (!_state.compareAndSet(State.ITERATING,State.SUCCEEDED)&&
+ !_state.compareAndSet(State.ITERATE_AGAIN,State.SUCCEEDED))
+ throw new IllegalStateException("state="+_state.get());
+ completed();
+ return;
+
+ case SCHEDULED:
+ // This callback has been scheduled, so it may or may not have
+ // already been called back. Let's find out
+ // Can double CaS here because state never changes directly ITERATING_AGAIN --> ITERATE
+ if (_state.compareAndSet(State.ITERATING,State.SCHEDULED) ||
+ _state.compareAndSet(State.ITERATE_AGAIN,State.SCHEDULED))
+ // not called back yet, so lets wait for it
+ return;
+ // call back must have happened, so lets iterate
+ continue;
+
+ case IDLE:
+ // No more progress can be made by this call to iterate
+ if (_state.compareAndSet(State.ITERATING,State.IDLE))
+ return;
+ // was iterate called again since we already decided to go IDLE?
+ if (_state.compareAndSet(State.ITERATE_AGAIN,State.IDLE))
+ continue; // Try another iteration as more work may have been added while previous process was returning
+ throw new IllegalStateException("state="+_state.get());
+
+ case FAILED:
+ _state.set(State.FAILED);
+ return;
+
+ default:
+ throw new IllegalStateException("state="+_state.get()+" next="+next);
+ }
+ }
+ break;
+
+ case ITERATING:
+ if (_state.compareAndSet(State.ITERATING,State.ITERATE_AGAIN))
+ return;
+ break;
default:
- throw new IllegalStateException("state="+_state.get()+" next="+next);
+ return;
}
}
}
@@ -140,6 +164,11 @@ public abstract class IteratingCallback implements Callback
{
switch(_state.get())
{
+ case ITERATE_AGAIN:
+ if (_state.compareAndSet(State.ITERATE_AGAIN,State.IDLE))
+ break loop;
+ continue;
+
case ITERATING:
if (_state.compareAndSet(State.ITERATING,State.IDLE))
break loop;
@@ -174,6 +203,11 @@ public abstract class IteratingCallback implements Callback
{
switch(_state.get())
{
+ case ITERATE_AGAIN:
+ if (_state.compareAndSet(State.ITERATE_AGAIN,State.FAILED))
+ break loop;
+ continue;
+
case ITERATING:
if (_state.compareAndSet(State.ITERATING,State.FAILED))
break loop;
diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java
index 1896d8a0d9..fe3319e818 100644
--- a/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java
+++ b/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java
@@ -54,15 +54,15 @@ public class IteratingCallbackTest
int i=10;
@Override
- protected State process() throws Exception
+ protected Next process() throws Exception
{
processed++;
if (i-->1)
{
succeeded(); // fake a completed IO operation
- return State.SCHEDULED;
+ return Next.SCHEDULED;
}
- return State.SUCCEEDED;
+ return Next.SUCCEEDED;
}
};
@@ -81,15 +81,15 @@ public class IteratingCallbackTest
int i=4;
@Override
- protected State process() throws Exception
+ protected Next process() throws Exception
{
processed++;
if (i-->1)
{
scheduler.schedule(successTask,50,TimeUnit.MILLISECONDS);
- return State.SCHEDULED;
+ return Next.SCHEDULED;
}
- return State.SUCCEEDED;
+ return Next.SUCCEEDED;
}
};
@@ -108,15 +108,15 @@ public class IteratingCallbackTest
int i=4;
@Override
- protected State process() throws Exception
+ protected Next process() throws Exception
{
processed++;
if (i-->1)
{
scheduler.schedule(successTask,50,TimeUnit.MILLISECONDS);
- return State.SCHEDULED;
+ return Next.SCHEDULED;
}
- return State.SUCCEEDED;
+ return Next.SUCCEEDED;
}
};
@@ -145,7 +145,7 @@ public class IteratingCallbackTest
int i=10;
@Override
- protected State process() throws Exception
+ protected Next process() throws Exception
{
processed++;
if (i-->1)
@@ -154,9 +154,9 @@ public class IteratingCallbackTest
succeeded(); // fake a completed IO operation
else
failed(new Exception("testing"));
- return State.SCHEDULED;
+ return Next.SCHEDULED;
}
- return State.SUCCEEDED;
+ return Next.SUCCEEDED;
}
};
@@ -173,15 +173,15 @@ public class IteratingCallbackTest
int i=4;
@Override
- protected State process() throws Exception
+ protected Next process() throws Exception
{
processed++;
if (i-->1)
{
scheduler.schedule(i>2?successTask:failTask,50,TimeUnit.MILLISECONDS);
- return State.SCHEDULED;
+ return Next.SCHEDULED;
}
- return State.SUCCEEDED;
+ return Next.SUCCEEDED;
}
};
@@ -202,7 +202,7 @@ public class IteratingCallbackTest
int i=5;
@Override
- protected State process()
+ protected Next process()
{
processed++;
@@ -210,11 +210,11 @@ public class IteratingCallbackTest
{
case 5:
succeeded();
- return State.SCHEDULED;
+ return Next.SCHEDULED;
case 4:
scheduler.schedule(successTask,5,TimeUnit.MILLISECONDS);
- return State.SCHEDULED;
+ return Next.SCHEDULED;
case 3:
scheduler.schedule(new Runnable()
@@ -225,18 +225,18 @@ public class IteratingCallbackTest
idle.countDown();
}
},5,TimeUnit.MILLISECONDS);
- return State.IDLE;
+ return Next.IDLE;
case 2:
succeeded();
- return State.SCHEDULED;
+ return Next.SCHEDULED;
case 1:
scheduler.schedule(successTask,5,TimeUnit.MILLISECONDS);
- return State.SCHEDULED;
+ return Next.SCHEDULED;
case 0:
- return State.SUCCEEDED;
+ return Next.SUCCEEDED;
default:
throw new IllegalStateException();
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java
index f8609da2b0..3f40792cc8 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java
@@ -217,7 +217,7 @@ public class FrameFlusher
}
@Override
- protected State process() throws Exception
+ protected Next process() throws Exception
{
synchronized (lock)
{
@@ -241,11 +241,11 @@ public class FrameFlusher
}
if (buffers.size()==0)
- return State.IDLE;
+ return Next.IDLE;
endpoint.write(this,buffers.toArray(new ByteBuffer[buffers.size()]));
buffers.clear();
- return State.SCHEDULED;
+ return Next.SCHEDULED;
}
@Override

Back to the top