diff options
author | Joakim Erdfelt | 2015-05-11 20:28:58 +0000 |
---|---|---|
committer | Joakim Erdfelt | 2015-05-11 20:28:58 +0000 |
commit | d0251349c52fdfc0ba8718d74c9120f849eca62d (patch) | |
tree | 990f9eb3a24dd070511630011f3d198226589955 /jetty-websocket/websocket-client | |
parent | 79a841da4bfc72f7b741110080052b0525e55863 (diff) | |
download | org.eclipse.jetty.project-d0251349c52fdfc0ba8718d74c9120f849eca62d.tar.gz org.eclipse.jetty.project-d0251349c52fdfc0ba8718d74c9120f849eca62d.tar.xz org.eclipse.jetty.project-d0251349c52fdfc0ba8718d74c9120f849eca62d.zip |
467036 - WebSocketClient fails to process immediate frames from server
+ Using Connection.UpgradeFrom and Connection.UpgradeTo with
client connections and endpoints too.
Diffstat (limited to 'jetty-websocket/websocket-client')
3 files changed, 62 insertions, 44 deletions
diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/ClientUpgradeResponse.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/ClientUpgradeResponse.java index 0c468765d2..10dfc9635b 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/ClientUpgradeResponse.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/ClientUpgradeResponse.java @@ -21,11 +21,15 @@ package org.eclipse.jetty.websocket.client; import java.io.IOException; import java.nio.ByteBuffer; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.api.UpgradeResponse; import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParseListener; public class ClientUpgradeResponse extends UpgradeResponse implements HttpResponseHeaderParseListener { + private static final Logger LOG = Log.getLogger(ClientUpgradeResponse.class); private ByteBuffer remainingBuffer; public ClientUpgradeResponse() @@ -47,6 +51,10 @@ public class ClientUpgradeResponse extends UpgradeResponse implements HttpRespon @Override public void setRemainingBuffer(ByteBuffer remainingBuffer) { + if (LOG.isDebugEnabled()) + { + LOG.debug("Saving remaining header: {}",BufferUtil.toDetailString(remainingBuffer)); + } this.remainingBuffer = remainingBuffer; } } diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/UpgradeConnection.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/UpgradeConnection.java index f300e7e027..8078a22f58 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/UpgradeConnection.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/UpgradeConnection.java @@ -29,6 +29,7 @@ import java.util.concurrent.Executor; import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.FutureCallback; @@ -49,11 +50,13 @@ import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser; import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser.ParseException; /** - * This is the initial connection handling that exists immediately after physical connection is established to destination server. + * This is the initial connection handling that exists immediately after physical connection is established to + * destination server. * <p> - * Eventually, upon successful Upgrade request/response, this connection swaps itself out for the WebSocektClientConnection handler. + * Eventually, upon successful Upgrade request/response, this connection swaps itself out for the + * WebSocektClientConnection handler. */ -public class UpgradeConnection extends AbstractConnection +public class UpgradeConnection extends AbstractConnection implements Connection.UpgradeFrom { public class SendUpgradeRequest extends FutureCallback implements Runnable { @@ -71,7 +74,7 @@ public class UpgradeConnection extends AbstractConnection String rawRequest = request.generate(); - ByteBuffer buf = BufferUtil.toBuffer(rawRequest, StandardCharsets.UTF_8); + ByteBuffer buf = BufferUtil.toBuffer(rawRequest,StandardCharsets.UTF_8); getEndPoint().write(this,buf); } @@ -81,6 +84,7 @@ public class UpgradeConnection extends AbstractConnection LOG.debug("Upgrade Request Write Success"); // Writing the request header is complete. super.succeeded(); + state = State.RESPONSE; // start the interest in fill fillInterested(); } @@ -88,8 +92,9 @@ public class UpgradeConnection extends AbstractConnection @Override public void failed(Throwable cause) { - LOG.warn("Upgrade Request Write Failure", cause); + LOG.warn("Upgrade Request Write Failure",cause); super.failed(cause); + state = State.FAILURE; // Fail the connect promise when a fundamental exception during connect occurs. connectPromise.failed(cause); } @@ -98,11 +103,21 @@ public class UpgradeConnection extends AbstractConnection /** HTTP Response Code: 101 Switching Protocols */ private static final int SWITCHING_PROTOCOLS = 101; + private enum State + { + REQUEST, + RESPONSE, + FAILURE, + UPGRADE + } + private static final Logger LOG = Log.getLogger(UpgradeConnection.class); private final ByteBufferPool bufferPool; private final ConnectPromise connectPromise; private final HttpResponseHeaderParser parser; + private State state = State.REQUEST; private ClientUpgradeRequest request; + private ClientUpgradeResponse response; public UpgradeConnection(EndPoint endp, Executor executor, ConnectPromise connectPromise) { @@ -147,6 +162,12 @@ public class UpgradeConnection extends AbstractConnection handshakeListener.onHandshakeResponse(response); } } + + @Override + public ByteBuffer onUpgradeFrom() + { + return connectPromise.getResponse().getRemainingBuffer(); + } @Override public void onFillable() @@ -157,20 +178,25 @@ public class UpgradeConnection extends AbstractConnection } ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),false); BufferUtil.clear(buffer); - boolean readMore = false; try { - readMore = read(buffer); + read(buffer); } finally { bufferPool.release(buffer); } - if (readMore) + if (state == State.RESPONSE) { + // Continue Reading fillInterested(); } + else if (state == State.UPGRADE) + { + // Stop Reading, upgrade the connection now + upgradeConnection(response); + } } @Override @@ -179,7 +205,7 @@ public class UpgradeConnection extends AbstractConnection super.onOpen(); getExecutor().execute(new SendUpgradeRequest()); } - + @Override public void onClose() { @@ -189,7 +215,7 @@ public class UpgradeConnection extends AbstractConnection } super.onClose(); } - + @Override protected boolean onReadTimeout() { @@ -197,9 +223,9 @@ public class UpgradeConnection extends AbstractConnection { LOG.warn("Timeout on connection {}",this); } - + failUpgrade(new IOException("Timeout while performing WebSocket Upgrade")); - + return super.onReadTimeout(); } @@ -208,9 +234,8 @@ public class UpgradeConnection extends AbstractConnection * * @param buffer * the buffer to fill into from the endpoint - * @return true if there is more to read, false if reading should stop */ - private boolean read(ByteBuffer buffer) + private void read(ByteBuffer buffer) { EndPoint endPoint = getEndPoint(); try @@ -220,13 +245,14 @@ public class UpgradeConnection extends AbstractConnection int filled = endPoint.fill(buffer); if (filled == 0) { - return true; + return; } else if (filled < 0) { LOG.warn("read - EOF Reached"); + state = State.FAILURE; failUpgrade(new EOFException("Reading WebSocket Upgrade response")); - return false; + return; } else { @@ -234,34 +260,32 @@ public class UpgradeConnection extends AbstractConnection { LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer)); } - ClientUpgradeResponse resp = (ClientUpgradeResponse)parser.parse(buffer); - if (resp != null) + response = (ClientUpgradeResponse)parser.parse(buffer); + if (response != null) { // Got a response! - validateResponse(resp); - notifyConnect(resp); - upgradeConnection(resp); - if (buffer.hasRemaining()) - { - LOG.debug("Has remaining client bytebuffer of {}",buffer.remaining()); - } - return false; // do no more reading + validateResponse(response); + notifyConnect(response); + state = State.UPGRADE; + return; // do no more reading } } } } catch (IOException | ParseException e) { + LOG.ignore(e); + state = State.FAILURE; UpgradeException ue = new UpgradeException(request.getRequestURI(),e); connectPromise.failed(ue); disconnect(false); - return false; } catch (UpgradeException e) { + LOG.ignore(e); + state = State.FAILURE; connectPromise.failed(e); disconnect(false); - return false; } } @@ -269,7 +293,7 @@ public class UpgradeConnection extends AbstractConnection { EndPoint endp = getEndPoint(); Executor executor = getExecutor(); - + EventDriver websocket = connectPromise.getDriver(); WebSocketPolicy policy = websocket.getPolicy(); @@ -301,9 +325,7 @@ public class UpgradeConnection extends AbstractConnection connectPromise.getClient().addManaged(session); // Now swap out the connection - // TODO use endp.upgrade ??? - endp.setConnection(connection); - connection.onOpen(); + endp.upgrade(connection); } private void validateResponse(ClientUpgradeResponse response) diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java index 21d34a65b5..4ed122b2ff 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java @@ -19,13 +19,10 @@ package org.eclipse.jetty.websocket.client.io; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jetty.io.EndPoint; -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.WriteCallback; @@ -41,7 +38,6 @@ import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection; */ public class WebSocketClientConnection extends AbstractWebSocketConnection { - private static final Logger LOG = Log.getLogger(WebSocketClientConnection.class); private final ConnectPromise connectPromise; private final Masker masker; private final AtomicBoolean opened = new AtomicBoolean(false); @@ -84,14 +80,6 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection ConnectionManager connectionManager = connectPromise.getClient().getConnectionManager(); connectionManager.addSession(session); connectPromise.succeeded(session); - - ByteBuffer extraBuf = connectPromise.getResponse().getRemainingBuffer(); - setInitialBuffer(extraBuf); - if (extraBuf.hasRemaining()) - { - LOG.debug("Parsing extra remaining buffer from UpgradeConnection"); - getParser().parse(extraBuf); - } } super.onOpen(); } |