diff options
Diffstat (limited to 'jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java')
-rw-r--r-- | jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java | 212 |
1 files changed, 203 insertions, 9 deletions
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java index 29764e69c2..66993eb8c4 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/message/MessageOutputStream.java @@ -20,30 +20,224 @@ package org.eclipse.jetty.websocket.common.message; import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutionException; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; -import org.eclipse.jetty.websocket.common.LogicalConnection; +import org.eclipse.jetty.websocket.common.WebSocketSession; +import org.eclipse.jetty.websocket.common.frames.BinaryFrame; +import org.eclipse.jetty.websocket.common.io.FutureWriteCallback; +/** + * Support for writing a single WebSocket BINARY message via a {@link OutputStream} + */ public class MessageOutputStream extends OutputStream { - private final LogicalConnection connection; + private static final Logger LOG = Log.getLogger(MessageOutputStream.class); private final OutgoingFrames outgoing; + private final ByteBufferPool bufferPool; + private long frameCount = 0; + private BinaryFrame frame; + private ByteBuffer buffer; + private FutureWriteCallback blocker; + private WriteCallback callback; + private boolean closed = false; - public MessageOutputStream(LogicalConnection connection, OutgoingFrames outgoing) + public MessageOutputStream(OutgoingFrames outgoing, int bufferSize, ByteBufferPool bufferPool) { - this.connection = connection; this.outgoing = outgoing; + this.bufferPool = bufferPool; + this.buffer = bufferPool.acquire(bufferSize,true); + BufferUtil.flipToFill(buffer); + this.frame = new BinaryFrame(); } - public boolean isClosed() + public MessageOutputStream(WebSocketSession session) { - // TODO Auto-generated method stub - return false; + this(session.getOutgoingHandler(),session.getPolicy().getMaxBinaryMessageBufferSize(),session.getBufferPool()); + } + + private void assertNotClosed() throws IOException + { + if (closed) + { + IOException e = new IOException("Stream is closed"); + notifyFailure(e); + throw e; + } + } + + @Override + public synchronized void close() throws IOException + { + assertNotClosed(); + LOG.debug("close()"); + + // finish sending whatever in the buffer with FIN=true + flush(true); + + // close stream + LOG.debug("Sent Frame Count: {}",frameCount); + closed = true; + try + { + if (callback != null) + { + callback.writeSuccess(); + } + super.close(); + bufferPool.release(buffer); + LOG.debug("closed"); + } + catch (IOException e) + { + notifyFailure(e); + throw e; + } } @Override - public void write(int b) throws IOException + public synchronized void flush() throws IOException + { + LOG.debug("flush()"); + assertNotClosed(); + + // flush whatever is in the buffer with FIN=false + flush(false); + try + { + super.flush(); + LOG.debug("flushed"); + } + catch (IOException e) + { + notifyFailure(e); + throw e; + } + } + + /** + * Flush whatever is in the buffer. + * + * @param fin + * fin flag + * @throws IOException + */ + private synchronized void flush(boolean fin) throws IOException + { + BufferUtil.flipToFlush(buffer,0); + LOG.debug("flush({}): {}",fin,BufferUtil.toDetailString(buffer)); + frame.setPayload(buffer); + frame.setFin(fin); + + try + { + blocker = new FutureWriteCallback(); + outgoing.outgoingFrame(frame,blocker); + try + { + // block on write + blocker.get(); + // block success + frameCount++; + frame.setIsContinuation(); + } + catch (ExecutionException e) + { + Throwable cause = e.getCause(); + if (cause != null) + { + if (cause instanceof IOException) + { + throw (IOException)cause; + } + else + { + throw new IOException(cause); + } + } + throw new IOException("Failed to flush",e); + } + catch (InterruptedException e) + { + throw new IOException("Failed to flush",e); + } + } + catch (IOException e) + { + notifyFailure(e); + throw e; + } + } + + private void notifyFailure(IOException e) { - // TODO Auto-generated method stub + if (callback != null) + { + callback.writeFailed(e); + } + } + + public void setCallback(WriteCallback callback) + { + this.callback = callback; + } + + @Override + public synchronized void write(byte[] b) throws IOException + { + try + { + this.write(b,0,b.length); + } + catch (IOException e) + { + notifyFailure(e); + throw e; + } + } + + @Override + public synchronized void write(byte[] b, int off, int len) throws IOException + { + LOG.debug("write(byte[{}], {}, {})",b.length,off,len); + int left = len; // bytes left to write + int offset = off; // offset within provided array + while (left > 0) + { + if (LOG.isDebugEnabled()) + { + LOG.debug("buffer: {}",BufferUtil.toDetailString(buffer)); + } + int space = buffer.remaining(); + assert (space > 0); + int size = Math.min(space,left); + buffer.put(b,offset,size); + assert (size > 0); + left -= size; // decrement bytes left + if (left > 0) + { + flush(false); + } + offset += size; // increment offset + } + } + + @Override + public synchronized void write(int b) throws IOException + { + assertNotClosed(); + + // buffer up to limit, flush once buffer reached. + buffer.put((byte)b); + if (buffer.remaining() <= 0) + { + flush(false); + } } } |