Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJoakim Erdfelt2015-09-22 21:54:18 +0000
committerJoakim Erdfelt2015-09-22 21:54:18 +0000
commit3901fcbb6072451aab126663d1ea317f76400b5c (patch)
treeb440a2791aa89ca22771c91fb7a4e52c2896287b
parentda3699dc5596c93dbb71156cd9c7978232abff5f (diff)
downloadorg.eclipse.jetty.project-3901fcbb6072451aab126663d1ea317f76400b5c.tar.gz
org.eclipse.jetty.project-3901fcbb6072451aab126663d1ea317f76400b5c.tar.xz
org.eclipse.jetty.project-3901fcbb6072451aab126663d1ea317f76400b5c.zip
474936 - WebSocketSessions are not always cleaned out from openSessions
+ Adding additional testcase to verify behavior
-rw-r--r--jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ManyConnectionsCleanupTest.java366
-rw-r--r--jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketCloseTest.java3
2 files changed, 366 insertions, 3 deletions
diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ManyConnectionsCleanupTest.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ManyConnectionsCleanupTest.java
new file mode 100644
index 0000000000..0adf4a4b4e
--- /dev/null
+++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ManyConnectionsCleanupTest.java
@@ -0,0 +1,366 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.websocket.server;
+
+import static org.hamcrest.Matchers.*;
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.eclipse.jetty.toolchain.test.EventQueue;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.util.log.StacklessLogging;
+import org.eclipse.jetty.util.log.StdErrLog;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.StatusCode;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+import org.eclipse.jetty.websocket.common.CloseInfo;
+import org.eclipse.jetty.websocket.common.OpCode;
+import org.eclipse.jetty.websocket.common.WebSocketFrame;
+import org.eclipse.jetty.websocket.common.WebSocketSession;
+import org.eclipse.jetty.websocket.common.frames.TextFrame;
+import org.eclipse.jetty.websocket.common.test.BlockheadClient;
+import org.eclipse.jetty.websocket.server.helper.RFCSocket;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
+import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests various close scenarios that should result in Open Session cleanup
+ */
+public class ManyConnectionsCleanupTest
+{
+ static class AbstractCloseSocket extends WebSocketAdapter
+ {
+ public CountDownLatch closeLatch = new CountDownLatch(1);
+ public String closeReason = null;
+ public int closeStatusCode = -1;
+ public List<Throwable> errors = new ArrayList<>();
+
+ @Override
+ public void onWebSocketClose(int statusCode, String reason)
+ {
+ LOG.debug("onWebSocketClose({}, {})",statusCode,reason);
+ this.closeStatusCode = statusCode;
+ this.closeReason = reason;
+ closeLatch.countDown();
+ }
+
+ @Override
+ public void onWebSocketError(Throwable cause)
+ {
+ errors.add(cause);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static class CloseServlet extends WebSocketServlet implements WebSocketCreator
+ {
+ private WebSocketServerFactory serverFactory;
+ private AtomicInteger calls = new AtomicInteger(0);
+
+ @Override
+ public void configure(WebSocketServletFactory factory)
+ {
+ factory.setCreator(this);
+ if (factory instanceof WebSocketServerFactory)
+ {
+ this.serverFactory = (WebSocketServerFactory)factory;
+ }
+ }
+
+ @Override
+ public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp)
+ {
+ if (req.hasSubProtocol("fastclose"))
+ {
+ closeSocket = new FastCloseSocket(calls);
+ return closeSocket;
+ }
+
+ if (req.hasSubProtocol("fastfail"))
+ {
+ closeSocket = new FastFailSocket(calls);
+ return closeSocket;
+ }
+
+ if (req.hasSubProtocol("container"))
+ {
+ closeSocket = new ContainerSocket(serverFactory,calls);
+ return closeSocket;
+ }
+ return new RFCSocket();
+ }
+ }
+
+ /**
+ * On Message, return container information
+ */
+ public static class ContainerSocket extends AbstractCloseSocket
+ {
+ private static final Logger LOG = Log.getLogger(ManyConnectionsCleanupTest.ContainerSocket.class);
+ private final WebSocketServerFactory container;
+ private final AtomicInteger calls;
+ private Session session;
+
+ public ContainerSocket(WebSocketServerFactory container, AtomicInteger calls)
+ {
+ this.container = container;
+ this.calls = calls;
+ }
+
+ @Override
+ public void onWebSocketText(String message)
+ {
+ LOG.debug("onWebSocketText({})",message);
+ calls.incrementAndGet();
+ if (message.equalsIgnoreCase("openSessions"))
+ {
+ Set<WebSocketSession> sessions = container.getOpenSessions();
+
+ StringBuilder ret = new StringBuilder();
+ ret.append("openSessions.size=").append(sessions.size()).append('\n');
+ int idx = 0;
+ for (WebSocketSession sess : sessions)
+ {
+ ret.append('[').append(idx++).append("] ").append(sess.toString()).append('\n');
+ }
+ session.getRemote().sendStringByFuture(ret.toString());
+ session.close(StatusCode.NORMAL,"ContainerSocket");
+ } else if(message.equalsIgnoreCase("calls"))
+ {
+ session.getRemote().sendStringByFuture(String.format("calls=%,d",calls.get()));
+ }
+ }
+
+ @Override
+ public void onWebSocketConnect(Session sess)
+ {
+ LOG.debug("onWebSocketConnect({})",sess);
+ this.session = sess;
+ }
+ }
+
+ /**
+ * On Connect, close socket
+ */
+ public static class FastCloseSocket extends AbstractCloseSocket
+ {
+ private static final Logger LOG = Log.getLogger(ManyConnectionsCleanupTest.FastCloseSocket.class);
+ private final AtomicInteger calls;
+
+ public FastCloseSocket(AtomicInteger calls)
+ {
+ this.calls = calls;
+ }
+
+ @Override
+ public void onWebSocketConnect(Session sess)
+ {
+ LOG.debug("onWebSocketConnect({})",sess);
+ calls.incrementAndGet();
+ sess.close(StatusCode.NORMAL,"FastCloseServer");
+ }
+ }
+
+ /**
+ * On Connect, throw unhandled exception
+ */
+ public static class FastFailSocket extends AbstractCloseSocket
+ {
+ private static final Logger LOG = Log.getLogger(ManyConnectionsCleanupTest.FastFailSocket.class);
+ private final AtomicInteger calls;
+
+ public FastFailSocket(AtomicInteger calls)
+ {
+ this.calls = calls;
+ }
+
+ @Override
+ public void onWebSocketConnect(Session sess)
+ {
+ LOG.debug("onWebSocketConnect({})",sess);
+ calls.incrementAndGet();
+ // Test failure due to unhandled exception
+ // this should trigger a fast-fail closure during open/connect
+ throw new RuntimeException("Intentional FastFail");
+ }
+ }
+
+ private static final Logger LOG = Log.getLogger(ManyConnectionsCleanupTest.class);
+
+ private static SimpleServletServer server;
+ private static AbstractCloseSocket closeSocket;
+
+ @BeforeClass
+ public static void startServer() throws Exception
+ {
+ server = new SimpleServletServer(new CloseServlet());
+ server.start();
+ }
+
+ @AfterClass
+ public static void stopServer()
+ {
+ server.stop();
+ }
+
+ /**
+ * Test session open session cleanup (bug #474936)
+ *
+ * @throws Exception
+ * on test failure
+ */
+ @Test
+ public void testOpenSessionCleanup() throws Exception
+ {
+ int iterationCount = 100;
+
+ StdErrLog.getLogger(FastFailSocket.class).setLevel(StdErrLog.LEVEL_OFF);
+
+ StdErrLog sessLog = StdErrLog.getLogger(WebSocketSession.class);
+ int oldLevel = sessLog.getLevel();
+ sessLog.setLevel(StdErrLog.LEVEL_OFF);
+
+ for (int requests = 0; requests < iterationCount; requests++)
+ {
+ fastFail();
+ fastClose();
+ dropConnection();
+ }
+
+ sessLog.setLevel(oldLevel);
+
+ try (BlockheadClient client = new BlockheadClient(server.getServerUri()))
+ {
+ client.setProtocols("container");
+ client.setTimeout(1,TimeUnit.SECONDS);
+ client.connect();
+ client.sendStandardRequest();
+ client.expectUpgradeResponse();
+
+ client.write(new TextFrame().setPayload("calls"));
+ client.write(new TextFrame().setPayload("openSessions"));
+
+ EventQueue<WebSocketFrame> frames = client.readFrames(3,6,TimeUnit.SECONDS);
+ WebSocketFrame frame;
+ String resp;
+
+ frame = frames.poll();
+ assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.TEXT));
+ resp = frame.getPayloadAsUTF8();
+ assertThat("Should only have 1 open session",resp,containsString("calls=" + ((iterationCount * 2) + 1)));
+
+ frame = frames.poll();
+ assertThat("frames[1].opcode",frame.getOpCode(),is(OpCode.TEXT));
+ resp = frame.getPayloadAsUTF8();
+ assertThat("Should only have 1 open session",resp,containsString("openSessions.size=1\n"));
+
+ frame = frames.poll();
+ assertThat("frames[2].opcode",frame.getOpCode(),is(OpCode.CLOSE));
+ CloseInfo close = new CloseInfo(frame);
+ assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.NORMAL));
+ client.write(close.asFrame()); // respond with close
+
+ // ensure server socket got close event
+ assertThat("Open Sessions Latch",closeSocket.closeLatch.await(1,TimeUnit.SECONDS),is(true));
+ assertThat("Open Sessions.statusCode",closeSocket.closeStatusCode,is(StatusCode.NORMAL));
+ assertThat("Open Sessions.errors",closeSocket.errors.size(),is(0));
+ }
+ }
+
+ private void fastClose() throws Exception
+ {
+ try (BlockheadClient client = new BlockheadClient(server.getServerUri()))
+ {
+ client.setProtocols("fastclose");
+ client.setTimeout(1,TimeUnit.SECONDS);
+ try (StacklessLogging scope = new StacklessLogging(WebSocketSession.class))
+ {
+ client.connect();
+ client.sendStandardRequest();
+ client.expectUpgradeResponse();
+
+ client.readFrames(1,1,TimeUnit.SECONDS);
+
+ CloseInfo close = new CloseInfo(StatusCode.NORMAL,"Normal");
+ assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.NORMAL));
+
+ // Notify server of close handshake
+ client.write(close.asFrame()); // respond with close
+
+ // ensure server socket got close event
+ assertThat("Fast Close Latch",closeSocket.closeLatch.await(1,TimeUnit.SECONDS),is(true));
+ assertThat("Fast Close.statusCode",closeSocket.closeStatusCode,is(StatusCode.NORMAL));
+ }
+ }
+ }
+
+ private void fastFail() throws Exception
+ {
+ try (BlockheadClient client = new BlockheadClient(server.getServerUri()))
+ {
+ client.setProtocols("fastfail");
+ client.setTimeout(1,TimeUnit.SECONDS);
+ try (StacklessLogging scope = new StacklessLogging(WebSocketSession.class))
+ {
+ client.connect();
+ client.sendStandardRequest();
+ client.expectUpgradeResponse();
+
+ client.readFrames(1,1,TimeUnit.SECONDS);
+
+ CloseInfo close = new CloseInfo(StatusCode.NORMAL,"Normal");
+ client.write(close.asFrame()); // respond with close
+
+ // ensure server socket got close event
+ assertThat("Fast Fail Latch",closeSocket.closeLatch.await(1,TimeUnit.SECONDS),is(true));
+ assertThat("Fast Fail.statusCode",closeSocket.closeStatusCode,is(StatusCode.SERVER_ERROR));
+ assertThat("Fast Fail.errors",closeSocket.errors.size(),is(1));
+ }
+ }
+ }
+
+ private void dropConnection() throws Exception
+ {
+ try (BlockheadClient client = new BlockheadClient(server.getServerUri()))
+ {
+ client.setProtocols("container");
+ client.setTimeout(1,TimeUnit.SECONDS);
+ try (StacklessLogging scope = new StacklessLogging(WebSocketSession.class))
+ {
+ client.connect();
+ client.sendStandardRequest();
+ client.expectUpgradeResponse();
+ client.disconnect();
+ }
+ }
+ }
+}
diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketCloseTest.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketCloseTest.java
index 392dba798a..940829def6 100644
--- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketCloseTest.java
+++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/WebSocketCloseTest.java
@@ -50,7 +50,6 @@ import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.Ignore;
/**
* Tests various close scenarios
@@ -218,7 +217,6 @@ public class WebSocketCloseTest
* on test failure
*/
@Test
- @Ignore("RELEASE")
public void testFastClose() throws Exception
{
try (BlockheadClient client = new BlockheadClient(server.getServerUri()))
@@ -252,7 +250,6 @@ public class WebSocketCloseTest
* on test failure
*/
@Test
- @Ignore("RELEASE")
public void testFastFail() throws Exception
{
try (BlockheadClient client = new BlockheadClient(server.getServerUri()))

Back to the top