Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGreg Wilkins2014-03-13 06:12:55 -0400
committerGreg Wilkins2014-03-13 06:12:55 -0400
commit9c30e7a303f07784dd686da04ab693c2d3dfbcc0 (patch)
tree3c20a95f2c55b9c433dc4c42edfdadad851e3639
parentb405ad8c4f0a290dc283906b9411fe493437a2fb (diff)
downloadorg.eclipse.jetty.project-9c30e7a303f07784dd686da04ab693c2d3dfbcc0.tar.gz
org.eclipse.jetty.project-9c30e7a303f07784dd686da04ab693c2d3dfbcc0.tar.xz
org.eclipse.jetty.project-9c30e7a303f07784dd686da04ab693c2d3dfbcc0.zip
430242 - added SharedBlockingCallback to support threadsafe blocking
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java9
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java10
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java48
-rw-r--r--jetty-util/src/main/java/org/eclipse/jetty/util/BlockingCallback.java25
-rw-r--r--jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java271
-rw-r--r--jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java123
-rw-r--r--jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/BlockingWriteCallback.java36
-rw-r--r--jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java15
-rw-r--r--jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java8
-rw-r--r--jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java8
10 files changed, 297 insertions, 256 deletions
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java
index b86363591b..0718b23c2d 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java
@@ -53,6 +53,7 @@ import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.URIUtil;
+import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
@@ -728,10 +729,10 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
protected boolean sendResponse(ResponseInfo info, ByteBuffer content, boolean complete) throws IOException
{
- BlockingCallback writeBlock = _response.getHttpOutput().acquireWriteBlockingCallback();
- boolean committing=sendResponse(info,content,complete,writeBlock);
- writeBlock.block();
- return committing;
+ try(Blocker blocker = _response.getHttpOutput().acquireWriteBlockingCallback())
+ {
+ return sendResponse(info,content,complete,blocker);
+ }
}
public boolean isCommitted()
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java
index b799b0c83e..35943376b5 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback;
+import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@@ -57,10 +58,11 @@ public class HttpInputOverHTTP extends HttpInput<ByteBuffer> implements Callback
{
while(true)
{
- _readBlocker.acquire();
- _httpConnection.fillInterested(_readBlocker);
- LOG.debug("{} block readable on {}",this,_readBlocker);
- _readBlocker.block();
+ try (Blocker blocker=_readBlocker.acquire())
+ {
+ _httpConnection.fillInterested(blocker);
+ LOG.debug("{} block readable on {}",this,blocker);
+ }
Object content=getNextContent();
if (content!=null || isFinished())
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
index 77ca11f5b1..8d36745ca0 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
@@ -35,6 +35,7 @@ import org.eclipse.jetty.http.HttpContent;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.SharedBlockingCallback;
+import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
@@ -116,17 +117,17 @@ public class HttpOutput extends ServletOutputStream implements Runnable
return _channel.getResponse().isAllContentWritten(_written);
}
- protected BlockingCallback acquireWriteBlockingCallback() throws IOException
+ protected Blocker acquireWriteBlockingCallback() throws IOException
{
- _writeblock.acquire();
- return _writeblock;
+ return _writeblock.acquire();
}
protected void write(ByteBuffer content, boolean complete) throws IOException
{
- _writeblock.acquire();
- write(content,complete,_writeblock);
- _writeblock.block();
+ try (Blocker blocker=_writeblock.acquire())
+ {
+ write(content,complete,blocker);
+ }
}
protected void write(ByteBuffer content, boolean complete, Callback callback)
@@ -439,9 +440,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable
// Check if all written or full
if (complete || BufferUtil.isFull(_aggregate))
{
- _writeblock.acquire();
- write(_aggregate, complete, _writeblock);
- _writeblock.block();
+ try(Blocker blocker=_writeblock.acquire())
+ {
+ write(_aggregate, complete, blocker);
+ }
if (complete)
closed();
}
@@ -497,9 +499,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/
public void sendContent(ByteBuffer content) throws IOException
{
- _writeblock.acquire();
- write(content,true,_writeblock);
- _writeblock.block();
+ try(Blocker blocker=_writeblock.acquire())
+ {
+ write(content,true,blocker);
+ }
}
/* ------------------------------------------------------------ */
@@ -509,9 +512,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/
public void sendContent(InputStream in) throws IOException
{
- _writeblock.acquire();
- new InputStreamWritingCB(in,_writeblock).iterate();
- _writeblock.block();
+ try(Blocker blocker=_writeblock.acquire())
+ {
+ new InputStreamWritingCB(in,blocker).iterate();
+ }
}
/* ------------------------------------------------------------ */
@@ -521,9 +525,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/
public void sendContent(ReadableByteChannel in) throws IOException
{
- _writeblock.acquire();
- new ReadableByteChannelWritingCB(in,_writeblock).iterate();
- _writeblock.block();
+ try(Blocker blocker=_writeblock.acquire())
+ {
+ new ReadableByteChannelWritingCB(in,blocker).iterate();
+ }
}
@@ -534,9 +539,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable
*/
public void sendContent(HttpContent content) throws IOException
{
- _writeblock.acquire();
- sendContent(content,_writeblock);
- _writeblock.block();
+ try(Blocker blocker=_writeblock.acquire())
+ {
+ sendContent(content,blocker);
+ }
}
/* ------------------------------------------------------------ */
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingCallback.java
index d55836d330..dfb4faea30 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingCallback.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingCallback.java
@@ -26,30 +26,7 @@ import java.util.concurrent.atomic.AtomicReference;
/* ------------------------------------------------------------ */
/**
- * A Callback for simple reusable conversion of an
- * asynchronous API to blocking.
- * <p>
- * To avoid late redundant calls to {@link #succeeded()} or {@link #failed(Throwable)} from
- * interfering with later reuses of this class, the callback context is used to hold pass a phase indicated
- * and only a single callback per phase is allowed.
- * <p>
- * A typical usage pattern is:
- * <pre>
- * public class MyClass
- * {
- * BlockingCallback cb = new BlockingCallback();
- *
- * public void blockingMethod(Object args) throws Exception
- * {
- * asyncMethod(args,cb);
- * cb.block();
- * }
- *
- * public <C>void asyncMethod(Object args, Callback callback)
- * {
- * ...
- * }
- * }
+ * TODO
*/
public class BlockingCallback implements Callback
{
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java
index 0ad55c2406..02bb82b1d6 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java
@@ -18,174 +18,203 @@
package org.eclipse.jetty.util;
+import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.CancellationException;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+
/* ------------------------------------------------------------ */
-/**
- * A Callback for simple reusable conversion of an
- * asynchronous API to blocking.
- * <p>
- * To avoid late redundant calls to {@link #succeeded()} or {@link #failed(Throwable)} from
- * interfering with later reuses of this class, the callback context is used to hold pass a phase indicated
- * and only a single callback per phase is allowed.
- * <p>
+/** Provides a reusable BlockingCallback.
* A typical usage pattern is:
* <pre>
- * public class MyClass
+ * void someBlockingCall(Object... args) throws IOException
* {
- * BlockingCallback cb = new BlockingCallback();
- *
- * public void blockingMethod(Object args) throws Exception
- * {
- * asyncMethod(args,cb);
- * cb.block();
- * }
- *
- * public <C>void asyncMethod(Object args, Callback callback)
- * {
- * ...
- * }
- * }
+ * try(Blocker blocker=sharedBlockingCallback.acquire())
+ * {
+ * someAsyncCall(args,blocker);
+ * }
+ * catch(Throwable e)
+ * {
+ * blocker.fail(e);
+ * }
+ * }
+ * </pre>
*/
-public class SharedBlockingCallback extends BlockingCallback
+public class SharedBlockingCallback
{
- private static Throwable IDLE=new Throwable()
+ private static Throwable IDLE = new Throwable()
{
@Override
- public String toString() { return "IDLE"; }
+ public String toString()
+ {
+ return "IDLE";
+ }
};
-
- private static Throwable SUCCEEDED=new Throwable()
+
+ private static Throwable SUCCEEDED = new Throwable()
{
@Override
- public String toString() { return "SUCCEEDED"; }
+ public String toString()
+ {
+ return "SUCCEEDED";
+ }
};
-
- final ReentrantLock _lock = new ReentrantLock();
- Condition _idle = _lock.newCondition();
- Condition _complete = _lock.newCondition();
- Throwable _state = IDLE;
-
+ final Blocker _blocker;
public SharedBlockingCallback()
- {}
-
- public void acquire() throws IOException
{
- _lock.lock();
- try
- {
- while (_state!=IDLE)
- _idle.await();
- _state=null;
- }
- catch (final InterruptedException e)
- {
- throw new InterruptedIOException(){{initCause(e);}};
- }
- finally
- {
- _lock.unlock();
- }
+ this(new Blocker());
}
- @Override
- public void succeeded()
+ protected SharedBlockingCallback(Blocker blocker)
{
- _lock.lock();
- try
- {
- if (_state==null)
- {
- _state=SUCCEEDED;
- _complete.signalAll();
- }
- else if (_state==IDLE)
- throw new IllegalStateException("IDLE");
- }
- finally
- {
- _lock.unlock();
- }
+ _blocker=blocker;
}
-
- @Override
- public void failed(Throwable cause)
+
+ public Blocker acquire() throws IOException
{
- _lock.lock();
+ _blocker._lock.lock();
try
{
- if (_state==null)
+ while (_blocker._state != IDLE)
+ _blocker._idle.await();
+ _blocker._state = null;
+ }
+ catch (final InterruptedException e)
+ {
+ throw new InterruptedIOException()
{
- _state=cause;
- _complete.signalAll();
- }
- else if (_state==IDLE)
- throw new IllegalStateException("IDLE");
+ {
+ initCause(e);
+ }
+ };
}
finally
{
- _lock.unlock();
+ _blocker._lock.unlock();
}
+ return _blocker;
}
- /** Block until the Callback has succeeded or failed and
- * after the return leave in the state to allow reuse.
- * This is useful for code that wants to repeatable use a FutureCallback to convert
- * an asynchronous API to a blocking API.
- * @throws IOException if exception was caught during blocking, or callback was cancelled
+
+ /* ------------------------------------------------------------ */
+ /** A Closeable Callback.
+ * Uses the auto close mechanism to block until the collback is complete.
*/
- @Override
- public void block() throws IOException
+ public static class Blocker implements Callback, Closeable
{
- _lock.lock();
- try
+ final ReentrantLock _lock = new ReentrantLock();
+ final Condition _idle = _lock.newCondition();
+ final Condition _complete = _lock.newCondition();
+ Throwable _state = IDLE;
+
+ public Blocker()
{
- while (_state==null)
- _complete.await();
-
- if (_state==SUCCEEDED)
- return;
- if (_state==IDLE)
- throw new IllegalStateException("IDLE");
- if (_state instanceof IOException)
- throw (IOException) _state;
- if (_state instanceof CancellationException)
- throw (CancellationException) _state;
- throw new IOException(_state);
}
- catch (final InterruptedException e)
+
+ @Override
+ public void succeeded()
{
- throw new InterruptedIOException(){{initCause(e);}};
+ _lock.lock();
+ try
+ {
+ if (_state == null)
+ {
+ _state = SUCCEEDED;
+ _complete.signalAll();
+ }
+ else if (_state == IDLE)
+ throw new IllegalStateException("IDLE");
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
- finally
+
+ @Override
+ public void failed(Throwable cause)
{
- _state=IDLE;
- _idle.signalAll();
- _lock.unlock();
+ _lock.lock();
+ try
+ {
+ if (_state == null)
+ {
+ _state = cause;
+ _complete.signalAll();
+ }
+ else if (_state == IDLE)
+ throw new IllegalStateException("IDLE");
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
- }
-
-
- @Override
- public String toString()
- {
- _lock.lock();
- try
+
+ /**
+ * Block until the Callback has succeeded or failed and after the return leave in the state to allow reuse. This is useful for code that wants to
+ * repeatable use a FutureCallback to convert an asynchronous API to a blocking API.
+ *
+ * @throws IOException
+ * if exception was caught during blocking, or callback was cancelled
+ */
+ @Override
+ public void close() throws IOException
{
- return String.format("%s@%x{%s}",SharedBlockingCallback.class.getSimpleName(),hashCode(),_state);
+ _lock.lock();
+ try
+ {
+ while (_state == null)
+ _complete.await();
+
+ if (_state == SUCCEEDED)
+ return;
+ if (_state == IDLE)
+ throw new IllegalStateException("IDLE");
+ if (_state instanceof IOException)
+ throw (IOException)_state;
+ if (_state instanceof CancellationException)
+ throw (CancellationException)_state;
+ if (_state instanceof RuntimeException)
+ throw (RuntimeException)_state;
+ if (_state instanceof Error)
+ throw (Error)_state;
+ throw new IOException(_state);
+ }
+ catch (final InterruptedException e)
+ {
+ throw new InterruptedIOException()
+ {
+ {
+ initCause(e);
+ }
+ };
+ }
+ finally
+ {
+ _state = IDLE;
+ _idle.signalAll();
+ _lock.unlock();
+ }
}
- finally
+
+ @Override
+ public String toString()
{
- _lock.unlock();
+ _lock.lock();
+ try
+ {
+ return String.format("%s@%x{%s}",SharedBlockingCallback.class.getSimpleName(),hashCode(),_state);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
}
-
}
diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java
index 43f9a693c6..1d4894ad92 100644
--- a/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java
+++ b/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java
@@ -19,22 +19,17 @@
package org.eclipse.jetty.util;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
public class SharedBlockingCallbackTest
{
- final SharedBlockingCallback fcb= new SharedBlockingCallback();
+ final SharedBlockingCallback sbcb= new SharedBlockingCallback();
public SharedBlockingCallbackTest()
{
@@ -43,34 +38,38 @@ public class SharedBlockingCallbackTest
@Test
public void testDone() throws Exception
- {
- fcb.acquire();
- fcb.succeeded();
- long start=System.currentTimeMillis();
- fcb.block();
+ {
+ long start;
+ try (Blocker blocker=sbcb.acquire())
+ {
+ blocker.succeeded();
+ start=System.currentTimeMillis();
+ }
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L));
}
@Test
public void testGetDone() throws Exception
{
- fcb.acquire();
- final CountDownLatch latch = new CountDownLatch(1);
-
- new Thread(new Runnable()
+ long start;
+ try (final Blocker blocker=sbcb.acquire())
{
- @Override
- public void run()
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ new Thread(new Runnable()
{
- latch.countDown();
- try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();}
- fcb.succeeded();
- }
- }).start();
-
- latch.await();
- long start=System.currentTimeMillis();
- fcb.block();
+ @Override
+ public void run()
+ {
+ latch.countDown();
+ try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();}
+ blocker.succeeded();
+ }
+ }).start();
+
+ latch.await();
+ start=System.currentTimeMillis();
+ }
Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L));
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L));
}
@@ -78,18 +77,19 @@ public class SharedBlockingCallbackTest
@Test
public void testFailed() throws Exception
{
- fcb.acquire();
- Exception ex=new Exception("FAILED");
- fcb.failed(ex);
-
- long start=System.currentTimeMillis();
+ final Exception ex = new Exception("FAILED");
+ long start=Long.MIN_VALUE;
try
{
- fcb.block();
+ try (final Blocker blocker=sbcb.acquire())
+ {
+ blocker.failed(ex);
+ }
Assert.fail();
}
catch(IOException ee)
{
+ start=System.currentTimeMillis();
Assert.assertEquals(ex,ee.getCause());
}
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(100L));
@@ -98,26 +98,29 @@ public class SharedBlockingCallbackTest
@Test
public void testGetFailed() throws Exception
{
- fcb.acquire();
- final Exception ex=new Exception("FAILED");
+ final Exception ex = new Exception("FAILED");
+ long start=Long.MIN_VALUE;
final CountDownLatch latch = new CountDownLatch(1);
-
- new Thread(new Runnable()
+
+ try
{
- @Override
- public void run()
+ try (final Blocker blocker=sbcb.acquire())
{
- latch.countDown();
- try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();}
- fcb.failed(ex);
+
+ new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ latch.countDown();
+ try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();}
+ blocker.failed(ex);
+ }
+ }).start();
+
+ latch.await();
+ start=System.currentTimeMillis();
}
- }).start();
-
- latch.await();
- long start=System.currentTimeMillis();
- try
- {
- fcb.block();
Assert.fail();
}
catch(IOException ee)
@@ -141,11 +144,12 @@ public class SharedBlockingCallbackTest
{
try
{
- fcb.acquire();
- latch.countDown();
- TimeUnit.MILLISECONDS.sleep(100);
- fcb.succeeded();
- fcb.block();
+ try (Blocker blocker=sbcb.acquire())
+ {
+ latch.countDown();
+ TimeUnit.MILLISECONDS.sleep(100);
+ blocker.succeeded();
+ }
}
catch(Exception e)
{
@@ -157,12 +161,13 @@ public class SharedBlockingCallbackTest
latch.await();
long start=System.currentTimeMillis();
- fcb.acquire();
- Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L));
- Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L));
+ try (Blocker blocker=sbcb.acquire())
+ {
+ Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L));
+ Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L));
- fcb.succeeded();
- fcb.block();
+ blocker.succeeded();
+ };
Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(600L));
}
}
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/BlockingWriteCallback.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/BlockingWriteCallback.java
index efda63020e..4fdc68072f 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/BlockingWriteCallback.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/BlockingWriteCallback.java
@@ -18,23 +18,39 @@
package org.eclipse.jetty.websocket.common;
+import java.io.IOException;
+
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.websocket.api.WriteCallback;
-public class BlockingWriteCallback extends SharedBlockingCallback implements WriteCallback
+
+/* ------------------------------------------------------------ */
+/** extend a SharedlBlockingCallback to an websocket WriteCallback
+ */
+public class BlockingWriteCallback extends SharedBlockingCallback
{
public BlockingWriteCallback()
- {}
-
- @Override
- public void writeFailed(Throwable x)
{
- failed(x);
+ super(new WriteBlocker());
}
-
- @Override
- public void writeSuccess()
+
+ public WriteBlocker acquireWriteBlocker() throws IOException
{
- succeeded();
+ return (WriteBlocker)acquire();
+ }
+
+ public static class WriteBlocker extends Blocker implements WriteCallback
+ {
+ @Override
+ public void writeFailed(Throwable x)
+ {
+ failed(x);
+ }
+
+ @Override
+ public void writeSuccess()
+ {
+ succeeded();
+ }
}
}
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java
index 146641bb4e..e0193a8986 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java
@@ -32,6 +32,7 @@ import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
+import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker;
import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
import org.eclipse.jetty.websocket.common.frames.ContinuationFrame;
import org.eclipse.jetty.websocket.common.frames.DataFrame;
@@ -100,9 +101,10 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
private void blockingWrite(WebSocketFrame frame) throws IOException
{
- blocker.acquire();
- uncheckedSendFrame(frame, blocker);
- blocker.block();
+ try(WriteBlocker b=blocker.acquireWriteBlocker())
+ {
+ uncheckedSendFrame(frame, b);
+ }
}
private boolean lockMsg(MsgType type)
@@ -441,14 +443,13 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
this.batchMode = batchMode;
}
+ @Override
public void flush() throws IOException
{
lockMsg(MsgType.ASYNC);
- try
+ try (WriteBlocker b = blocker.acquireWriteBlocker())
{
- blocker.acquire();
- uncheckedSendFrame(FrameFlusher.FLUSH_FRAME, blocker);
- blocker.block();
+ uncheckedSendFrame(FrameFlusher.FLUSH_FRAME, b);
}
finally
{
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java
index e1bb6b1490..b9249cfe39 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java
@@ -31,6 +31,7 @@ import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.BlockingWriteCallback;
import org.eclipse.jetty.websocket.common.WebSocketSession;
+import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker;
import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
/**
@@ -142,9 +143,10 @@ public class MessageOutputStream extends OutputStream
frame.setPayload(buffer);
frame.setFin(fin);
- blocker.acquire();
- outgoing.outgoingFrame(frame, blocker, BatchMode.OFF);
- blocker.block();
+ try(WriteBlocker b=blocker.acquireWriteBlocker())
+ {
+ outgoing.outgoingFrame(frame, b, BatchMode.OFF);
+ }
++frameCount;
// Any flush after the first will be a CONTINUATION frame.
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java
index 8d691217d3..655bf386cb 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java
@@ -31,6 +31,7 @@ import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.BlockingWriteCallback;
import org.eclipse.jetty.websocket.common.WebSocketSession;
+import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
/**
@@ -146,9 +147,10 @@ public class MessageWriter extends Writer
frame.setPayload(data);
frame.setFin(fin);
- blocker.acquire();
- outgoing.outgoingFrame(frame, blocker, BatchMode.OFF);
- blocker.block();
+ try (WriteBlocker b = blocker.acquireWriteBlocker())
+ {
+ outgoing.outgoingFrame(frame, b, BatchMode.OFF);
+ }
++frameCount;
// Any flush after the first will be a CONTINUATION frame.

Back to the top