Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJoakim Erdfelt2012-08-08 15:30:12 -0400
committerJoakim Erdfelt2012-08-08 15:30:12 -0400
commit0abb7511ffed79299294f6bf7381f5faa64093e3 (patch)
tree5280850a51d7d29abacd15b78b009d419eb772fc /jetty-websocket
parent1a62b2a780c883460d91790389be26a598a62b75 (diff)
downloadorg.eclipse.jetty.project-0abb7511ffed79299294f6bf7381f5faa64093e3.tar.gz
org.eclipse.jetty.project-0abb7511ffed79299294f6bf7381f5faa64093e3.tar.xz
org.eclipse.jetty.project-0abb7511ffed79299294f6bf7381f5faa64093e3.zip
Refactoring websocket.io.RawConnection to websocket.api.BaseConnection.
+ Introducing BaseConnection.SuspendToken and suspend/resume logic for working with buffer suspended read concerns.
Diffstat (limited to 'jetty-websocket')
-rw-r--r--jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/BaseConnection.java73
-rw-r--r--jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/WebSocketConnection.java44
-rw-r--r--jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/driver/WebSocketEventDriver.java7
-rw-r--r--jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/AbstractWebSocketConnection.java53
-rw-r--r--jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/RawConnection.java37
-rw-r--r--jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketSession.java44
-rw-r--r--jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/MessageAppender.java18
-rw-r--r--jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/MessageInputStream.java26
-rw-r--r--jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/MessageReader.java59
-rw-r--r--jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/SimpleBinaryMessage.java2
-rw-r--r--jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/SimpleTextMessage.java2
-rw-r--r--jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/io/LocalWebSocketConnection.java49
12 files changed, 279 insertions, 135 deletions
diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/BaseConnection.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/BaseConnection.java
new file mode 100644
index 0000000000..d1a08e740f
--- /dev/null
+++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/BaseConnection.java
@@ -0,0 +1,73 @@
+package org.eclipse.jetty.websocket.api;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+public interface BaseConnection
+{
+ /**
+ * Connection suspend token
+ */
+ public static interface SuspendToken
+ {
+ /**
+ * Resume a previously suspended connection.
+ */
+ void resume();
+ }
+
+ /**
+ * Terminate connection, {@link StatusCode#NORMAL}, without a reason.
+ * <p>
+ * Basic usage: results in an non-blocking async write, then connection close.
+ *
+ * @throws IOException
+ * if unable to send the close frame, or close the connection successfully.
+ * @see StatusCode
+ * @see #close(int, String)
+ */
+ void close() throws IOException;
+
+ /**
+ * Terminate connection, with status code.
+ * <p>
+ * Advanced usage: results in an non-blocking async write, then connection close.
+ *
+ * @param statusCode
+ * the status code
+ * @param reason
+ * the (optional) reason. (can be null for no reason)
+ * @throws IOException
+ * if unable to send the close frame, or close the connection successfully.
+ * @see StatusCode
+ */
+ void close(int statusCode, String reason) throws IOException;
+
+ /**
+ * Get the remote Address in use for this connection.
+ *
+ * @return the remote address if available. (situations like mux extension and proxying makes this information unreliable)
+ */
+ InetSocketAddress getRemoteAddress();
+
+ /**
+ * Simple test to see if connection is open (and not closed)
+ *
+ * @return true if connection still open
+ */
+ boolean isOpen();
+
+ /**
+ * Tests if the connection is actively reading.
+ *
+ * @return true if connection is actively attempting to read.
+ */
+ boolean isReading();
+
+ /**
+ * Suspend a the incoming read events on the connection.
+ *
+ * @return
+ */
+ SuspendToken suspend();
+}
diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/WebSocketConnection.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/WebSocketConnection.java
index db51ea8c6e..3d0cf1166a 100644
--- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/WebSocketConnection.java
+++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/api/WebSocketConnection.java
@@ -16,7 +16,6 @@
package org.eclipse.jetty.websocket.api;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.Callback;
@@ -24,36 +23,9 @@ import org.eclipse.jetty.util.Callback;
/**
* Connection interface for WebSocket protocol <a href="https://tools.ietf.org/html/rfc6455">RFC-6455</a>.
*/
-public interface WebSocketConnection
+public interface WebSocketConnection extends BaseConnection
{
/**
- * Terminate connection, {@link StatusCode#NORMAL}, without a reason.
- * <p>
- * Basic usage: results in an non-blocking async write, then connection close.
- *
- * @throws IOException
- * if unable to send the close frame, or close the connection successfully.
- * @see StatusCode
- * @see #close(int, String)
- */
- void close() throws IOException;
-
- /**
- * Terminate connection, with status code.
- * <p>
- * Advanced usage: results in an non-blocking async write, then connection close.
- *
- * @param statusCode
- * the status code
- * @param reason
- * the (optional) reason. (can be null for no reason)
- * @throws IOException
- * if unable to send the close frame, or close the connection successfully.
- * @see StatusCode
- */
- void close(int statusCode, String reason) throws IOException;
-
- /**
* Access the (now read-only) {@link WebSocketPolicy} in use for this connection.
*
* @return the policy in use
@@ -61,13 +33,6 @@ public interface WebSocketConnection
WebSocketPolicy getPolicy();
/**
- * Get the remote Address in use for this connection.
- *
- * @return the remote address if available. (situations like mux extension and proxying makes this information unreliable)
- */
- InetSocketAddress getRemoteAddress();
-
- /**
* Get the SubProtocol in use for this connection.
*
* @return the negotiated sub protocol name in use for this connection, can be null if there is no sub-protocol negotiated.
@@ -75,13 +40,6 @@ public interface WebSocketConnection
String getSubProtocol();
/**
- * Simple test to see if connection is open (and not closed)
- *
- * @return true if connection still open
- */
- boolean isOpen();
-
- /**
* Send a single ping messages.
* <p>
* NIO style with callbacks, allows for knowledge of successful ping send.
diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/driver/WebSocketEventDriver.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/driver/WebSocketEventDriver.java
index 4862b94776..56bada51b3 100644
--- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/driver/WebSocketEventDriver.java
+++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/driver/WebSocketEventDriver.java
@@ -229,12 +229,7 @@ public class WebSocketEventDriver implements IncomingFrames
{
if (events.onText.isStreaming())
{
- // Allocate directly, not via ByteBufferPool, as this buffer
- // is ultimately controlled by the end user, and we can't know
- // when they are done using the stream in order to release any
- // buffer allocated from the ByteBufferPool.
- ByteBuffer buf = ByteBuffer.allocate(policy.getBufferSize());
- this.activeMessage = new MessageReader(buf);
+ activeMessage = new MessageReader(websocket,events.onBinary,session,policy);
}
else
{
diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/AbstractWebSocketConnection.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/AbstractWebSocketConnection.java
index 3944be0eb3..6549c33cf7 100644
--- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/AbstractWebSocketConnection.java
+++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/AbstractWebSocketConnection.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
@@ -33,6 +34,7 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.websocket.api.BaseConnection;
import org.eclipse.jetty.websocket.api.CloseException;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
@@ -47,7 +49,7 @@ import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
/**
* Provides the implementation of {@link WebSocketConnection} within the framework of the new {@link Connection} framework of jetty-io
*/
-public abstract class AbstractWebSocketConnection extends AbstractConnection implements RawConnection, OutgoingFrames
+public abstract class AbstractWebSocketConnection extends AbstractConnection implements BaseConnection, BaseConnection.SuspendToken, OutgoingFrames
{
private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
private static final Logger LOG_FRAMES = Log.getLogger("org.eclipse.jetty.websocket.io.Frames");
@@ -58,9 +60,11 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
private final Parser parser;
private final WebSocketPolicy policy;
private final FrameQueue queue;
+ private final AtomicBoolean suspendToken;
private WebSocketSession session;
private List<ExtensionConfig> extensions;
private boolean flushing;
+ private boolean isFilling;
public AbstractWebSocketConnection(EndPoint endp, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
{
@@ -72,6 +76,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
this.scheduler = scheduler;
this.extensions = new ArrayList<>();
this.queue = new FrameQueue();
+ this.suspendToken = new AtomicBoolean(false);
}
@Override
@@ -98,7 +103,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
}
- @Override
public void disconnect(boolean onlyOutput)
{
EndPoint endPoint = getEndPoint();
@@ -146,7 +150,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
LOG.debug("Flushing {}, {} frame(s) in queue",frameBytes,queue.size());
}
}
- write(buffer,this,frameBytes);
+ write(buffer,frameBytes);
}
public ByteBufferPool getBufferPool()
@@ -154,16 +158,11 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return bufferPool;
}
- public Executor getExecutor()
- {
- return getExecutor();
- }
-
/**
* Get the list of extensions in use.
* <p>
* This list is negotiated during the WebSocket Upgrade Request/Response handshake.
- *
+ *
* @return the list of negotiated extensions in use.
*/
public List<ExtensionConfig> getExtensions()
@@ -214,6 +213,12 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
@Override
+ public boolean isReading()
+ {
+ return isFilling;
+ }
+
+ @Override
public void onFillable()
{
ByteBuffer buffer = bufferPool.acquire(policy.getBufferSize(),false);
@@ -221,16 +226,22 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
boolean readMore = false;
try
{
+ isFilling = true;
readMore = (read(buffer) != -1);
}
finally
{
bufferPool.release(buffer);
}
- if (readMore)
+
+ if (readMore && (suspendToken.get() == false))
{
fillInterested();
}
+ else
+ {
+ isFilling = false;
+ }
}
@Override
@@ -296,7 +307,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
else if (filled < 0)
{
LOG.debug("read - EOF Reached");
- // disconnect(false); // FIXME Simone says this is bad
return -1;
}
else
@@ -323,6 +333,14 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
}
+ @Override
+ public void resume()
+ {
+ if(suspendToken.getAndSet(false)) {
+ fillInterested();
+ }
+ }
+
private <C> void scheduleTimeout(FrameBytes<C> bytes)
{
if (policy.getIdleTimeout() > 0)
@@ -335,7 +353,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
* Get the list of extensions in use.
* <p>
* This list is negotiated during the WebSocket Upgrade Request/Response handshake.
- *
+ *
* @param extensions
* the list of negotiated extensions in use.
*/
@@ -349,9 +367,16 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
this.session = session;
}
+ @Override
+ public SuspendToken suspend()
+ {
+ suspendToken.set(true);
+ return this;
+ }
+
/**
* For terminating connections forcefully.
- *
+ *
* @param statusCode
* the WebSocket status code.
* @param reason
@@ -373,7 +398,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return String.format("%s{g=%s,p=%s}",super.toString(),generator,parser);
}
- private <C> void write(ByteBuffer buffer, AbstractWebSocketConnection webSocketConnection, FrameBytes<C> frameBytes)
+ private <C> void write(ByteBuffer buffer, FrameBytes<C> frameBytes)
{
EndPoint endpoint = getEndPoint();
diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/RawConnection.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/RawConnection.java
deleted file mode 100644
index 590d3193e8..0000000000
--- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/RawConnection.java
+++ /dev/null
@@ -1,37 +0,0 @@
-// ========================================================================
-// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
-// ------------------------------------------------------------------------
-// All rights reserved. This program and the accompanying materials
-// are made available under the terms of the Eclipse Public License v1.0
-// and Apache License v2.0 which accompanies this distribution.
-//
-// The Eclipse Public License is available at
-// http://www.eclipse.org/legal/epl-v10.html
-//
-// The Apache License v2.0 is available at
-// http://www.opensource.org/licenses/apache2.0.php
-//
-// You may elect to redistribute this code under either of these licenses.
-//========================================================================
-package org.eclipse.jetty.websocket.io;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-/**
- * Interface for working with connections in a raw way.
- * <p>
- * This is abstracted out to allow for common access to connection internals regardless of physical vs virtual connections.
- */
-public interface RawConnection extends OutgoingFrames
-{
- void close() throws IOException;
-
- void close(int statusCode, String reason) throws IOException;
-
- void disconnect(boolean onlyOutput);
-
- InetSocketAddress getRemoteAddress();
-
- boolean isOpen();
-}
diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketSession.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketSession.java
index 4df0ef235a..7ed264af69 100644
--- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketSession.java
+++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketSession.java
@@ -8,6 +8,7 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.websocket.api.BaseConnection;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
@@ -18,17 +19,22 @@ import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
public class WebSocketSession implements WebSocketConnection, IncomingFrames, OutgoingFrames
{
private static final Logger LOG = Log.getLogger(WebSocketSession.class);
- private final RawConnection connection;
+ /**
+ * The reference to the base connection.
+ * <p>
+ * This will be the {@link AbstractWebSocketConnection} on normal websocket use, and be a MuxConnection when MUX is in the picture.
+ */
+ private final BaseConnection baseConnection;
private final WebSocketPolicy policy;
private final String subprotocol;
private final WebSocketEventDriver websocket;
private OutgoingFrames outgoing;
- public WebSocketSession(WebSocketEventDriver websocket, RawConnection connection, WebSocketPolicy policy, String subprotocol)
+ public WebSocketSession(WebSocketEventDriver websocket, BaseConnection connection, WebSocketPolicy policy, String subprotocol)
{
super();
this.websocket = websocket;
- this.connection = connection;
+ this.baseConnection = connection;
this.policy = policy;
this.subprotocol = subprotocol;
}
@@ -36,13 +42,13 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
@Override
public void close() throws IOException
{
- connection.close();
+ baseConnection.close();
}
@Override
public void close(int statusCode, String reason) throws IOException
{
- connection.close(statusCode,reason);
+ baseConnection.close(statusCode,reason);
}
public IncomingFrames getIncoming()
@@ -64,7 +70,7 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
@Override
public InetSocketAddress getRemoteAddress()
{
- return connection.getRemoteAddress();
+ return baseConnection.getRemoteAddress();
}
@Override
@@ -90,7 +96,13 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
@Override
public boolean isOpen()
{
- return connection.isOpen();
+ return baseConnection.isOpen();
+ }
+
+ @Override
+ public boolean isReading()
+ {
+ return baseConnection.isReading();
}
public void onConnect()
@@ -117,6 +129,8 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
@Override
public <C> void ping(C context, Callback<C> callback, byte[] payload) throws IOException
{
+ // Delegate the application called ping to the OutgoingFrames interface to allow
+ // extensions to process the frame appropriately.
WebSocketFrame frame = new WebSocketFrame(OpCode.PING).setPayload(payload);
frame.setFin(true);
output(context,callback,frame);
@@ -128,13 +142,19 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
}
@Override
+ public SuspendToken suspend()
+ {
+ return baseConnection.suspend();
+ }
+
+ @Override
public String toString()
{
StringBuilder builder = new StringBuilder();
builder.append("WebSocketSession[websocket=");
builder.append(websocket);
- builder.append(",connection=");
- builder.append(connection);
+ builder.append(",baseConnection=");
+ builder.append(baseConnection);
builder.append(",subprotocol=");
builder.append(subprotocol);
builder.append(",outgoing=");
@@ -153,6 +173,8 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
{
LOG.debug("write(context,{},byte[],{},{})",callback,offset,len);
}
+ // Delegate the application called write to the OutgoingFrames interface to allow
+ // extensions to process the frame appropriately.
WebSocketFrame frame = WebSocketFrame.binary().setPayload(buf,offset,len);
frame.setFin(true);
output(context,callback,frame);
@@ -168,6 +190,8 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
{
LOG.debug("write(context,{},ByteBuffer->{})",callback,BufferUtil.toDetailString(buffer));
}
+ // Delegate the application called write to the OutgoingFrames interface to allow
+ // extensions to process the frame appropriately.
WebSocketFrame frame = WebSocketFrame.binary().setPayload(buffer);
frame.setFin(true);
output(context,callback,frame);
@@ -183,6 +207,8 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
{
LOG.debug("write(context,{},message.length:{})",callback,message.length());
}
+ // Delegate the application called ping to the OutgoingFrames interface to allow
+ // extensions to process the frame appropriately.
WebSocketFrame frame = WebSocketFrame.text(message);
frame.setFin(true);
output(context,callback,frame);
diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/MessageAppender.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/MessageAppender.java
index d500f888a4..1188267821 100644
--- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/MessageAppender.java
+++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/MessageAppender.java
@@ -3,9 +3,23 @@ package org.eclipse.jetty.websocket.io.message;
import java.io.IOException;
import java.nio.ByteBuffer;
+/**
+ * Appender for messages (used for multiple fragments with continuations, and also to allow for streaming APIs)
+ */
public interface MessageAppender
{
- abstract void appendMessage(ByteBuffer byteBuffer) throws IOException;
+ /**
+ * Append the payload to the message.
+ *
+ * @param payload
+ * the payload to append.
+ * @throws IOException
+ * if unable to append the payload
+ */
+ abstract void appendMessage(ByteBuffer payload) throws IOException;
- abstract void messageComplete() throws IOException;
+ /**
+ * Notification that message is to be considered complete.
+ */
+ abstract void messageComplete();
}
diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/MessageInputStream.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/MessageInputStream.java
index 08f22071d8..4930a8be60 100644
--- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/MessageInputStream.java
+++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/MessageInputStream.java
@@ -30,6 +30,10 @@ import org.eclipse.jetty.websocket.io.WebSocketSession;
*/
public class MessageInputStream extends InputStream implements MessageAppender
{
+ /**
+ * Threshold (of bytes) to perform compaction at
+ */
+ private static final int COMPACT_THRESHOLD = 5;
private final Object websocket;
private final EventMethod onEvent;
private final WebSocketSession session;
@@ -38,7 +42,8 @@ public class MessageInputStream extends InputStream implements MessageAppender
private final ByteBuffer buf;
private int size;
private boolean finished;
- private boolean needsNotification = true;
+ private boolean needsNotification;
+ private int readPosition;
public MessageInputStream(Object websocket, EventMethod onEvent, WebSocketSession session, ByteBufferPool bufferPool, WebSocketPolicy policy)
{
@@ -49,7 +54,10 @@ public class MessageInputStream extends InputStream implements MessageAppender
this.policy = policy;
this.buf = bufferPool.acquire(policy.getBufferSize(),false);
BufferUtil.clearToFill(this.buf);
+ size = 0;
+ readPosition = this.buf.position();
finished = false;
+ needsNotification = true;
}
@Override
@@ -72,6 +80,8 @@ public class MessageInputStream extends InputStream implements MessageAppender
synchronized (buf)
{
// TODO: grow buffer till max binary message size?
+ // TODO: compact this buffer to fit incoming buffer?
+ // TODO: tell connection to suspend if buffer too full?
BufferUtil.put(payload,buf);
}
@@ -85,12 +95,13 @@ public class MessageInputStream extends InputStream implements MessageAppender
@Override
public void close() throws IOException
{
+ finished = true;
super.close();
this.bufferPool.release(this.buf);
}
@Override
- public void messageComplete() throws IOException
+ public void messageComplete()
{
finished = true;
}
@@ -100,13 +111,14 @@ public class MessageInputStream extends InputStream implements MessageAppender
{
synchronized (buf)
{
- // FIXME: HACKITY HACK HACK HACK
- // Should really use its own tracking of position, to avoid flipping the
- // buffer between read/write
- byte b = buf.get();
- if (buf.limit() <= (buf.capacity() - 5))
+ byte b = buf.get(readPosition);
+ readPosition++;
+ if (readPosition <= (buf.limit() - COMPACT_THRESHOLD))
{
+ int curPos = buf.position();
buf.compact();
+ int offsetPos = buf.position() - curPos;
+ readPosition += offsetPos;
}
return b;
}
diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/MessageReader.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/MessageReader.java
index 1a49c5d7fa..0c3a6f8a20 100644
--- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/MessageReader.java
+++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/MessageReader.java
@@ -19,7 +19,10 @@ import java.io.IOException;
import java.io.Reader;
import java.nio.ByteBuffer;
-import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.util.Utf8StringBuilder;
+import org.eclipse.jetty.websocket.api.WebSocketPolicy;
+import org.eclipse.jetty.websocket.driver.EventMethod;
+import org.eclipse.jetty.websocket.io.WebSocketSession;
/**
* Support class for reading text message data as an Reader.
@@ -28,32 +31,66 @@ import org.eclipse.jetty.util.BufferUtil;
*/
public class MessageReader extends Reader implements MessageAppender
{
- private ByteBuffer buffer;
+ private final Object websocket;
+ private final EventMethod onEvent;
+ private final WebSocketSession session;
+ private final WebSocketPolicy policy;
+ private final Utf8StringBuilder utf;
+ private int size;
+ private boolean finished;
+ private boolean needsNotification;
- public MessageReader(ByteBuffer buf)
+ public MessageReader(Object websocket, EventMethod onEvent, WebSocketSession session, WebSocketPolicy policy)
{
- BufferUtil.clearToFill(buf);
- this.buffer = buf;
+ this.websocket = websocket;
+ this.onEvent = onEvent;
+ this.session = session;
+ this.policy = policy;
+ this.utf = new Utf8StringBuilder();
+ size = 0;
+ finished = false;
+ needsNotification = true;
}
@Override
- public void appendMessage(ByteBuffer byteBuffer) throws IOException
+ public void appendMessage(ByteBuffer payload) throws IOException
{
- // TODO Auto-generated method stub
+ if (finished)
+ {
+ throw new IOException("Cannot append to finished buffer");
+ }
+
+ if (payload == null)
+ {
+ // empty payload is valid
+ return;
+ }
+
+ policy.assertValidTextMessageSize(size + payload.remaining());
+ size += payload.remaining();
+
+ synchronized (utf)
+ {
+ utf.append(payload);
+ }
+ if (needsNotification)
+ {
+ needsNotification = true;
+ this.onEvent.call(websocket,session,this);
+ }
}
@Override
public void close() throws IOException
{
- // TODO Auto-generated method stub
+ finished = true;
}
@Override
- public void messageComplete() throws IOException
+ public void messageComplete()
{
- // TODO Auto-generated method stub
-
+ finished = true;
}
@Override
diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/SimpleBinaryMessage.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/SimpleBinaryMessage.java
index e9ad1ddf0a..ae99401da5 100644
--- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/SimpleBinaryMessage.java
+++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/SimpleBinaryMessage.java
@@ -54,7 +54,7 @@ public class SimpleBinaryMessage implements MessageAppender
}
@Override
- public void messageComplete() throws IOException
+ public void messageComplete()
{
BufferUtil.flipToFlush(this.buf,0);
finished = true;
diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/SimpleTextMessage.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/SimpleTextMessage.java
index b0a1dc81be..ee9cc5bd8e 100644
--- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/SimpleTextMessage.java
+++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/message/SimpleTextMessage.java
@@ -51,7 +51,7 @@ public class SimpleTextMessage implements MessageAppender
}
@Override
- public void messageComplete() throws IOException
+ public void messageComplete()
{
finished = true;
diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/io/LocalWebSocketConnection.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/io/LocalWebSocketConnection.java
index 45b3340a7e..4e1c07feb4 100644
--- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/io/LocalWebSocketConnection.java
+++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/io/LocalWebSocketConnection.java
@@ -15,13 +15,16 @@
//========================================================================
package org.eclipse.jetty.websocket.io;
+import java.io.IOException;
import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
import org.eclipse.jetty.util.Callback;
-import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
+import org.eclipse.jetty.websocket.api.WebSocketConnection;
+import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.junit.rules.TestName;
-public class LocalWebSocketConnection implements RawConnection
+public class LocalWebSocketConnection implements WebSocketConnection
{
private final String id;
@@ -51,8 +54,9 @@ public class LocalWebSocketConnection implements RawConnection
}
@Override
- public void disconnect(boolean onlyOutput)
+ public WebSocketPolicy getPolicy()
{
+ return null;
}
@Override
@@ -62,14 +66,33 @@ public class LocalWebSocketConnection implements RawConnection
}
@Override
+ public String getSubProtocol()
+ {
+ return null;
+ }
+
+ @Override
public boolean isOpen()
{
return false;
}
@Override
- public <C> void output(C context, Callback<C> callback, WebSocketFrame frame)
+ public boolean isReading()
+ {
+ return false;
+ }
+
+ @Override
+ public <C> void ping(C context, Callback<C> callback, byte[] payload) throws IOException
+ {
+
+ }
+
+ @Override
+ public SuspendToken suspend()
{
+ return null;
}
@Override
@@ -77,4 +100,22 @@ public class LocalWebSocketConnection implements RawConnection
{
return String.format("%s[%s]",LocalWebSocketConnection.class.getSimpleName(),id);
}
+
+ @Override
+ public <C> void write(C context, Callback<C> callback, byte[] buf, int offset, int len) throws IOException
+ {
+
+ }
+
+ @Override
+ public <C> void write(C context, Callback<C> callback, ByteBuffer buffer) throws IOException
+ {
+
+ }
+
+ @Override
+ public <C> void write(C context, Callback<C> callback, String message) throws IOException
+ {
+
+ }
}

Back to the top