diff options
author | Joakim Erdfelt | 2012-07-30 21:51:34 +0000 |
---|---|---|
committer | Joakim Erdfelt | 2012-07-30 21:51:34 +0000 |
commit | 7d2d40dc2b8dc1b858152e0dc795206cd3214049 (patch) | |
tree | 0186007c5bfb634446f8fb741fa8a1b9ae8b83b3 | |
parent | e75e0e9a04d733f1b0134d5b509aa052ba2cbd8f (diff) | |
download | org.eclipse.jetty.project-7d2d40dc2b8dc1b858152e0dc795206cd3214049.tar.gz org.eclipse.jetty.project-7d2d40dc2b8dc1b858152e0dc795206cd3214049.tar.xz org.eclipse.jetty.project-7d2d40dc2b8dc1b858152e0dc795206cd3214049.zip |
Fixing PING vs CLOSE ordering issues with outgoing queue
4 files changed, 500 insertions, 9 deletions
diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java index aaf178d641..ab9e7d79d0 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java @@ -41,6 +41,7 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.protocol.CloseInfo; import org.eclipse.jetty.websocket.protocol.ExtensionConfig; import org.eclipse.jetty.websocket.protocol.Generator; +import org.eclipse.jetty.websocket.protocol.OpCode; import org.eclipse.jetty.websocket.protocol.Parser; import org.eclipse.jetty.websocket.protocol.WebSocketFrame; @@ -234,18 +235,31 @@ public abstract class WebSocketAsyncConnection extends AbstractAsyncConnection i @Override public <C> void output(C context, Callback<C> callback, WebSocketFrame frame) { + if (LOG.isDebugEnabled()) + { + LOG.debug("output({}, {}, {})",context,callback,frame); + } + synchronized (queue) { + FrameBytes<C> bytes = null; + if (frame.getOpCode().isControlFrame()) { - ControlFrameBytes<C> bytes = new ControlFrameBytes<C>(this,callback,context,frame); - scheduleTimeout(bytes); + bytes = new ControlFrameBytes<C>(this,callback,context,frame); + } + else + { + bytes = new DataFrameBytes<C>(this,callback,context,frame); + } + + scheduleTimeout(bytes); + if (frame.getOpCode() == OpCode.PING) + { queue.prepend(bytes); } else { - DataFrameBytes<C> bytes = new DataFrameBytes<C>(this,callback,context,frame); - scheduleTimeout(bytes); queue.append(bytes); } } @@ -266,6 +280,7 @@ public abstract class WebSocketAsyncConnection extends AbstractAsyncConnection i } else if (filled < 0) { + LOG.debug("read - EOF Reached"); disconnect(false); return -1; } diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/TestABCase2.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/TestABCase2.java new file mode 100644 index 0000000000..98d9a9b528 --- /dev/null +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/TestABCase2.java @@ -0,0 +1,476 @@ +package org.eclipse.jetty.websocket.server.ab; + +import static org.hamcrest.Matchers.*; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.websocket.api.StatusCode; +import org.eclipse.jetty.websocket.protocol.CloseInfo; +import org.eclipse.jetty.websocket.protocol.Generator; +import org.eclipse.jetty.websocket.protocol.OpCode; +import org.eclipse.jetty.websocket.protocol.WebSocketFrame; +import org.eclipse.jetty.websocket.server.ByteBufferAssert; +import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient; +import org.eclipse.jetty.websocket.server.helper.IncomingFramesCapture; +import org.junit.Assert; +import org.junit.Test; + +public class TestABCase2 extends AbstractABCase +{ + private void assertPingFrame(byte[] payload) throws Exception + { + boolean hasPayload = ((payload != null) && (payload.length > 0)); + + BlockheadClient client = new BlockheadClient(server.getServerUri()); + try + { + client.connect(); + client.sendStandardRequest(); + client.expectUpgradeResponse(); + + int len = 0; + if (hasPayload) + { + len = payload.length; + } + + ByteBuffer buf = ByteBuffer.allocate(len + Generator.OVERHEAD); + BufferUtil.clearToFill(buf); + + // Prepare PING Frame + buf.put((byte)(0x00 | FIN | OpCode.PING.getCode())); + putPayloadLength(buf,len); + putMask(buf); + if (hasPayload) + { + buf.put(masked(payload)); + } + + // Prepare CLOSE Frame + buf.put((byte)(0x00 | FIN | OpCode.CLOSE.getCode())); + putPayloadLength(buf,2); + putMask(buf); + buf.put(masked(new byte[] + { 0x03, (byte)0xE8 })); + + // Write Data Frame + BufferUtil.flipToFlush(buf,0); + client.writeRaw(buf); + client.flush(); + + // Read frames + IncomingFramesCapture capture = client.readFrames(2,TimeUnit.MILLISECONDS,500); + + // Validate echo'd frame + WebSocketFrame frame = capture.getFrames().get(0); + Assert.assertThat("frame should be PONG frame",frame.getOpCode(),is(OpCode.PONG)); + if (hasPayload) + { + Assert.assertThat("PONG.payloadLength",frame.getPayloadLength(),is(payload.length)); + ByteBufferAssert.assertEquals("PONG.payload",payload,frame.getPayload()); + } + else + { + Assert.assertThat("PONG.payloadLength",frame.getPayloadLength(),is(0)); + } + + // Validate close + frame = capture.getFrames().get(1); + Assert.assertThat("CLOSE.frame.opcode",frame.getOpCode(),is(OpCode.CLOSE)); + CloseInfo close = new CloseInfo(frame); + Assert.assertThat("CLOSE.statusCode",close.getStatusCode(),is(StatusCode.NORMAL)); + } + finally + { + client.disconnect(); + } + } + + private void assertProtocolError(byte[] payload) throws Exception + { + BlockheadClient client = new BlockheadClient(server.getServerUri()); + try + { + client.connect(); + client.sendStandardRequest(); + client.expectUpgradeResponse(); + + ByteBuffer buf = ByteBuffer.allocate(payload.length + Generator.OVERHEAD); + BufferUtil.clearToFill(buf); + + // Prepare PING Frame + buf.put((byte)(0x00 | FIN | OpCode.PING.getCode())); + putPayloadLength(buf,payload.length); + putMask(buf); + buf.put(masked(payload)); + + // Prepare CLOSE Frame + buf.put((byte)(0x00 | FIN | OpCode.CLOSE.getCode())); + putPayloadLength(buf,2); + putMask(buf); + buf.put(masked(new byte[] + { 0x03, (byte)0xE8 })); + + // Write Data Frame + BufferUtil.flipToFlush(buf,0); + client.writeRaw(buf); + client.flush(); + + // Read frames + IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500); + + // Validate close w/ Protocol Error + WebSocketFrame frame = capture.getFrames().pop(); + Assert.assertThat("CLOSE.frame.opcode",frame.getOpCode(),is(OpCode.CLOSE)); + CloseInfo close = new CloseInfo(frame); + Assert.assertThat("CLOSE.statusCode",close.getStatusCode(),is(StatusCode.PROTOCOL)); + } + finally + { + client.disconnect(); + } + } + + /** + * Send a ping frame as separate segments, in an inefficient way. + * + * @param payload + * the payload + * @param segmentSize + * the segment size for each inefficient segment (flush between) + */ + private void assertSegmentedPingFrame(byte[] payload, int segmentSize) throws Exception + { + Assert.assertThat("payload exists for segmented send",payload,notNullValue()); + Assert.assertThat("payload exists for segmented send",payload.length,greaterThan(0)); + + BlockheadClient client = new BlockheadClient(server.getServerUri()); + try + { + client.connect(); + client.sendStandardRequest(); + client.expectUpgradeResponse(); + + ByteBuffer buf = ByteBuffer.allocate(payload.length + Generator.OVERHEAD); + BufferUtil.clearToFill(buf); + + // Prepare PING Frame + buf.put((byte)(0x00 | FIN | OpCode.PING.getCode())); + putPayloadLength(buf,payload.length); + putMask(buf); + buf.put(masked(payload)); + + // Prepare CLOSE Frame + buf.put((byte)(0x00 | FIN | OpCode.CLOSE.getCode())); + putPayloadLength(buf,2); + putMask(buf); + buf.put(masked(new byte[] + { 0x03, (byte)0xE8 })); + + // Write Data Frame + BufferUtil.flipToFlush(buf,0); + int origLimit = buf.limit(); + int limit = buf.limit(); + int len; + int pos = buf.position(); + int overallLeft = buf.remaining(); + while (overallLeft > 0) + { + buf.position(pos); + limit = Math.min(origLimit,pos + segmentSize); + buf.limit(limit); + len = buf.remaining(); + overallLeft -= len; + pos += len; + client.writeRaw(buf); + client.flush(); + } + + // Read frames + IncomingFramesCapture capture = client.readFrames(2,TimeUnit.MILLISECONDS,500); + + // Validate echo'd frame + WebSocketFrame frame = capture.getFrames().get(0); + Assert.assertThat("frame should be PONG frame",frame.getOpCode(),is(OpCode.PONG)); + Assert.assertThat("PONG.payloadLength",frame.getPayloadLength(),is(payload.length)); + ByteBufferAssert.assertEquals("PONG.payload",payload,frame.getPayload()); + + // Validate close + frame = capture.getFrames().get(1); + Assert.assertThat("CLOSE.frame.opcode",frame.getOpCode(),is(OpCode.CLOSE)); + CloseInfo close = new CloseInfo(frame); + Assert.assertThat("CLOSE.statusCode",close.getStatusCode(),is(StatusCode.NORMAL)); + } + finally + { + client.disconnect(); + } + } + + /** + * Ping without payload + */ + @Test + public void testCase2_1() throws Exception + { + byte payload[] = new byte[0]; + assertPingFrame(payload); + } + + /** + * 10 pings + */ + @Test + public void testCase2_10() throws Exception + { + // send 10 pings each with unique payload + // send close + // expect 10 pongs with OUR payload + // expect close + } + + /** + * 10 pings, sent slowly + */ + @Test + public void testCase2_11() throws Exception + { + // send 10 pings (slowly) each with unique payload + // send close + // expect 10 pongs with OUR payload + // expect close + } + + /** + * Ping with small text payload + */ + @Test + public void testCase2_2() throws Exception + { + byte payload[] = StringUtil.getUtf8Bytes("Hello world"); + assertPingFrame(payload); + } + + /** + * Ping with small binary (non-utf8) payload + */ + @Test + public void testCase2_3() throws Exception + { + byte payload[] = new byte[] + { 0x00, (byte)0xFF, (byte)0xFE, (byte)0xFD, (byte)0xFC, (byte)0xFB, 0x00, (byte)0xFF }; + assertPingFrame(payload); + } + + /** + * Ping with 125 byte binary payload + */ + @Test + public void testCase2_4() throws Exception + { + byte payload[] = new byte[125]; + Arrays.fill(payload,(byte)0xFE); + assertPingFrame(payload); + } + + /** + * Ping with 126 byte binary payload + */ + @Test + public void testCase2_5() throws Exception + { + byte payload[] = new byte[126]; + Arrays.fill(payload,(byte)0xFE); + assertProtocolError(payload); + } + + /** + * Ping with 125 byte binary payload (slow send) + */ + @Test + public void testCase2_6() throws Exception + { + byte payload[] = new byte[125]; + Arrays.fill(payload,(byte)0xFE); + assertSegmentedPingFrame(payload,1); + } + + /** + * Unsolicited pong frame without payload + */ + @Test + public void testCase2_7() throws Exception + { + BlockheadClient client = new BlockheadClient(server.getServerUri()); + try + { + client.connect(); + client.sendStandardRequest(); + client.expectUpgradeResponse(); + + byte payload[] = new byte[0]; + + ByteBuffer buf = ByteBuffer.allocate(256); + BufferUtil.clearToFill(buf); + + // Prepare Unsolicited PONG Frame + buf.put((byte)(0x00 | FIN | OpCode.PONG.getCode())); + putPayloadLength(buf,payload.length); + putMask(buf); + // buf.put(masked(payload)); + + // Prepare CLOSE Frame + buf.put((byte)(0x00 | FIN | OpCode.CLOSE.getCode())); + putPayloadLength(buf,2); + putMask(buf); + buf.put(masked(new byte[] + { 0x03, (byte)0xE8 })); + + // Write Data Frame + BufferUtil.flipToFlush(buf,0); + client.writeRaw(buf); + client.flush(); + + // Read frames + IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500); + + // Validate close + WebSocketFrame frame = capture.getFrames().pop(); + Assert.assertThat("CLOSE.frame.opcode",frame.getOpCode(),is(OpCode.CLOSE)); + CloseInfo close = new CloseInfo(frame); + Assert.assertThat("CLOSE.statusCode",close.getStatusCode(),is(StatusCode.NORMAL)); + } + finally + { + client.disconnect(); + } + + } + + /** + * Unsolicited pong frame with basic payload + */ + @Test + public void testCase2_8() throws Exception + { + BlockheadClient client = new BlockheadClient(server.getServerUri()); + try + { + client.connect(); + client.sendStandardRequest(); + client.expectUpgradeResponse(); + + byte payload[] = StringUtil.getUtf8Bytes("unsolicited"); + + ByteBuffer buf = ByteBuffer.allocate(256); + BufferUtil.clearToFill(buf); + + // Prepare Unsolicited PONG Frame + buf.put((byte)(0x00 | FIN | OpCode.PONG.getCode())); + putPayloadLength(buf,payload.length); + putMask(buf); + buf.put(masked(payload)); + + // Prepare CLOSE Frame + buf.put((byte)(0x00 | FIN | OpCode.CLOSE.getCode())); + putPayloadLength(buf,2); + putMask(buf); + buf.put(masked(new byte[] + { 0x03, (byte)0xE8 })); + + // Write Data Frame + BufferUtil.flipToFlush(buf,0); + client.writeRaw(buf); + client.flush(); + + // Read frames + IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500); + + // Validate close + WebSocketFrame frame = capture.getFrames().pop(); + Assert.assertThat("CLOSE.frame.opcode",frame.getOpCode(),is(OpCode.CLOSE)); + CloseInfo close = new CloseInfo(frame); + Assert.assertThat("CLOSE.statusCode",close.getStatusCode(),is(StatusCode.NORMAL)); + } + finally + { + client.disconnect(); + } + } + + /** + * Unsolicited pong frame, then ping with basic payload + */ + @Test + public void testCase2_9() throws Exception + { + // send unsolicited pong with payload. + // send OUR ping with payload + // send close + // expect pong with OUR payload + // expect close + + BlockheadClient client = new BlockheadClient(server.getServerUri()); + try + { + client.connect(); + client.sendStandardRequest(); + client.expectUpgradeResponse(); + + byte pongPayload[] = StringUtil.getUtf8Bytes("unsolicited"); + + ByteBuffer buf = ByteBuffer.allocate(512); + BufferUtil.clearToFill(buf); + + // Prepare Unsolicited PONG Frame + buf.put((byte)(0x00 | FIN | OpCode.PONG.getCode())); + putPayloadLength(buf,pongPayload.length); + putMask(buf); + buf.put(masked(pongPayload)); + + // Prepare our PING with payload + byte pingPayload[] = StringUtil.getUtf8Bytes("ping me"); + buf.put((byte)(0x00 | FIN | OpCode.PING.getCode())); + putPayloadLength(buf,pingPayload.length); + putMask(buf); + buf.put(masked(pingPayload)); + + // Prepare CLOSE Frame + buf.put((byte)(0x00 | FIN | OpCode.CLOSE.getCode())); + putPayloadLength(buf,2); + putMask(buf); + buf.put(masked(new byte[] + { 0x03, (byte)0xE8 })); + + // Write Data Frame + BufferUtil.flipToFlush(buf,0); + client.writeRaw(buf); + client.flush(); + + // Read frames + IncomingFramesCapture capture = client.readFrames(2,TimeUnit.MILLISECONDS,500); + + // Validate PONG + WebSocketFrame frame = capture.getFrames().pop(); + Assert.assertThat("frame should be PONG frame",frame.getOpCode(),is(OpCode.PONG)); + Assert.assertThat("PONG.payloadLength",frame.getPayloadLength(),is(pingPayload.length)); + ByteBufferAssert.assertEquals("PONG.payload",pingPayload,frame.getPayload()); + + // Validate close + frame = capture.getFrames().pop(); + Assert.assertThat("CLOSE.frame.opcode",frame.getOpCode(),is(OpCode.CLOSE)); + CloseInfo close = new CloseInfo(frame); + Assert.assertThat("CLOSE.statusCode",close.getStatusCode(),is(StatusCode.NORMAL)); + } + finally + { + client.disconnect(); + } + + } + +} diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/IncomingFramesCapture.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/IncomingFramesCapture.java index fff894c5d3..facf6b4565 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/IncomingFramesCapture.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/helper/IncomingFramesCapture.java @@ -71,12 +71,12 @@ public class IncomingFramesCapture implements IncomingFrames public void dump() { - System.out.printf("Captured %d incoming frames%n",frames.size()); + System.err.printf("Captured %d incoming frames%n",frames.size()); for (int i = 0; i < frames.size(); i++) { WebSocketFrame frame = frames.get(i); - System.out.printf("[%3d] %s%n",i,frame); - System.out.printf(" %s%n",BufferUtil.toDetailString(frame.getPayload())); + System.err.printf("[%3d] %s%n",i,frame); + System.err.printf(" %s%n",BufferUtil.toDetailString(frame.getPayload())); } } diff --git a/jetty-websocket/websocket-server/src/test/resources/jetty-logging.properties b/jetty-websocket/websocket-server/src/test/resources/jetty-logging.properties index 9a2364b67d..887eeadef1 100644 --- a/jetty-websocket/websocket-server/src/test/resources/jetty-logging.properties +++ b/jetty-websocket/websocket-server/src/test/resources/jetty-logging.properties @@ -4,12 +4,12 @@ org.eclipse.jetty.server.LEVEL=WARN # org.eclipse.jetty.websocket.LEVEL=WARN org.eclipse.jetty.websocket.server.helper.RFCSocket.LEVEL=OFF # See the read/write traffic -# org.eclipse.jetty.websocket.io.Frames.LEVEL=DEBUG +org.eclipse.jetty.websocket.io.Frames.LEVEL=DEBUG # org.eclipse.jetty.websocket.io.LEVEL=DEBUG # org.eclipse.jetty.websocket.io.WebSocketAsyncConnection.LEVEL=DEBUG # org.eclipse.jetty.util.thread.QueuedThreadPool.LEVEL=DEBUG # org.eclipse.jetty.io.SelectorManager.LEVEL=INFO -# org.eclipse.jetty.websocket.LEVEL=DEBUG +org.eclipse.jetty.websocket.LEVEL=DEBUG # org.eclipse.jetty.websocket.driver.WebSocketEventDriver.LEVEL=DEBUG # org.eclipse.jetty.websocket.extensions.LEVEL=DEBUG # org.eclipse.jetty.websocket.protocol.Generator.LEVEL=INFO |