diff options
author | Greg Wilkins | 2014-03-21 06:55:48 +0000 |
---|---|---|
committer | Greg Wilkins | 2014-03-21 06:55:48 +0000 |
commit | 138dfba56067a99d64ced232222756c329e1324b (patch) | |
tree | 5a286d792cedf9ba3dbcd57e5808581179a70c47 | |
parent | bbfb5c723771d530a559a17890c19d0f8edfeb08 (diff) | |
download | org.eclipse.jetty.project-138dfba56067a99d64ced232222756c329e1324b.tar.gz org.eclipse.jetty.project-138dfba56067a99d64ced232222756c329e1324b.tar.xz org.eclipse.jetty.project-138dfba56067a99d64ced232222756c329e1324b.zip |
Revert "430242 - added SharedBlockingCallback to support threadsafe blocking"
This reverts commit 9c30e7a303f07784dd686da04ab693c2d3dfbcc0.
10 files changed, 256 insertions, 297 deletions
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index fbd34cebad..82db0178f3 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -53,7 +53,6 @@ import org.eclipse.jetty.util.BlockingCallback; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.URIUtil; -import org.eclipse.jetty.util.SharedBlockingCallback.Blocker; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Scheduler; @@ -729,10 +728,10 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable protected boolean sendResponse(ResponseInfo info, ByteBuffer content, boolean complete) throws IOException { - try(Blocker blocker = _response.getHttpOutput().acquireWriteBlockingCallback()) - { - return sendResponse(info,content,complete,blocker); - } + BlockingCallback writeBlock = _response.getHttpOutput().acquireWriteBlockingCallback(); + boolean committing=sendResponse(info,content,complete,writeBlock); + writeBlock.block(); + return committing; } public boolean isCommitted() diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java index 35943376b5..b799b0c83e 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java @@ -24,7 +24,6 @@ import java.nio.ByteBuffer; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.SharedBlockingCallback; -import org.eclipse.jetty.util.SharedBlockingCallback.Blocker; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -58,11 +57,10 @@ public class HttpInputOverHTTP extends HttpInput<ByteBuffer> implements Callback { while(true) { - try (Blocker blocker=_readBlocker.acquire()) - { - _httpConnection.fillInterested(blocker); - LOG.debug("{} block readable on {}",this,blocker); - } + _readBlocker.acquire(); + _httpConnection.fillInterested(_readBlocker); + LOG.debug("{} block readable on {}",this,_readBlocker); + _readBlocker.block(); Object content=getNextContent(); if (content!=null || isFinished()) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 8d36745ca0..77ca11f5b1 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -35,7 +35,6 @@ import org.eclipse.jetty.http.HttpContent; import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.util.BlockingCallback; import org.eclipse.jetty.util.SharedBlockingCallback; -import org.eclipse.jetty.util.SharedBlockingCallback.Blocker; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IteratingCallback; @@ -117,17 +116,17 @@ public class HttpOutput extends ServletOutputStream implements Runnable return _channel.getResponse().isAllContentWritten(_written); } - protected Blocker acquireWriteBlockingCallback() throws IOException + protected BlockingCallback acquireWriteBlockingCallback() throws IOException { - return _writeblock.acquire(); + _writeblock.acquire(); + return _writeblock; } protected void write(ByteBuffer content, boolean complete) throws IOException { - try (Blocker blocker=_writeblock.acquire()) - { - write(content,complete,blocker); - } + _writeblock.acquire(); + write(content,complete,_writeblock); + _writeblock.block(); } protected void write(ByteBuffer content, boolean complete, Callback callback) @@ -440,10 +439,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable // Check if all written or full if (complete || BufferUtil.isFull(_aggregate)) { - try(Blocker blocker=_writeblock.acquire()) - { - write(_aggregate, complete, blocker); - } + _writeblock.acquire(); + write(_aggregate, complete, _writeblock); + _writeblock.block(); if (complete) closed(); } @@ -499,10 +497,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable */ public void sendContent(ByteBuffer content) throws IOException { - try(Blocker blocker=_writeblock.acquire()) - { - write(content,true,blocker); - } + _writeblock.acquire(); + write(content,true,_writeblock); + _writeblock.block(); } /* ------------------------------------------------------------ */ @@ -512,10 +509,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable */ public void sendContent(InputStream in) throws IOException { - try(Blocker blocker=_writeblock.acquire()) - { - new InputStreamWritingCB(in,blocker).iterate(); - } + _writeblock.acquire(); + new InputStreamWritingCB(in,_writeblock).iterate(); + _writeblock.block(); } /* ------------------------------------------------------------ */ @@ -525,10 +521,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable */ public void sendContent(ReadableByteChannel in) throws IOException { - try(Blocker blocker=_writeblock.acquire()) - { - new ReadableByteChannelWritingCB(in,blocker).iterate(); - } + _writeblock.acquire(); + new ReadableByteChannelWritingCB(in,_writeblock).iterate(); + _writeblock.block(); } @@ -539,10 +534,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable */ public void sendContent(HttpContent content) throws IOException { - try(Blocker blocker=_writeblock.acquire()) - { - sendContent(content,blocker); - } + _writeblock.acquire(); + sendContent(content,_writeblock); + _writeblock.block(); } /* ------------------------------------------------------------ */ diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingCallback.java index dfb4faea30..d55836d330 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingCallback.java @@ -26,7 +26,30 @@ import java.util.concurrent.atomic.AtomicReference; /* ------------------------------------------------------------ */ /** - * TODO + * A Callback for simple reusable conversion of an + * asynchronous API to blocking. + * <p> + * To avoid late redundant calls to {@link #succeeded()} or {@link #failed(Throwable)} from + * interfering with later reuses of this class, the callback context is used to hold pass a phase indicated + * and only a single callback per phase is allowed. + * <p> + * A typical usage pattern is: + * <pre> + * public class MyClass + * { + * BlockingCallback cb = new BlockingCallback(); + * + * public void blockingMethod(Object args) throws Exception + * { + * asyncMethod(args,cb); + * cb.block(); + * } + * + * public <C>void asyncMethod(Object args, Callback callback) + * { + * ... + * } + * } */ public class BlockingCallback implements Callback { diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java index 02bb82b1d6..0ad55c2406 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java @@ -18,203 +18,174 @@ package org.eclipse.jetty.util; -import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; import java.util.concurrent.CancellationException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; - /* ------------------------------------------------------------ */ -/** Provides a reusable BlockingCallback. +/** + * A Callback for simple reusable conversion of an + * asynchronous API to blocking. + * <p> + * To avoid late redundant calls to {@link #succeeded()} or {@link #failed(Throwable)} from + * interfering with later reuses of this class, the callback context is used to hold pass a phase indicated + * and only a single callback per phase is allowed. + * <p> * A typical usage pattern is: * <pre> - * void someBlockingCall(Object... args) throws IOException + * public class MyClass * { - * try(Blocker blocker=sharedBlockingCallback.acquire()) - * { - * someAsyncCall(args,blocker); - * } - * catch(Throwable e) - * { - * blocker.fail(e); - * } - * } - * </pre> + * BlockingCallback cb = new BlockingCallback(); + * + * public void blockingMethod(Object args) throws Exception + * { + * asyncMethod(args,cb); + * cb.block(); + * } + * + * public <C>void asyncMethod(Object args, Callback callback) + * { + * ... + * } + * } */ -public class SharedBlockingCallback +public class SharedBlockingCallback extends BlockingCallback { - private static Throwable IDLE = new Throwable() + private static Throwable IDLE=new Throwable() { @Override - public String toString() - { - return "IDLE"; - } + public String toString() { return "IDLE"; } }; - - private static Throwable SUCCEEDED = new Throwable() + + private static Throwable SUCCEEDED=new Throwable() { @Override - public String toString() - { - return "SUCCEEDED"; - } + public String toString() { return "SUCCEEDED"; } }; - - final Blocker _blocker; - public SharedBlockingCallback() - { - this(new Blocker()); - } + + final ReentrantLock _lock = new ReentrantLock(); + Condition _idle = _lock.newCondition(); + Condition _complete = _lock.newCondition(); + Throwable _state = IDLE; - protected SharedBlockingCallback(Blocker blocker) - { - _blocker=blocker; - } - public Blocker acquire() throws IOException + public SharedBlockingCallback() + {} + + public void acquire() throws IOException { - _blocker._lock.lock(); + _lock.lock(); try { - while (_blocker._state != IDLE) - _blocker._idle.await(); - _blocker._state = null; + while (_state!=IDLE) + _idle.await(); + _state=null; } catch (final InterruptedException e) { - throw new InterruptedIOException() - { - { - initCause(e); - } - }; + throw new InterruptedIOException(){{initCause(e);}}; } finally { - _blocker._lock.unlock(); + _lock.unlock(); } - return _blocker; } - - /* ------------------------------------------------------------ */ - /** A Closeable Callback. - * Uses the auto close mechanism to block until the collback is complete. - */ - public static class Blocker implements Callback, Closeable + @Override + public void succeeded() { - final ReentrantLock _lock = new ReentrantLock(); - final Condition _idle = _lock.newCondition(); - final Condition _complete = _lock.newCondition(); - Throwable _state = IDLE; - - public Blocker() - { - } - - @Override - public void succeeded() + _lock.lock(); + try { - _lock.lock(); - try - { - if (_state == null) - { - _state = SUCCEEDED; - _complete.signalAll(); - } - else if (_state == IDLE) - throw new IllegalStateException("IDLE"); - } - finally + if (_state==null) { - _lock.unlock(); + _state=SUCCEEDED; + _complete.signalAll(); } + else if (_state==IDLE) + throw new IllegalStateException("IDLE"); } - - @Override - public void failed(Throwable cause) + finally { - _lock.lock(); - try - { - if (_state == null) - { - _state = cause; - _complete.signalAll(); - } - else if (_state == IDLE) - throw new IllegalStateException("IDLE"); - } - finally - { - _lock.unlock(); - } + _lock.unlock(); } + } - /** - * Block until the Callback has succeeded or failed and after the return leave in the state to allow reuse. This is useful for code that wants to - * repeatable use a FutureCallback to convert an asynchronous API to a blocking API. - * - * @throws IOException - * if exception was caught during blocking, or callback was cancelled - */ - @Override - public void close() throws IOException + @Override + public void failed(Throwable cause) + { + _lock.lock(); + try { - _lock.lock(); - try - { - while (_state == null) - _complete.await(); - - if (_state == SUCCEEDED) - return; - if (_state == IDLE) - throw new IllegalStateException("IDLE"); - if (_state instanceof IOException) - throw (IOException)_state; - if (_state instanceof CancellationException) - throw (CancellationException)_state; - if (_state instanceof RuntimeException) - throw (RuntimeException)_state; - if (_state instanceof Error) - throw (Error)_state; - throw new IOException(_state); - } - catch (final InterruptedException e) + if (_state==null) { - throw new InterruptedIOException() - { - { - initCause(e); - } - }; - } - finally - { - _state = IDLE; - _idle.signalAll(); - _lock.unlock(); + _state=cause; + _complete.signalAll(); } + else if (_state==IDLE) + throw new IllegalStateException("IDLE"); + } + finally + { + _lock.unlock(); } + } - @Override - public String toString() + /** Block until the Callback has succeeded or failed and + * after the return leave in the state to allow reuse. + * This is useful for code that wants to repeatable use a FutureCallback to convert + * an asynchronous API to a blocking API. + * @throws IOException if exception was caught during blocking, or callback was cancelled + */ + @Override + public void block() throws IOException + { + _lock.lock(); + try { - _lock.lock(); - try - { - return String.format("%s@%x{%s}",SharedBlockingCallback.class.getSimpleName(),hashCode(),_state); - } - finally - { - _lock.unlock(); - } + while (_state==null) + _complete.await(); + + if (_state==SUCCEEDED) + return; + if (_state==IDLE) + throw new IllegalStateException("IDLE"); + if (_state instanceof IOException) + throw (IOException) _state; + if (_state instanceof CancellationException) + throw (CancellationException) _state; + throw new IOException(_state); + } + catch (final InterruptedException e) + { + throw new InterruptedIOException(){{initCause(e);}}; + } + finally + { + _state=IDLE; + _idle.signalAll(); + _lock.unlock(); + } + } + + + @Override + public String toString() + { + _lock.lock(); + try + { + return String.format("%s@%x{%s}",SharedBlockingCallback.class.getSimpleName(),hashCode(),_state); + } + finally + { + _lock.unlock(); } } + } diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java index 1d4894ad92..43f9a693c6 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java @@ -19,17 +19,22 @@ package org.eclipse.jetty.util; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.eclipse.jetty.util.SharedBlockingCallback.Blocker; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; public class SharedBlockingCallbackTest { - final SharedBlockingCallback sbcb= new SharedBlockingCallback(); + final SharedBlockingCallback fcb= new SharedBlockingCallback(); public SharedBlockingCallbackTest() { @@ -38,38 +43,34 @@ public class SharedBlockingCallbackTest @Test public void testDone() throws Exception - { - long start; - try (Blocker blocker=sbcb.acquire()) - { - blocker.succeeded(); - start=System.currentTimeMillis(); - } + { + fcb.acquire(); + fcb.succeeded(); + long start=System.currentTimeMillis(); + fcb.block(); Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L)); } @Test public void testGetDone() throws Exception { - long start; - try (final Blocker blocker=sbcb.acquire()) + fcb.acquire(); + final CountDownLatch latch = new CountDownLatch(1); + + new Thread(new Runnable() { - final CountDownLatch latch = new CountDownLatch(1); - - new Thread(new Runnable() + @Override + public void run() { - @Override - public void run() - { - latch.countDown(); - try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();} - blocker.succeeded(); - } - }).start(); - - latch.await(); - start=System.currentTimeMillis(); - } + latch.countDown(); + try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();} + fcb.succeeded(); + } + }).start(); + + latch.await(); + long start=System.currentTimeMillis(); + fcb.block(); Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L)); Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L)); } @@ -77,19 +78,18 @@ public class SharedBlockingCallbackTest @Test public void testFailed() throws Exception { - final Exception ex = new Exception("FAILED"); - long start=Long.MIN_VALUE; + fcb.acquire(); + Exception ex=new Exception("FAILED"); + fcb.failed(ex); + + long start=System.currentTimeMillis(); try { - try (final Blocker blocker=sbcb.acquire()) - { - blocker.failed(ex); - } + fcb.block(); Assert.fail(); } catch(IOException ee) { - start=System.currentTimeMillis(); Assert.assertEquals(ex,ee.getCause()); } Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(100L)); @@ -98,29 +98,26 @@ public class SharedBlockingCallbackTest @Test public void testGetFailed() throws Exception { - final Exception ex = new Exception("FAILED"); - long start=Long.MIN_VALUE; + fcb.acquire(); + final Exception ex=new Exception("FAILED"); final CountDownLatch latch = new CountDownLatch(1); - - try + + new Thread(new Runnable() { - try (final Blocker blocker=sbcb.acquire()) + @Override + public void run() { - - new Thread(new Runnable() - { - @Override - public void run() - { - latch.countDown(); - try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();} - blocker.failed(ex); - } - }).start(); - - latch.await(); - start=System.currentTimeMillis(); + latch.countDown(); + try{TimeUnit.MILLISECONDS.sleep(100);}catch(Exception e){e.printStackTrace();} + fcb.failed(ex); } + }).start(); + + latch.await(); + long start=System.currentTimeMillis(); + try + { + fcb.block(); Assert.fail(); } catch(IOException ee) @@ -144,12 +141,11 @@ public class SharedBlockingCallbackTest { try { - try (Blocker blocker=sbcb.acquire()) - { - latch.countDown(); - TimeUnit.MILLISECONDS.sleep(100); - blocker.succeeded(); - } + fcb.acquire(); + latch.countDown(); + TimeUnit.MILLISECONDS.sleep(100); + fcb.succeeded(); + fcb.block(); } catch(Exception e) { @@ -161,13 +157,12 @@ public class SharedBlockingCallbackTest latch.await(); long start=System.currentTimeMillis(); - try (Blocker blocker=sbcb.acquire()) - { - Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L)); - Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L)); + fcb.acquire(); + Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L)); + Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L)); - blocker.succeeded(); - }; + fcb.succeeded(); + fcb.block(); Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(600L)); } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/BlockingWriteCallback.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/BlockingWriteCallback.java index 4fdc68072f..efda63020e 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/BlockingWriteCallback.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/BlockingWriteCallback.java @@ -18,39 +18,23 @@ package org.eclipse.jetty.websocket.common; -import java.io.IOException; - import org.eclipse.jetty.util.SharedBlockingCallback; import org.eclipse.jetty.websocket.api.WriteCallback; - -/* ------------------------------------------------------------ */ -/** extend a SharedlBlockingCallback to an websocket WriteCallback - */ -public class BlockingWriteCallback extends SharedBlockingCallback +public class BlockingWriteCallback extends SharedBlockingCallback implements WriteCallback { public BlockingWriteCallback() - { - super(new WriteBlocker()); - } - - public WriteBlocker acquireWriteBlocker() throws IOException - { - return (WriteBlocker)acquire(); - } + {} - public static class WriteBlocker extends Blocker implements WriteCallback + @Override + public void writeFailed(Throwable x) { - @Override - public void writeFailed(Throwable x) - { - failed(x); - } + failed(x); + } - @Override - public void writeSuccess() - { - succeeded(); - } + @Override + public void writeSuccess() + { + succeeded(); } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java index e0193a8986..146641bb4e 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketRemoteEndpoint.java @@ -32,7 +32,6 @@ import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; -import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker; import org.eclipse.jetty.websocket.common.frames.BinaryFrame; import org.eclipse.jetty.websocket.common.frames.ContinuationFrame; import org.eclipse.jetty.websocket.common.frames.DataFrame; @@ -101,10 +100,9 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint private void blockingWrite(WebSocketFrame frame) throws IOException { - try(WriteBlocker b=blocker.acquireWriteBlocker()) - { - uncheckedSendFrame(frame, b); - } + blocker.acquire(); + uncheckedSendFrame(frame, blocker); + blocker.block(); } private boolean lockMsg(MsgType type) @@ -443,13 +441,14 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint this.batchMode = batchMode; } - @Override public void flush() throws IOException { lockMsg(MsgType.ASYNC); - try (WriteBlocker b = blocker.acquireWriteBlocker()) + try { - uncheckedSendFrame(FrameFlusher.FLUSH_FRAME, b); + blocker.acquire(); + uncheckedSendFrame(FrameFlusher.FLUSH_FRAME, blocker); + blocker.block(); } finally { diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java index b9249cfe39..e1bb6b1490 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java @@ -31,7 +31,6 @@ import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; import org.eclipse.jetty.websocket.common.BlockingWriteCallback; import org.eclipse.jetty.websocket.common.WebSocketSession; -import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker; import org.eclipse.jetty.websocket.common.frames.BinaryFrame; /** @@ -143,10 +142,9 @@ public class MessageOutputStream extends OutputStream frame.setPayload(buffer); frame.setFin(fin); - try(WriteBlocker b=blocker.acquireWriteBlocker()) - { - outgoing.outgoingFrame(frame, b, BatchMode.OFF); - } + blocker.acquire(); + outgoing.outgoingFrame(frame, blocker, BatchMode.OFF); + blocker.block(); ++frameCount; // Any flush after the first will be a CONTINUATION frame. diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java index 655bf386cb..8d691217d3 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageWriter.java @@ -31,7 +31,6 @@ import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; import org.eclipse.jetty.websocket.common.BlockingWriteCallback; import org.eclipse.jetty.websocket.common.WebSocketSession; -import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker; import org.eclipse.jetty.websocket.common.frames.TextFrame; /** @@ -147,10 +146,9 @@ public class MessageWriter extends Writer frame.setPayload(data); frame.setFin(fin); - try (WriteBlocker b = blocker.acquireWriteBlocker()) - { - outgoing.outgoingFrame(frame, b, BatchMode.OFF); - } + blocker.acquire(); + outgoing.outgoingFrame(frame, blocker, BatchMode.OFF); + blocker.block(); ++frameCount; // Any flush after the first will be a CONTINUATION frame. |