Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJoakim Erdfelt2015-05-11 20:28:58 +0000
committerJoakim Erdfelt2015-05-11 20:28:58 +0000
commitd0251349c52fdfc0ba8718d74c9120f849eca62d (patch)
tree990f9eb3a24dd070511630011f3d198226589955 /jetty-websocket/websocket-client/src
parent79a841da4bfc72f7b741110080052b0525e55863 (diff)
downloadorg.eclipse.jetty.project-d0251349c52fdfc0ba8718d74c9120f849eca62d.tar.gz
org.eclipse.jetty.project-d0251349c52fdfc0ba8718d74c9120f849eca62d.tar.xz
org.eclipse.jetty.project-d0251349c52fdfc0ba8718d74c9120f849eca62d.zip
467036 - WebSocketClient fails to process immediate frames from server
+ Using Connection.UpgradeFrom and Connection.UpgradeTo with client connections and endpoints too.
Diffstat (limited to 'jetty-websocket/websocket-client/src')
-rw-r--r--jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/ClientUpgradeResponse.java8
-rw-r--r--jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/UpgradeConnection.java86
-rw-r--r--jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java12
3 files changed, 62 insertions, 44 deletions
diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/ClientUpgradeResponse.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/ClientUpgradeResponse.java
index 0c468765d2..10dfc9635b 100644
--- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/ClientUpgradeResponse.java
+++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/ClientUpgradeResponse.java
@@ -21,11 +21,15 @@ package org.eclipse.jetty.websocket.client;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParseListener;
public class ClientUpgradeResponse extends UpgradeResponse implements HttpResponseHeaderParseListener
{
+ private static final Logger LOG = Log.getLogger(ClientUpgradeResponse.class);
private ByteBuffer remainingBuffer;
public ClientUpgradeResponse()
@@ -47,6 +51,10 @@ public class ClientUpgradeResponse extends UpgradeResponse implements HttpRespon
@Override
public void setRemainingBuffer(ByteBuffer remainingBuffer)
{
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("Saving remaining header: {}",BufferUtil.toDetailString(remainingBuffer));
+ }
this.remainingBuffer = remainingBuffer;
}
}
diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/UpgradeConnection.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/UpgradeConnection.java
index f300e7e027..8078a22f58 100644
--- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/UpgradeConnection.java
+++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/UpgradeConnection.java
@@ -29,6 +29,7 @@ import java.util.concurrent.Executor;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
+import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback;
@@ -49,11 +50,13 @@ import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser;
import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser.ParseException;
/**
- * This is the initial connection handling that exists immediately after physical connection is established to destination server.
+ * This is the initial connection handling that exists immediately after physical connection is established to
+ * destination server.
* <p>
- * Eventually, upon successful Upgrade request/response, this connection swaps itself out for the WebSocektClientConnection handler.
+ * Eventually, upon successful Upgrade request/response, this connection swaps itself out for the
+ * WebSocektClientConnection handler.
*/
-public class UpgradeConnection extends AbstractConnection
+public class UpgradeConnection extends AbstractConnection implements Connection.UpgradeFrom
{
public class SendUpgradeRequest extends FutureCallback implements Runnable
{
@@ -71,7 +74,7 @@ public class UpgradeConnection extends AbstractConnection
String rawRequest = request.generate();
- ByteBuffer buf = BufferUtil.toBuffer(rawRequest, StandardCharsets.UTF_8);
+ ByteBuffer buf = BufferUtil.toBuffer(rawRequest,StandardCharsets.UTF_8);
getEndPoint().write(this,buf);
}
@@ -81,6 +84,7 @@ public class UpgradeConnection extends AbstractConnection
LOG.debug("Upgrade Request Write Success");
// Writing the request header is complete.
super.succeeded();
+ state = State.RESPONSE;
// start the interest in fill
fillInterested();
}
@@ -88,8 +92,9 @@ public class UpgradeConnection extends AbstractConnection
@Override
public void failed(Throwable cause)
{
- LOG.warn("Upgrade Request Write Failure", cause);
+ LOG.warn("Upgrade Request Write Failure",cause);
super.failed(cause);
+ state = State.FAILURE;
// Fail the connect promise when a fundamental exception during connect occurs.
connectPromise.failed(cause);
}
@@ -98,11 +103,21 @@ public class UpgradeConnection extends AbstractConnection
/** HTTP Response Code: 101 Switching Protocols */
private static final int SWITCHING_PROTOCOLS = 101;
+ private enum State
+ {
+ REQUEST,
+ RESPONSE,
+ FAILURE,
+ UPGRADE
+ }
+
private static final Logger LOG = Log.getLogger(UpgradeConnection.class);
private final ByteBufferPool bufferPool;
private final ConnectPromise connectPromise;
private final HttpResponseHeaderParser parser;
+ private State state = State.REQUEST;
private ClientUpgradeRequest request;
+ private ClientUpgradeResponse response;
public UpgradeConnection(EndPoint endp, Executor executor, ConnectPromise connectPromise)
{
@@ -147,6 +162,12 @@ public class UpgradeConnection extends AbstractConnection
handshakeListener.onHandshakeResponse(response);
}
}
+
+ @Override
+ public ByteBuffer onUpgradeFrom()
+ {
+ return connectPromise.getResponse().getRemainingBuffer();
+ }
@Override
public void onFillable()
@@ -157,20 +178,25 @@ public class UpgradeConnection extends AbstractConnection
}
ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),false);
BufferUtil.clear(buffer);
- boolean readMore = false;
try
{
- readMore = read(buffer);
+ read(buffer);
}
finally
{
bufferPool.release(buffer);
}
- if (readMore)
+ if (state == State.RESPONSE)
{
+ // Continue Reading
fillInterested();
}
+ else if (state == State.UPGRADE)
+ {
+ // Stop Reading, upgrade the connection now
+ upgradeConnection(response);
+ }
}
@Override
@@ -179,7 +205,7 @@ public class UpgradeConnection extends AbstractConnection
super.onOpen();
getExecutor().execute(new SendUpgradeRequest());
}
-
+
@Override
public void onClose()
{
@@ -189,7 +215,7 @@ public class UpgradeConnection extends AbstractConnection
}
super.onClose();
}
-
+
@Override
protected boolean onReadTimeout()
{
@@ -197,9 +223,9 @@ public class UpgradeConnection extends AbstractConnection
{
LOG.warn("Timeout on connection {}",this);
}
-
+
failUpgrade(new IOException("Timeout while performing WebSocket Upgrade"));
-
+
return super.onReadTimeout();
}
@@ -208,9 +234,8 @@ public class UpgradeConnection extends AbstractConnection
*
* @param buffer
* the buffer to fill into from the endpoint
- * @return true if there is more to read, false if reading should stop
*/
- private boolean read(ByteBuffer buffer)
+ private void read(ByteBuffer buffer)
{
EndPoint endPoint = getEndPoint();
try
@@ -220,13 +245,14 @@ public class UpgradeConnection extends AbstractConnection
int filled = endPoint.fill(buffer);
if (filled == 0)
{
- return true;
+ return;
}
else if (filled < 0)
{
LOG.warn("read - EOF Reached");
+ state = State.FAILURE;
failUpgrade(new EOFException("Reading WebSocket Upgrade response"));
- return false;
+ return;
}
else
{
@@ -234,34 +260,32 @@ public class UpgradeConnection extends AbstractConnection
{
LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
}
- ClientUpgradeResponse resp = (ClientUpgradeResponse)parser.parse(buffer);
- if (resp != null)
+ response = (ClientUpgradeResponse)parser.parse(buffer);
+ if (response != null)
{
// Got a response!
- validateResponse(resp);
- notifyConnect(resp);
- upgradeConnection(resp);
- if (buffer.hasRemaining())
- {
- LOG.debug("Has remaining client bytebuffer of {}",buffer.remaining());
- }
- return false; // do no more reading
+ validateResponse(response);
+ notifyConnect(response);
+ state = State.UPGRADE;
+ return; // do no more reading
}
}
}
}
catch (IOException | ParseException e)
{
+ LOG.ignore(e);
+ state = State.FAILURE;
UpgradeException ue = new UpgradeException(request.getRequestURI(),e);
connectPromise.failed(ue);
disconnect(false);
- return false;
}
catch (UpgradeException e)
{
+ LOG.ignore(e);
+ state = State.FAILURE;
connectPromise.failed(e);
disconnect(false);
- return false;
}
}
@@ -269,7 +293,7 @@ public class UpgradeConnection extends AbstractConnection
{
EndPoint endp = getEndPoint();
Executor executor = getExecutor();
-
+
EventDriver websocket = connectPromise.getDriver();
WebSocketPolicy policy = websocket.getPolicy();
@@ -301,9 +325,7 @@ public class UpgradeConnection extends AbstractConnection
connectPromise.getClient().addManaged(session);
// Now swap out the connection
- // TODO use endp.upgrade ???
- endp.setConnection(connection);
- connection.onOpen();
+ endp.upgrade(connection);
}
private void validateResponse(ClientUpgradeResponse response)
diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java
index 21d34a65b5..4ed122b2ff 100644
--- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java
+++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java
@@ -19,13 +19,10 @@
package org.eclipse.jetty.websocket.client.io;
import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.EndPoint;
-import org.eclipse.jetty.util.log.Log;
-import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
@@ -41,7 +38,6 @@ import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection;
*/
public class WebSocketClientConnection extends AbstractWebSocketConnection
{
- private static final Logger LOG = Log.getLogger(WebSocketClientConnection.class);
private final ConnectPromise connectPromise;
private final Masker masker;
private final AtomicBoolean opened = new AtomicBoolean(false);
@@ -84,14 +80,6 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
ConnectionManager connectionManager = connectPromise.getClient().getConnectionManager();
connectionManager.addSession(session);
connectPromise.succeeded(session);
-
- ByteBuffer extraBuf = connectPromise.getResponse().getRemainingBuffer();
- setInitialBuffer(extraBuf);
- if (extraBuf.hasRemaining())
- {
- LOG.debug("Parsing extra remaining buffer from UpgradeConnection");
- getParser().parse(extraBuf);
- }
}
super.onOpen();
}

Back to the top