Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJoakim Erdfelt2013-02-21 22:30:56 +0000
committerJoakim Erdfelt2013-02-21 22:32:37 +0000
commit61470cde2eec0db2562ac0495f144a65158288a9 (patch)
tree491d0114872fc1e55c80d2c5f7ffab91a059d1a3 /jetty-websocket
parent8418acdae45d08ab346d9af3dc7f2908381fa4dd (diff)
downloadorg.eclipse.jetty.project-61470cde2eec0db2562ac0495f144a65158288a9.tar.gz
org.eclipse.jetty.project-61470cde2eec0db2562ac0495f144a65158288a9.tar.xz
org.eclipse.jetty.project-61470cde2eec0db2562ac0495f144a65158288a9.zip
401427 - WebSocket messages sent from onConnect fail to be read by jetty websocket-client
+ Adding carryover of bytes remaining from UpgradeConnection to AbstractWebSocketConnection.parser
Diffstat (limited to 'jetty-websocket')
-rw-r--r--jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/ClientUpgradeResponse.java13
-rw-r--r--jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/HttpResponseHeaderParser.java6
-rw-r--r--jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/UpgradeConnection.java4
-rw-r--r--jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java16
-rw-r--r--jetty-websocket/websocket-client/src/test/resources/jetty-logging.properties10
-rw-r--r--jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java45
-rw-r--r--jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerConnection.java12
7 files changed, 86 insertions, 20 deletions
diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/ClientUpgradeResponse.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/ClientUpgradeResponse.java
index e5de092b2a..7c9e971cb9 100644
--- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/ClientUpgradeResponse.java
+++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/ClientUpgradeResponse.java
@@ -19,19 +19,32 @@
package org.eclipse.jetty.websocket.client;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
public class ClientUpgradeResponse extends UpgradeResponse
{
+ private ByteBuffer remainingBuffer;
+
public ClientUpgradeResponse()
{
super();
}
+ public ByteBuffer getRemainingBuffer()
+ {
+ return remainingBuffer;
+ }
+
@Override
public void sendForbidden(String message) throws IOException
{
throw new UnsupportedOperationException("Not supported on client implementation");
}
+
+ public void setRemainingBuffer(ByteBuffer remainingBuffer)
+ {
+ this.remainingBuffer = remainingBuffer;
+ }
}
diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/HttpResponseHeaderParser.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/HttpResponseHeaderParser.java
index 87da0304d4..8b67dfdef4 100644
--- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/HttpResponseHeaderParser.java
+++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/HttpResponseHeaderParser.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Utf8LineParser;
import org.eclipse.jetty.websocket.client.ClientUpgradeResponse;
@@ -79,6 +80,11 @@ public class HttpResponseHeaderParser
{
if (parseHeader(line))
{
+ // Finished parsing entire header
+ ByteBuffer copy = ByteBuffer.allocate(buf.remaining());
+ BufferUtil.put(buf,copy);
+ BufferUtil.flipToFlush(copy,0);
+ this.response.setRemainingBuffer(copy);
return this.response;
}
}
diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/UpgradeConnection.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/UpgradeConnection.java
index e28af0f279..b37a02a4a8 100644
--- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/UpgradeConnection.java
+++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/UpgradeConnection.java
@@ -180,6 +180,10 @@ public class UpgradeConnection extends AbstractConnection
validateResponse(resp);
notifyConnect(resp);
upgradeConnection(resp);
+ if (buffer.hasRemaining())
+ {
+ LOG.debug("Has remaining client bytebuffer of {}",buffer.remaining());
+ }
return false; // do no more reading
}
}
diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java
index edbca56173..d1d4a6ef65 100644
--- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java
+++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java
@@ -19,7 +19,9 @@
package org.eclipse.jetty.websocket.client.io;
import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.log.Log;
@@ -41,13 +43,12 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
private static final Logger LOG = Log.getLogger(WebSocketClientConnection.class);
private final ConnectPromise connectPromise;
private final Masker masker;
- private boolean connected;
+ private final AtomicBoolean opened = new AtomicBoolean(false);
public WebSocketClientConnection(EndPoint endp, Executor executor, ConnectPromise connectPromise)
{
super(endp,executor,connectPromise.getClient().getScheduler(),connectPromise.getClient().getPolicy(),connectPromise.getClient().getBufferPool());
this.connectPromise = connectPromise;
- this.connected = false;
this.masker = connectPromise.getMasker();
assert (this.masker != null);
}
@@ -75,13 +76,20 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
@Override
public void onOpen()
{
- if (!connected)
+ boolean beenOpened = opened.getAndSet(true);
+ if (!beenOpened)
{
WebSocketSession session = getSession();
ConnectionManager connectionManager = connectPromise.getClient().getConnectionManager();
connectionManager.addSession(session);
connectPromise.succeeded(session);
- connected = true;
+
+ ByteBuffer extraBuf = connectPromise.getResponse().getRemainingBuffer();
+ if (extraBuf.hasRemaining())
+ {
+ LOG.debug("Parsing extra remaining buffer from UpgradeConnection");
+ getParser().parse(extraBuf);
+ }
}
super.onOpen();
}
diff --git a/jetty-websocket/websocket-client/src/test/resources/jetty-logging.properties b/jetty-websocket/websocket-client/src/test/resources/jetty-logging.properties
index 1aed71b558..f5e58f4560 100644
--- a/jetty-websocket/websocket-client/src/test/resources/jetty-logging.properties
+++ b/jetty-websocket/websocket-client/src/test/resources/jetty-logging.properties
@@ -6,15 +6,5 @@ org.eclipse.jetty.LEVEL=WARN
# org.eclipse.jetty.websocket.client.TrackingSocket.LEVEL=DEBUG
# Hide the stacktraces during testing
org.eclipse.jetty.websocket.client.internal.io.UpgradeConnection.STACKS=false
-# See the read/write traffic
-# org.eclipse.jetty.websocket.io.Frames.LEVEL=DEBUG
-# org.eclipse.jetty.websocket.io.LEVEL=DEBUG
-# org.eclipse.jetty.websocket.io.WebSocketAsyncConnection.LEVEL=DEBUG
# org.eclipse.jetty.io.SelectorManager.LEVEL=INFO
# org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection$DataFrameBytes.LEVEL=WARN
-# org.eclipse.jetty.websocket.io.ControlFrameBytes.LEVEL=DEBUG
-# org.eclipse.jetty.websocket.driver.WebSocketEventDriver.LEVEL=DEBUG
-# org.eclipse.jetty.websocket.extensions.LEVEL=DEBUG
-# org.eclipse.jetty.websocket.protocol.Generator.LEVEL=INFO
-# org.eclipse.jetty.websocket.protocol.Parser.LEVEL=DEBUG
-# org.eclipse.jetty.websocket.io.payload.LEVEL=DEBUG
diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java
index 81fca18443..3c5b6b2dd0 100644
--- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java
+++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
@@ -121,6 +122,24 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
}
+ public static class Stats {
+ private AtomicLong countFillInterestedEvents = new AtomicLong(0);
+ private AtomicLong countOnFillableEvents = new AtomicLong(0);
+ private AtomicLong countFillableErrors = new AtomicLong(0);
+
+ public long getFillableErrorCount() {
+ return countFillableErrors.get();
+ }
+
+ public long getFillInterestedCount() {
+ return countFillInterestedEvents.get();
+ }
+
+ public long getOnFillableCount() {
+ return countOnFillableEvents.get();
+ }
+ }
+
private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
/**
@@ -141,11 +160,11 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
private boolean flushing;
private boolean isFilling;
private IOState ioState;
+ private Stats stats = new Stats();
public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
{
super(endp,executor,EXECUTE_ONFILLABLE); // TODO review if this is best. Specifically with MUX
- endp.setIdleTimeout(policy.getIdleTimeout());
this.policy = policy;
this.bufferPool = bufferPool;
this.generator = new Generator(policy,bufferPool);
@@ -246,6 +265,13 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
}
+ @Override
+ public void fillInterested()
+ {
+ stats.countFillInterestedEvents.incrementAndGet();
+ super.fillInterested();
+ }
+
public void flush()
{
ByteBuffer buffer = null;
@@ -349,6 +375,11 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return session;
}
+ public Stats getStats()
+ {
+ return stats;
+ }
+
@Override
public boolean isOpen()
{
@@ -372,6 +403,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
public void onFillable()
{
LOG.debug("{} onFillable()",policy.getBehavior());
+ stats.countOnFillableEvents.incrementAndGet();
ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),false);
BufferUtil.clear(buffer);
boolean readMore = false;
@@ -396,6 +428,14 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
@Override
+ protected void onFillInterestedFailed(Throwable cause)
+ {
+ LOG.ignore(cause);
+ stats.countFillInterestedEvents.incrementAndGet();
+ super.onFillInterestedFailed(cause);
+ }
+
+ @Override
public void onOpen()
{
super.onOpen();
@@ -409,7 +449,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
LOG.warn("Read Timeout");
- if ((ioState.getState() == ConnectionState.CLOSING) || (ioState.getState() == ConnectionState.CLOSED))
+ IOState state = getIOState();
+ if ((state.getState() == ConnectionState.CLOSING) || (state.getState() == ConnectionState.CLOSED))
{
// close already initiated, extra timeouts not relevant
// allow udnerlying connection and endpoint to disconnect on its own
diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerConnection.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerConnection.java
index f28ae54739..afe2e78a63 100644
--- a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerConnection.java
+++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerConnection.java
@@ -20,6 +20,7 @@ package org.eclipse.jetty.websocket.server;
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
@@ -31,14 +32,17 @@ import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection;
public class WebSocketServerConnection extends AbstractWebSocketConnection
{
private final WebSocketServerFactory factory;
- private boolean connected;
+ private final AtomicBoolean opened = new AtomicBoolean(false);
public WebSocketServerConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool,
WebSocketServerFactory factory)
{
super(endp,executor,scheduler,policy,bufferPool);
+ if (policy.getIdleTimeout() > 0)
+ {
+ endp.setIdleTimeout(policy.getIdleTimeout());
+ }
this.factory = factory;
- this.connected = false;
}
@Override
@@ -63,10 +67,10 @@ public class WebSocketServerConnection extends AbstractWebSocketConnection
@Override
public void onOpen()
{
- if (!connected)
+ boolean beenOpened = opened.getAndSet(true);
+ if (!beenOpened)
{
factory.sessionOpened(getSession());
- connected = true;
}
super.onOpen();
}

Back to the top