diff options
Diffstat (limited to 'jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServerConnection.java')
-rw-r--r-- | jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServerConnection.java | 614 |
1 files changed, 614 insertions, 0 deletions
diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServerConnection.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServerConnection.java new file mode 100644 index 0000000000..e54e66423d --- /dev/null +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/BlockheadServerConnection.java @@ -0,0 +1,614 @@ +// +// ======================================================================== +// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.common.test; + +import static org.hamcrest.Matchers.*; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.Socket; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.MappedByteBufferPool; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.BatchMode; +import org.eclipse.jetty.websocket.api.WebSocketPolicy; +import org.eclipse.jetty.websocket.api.WriteCallback; +import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig; +import org.eclipse.jetty.websocket.api.extensions.Frame; +import org.eclipse.jetty.websocket.api.extensions.IncomingFrames; +import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames; +import org.eclipse.jetty.websocket.api.extensions.Frame.Type; +import org.eclipse.jetty.websocket.common.AcceptHash; +import org.eclipse.jetty.websocket.common.CloseInfo; +import org.eclipse.jetty.websocket.common.Generator; +import org.eclipse.jetty.websocket.common.OpCode; +import org.eclipse.jetty.websocket.common.Parser; +import org.eclipse.jetty.websocket.common.WebSocketFrame; +import org.eclipse.jetty.websocket.common.extensions.ExtensionStack; +import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory; +import org.eclipse.jetty.websocket.common.frames.CloseFrame; +import org.eclipse.jetty.websocket.common.scopes.SimpleContainerScope; +import org.junit.Assert; + +public class BlockheadServerConnection implements IncomingFrames, OutgoingFrames, Runnable, IBlockheadServerConnection +{ + private static final Logger LOG = Log.getLogger(BlockheadServerConnection.class); + + private final int BUFFER_SIZE = 8192; + private final Socket socket; + private final ByteBufferPool bufferPool; + private final WebSocketPolicy policy; + private final IncomingFramesCapture incomingFrames; + private final Parser parser; + private final Generator generator; + private final AtomicInteger parseCount; + private final WebSocketExtensionFactory extensionRegistry; + private final AtomicBoolean echoing = new AtomicBoolean(false); + private Thread echoThread; + + /** Set to true to disable timeouts (for debugging reasons) */ + private boolean debug = false; + private OutputStream out; + private InputStream in; + + private Map<String, String> extraResponseHeaders = new HashMap<>(); + private OutgoingFrames outgoing = this; + + public BlockheadServerConnection(Socket socket) + { + this.socket = socket; + this.incomingFrames = new IncomingFramesCapture(); + this.policy = WebSocketPolicy.newServerPolicy(); + this.policy.setMaxBinaryMessageSize(100000); + this.policy.setMaxTextMessageSize(100000); + // This is a blockhead server connection, no point tracking leaks on this object. + this.bufferPool = new MappedByteBufferPool(BUFFER_SIZE); + this.parser = new Parser(policy,bufferPool); + this.parseCount = new AtomicInteger(0); + this.generator = new Generator(policy,bufferPool,false); + this.extensionRegistry = new WebSocketExtensionFactory(new SimpleContainerScope(policy,bufferPool)); + } + + /** + * Add an extra header for the upgrade response (from the server). No extra work is done to ensure the key and value are sane for http. + * @param rawkey the raw key + * @param rawvalue the raw value + */ + public void addResponseHeader(String rawkey, String rawvalue) + { + extraResponseHeaders.put(rawkey,rawvalue); + } + + /* (non-Javadoc) + * @see org.eclipse.jetty.websocket.common.test.IBlockheadServerConnection#close() + */ + @Override + public void close() throws IOException + { + write(new CloseFrame()); + flush(); + } + + /* (non-Javadoc) + * @see org.eclipse.jetty.websocket.common.test.IBlockheadServerConnection#close(int) + */ + @Override + public void close(int statusCode) throws IOException + { + CloseInfo close = new CloseInfo(statusCode); + write(close.asFrame()); + flush(); + } + + public void disconnect() + { + LOG.debug("disconnect"); + IO.close(in); + IO.close(out); + if (socket != null) + { + try + { + socket.close(); + } + catch (IOException ignore) + { + /* ignore */ + } + } + } + + public void echoMessage(int expectedFrames, int timeoutDuration, TimeUnit timeoutUnit) throws IOException, TimeoutException + { + LOG.debug("Echo Frames [expecting {}]",expectedFrames); + IncomingFramesCapture cap = readFrames(expectedFrames,timeoutDuration,timeoutUnit); + // now echo them back. + for (Frame frame : cap.getFrames()) + { + write(WebSocketFrame.copy(frame).setMasked(false)); + } + } + + public void flush() throws IOException + { + getOutputStream().flush(); + } + + public ByteBufferPool getBufferPool() + { + return bufferPool; + } + + public IncomingFramesCapture getIncomingFrames() + { + return incomingFrames; + } + + public InputStream getInputStream() throws IOException + { + if (in == null) + { + in = socket.getInputStream(); + } + return in; + } + + private OutputStream getOutputStream() throws IOException + { + if (out == null) + { + out = socket.getOutputStream(); + } + return out; + } + + public Parser getParser() + { + return parser; + } + + public WebSocketPolicy getPolicy() + { + return policy; + } + + @Override + public void incomingError(Throwable e) + { + incomingFrames.incomingError(e); + } + + @Override + public void incomingFrame(Frame frame) + { + LOG.debug("incoming({})",frame); + int count = parseCount.incrementAndGet(); + if ((count % 10) == 0) + { + LOG.info("Server parsed {} frames",count); + } + incomingFrames.incomingFrame(WebSocketFrame.copy(frame)); + + if (frame.getOpCode() == OpCode.CLOSE) + { + CloseInfo close = new CloseInfo(frame); + LOG.debug("Close frame: {}",close); + } + + Type type = frame.getType(); + if (echoing.get() && (type.isData() || type.isContinuation())) + { + try + { + write(WebSocketFrame.copy(frame).setMasked(false)); + } + catch (IOException e) + { + LOG.warn(e); + } + } + } + + @Override + public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) + { + ByteBuffer headerBuf = generator.generateHeaderBytes(frame); + if (LOG.isDebugEnabled()) + { + LOG.debug("writing out: {}",BufferUtil.toDetailString(headerBuf)); + } + + try + { + BufferUtil.writeTo(headerBuf,out); + if (frame.hasPayload()) + BufferUtil.writeTo(frame.getPayload(),out); + out.flush(); + if (callback != null) + { + callback.writeSuccess(); + } + + if (frame.getOpCode() == OpCode.CLOSE) + { + disconnect(); + } + } + catch (Throwable t) + { + if (callback != null) + { + callback.writeFailed(t); + } + } + } + + public List<ExtensionConfig> parseExtensions(List<String> requestLines) + { + List<ExtensionConfig> extensionConfigs = new ArrayList<>(); + + List<String> hits = regexFind(requestLines, "^Sec-WebSocket-Extensions: (.*)$"); + + for (String econf : hits) + { + // found extensions + ExtensionConfig config = ExtensionConfig.parse(econf); + extensionConfigs.add(config); + } + + return extensionConfigs; + } + + public String parseWebSocketKey(List<String> requestLines) + { + List<String> hits = regexFind(requestLines,"^Sec-WebSocket-Key: (.*)$"); + if (hits.size() <= 0) + { + return null; + } + + Assert.assertThat("Number of Sec-WebSocket-Key headers", hits.size(), is(1)); + + String key = hits.get(0); + return key; + } + + public int read(ByteBuffer buf) throws IOException + { + int len = 0; + while ((in.available() > 0) && (buf.remaining() > 0)) + { + buf.put((byte)in.read()); + len++; + } + return len; + } + + public IncomingFramesCapture readFrames(int expectedCount, int timeoutDuration, TimeUnit timeoutUnit) throws IOException, TimeoutException + { + LOG.debug("Read: waiting for {} frame(s) from client",expectedCount); + int startCount = incomingFrames.size(); + + ByteBuffer buf = bufferPool.acquire(BUFFER_SIZE,false); + BufferUtil.clearToFill(buf); + try + { + long msDur = TimeUnit.MILLISECONDS.convert(timeoutDuration,timeoutUnit); + long now = System.currentTimeMillis(); + long expireOn = now + msDur; + LOG.debug("Now: {} - expireOn: {} ({} ms)",now,expireOn,msDur); + + int len = 0; + while (incomingFrames.size() < (startCount + expectedCount)) + { + BufferUtil.clearToFill(buf); + len = read(buf); + if (len > 0) + { + LOG.debug("Read {} bytes",len); + BufferUtil.flipToFlush(buf,0); + parser.parse(buf); + } + try + { + TimeUnit.MILLISECONDS.sleep(20); + } + catch (InterruptedException gnore) + { + /* ignore */ + } + if (!debug && (System.currentTimeMillis() > expireOn)) + { + incomingFrames.dump(); + throw new TimeoutException(String.format("Timeout reading all %d expected frames. (managed to only read %d frame(s))",expectedCount, + incomingFrames.size())); + } + } + } + finally + { + bufferPool.release(buf); + } + + return incomingFrames; + } + + public String readRequest() throws IOException + { + LOG.debug("Reading client request"); + StringBuilder request = new StringBuilder(); + BufferedReader in = new BufferedReader(new InputStreamReader(getInputStream())); + for (String line = in.readLine(); line != null; line = in.readLine()) + { + if (line.length() == 0) + { + break; + } + request.append(line).append("\r\n"); + LOG.debug("read line: {}",line); + } + + LOG.debug("Client Request:{}{}","\n",request); + return request.toString(); + } + + public List<String> readRequestLines() throws IOException + { + LOG.debug("Reading client request header"); + List<String> lines = new ArrayList<>(); + + BufferedReader in = new BufferedReader(new InputStreamReader(getInputStream())); + for (String line = in.readLine(); line != null; line = in.readLine()) + { + if (line.length() == 0) + { + break; + } + lines.add(line); + } + + return lines; + } + + public List<String> regexFind(List<String> lines, String pattern) + { + List<String> hits = new ArrayList<>(); + + Pattern patKey = Pattern.compile(pattern,Pattern.CASE_INSENSITIVE); + + Matcher mat; + for (String line : lines) + { + mat = patKey.matcher(line); + if (mat.matches()) + { + if (mat.groupCount() >= 1) + { + hits.add(mat.group(1)); + } + else + { + hits.add(mat.group(0)); + } + } + } + + return hits; + } + + public void respond(String rawstr) throws IOException + { + LOG.debug("respond(){}{}","\n",rawstr); + getOutputStream().write(rawstr.getBytes()); + flush(); + } + + @Override + public void run() + { + LOG.debug("Entering echo thread"); + + ByteBuffer buf = bufferPool.acquire(BUFFER_SIZE,false); + BufferUtil.clearToFill(buf); + long readBytes = 0; + try + { + while (echoing.get()) + { + BufferUtil.clearToFill(buf); + long len = read(buf); + if (len > 0) + { + readBytes += len; + LOG.debug("Read {} bytes",len); + BufferUtil.flipToFlush(buf,0); + parser.parse(buf); + } + + try + { + TimeUnit.MILLISECONDS.sleep(20); + } + catch (InterruptedException gnore) + { + /* ignore */ + } + } + } + catch (IOException e) + { + LOG.debug("Exception during echo loop",e); + } + finally + { + LOG.debug("Read {} bytes",readBytes); + bufferPool.release(buf); + } + } + + public void setSoTimeout(int ms) throws SocketException + { + socket.setSoTimeout(ms); + } + + public void startEcho() + { + if (echoThread != null) + { + throw new IllegalStateException("Echo thread already declared!"); + } + echoThread = new Thread(this,"BlockheadServer/Echo"); + echoing.set(true); + echoThread.start(); + } + + public void stopEcho() + { + echoing.set(false); + } + + public List<String> upgrade() throws IOException + { + List<String> requestLines = readRequestLines(); + List<ExtensionConfig> extensionConfigs = parseExtensions(requestLines); + String key = parseWebSocketKey(requestLines); + + LOG.debug("Client Request Extensions: {}",extensionConfigs); + LOG.debug("Client Request Key: {}",key); + + Assert.assertThat("Request: Sec-WebSocket-Key",key,notNullValue()); + + // collect extensions configured in response header + ExtensionStack extensionStack = new ExtensionStack(extensionRegistry); + extensionStack.negotiate(extensionConfigs); + + // Start with default routing + extensionStack.setNextIncoming(this); + extensionStack.setNextOutgoing(this); + + // Configure Parser / Generator + extensionStack.configure(parser); + extensionStack.configure(generator); + + // Start Stack + try + { + extensionStack.start(); + } + catch (Exception e) + { + throw new IOException("Unable to start Extension Stack"); + } + + // Configure Parser + parser.setIncomingFramesHandler(extensionStack); + + // Setup Response + StringBuilder resp = new StringBuilder(); + resp.append("HTTP/1.1 101 Upgrade\r\n"); + resp.append("Connection: upgrade\r\n"); + resp.append("Sec-WebSocket-Accept: "); + resp.append(AcceptHash.hashKey(key)).append("\r\n"); + if (extensionStack.hasNegotiatedExtensions()) + { + // Respond to used extensions + resp.append("Sec-WebSocket-Extensions: "); + boolean delim = false; + for (ExtensionConfig ext : extensionStack.getNegotiatedExtensions()) + { + if (delim) + { + resp.append(", "); + } + resp.append(ext.getParameterizedName()); + delim = true; + } + resp.append("\r\n"); + } + if (extraResponseHeaders.size() > 0) + { + for (Map.Entry<String, String> xheader : extraResponseHeaders.entrySet()) + { + resp.append(xheader.getKey()); + resp.append(": "); + resp.append(xheader.getValue()); + resp.append("\r\n"); + } + } + resp.append("\r\n"); + + // Write Response + LOG.debug("Response: {}",resp.toString()); + write(resp.toString().getBytes()); + return requestLines; + } + + private void write(byte[] bytes) throws IOException + { + getOutputStream().write(bytes); + } + + public void write(byte[] buf, int offset, int length) throws IOException + { + getOutputStream().write(buf,offset,length); + } + + /* (non-Javadoc) + * @see org.eclipse.jetty.websocket.common.test.IBlockheadServerConnection#write(org.eclipse.jetty.websocket.api.extensions.Frame) + */ + @Override + public void write(Frame frame) throws IOException + { + LOG.debug("write(Frame->{}) to {}",frame,outgoing); + outgoing.outgoingFrame(frame,null,BatchMode.OFF); + } + + public void write(int b) throws IOException + { + getOutputStream().write(b); + } + + public void write(ByteBuffer buf) throws IOException + { + byte arr[] = BufferUtil.toArray(buf); + if ((arr != null) && (arr.length > 0)) + { + getOutputStream().write(arr); + } + } +}
\ No newline at end of file |