diff options
author | Greg Wilkins | 2013-11-21 04:11:56 +0000 |
---|---|---|
committer | Greg Wilkins | 2013-11-21 04:11:56 +0000 |
commit | 0a52c64d161068e9722f7c9c00cb0014170505fc (patch) | |
tree | ba02d2f822005c32f0bab7ea9a9f2c8f5c248276 /jetty-spdy/spdy-core/src | |
parent | 1eb2997efdd3a255d4df0b09667624f337ee2c77 (diff) | |
download | org.eclipse.jetty.project-0a52c64d161068e9722f7c9c00cb0014170505fc.tar.gz org.eclipse.jetty.project-0a52c64d161068e9722f7c9c00cb0014170505fc.tar.xz org.eclipse.jetty.project-0a52c64d161068e9722f7c9c00cb0014170505fc.zip |
421697 - IteratingCallback improvements
Use the iteratingcallback for websocket
use gather writes for websocket
always write entire websocket payload
Diffstat (limited to 'jetty-spdy/spdy-core/src')
-rw-r--r-- | jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Flusher.java | 53 | ||||
-rw-r--r-- | jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java | 2 |
2 files changed, 24 insertions, 31 deletions
diff --git a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Flusher.java b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Flusher.java index 3981aeb2aa..78a72935af 100644 --- a/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Flusher.java +++ b/jetty-spdy/spdy-core/src/main/java/org/eclipse/jetty/spdy/Flusher.java @@ -38,7 +38,7 @@ public class Flusher private static final Logger LOG = Log.getLogger(Flusher.class); private static final int MAX_GATHER = 10; - private final IteratingCallback iteratingCallback = new SessionIteratingCallback(); + private final FlusherCB flusherCB = new FlusherCB(); private final Controller controller; private final Object lock = new Object(); private final ArrayQueue<StandardSession.FrameBytes> queue = new ArrayQueue<>(lock); @@ -124,7 +124,7 @@ public class Flusher void flush() { - iteratingCallback.iterate(); + flusherCB.iterate(); } public int getQueueSize() @@ -135,28 +135,25 @@ public class Flusher } } - private class SessionIteratingCallback extends IteratingCallback + private class FlusherCB extends IteratingCallback { - private final List<StandardSession.FrameBytes> active = new ArrayList<>(); + // TODO should active and succeeded be local? + private final List<StandardSession.FrameBytes> active = new ArrayList<>(MAX_GATHER); + private final List<StandardSession.FrameBytes> succeeded = new ArrayList<>(MAX_GATHER); private final Set<IStream> stalled = new HashSet<>(); @Override protected State process() throws Exception { - StandardSession.FrameBytes frameBytes = null; synchronized (lock) { - if (active.size()>0) - throw new IllegalStateException(); + succeeded.clear(); - if (queue.isEmpty()) - return State.IDLE; - // Scan queue for data to write from first non stalled stream. int qs=queue.size(); for (int i = 0; i < qs && active.size()<MAX_GATHER;) { - frameBytes = queue.getUnsafe(i); + StandardSession.FrameBytes frameBytes = queue.getUnsafe(i); IStream stream = frameBytes.getStream(); // Continue if this is stalled stream @@ -205,38 +202,36 @@ public class Flusher buffers[i]=active.get(i).getByteBuffer(); if (controller != null) - controller.write(iteratingCallback, buffers); + controller.write(flusherCB, buffers); return State.SCHEDULED; } @Override protected void completed() { - // will never be called as doProcess always returns WAITING or IDLE + // will never be called as process always returns SCHEDULED or IDLE throw new IllegalStateException(); } @Override public void succeeded() { - if (LOG.isDebugEnabled()) + synchronized (lock) { - synchronized (lock) - { - LOG.debug("Completed write of {}, {} frame(s) in queue", active, queue.size()); - } + if (LOG.isDebugEnabled()) + LOG.debug("Succeeded write of {}, q={}", active, queue.size()); + succeeded.addAll(active); + active.clear(); } - for (FrameBytes frame: active) - frame.succeeded(); - active.clear(); + for (FrameBytes frame: succeeded) + frame.succeeded(); // TODO should we try catch? super.succeeded(); } @Override public void failed(Throwable x) { - List<StandardSession.FrameBytes> frameBytesToFail = new ArrayList<>(); - + List<StandardSession.FrameBytes> failed = new ArrayList<>(); synchronized (lock) { failure = x; @@ -245,15 +240,13 @@ public class Flusher String logMessage = String.format("Failed write of %s, failing all %d frame(s) in queue", this, queue.size()); LOG.debug(logMessage, x); } - frameBytesToFail.addAll(queue); + failed.addAll(active); + active.clear(); + failed.addAll(queue); queue.clear(); } - - for (FrameBytes frame: active) - frame.failed(x); - active.clear(); - for (StandardSession.FrameBytes fb : frameBytesToFail) - fb.failed(x); + for (StandardSession.FrameBytes fb : failed) + fb.failed(x); // TODO should we try catch? super.failed(x); } } diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java index dc66b59a81..fd5d580075 100644 --- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java +++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java @@ -124,7 +124,7 @@ public class StandardSessionTest public Object answer(InvocationOnMock invocation) { Object[] args = invocation.getArguments(); - Callback callback = (Callback)args[1]; + Callback callback = (Callback)args[0]; if (fail) callback.failed(new ClosedChannelException()); else |