Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGreg Wilkins2013-11-21 04:11:56 +0000
committerGreg Wilkins2013-11-21 04:11:56 +0000
commit0a52c64d161068e9722f7c9c00cb0014170505fc (patch)
treeba02d2f822005c32f0bab7ea9a9f2c8f5c248276 /jetty-spdy/spdy-core/src
parent1eb2997efdd3a255d4df0b09667624f337ee2c77 (diff)
downloadorg.eclipse.jetty.project-0a52c64d161068e9722f7c9c00cb0014170505fc.tar.gz
org.eclipse.jetty.project-0a52c64d161068e9722f7c9c00cb0014170505fc.tar.xz
org.eclipse.jetty.project-0a52c64d161068e9722f7c9c00cb0014170505fc.zip
421697 - IteratingCallback improvements
Use the iteratingcallback for websocket use gather writes for websocket always write entire websocket payload
Diffstat (limited to 'jetty-spdy/spdy-core/src')
-rw-r--r--jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Flusher.java53
-rw-r--r--jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java2
2 files changed, 24 insertions, 31 deletions
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Flusher.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Flusher.java
index 3981aeb2aa..78a72935af 100644
--- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Flusher.java
+++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Flusher.java
@@ -38,7 +38,7 @@ public class Flusher
private static final Logger LOG = Log.getLogger(Flusher.class);
private static final int MAX_GATHER = 10;
- private final IteratingCallback iteratingCallback = new SessionIteratingCallback();
+ private final FlusherCB flusherCB = new FlusherCB();
private final Controller controller;
private final Object lock = new Object();
private final ArrayQueue<StandardSession.FrameBytes> queue = new ArrayQueue<>(lock);
@@ -124,7 +124,7 @@ public class Flusher
void flush()
{
- iteratingCallback.iterate();
+ flusherCB.iterate();
}
public int getQueueSize()
@@ -135,28 +135,25 @@ public class Flusher
}
}
- private class SessionIteratingCallback extends IteratingCallback
+ private class FlusherCB extends IteratingCallback
{
- private final List<StandardSession.FrameBytes> active = new ArrayList<>();
+ // TODO should active and succeeded be local?
+ private final List<StandardSession.FrameBytes> active = new ArrayList<>(MAX_GATHER);
+ private final List<StandardSession.FrameBytes> succeeded = new ArrayList<>(MAX_GATHER);
private final Set<IStream> stalled = new HashSet<>();
@Override
protected State process() throws Exception
{
- StandardSession.FrameBytes frameBytes = null;
synchronized (lock)
{
- if (active.size()>0)
- throw new IllegalStateException();
+ succeeded.clear();
- if (queue.isEmpty())
- return State.IDLE;
-
// Scan queue for data to write from first non stalled stream.
int qs=queue.size();
for (int i = 0; i < qs && active.size()<MAX_GATHER;)
{
- frameBytes = queue.getUnsafe(i);
+ StandardSession.FrameBytes frameBytes = queue.getUnsafe(i);
IStream stream = frameBytes.getStream();
// Continue if this is stalled stream
@@ -205,38 +202,36 @@ public class Flusher
buffers[i]=active.get(i).getByteBuffer();
if (controller != null)
- controller.write(iteratingCallback, buffers);
+ controller.write(flusherCB, buffers);
return State.SCHEDULED;
}
@Override
protected void completed()
{
- // will never be called as doProcess always returns WAITING or IDLE
+ // will never be called as process always returns SCHEDULED or IDLE
throw new IllegalStateException();
}
@Override
public void succeeded()
{
- if (LOG.isDebugEnabled())
+ synchronized (lock)
{
- synchronized (lock)
- {
- LOG.debug("Completed write of {}, {} frame(s) in queue", active, queue.size());
- }
+ if (LOG.isDebugEnabled())
+ LOG.debug("Succeeded write of {}, q={}", active, queue.size());
+ succeeded.addAll(active);
+ active.clear();
}
- for (FrameBytes frame: active)
- frame.succeeded();
- active.clear();
+ for (FrameBytes frame: succeeded)
+ frame.succeeded(); // TODO should we try catch?
super.succeeded();
}
@Override
public void failed(Throwable x)
{
- List<StandardSession.FrameBytes> frameBytesToFail = new ArrayList<>();
-
+ List<StandardSession.FrameBytes> failed = new ArrayList<>();
synchronized (lock)
{
failure = x;
@@ -245,15 +240,13 @@ public class Flusher
String logMessage = String.format("Failed write of %s, failing all %d frame(s) in queue", this, queue.size());
LOG.debug(logMessage, x);
}
- frameBytesToFail.addAll(queue);
+ failed.addAll(active);
+ active.clear();
+ failed.addAll(queue);
queue.clear();
}
-
- for (FrameBytes frame: active)
- frame.failed(x);
- active.clear();
- for (StandardSession.FrameBytes fb : frameBytesToFail)
- fb.failed(x);
+ for (StandardSession.FrameBytes fb : failed)
+ fb.failed(x); // TODO should we try catch?
super.failed(x);
}
}
diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java
index dc66b59a81..fd5d580075 100644
--- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java
+++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java
@@ -124,7 +124,7 @@ public class StandardSessionTest
public Object answer(InvocationOnMock invocation)
{
Object[] args = invocation.getArguments();
- Callback callback = (Callback)args[1];
+ Callback callback = (Callback)args[0];
if (fail)
callback.failed(new ClosedChannelException());
else

Back to the top