Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java')
-rw-r--r--jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java212
1 files changed, 203 insertions, 9 deletions
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java
index 29764e69c2..66993eb8c4 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java
@@ -20,30 +20,224 @@ package org.eclipse.jetty.websocket.common.message;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutionException;
+import org.eclipse.jetty.io.ByteBufferPool;
+import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
-import org.eclipse.jetty.websocket.common.LogicalConnection;
+import org.eclipse.jetty.websocket.common.WebSocketSession;
+import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
+import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
+/**
+ * Support for writing a single WebSocket BINARY message via a {@link OutputStream}
+ */
public class MessageOutputStream extends OutputStream
{
- private final LogicalConnection connection;
+ private static final Logger LOG = Log.getLogger(MessageOutputStream.class);
private final OutgoingFrames outgoing;
+ private final ByteBufferPool bufferPool;
+ private long frameCount = 0;
+ private BinaryFrame frame;
+ private ByteBuffer buffer;
+ private FutureWriteCallback blocker;
+ private WriteCallback callback;
+ private boolean closed = false;
- public MessageOutputStream(LogicalConnection connection, OutgoingFrames outgoing)
+ public MessageOutputStream(OutgoingFrames outgoing, int bufferSize, ByteBufferPool bufferPool)
{
- this.connection = connection;
this.outgoing = outgoing;
+ this.bufferPool = bufferPool;
+ this.buffer = bufferPool.acquire(bufferSize,true);
+ BufferUtil.flipToFill(buffer);
+ this.frame = new BinaryFrame();
}
- public boolean isClosed()
+ public MessageOutputStream(WebSocketSession session)
{
- // TODO Auto-generated method stub
- return false;
+ this(session.getOutgoingHandler(),session.getPolicy().getMaxBinaryMessageBufferSize(),session.getBufferPool());
+ }
+
+ private void assertNotClosed() throws IOException
+ {
+ if (closed)
+ {
+ IOException e = new IOException("Stream is closed");
+ notifyFailure(e);
+ throw e;
+ }
+ }
+
+ @Override
+ public synchronized void close() throws IOException
+ {
+ assertNotClosed();
+ LOG.debug("close()");
+
+ // finish sending whatever in the buffer with FIN=true
+ flush(true);
+
+ // close stream
+ LOG.debug("Sent Frame Count: {}",frameCount);
+ closed = true;
+ try
+ {
+ if (callback != null)
+ {
+ callback.writeSuccess();
+ }
+ super.close();
+ bufferPool.release(buffer);
+ LOG.debug("closed");
+ }
+ catch (IOException e)
+ {
+ notifyFailure(e);
+ throw e;
+ }
}
@Override
- public void write(int b) throws IOException
+ public synchronized void flush() throws IOException
+ {
+ LOG.debug("flush()");
+ assertNotClosed();
+
+ // flush whatever is in the buffer with FIN=false
+ flush(false);
+ try
+ {
+ super.flush();
+ LOG.debug("flushed");
+ }
+ catch (IOException e)
+ {
+ notifyFailure(e);
+ throw e;
+ }
+ }
+
+ /**
+ * Flush whatever is in the buffer.
+ *
+ * @param fin
+ * fin flag
+ * @throws IOException
+ */
+ private synchronized void flush(boolean fin) throws IOException
+ {
+ BufferUtil.flipToFlush(buffer,0);
+ LOG.debug("flush({}): {}",fin,BufferUtil.toDetailString(buffer));
+ frame.setPayload(buffer);
+ frame.setFin(fin);
+
+ try
+ {
+ blocker = new FutureWriteCallback();
+ outgoing.outgoingFrame(frame,blocker);
+ try
+ {
+ // block on write
+ blocker.get();
+ // block success
+ frameCount++;
+ frame.setIsContinuation();
+ }
+ catch (ExecutionException e)
+ {
+ Throwable cause = e.getCause();
+ if (cause != null)
+ {
+ if (cause instanceof IOException)
+ {
+ throw (IOException)cause;
+ }
+ else
+ {
+ throw new IOException(cause);
+ }
+ }
+ throw new IOException("Failed to flush",e);
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException("Failed to flush",e);
+ }
+ }
+ catch (IOException e)
+ {
+ notifyFailure(e);
+ throw e;
+ }
+ }
+
+ private void notifyFailure(IOException e)
{
- // TODO Auto-generated method stub
+ if (callback != null)
+ {
+ callback.writeFailed(e);
+ }
+ }
+
+ public void setCallback(WriteCallback callback)
+ {
+ this.callback = callback;
+ }
+
+ @Override
+ public synchronized void write(byte[] b) throws IOException
+ {
+ try
+ {
+ this.write(b,0,b.length);
+ }
+ catch (IOException e)
+ {
+ notifyFailure(e);
+ throw e;
+ }
+ }
+
+ @Override
+ public synchronized void write(byte[] b, int off, int len) throws IOException
+ {
+ LOG.debug("write(byte[{}], {}, {})",b.length,off,len);
+ int left = len; // bytes left to write
+ int offset = off; // offset within provided array
+ while (left > 0)
+ {
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("buffer: {}",BufferUtil.toDetailString(buffer));
+ }
+ int space = buffer.remaining();
+ assert (space > 0);
+ int size = Math.min(space,left);
+ buffer.put(b,offset,size);
+ assert (size > 0);
+ left -= size; // decrement bytes left
+ if (left > 0)
+ {
+ flush(false);
+ }
+ offset += size; // increment offset
+ }
+ }
+
+ @Override
+ public synchronized void write(int b) throws IOException
+ {
+ assertNotClosed();
+
+ // buffer up to limit, flush once buffer reached.
+ buffer.put((byte)b);
+ if (buffer.remaining() <= 0)
+ {
+ flush(false);
+ }
}
}

Back to the top