Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJoakim Erdfelt2012-07-30 21:51:34 +0000
committerJoakim Erdfelt2012-07-30 21:51:34 +0000
commit7d2d40dc2b8dc1b858152e0dc795206cd3214049 (patch)
tree0186007c5bfb634446f8fb741fa8a1b9ae8b83b3
parente75e0e9a04d733f1b0134d5b509aa052ba2cbd8f (diff)
downloadorg.eclipse.jetty.project-7d2d40dc2b8dc1b858152e0dc795206cd3214049.tar.gz
org.eclipse.jetty.project-7d2d40dc2b8dc1b858152e0dc795206cd3214049.tar.xz
org.eclipse.jetty.project-7d2d40dc2b8dc1b858152e0dc795206cd3214049.zip
Fixing PING vs CLOSE ordering issues with outgoing queue
-rw-r--r--jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java23
-rw-r--r--jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/TestABCase2.java476
-rw-r--r--jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/IncomingFramesCapture.java6
-rw-r--r--jetty-websocket/websocket-server/src/test/resources/jetty-logging.properties4
4 files changed, 500 insertions, 9 deletions
diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java
index aaf178d641..ab9e7d79d0 100644
--- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java
+++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java
@@ -41,6 +41,7 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.protocol.CloseInfo;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
import org.eclipse.jetty.websocket.protocol.Generator;
+import org.eclipse.jetty.websocket.protocol.OpCode;
import org.eclipse.jetty.websocket.protocol.Parser;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
@@ -234,18 +235,31 @@ public abstract class WebSocketAsyncConnection extends AbstractAsyncConnection i
@Override
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame)
{
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("output({}, {}, {})",context,callback,frame);
+ }
+
synchronized (queue)
{
+ FrameBytes<C> bytes = null;
+
if (frame.getOpCode().isControlFrame())
{
- ControlFrameBytes<C> bytes = new ControlFrameBytes<C>(this,callback,context,frame);
- scheduleTimeout(bytes);
+ bytes = new ControlFrameBytes<C>(this,callback,context,frame);
+ }
+ else
+ {
+ bytes = new DataFrameBytes<C>(this,callback,context,frame);
+ }
+
+ scheduleTimeout(bytes);
+ if (frame.getOpCode() == OpCode.PING)
+ {
queue.prepend(bytes);
}
else
{
- DataFrameBytes<C> bytes = new DataFrameBytes<C>(this,callback,context,frame);
- scheduleTimeout(bytes);
queue.append(bytes);
}
}
@@ -266,6 +280,7 @@ public abstract class WebSocketAsyncConnection extends AbstractAsyncConnection i
}
else if (filled < 0)
{
+ LOG.debug("read - EOF Reached");
disconnect(false);
return -1;
}
diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/TestABCase2.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/TestABCase2.java
new file mode 100644
index 0000000000..98d9a9b528
--- /dev/null
+++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/TestABCase2.java
@@ -0,0 +1,476 @@
+package org.eclipse.jetty.websocket.server.ab;
+
+import static org.hamcrest.Matchers.*;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.util.StringUtil;
+import org.eclipse.jetty.websocket.api.StatusCode;
+import org.eclipse.jetty.websocket.protocol.CloseInfo;
+import org.eclipse.jetty.websocket.protocol.Generator;
+import org.eclipse.jetty.websocket.protocol.OpCode;
+import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
+import org.eclipse.jetty.websocket.server.ByteBufferAssert;
+import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient;
+import org.eclipse.jetty.websocket.server.helper.IncomingFramesCapture;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestABCase2 extends AbstractABCase
+{
+ private void assertPingFrame(byte[] payload) throws Exception
+ {
+ boolean hasPayload = ((payload != null) && (payload.length > 0));
+
+ BlockheadClient client = new BlockheadClient(server.getServerUri());
+ try
+ {
+ client.connect();
+ client.sendStandardRequest();
+ client.expectUpgradeResponse();
+
+ int len = 0;
+ if (hasPayload)
+ {
+ len = payload.length;
+ }
+
+ ByteBuffer buf = ByteBuffer.allocate(len + Generator.OVERHEAD);
+ BufferUtil.clearToFill(buf);
+
+ // Prepare PING Frame
+ buf.put((byte)(0x00 | FIN | OpCode.PING.getCode()));
+ putPayloadLength(buf,len);
+ putMask(buf);
+ if (hasPayload)
+ {
+ buf.put(masked(payload));
+ }
+
+ // Prepare CLOSE Frame
+ buf.put((byte)(0x00 | FIN | OpCode.CLOSE.getCode()));
+ putPayloadLength(buf,2);
+ putMask(buf);
+ buf.put(masked(new byte[]
+ { 0x03, (byte)0xE8 }));
+
+ // Write Data Frame
+ BufferUtil.flipToFlush(buf,0);
+ client.writeRaw(buf);
+ client.flush();
+
+ // Read frames
+ IncomingFramesCapture capture = client.readFrames(2,TimeUnit.MILLISECONDS,500);
+
+ // Validate echo'd frame
+ WebSocketFrame frame = capture.getFrames().get(0);
+ Assert.assertThat("frame should be PONG frame",frame.getOpCode(),is(OpCode.PONG));
+ if (hasPayload)
+ {
+ Assert.assertThat("PONG.payloadLength",frame.getPayloadLength(),is(payload.length));
+ ByteBufferAssert.assertEquals("PONG.payload",payload,frame.getPayload());
+ }
+ else
+ {
+ Assert.assertThat("PONG.payloadLength",frame.getPayloadLength(),is(0));
+ }
+
+ // Validate close
+ frame = capture.getFrames().get(1);
+ Assert.assertThat("CLOSE.frame.opcode",frame.getOpCode(),is(OpCode.CLOSE));
+ CloseInfo close = new CloseInfo(frame);
+ Assert.assertThat("CLOSE.statusCode",close.getStatusCode(),is(StatusCode.NORMAL));
+ }
+ finally
+ {
+ client.disconnect();
+ }
+ }
+
+ private void assertProtocolError(byte[] payload) throws Exception
+ {
+ BlockheadClient client = new BlockheadClient(server.getServerUri());
+ try
+ {
+ client.connect();
+ client.sendStandardRequest();
+ client.expectUpgradeResponse();
+
+ ByteBuffer buf = ByteBuffer.allocate(payload.length + Generator.OVERHEAD);
+ BufferUtil.clearToFill(buf);
+
+ // Prepare PING Frame
+ buf.put((byte)(0x00 | FIN | OpCode.PING.getCode()));
+ putPayloadLength(buf,payload.length);
+ putMask(buf);
+ buf.put(masked(payload));
+
+ // Prepare CLOSE Frame
+ buf.put((byte)(0x00 | FIN | OpCode.CLOSE.getCode()));
+ putPayloadLength(buf,2);
+ putMask(buf);
+ buf.put(masked(new byte[]
+ { 0x03, (byte)0xE8 }));
+
+ // Write Data Frame
+ BufferUtil.flipToFlush(buf,0);
+ client.writeRaw(buf);
+ client.flush();
+
+ // Read frames
+ IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500);
+
+ // Validate close w/ Protocol Error
+ WebSocketFrame frame = capture.getFrames().pop();
+ Assert.assertThat("CLOSE.frame.opcode",frame.getOpCode(),is(OpCode.CLOSE));
+ CloseInfo close = new CloseInfo(frame);
+ Assert.assertThat("CLOSE.statusCode",close.getStatusCode(),is(StatusCode.PROTOCOL));
+ }
+ finally
+ {
+ client.disconnect();
+ }
+ }
+
+ /**
+ * Send a ping frame as separate segments, in an inefficient way.
+ *
+ * @param payload
+ * the payload
+ * @param segmentSize
+ * the segment size for each inefficient segment (flush between)
+ */
+ private void assertSegmentedPingFrame(byte[] payload, int segmentSize) throws Exception
+ {
+ Assert.assertThat("payload exists for segmented send",payload,notNullValue());
+ Assert.assertThat("payload exists for segmented send",payload.length,greaterThan(0));
+
+ BlockheadClient client = new BlockheadClient(server.getServerUri());
+ try
+ {
+ client.connect();
+ client.sendStandardRequest();
+ client.expectUpgradeResponse();
+
+ ByteBuffer buf = ByteBuffer.allocate(payload.length + Generator.OVERHEAD);
+ BufferUtil.clearToFill(buf);
+
+ // Prepare PING Frame
+ buf.put((byte)(0x00 | FIN | OpCode.PING.getCode()));
+ putPayloadLength(buf,payload.length);
+ putMask(buf);
+ buf.put(masked(payload));
+
+ // Prepare CLOSE Frame
+ buf.put((byte)(0x00 | FIN | OpCode.CLOSE.getCode()));
+ putPayloadLength(buf,2);
+ putMask(buf);
+ buf.put(masked(new byte[]
+ { 0x03, (byte)0xE8 }));
+
+ // Write Data Frame
+ BufferUtil.flipToFlush(buf,0);
+ int origLimit = buf.limit();
+ int limit = buf.limit();
+ int len;
+ int pos = buf.position();
+ int overallLeft = buf.remaining();
+ while (overallLeft > 0)
+ {
+ buf.position(pos);
+ limit = Math.min(origLimit,pos + segmentSize);
+ buf.limit(limit);
+ len = buf.remaining();
+ overallLeft -= len;
+ pos += len;
+ client.writeRaw(buf);
+ client.flush();
+ }
+
+ // Read frames
+ IncomingFramesCapture capture = client.readFrames(2,TimeUnit.MILLISECONDS,500);
+
+ // Validate echo'd frame
+ WebSocketFrame frame = capture.getFrames().get(0);
+ Assert.assertThat("frame should be PONG frame",frame.getOpCode(),is(OpCode.PONG));
+ Assert.assertThat("PONG.payloadLength",frame.getPayloadLength(),is(payload.length));
+ ByteBufferAssert.assertEquals("PONG.payload",payload,frame.getPayload());
+
+ // Validate close
+ frame = capture.getFrames().get(1);
+ Assert.assertThat("CLOSE.frame.opcode",frame.getOpCode(),is(OpCode.CLOSE));
+ CloseInfo close = new CloseInfo(frame);
+ Assert.assertThat("CLOSE.statusCode",close.getStatusCode(),is(StatusCode.NORMAL));
+ }
+ finally
+ {
+ client.disconnect();
+ }
+ }
+
+ /**
+ * Ping without payload
+ */
+ @Test
+ public void testCase2_1() throws Exception
+ {
+ byte payload[] = new byte[0];
+ assertPingFrame(payload);
+ }
+
+ /**
+ * 10 pings
+ */
+ @Test
+ public void testCase2_10() throws Exception
+ {
+ // send 10 pings each with unique payload
+ // send close
+ // expect 10 pongs with OUR payload
+ // expect close
+ }
+
+ /**
+ * 10 pings, sent slowly
+ */
+ @Test
+ public void testCase2_11() throws Exception
+ {
+ // send 10 pings (slowly) each with unique payload
+ // send close
+ // expect 10 pongs with OUR payload
+ // expect close
+ }
+
+ /**
+ * Ping with small text payload
+ */
+ @Test
+ public void testCase2_2() throws Exception
+ {
+ byte payload[] = StringUtil.getUtf8Bytes("Hello world");
+ assertPingFrame(payload);
+ }
+
+ /**
+ * Ping with small binary (non-utf8) payload
+ */
+ @Test
+ public void testCase2_3() throws Exception
+ {
+ byte payload[] = new byte[]
+ { 0x00, (byte)0xFF, (byte)0xFE, (byte)0xFD, (byte)0xFC, (byte)0xFB, 0x00, (byte)0xFF };
+ assertPingFrame(payload);
+ }
+
+ /**
+ * Ping with 125 byte binary payload
+ */
+ @Test
+ public void testCase2_4() throws Exception
+ {
+ byte payload[] = new byte[125];
+ Arrays.fill(payload,(byte)0xFE);
+ assertPingFrame(payload);
+ }
+
+ /**
+ * Ping with 126 byte binary payload
+ */
+ @Test
+ public void testCase2_5() throws Exception
+ {
+ byte payload[] = new byte[126];
+ Arrays.fill(payload,(byte)0xFE);
+ assertProtocolError(payload);
+ }
+
+ /**
+ * Ping with 125 byte binary payload (slow send)
+ */
+ @Test
+ public void testCase2_6() throws Exception
+ {
+ byte payload[] = new byte[125];
+ Arrays.fill(payload,(byte)0xFE);
+ assertSegmentedPingFrame(payload,1);
+ }
+
+ /**
+ * Unsolicited pong frame without payload
+ */
+ @Test
+ public void testCase2_7() throws Exception
+ {
+ BlockheadClient client = new BlockheadClient(server.getServerUri());
+ try
+ {
+ client.connect();
+ client.sendStandardRequest();
+ client.expectUpgradeResponse();
+
+ byte payload[] = new byte[0];
+
+ ByteBuffer buf = ByteBuffer.allocate(256);
+ BufferUtil.clearToFill(buf);
+
+ // Prepare Unsolicited PONG Frame
+ buf.put((byte)(0x00 | FIN | OpCode.PONG.getCode()));
+ putPayloadLength(buf,payload.length);
+ putMask(buf);
+ // buf.put(masked(payload));
+
+ // Prepare CLOSE Frame
+ buf.put((byte)(0x00 | FIN | OpCode.CLOSE.getCode()));
+ putPayloadLength(buf,2);
+ putMask(buf);
+ buf.put(masked(new byte[]
+ { 0x03, (byte)0xE8 }));
+
+ // Write Data Frame
+ BufferUtil.flipToFlush(buf,0);
+ client.writeRaw(buf);
+ client.flush();
+
+ // Read frames
+ IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500);
+
+ // Validate close
+ WebSocketFrame frame = capture.getFrames().pop();
+ Assert.assertThat("CLOSE.frame.opcode",frame.getOpCode(),is(OpCode.CLOSE));
+ CloseInfo close = new CloseInfo(frame);
+ Assert.assertThat("CLOSE.statusCode",close.getStatusCode(),is(StatusCode.NORMAL));
+ }
+ finally
+ {
+ client.disconnect();
+ }
+
+ }
+
+ /**
+ * Unsolicited pong frame with basic payload
+ */
+ @Test
+ public void testCase2_8() throws Exception
+ {
+ BlockheadClient client = new BlockheadClient(server.getServerUri());
+ try
+ {
+ client.connect();
+ client.sendStandardRequest();
+ client.expectUpgradeResponse();
+
+ byte payload[] = StringUtil.getUtf8Bytes("unsolicited");
+
+ ByteBuffer buf = ByteBuffer.allocate(256);
+ BufferUtil.clearToFill(buf);
+
+ // Prepare Unsolicited PONG Frame
+ buf.put((byte)(0x00 | FIN | OpCode.PONG.getCode()));
+ putPayloadLength(buf,payload.length);
+ putMask(buf);
+ buf.put(masked(payload));
+
+ // Prepare CLOSE Frame
+ buf.put((byte)(0x00 | FIN | OpCode.CLOSE.getCode()));
+ putPayloadLength(buf,2);
+ putMask(buf);
+ buf.put(masked(new byte[]
+ { 0x03, (byte)0xE8 }));
+
+ // Write Data Frame
+ BufferUtil.flipToFlush(buf,0);
+ client.writeRaw(buf);
+ client.flush();
+
+ // Read frames
+ IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500);
+
+ // Validate close
+ WebSocketFrame frame = capture.getFrames().pop();
+ Assert.assertThat("CLOSE.frame.opcode",frame.getOpCode(),is(OpCode.CLOSE));
+ CloseInfo close = new CloseInfo(frame);
+ Assert.assertThat("CLOSE.statusCode",close.getStatusCode(),is(StatusCode.NORMAL));
+ }
+ finally
+ {
+ client.disconnect();
+ }
+ }
+
+ /**
+ * Unsolicited pong frame, then ping with basic payload
+ */
+ @Test
+ public void testCase2_9() throws Exception
+ {
+ // send unsolicited pong with payload.
+ // send OUR ping with payload
+ // send close
+ // expect pong with OUR payload
+ // expect close
+
+ BlockheadClient client = new BlockheadClient(server.getServerUri());
+ try
+ {
+ client.connect();
+ client.sendStandardRequest();
+ client.expectUpgradeResponse();
+
+ byte pongPayload[] = StringUtil.getUtf8Bytes("unsolicited");
+
+ ByteBuffer buf = ByteBuffer.allocate(512);
+ BufferUtil.clearToFill(buf);
+
+ // Prepare Unsolicited PONG Frame
+ buf.put((byte)(0x00 | FIN | OpCode.PONG.getCode()));
+ putPayloadLength(buf,pongPayload.length);
+ putMask(buf);
+ buf.put(masked(pongPayload));
+
+ // Prepare our PING with payload
+ byte pingPayload[] = StringUtil.getUtf8Bytes("ping me");
+ buf.put((byte)(0x00 | FIN | OpCode.PING.getCode()));
+ putPayloadLength(buf,pingPayload.length);
+ putMask(buf);
+ buf.put(masked(pingPayload));
+
+ // Prepare CLOSE Frame
+ buf.put((byte)(0x00 | FIN | OpCode.CLOSE.getCode()));
+ putPayloadLength(buf,2);
+ putMask(buf);
+ buf.put(masked(new byte[]
+ { 0x03, (byte)0xE8 }));
+
+ // Write Data Frame
+ BufferUtil.flipToFlush(buf,0);
+ client.writeRaw(buf);
+ client.flush();
+
+ // Read frames
+ IncomingFramesCapture capture = client.readFrames(2,TimeUnit.MILLISECONDS,500);
+
+ // Validate PONG
+ WebSocketFrame frame = capture.getFrames().pop();
+ Assert.assertThat("frame should be PONG frame",frame.getOpCode(),is(OpCode.PONG));
+ Assert.assertThat("PONG.payloadLength",frame.getPayloadLength(),is(pingPayload.length));
+ ByteBufferAssert.assertEquals("PONG.payload",pingPayload,frame.getPayload());
+
+ // Validate close
+ frame = capture.getFrames().pop();
+ Assert.assertThat("CLOSE.frame.opcode",frame.getOpCode(),is(OpCode.CLOSE));
+ CloseInfo close = new CloseInfo(frame);
+ Assert.assertThat("CLOSE.statusCode",close.getStatusCode(),is(StatusCode.NORMAL));
+ }
+ finally
+ {
+ client.disconnect();
+ }
+
+ }
+
+}
diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/IncomingFramesCapture.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/IncomingFramesCapture.java
index fff894c5d3..facf6b4565 100644
--- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/IncomingFramesCapture.java
+++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/IncomingFramesCapture.java
@@ -71,12 +71,12 @@ public class IncomingFramesCapture implements IncomingFrames
public void dump()
{
- System.out.printf("Captured %d incoming frames%n",frames.size());
+ System.err.printf("Captured %d incoming frames%n",frames.size());
for (int i = 0; i < frames.size(); i++)
{
WebSocketFrame frame = frames.get(i);
- System.out.printf("[%3d] %s%n",i,frame);
- System.out.printf(" %s%n",BufferUtil.toDetailString(frame.getPayload()));
+ System.err.printf("[%3d] %s%n",i,frame);
+ System.err.printf(" %s%n",BufferUtil.toDetailString(frame.getPayload()));
}
}
diff --git a/jetty-websocket/websocket-server/src/test/resources/jetty-logging.properties b/jetty-websocket/websocket-server/src/test/resources/jetty-logging.properties
index 9a2364b67d..887eeadef1 100644
--- a/jetty-websocket/websocket-server/src/test/resources/jetty-logging.properties
+++ b/jetty-websocket/websocket-server/src/test/resources/jetty-logging.properties
@@ -4,12 +4,12 @@ org.eclipse.jetty.server.LEVEL=WARN
# org.eclipse.jetty.websocket.LEVEL=WARN
org.eclipse.jetty.websocket.server.helper.RFCSocket.LEVEL=OFF
# See the read/write traffic
-# org.eclipse.jetty.websocket.io.Frames.LEVEL=DEBUG
+org.eclipse.jetty.websocket.io.Frames.LEVEL=DEBUG
# org.eclipse.jetty.websocket.io.LEVEL=DEBUG
# org.eclipse.jetty.websocket.io.WebSocketAsyncConnection.LEVEL=DEBUG
# org.eclipse.jetty.util.thread.QueuedThreadPool.LEVEL=DEBUG
# org.eclipse.jetty.io.SelectorManager.LEVEL=INFO
-# org.eclipse.jetty.websocket.LEVEL=DEBUG
+org.eclipse.jetty.websocket.LEVEL=DEBUG
# org.eclipse.jetty.websocket.driver.WebSocketEventDriver.LEVEL=DEBUG
# org.eclipse.jetty.websocket.extensions.LEVEL=DEBUG
# org.eclipse.jetty.websocket.protocol.Generator.LEVEL=INFO

Back to the top