Skip to main content
summaryrefslogblamecommitdiffstats
blob: 1e27bfae722279699131c48c792e5c8143b10e28 (plain) (tree)
1
2
3

                                                                            
                                                         














                                                                            



                                    
                                  

                                       
                      
                     
                       
                        
                                                  
 
                               



                                               
                                          




                                              
                                     
                                          
                                                

                                                      
                                              



                                                           
                                                    


                                                      
                                                                  



                                                                                                                   
                                                                                                                                    

                                                   




                                                                                                                         
                                                                                                                

                                                                                                            

                                                    

                                      

                                                                      

                                                                              


                                   
                   
     
 
                                                                      

                                                                                                       
      
                                                       


                                                        












                                                                                               


                                                                      
 

                                                                                
      
                                                       
                                                  
                                             
       
                                                                                         
     
                               


                                                
 
                                                    
                          
 

                           
 
                                                  
                           
 









                                                                      




                                                                      
      







                                                                      

                          
                                                       
      







                                                                      

                                                                                     
                                        
       



                               
 
                                                                      

                                                                                            
                                               
       



                                                        


                             
     
 
                                                                      



                                             



                                                        
                             
                                                    
                          
     
 
                                                                      


                                   



                                        
 



                                            
                       

     
                                                                      





                                                                                               



                                               
 
                                                                              
     












                                                                                 
 
                         
     
 















                                                                      
                                                                      

                                        









                                                         
                                                                                                                                          
         



















                                                                                                           


                 
                                                                                                              
         

                                                                                                 










                                                                                             
                                                                                   




                                                                     
                                               





                                                                                               
                                                                









                                                                                                     
                                                                      

                            

                                                                    
                                                                                   
     
                                          
                                                              



                                         
                                           
 
                                                                                                  
         


                                                        
 


                                                     
 

                                                                                  



                                                                                                       
                                      
                     
                                                                                







                                                                                      
                                                   

                 
                                                                 

                                                                                                      

                                                                                       


                                  
                                                                 

                                                                  

                                                                                   


                                  
         
 
                                   
         




                                                         
 

                                                                 
 
                                                    
 







                                                                                
 

                                                                             
 
                                                                                                                    
 

                                                                                                            
 









                                                                                                                    
 
                                       
 


                                                                          



                              



                                                                  
             



                                           
                                          





                                                           


                                                              
 
                                              
                 
                                                
                                                                                                  
                                

                 
                               
             



                                                       
                                                                                   


                                                        

                    
                                                                              
 
                                                              



                                                           
                                                     








                                      

                                                                               
                                                    












                                                       

                                                        
                          

         









                                    
                             
         
                               
                                                                       
                
                                                            

         

















                                                                                                                                                                                                                                                                              
 
//
//  ========================================================================
//  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package org.eclipse.jetty.websocket;

import java.io.EOFException;
import java.io.IOException;
import java.net.ProtocolException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;

import javax.net.ssl.SSLEngine;

import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.SimpleBuffers;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.io.nio.SelectorManager;
import org.eclipse.jetty.io.nio.SslConnection;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ThreadPool;

/* ------------------------------------------------------------ */
/**
 * <p>WebSocketClientFactory contains the common components needed by multiple {@link WebSocketClient} instances
 * (for example, a {@link ThreadPool}, a {@link SelectorManager NIO selector}, etc).</p>
 * <p>WebSocketClients with different configurations should share the same factory to avoid to waste resources.</p>
 * <p>If a ThreadPool or MaskGen is passed in the constructor, then it is not added with {@link AggregateLifeCycle#addBean(Object)},
 * so it's lifecycle must be controlled externally.
 *
 * @see WebSocketClient
 */
public class WebSocketClientFactory extends AggregateLifeCycle
{
    private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClientFactory.class.getName());
    private final static ByteArrayBuffer __ACCEPT = new ByteArrayBuffer.CaseInsensitive("Sec-WebSocket-Accept");
    private final Queue<WebSocketConnection> connections = new ConcurrentLinkedQueue<WebSocketConnection>();
    private final SslContextFactory _sslContextFactory = new SslContextFactory();
    private final ThreadPool _threadPool;
    private final WebSocketClientSelector _selector;
    private MaskGen _maskGen;
    private WebSocketBuffers _buffers;

    /* ------------------------------------------------------------ */
    /**
     * <p>Creates a WebSocketClientFactory with the default configuration.</p>
     */
    public WebSocketClientFactory()
    {
        this(null);
    }

    /* ------------------------------------------------------------ */
    /**
     * <p>Creates a WebSocketClientFactory with the given ThreadPool and the default configuration.</p>
     *
     * @param threadPool the ThreadPool instance to use
     */
    public WebSocketClientFactory(ThreadPool threadPool)
    {
        this(threadPool, new RandomMaskGen());
    }

    /* ------------------------------------------------------------ */
    /**
     * <p>Creates a WebSocketClientFactory with the given ThreadPool and the given MaskGen.</p>
     *
     * @param threadPool the ThreadPool instance to use
     * @param maskGen    the MaskGen instance to use
     */
    public WebSocketClientFactory(ThreadPool threadPool, MaskGen maskGen)
    {
        this(threadPool, maskGen, 8192);
    }

    /* ------------------------------------------------------------ */

    /**
     * <p>Creates a WebSocketClientFactory with the specified configuration.</p>
     *
     * @param threadPool the ThreadPool instance to use
     * @param maskGen    the mask generator to use
     * @param bufferSize the read buffer size
     */
    public WebSocketClientFactory(ThreadPool threadPool, MaskGen maskGen, int bufferSize)
    {
        if (threadPool == null)
            threadPool = new QueuedThreadPool();
        _threadPool = threadPool;
        addBean(_threadPool);

        _buffers = new WebSocketBuffers(bufferSize);
        addBean(_buffers);

        _maskGen = maskGen;
        addBean(_maskGen);

        _selector = new WebSocketClientSelector();
        addBean(_selector);

        addBean(_sslContextFactory);
    }

    /* ------------------------------------------------------------ */
    /**
     * @return the SslContextFactory used to configure SSL parameters
     */
    public SslContextFactory getSslContextFactory()
    {
        return _sslContextFactory;
    }

    /* ------------------------------------------------------------ */
    /**
     * Get the selectorManager. Used to configure the manager.
     *
     * @return The {@link SelectorManager} instance.
     */
    public SelectorManager getSelectorManager()
    {
        return _selector;
    }

    /* ------------------------------------------------------------ */
    /**
     * Get the ThreadPool.
     * Used to set/query the thread pool configuration.
     *
     * @return The {@link ThreadPool}
     */
    public ThreadPool getThreadPool()
    {
        return _threadPool;
    }

    /* ------------------------------------------------------------ */
    /**
     * @return the shared mask generator, or null if no shared mask generator is used
     * @see WebSocketClient#getMaskGen()
     */
    public MaskGen getMaskGen()
    {
        return _maskGen;
    }

    /* ------------------------------------------------------------ */
    /**
     * @param maskGen the shared mask generator, or null if no shared mask generator is used
     * @see WebSocketClient#setMaskGen(MaskGen)
     */
    public void setMaskGen(MaskGen maskGen)
    {
        if (isRunning())
            throw new IllegalStateException(getState());
        removeBean(_maskGen);
        _maskGen = maskGen;
        addBean(maskGen);
    }

    /* ------------------------------------------------------------ */
    /**
     * @param bufferSize the read buffer size
     * @see #getBufferSize()
     */
    public void setBufferSize(int bufferSize)
    {
        if (isRunning())
            throw new IllegalStateException(getState());
        removeBean(_buffers);
        _buffers = new WebSocketBuffers(bufferSize);
        addBean(_buffers);
    }

    /* ------------------------------------------------------------ */
    /**
     * @return the read buffer size
     */
    public int getBufferSize()
    {
        return _buffers.getBufferSize();
    }

    @Override
    protected void doStop() throws Exception
    {
        closeConnections();
        super.doStop();
    }

    /* ------------------------------------------------------------ */
    /**
     * <p>Creates and returns a new instance of a {@link WebSocketClient}, configured with this
     * WebSocketClientFactory instance.</p>
     *
     * @return a new {@link WebSocketClient} instance
     */
    public WebSocketClient newWebSocketClient()
    {
        return new WebSocketClient(this);
    }

    protected SSLEngine newSslEngine(SocketChannel channel) throws IOException
    {
        SSLEngine sslEngine;
        if (channel != null)
        {
            String peerHost = channel.socket().getInetAddress().getHostAddress();
            int peerPort = channel.socket().getPort();
            sslEngine = _sslContextFactory.newSslEngine(peerHost, peerPort);
        }
        else
        {
            sslEngine = _sslContextFactory.newSslEngine();
        }
        sslEngine.setUseClientMode(true);
        sslEngine.beginHandshake();

        return sslEngine;
    }

    protected boolean addConnection(WebSocketConnection connection)
    {
        return isRunning() && connections.add(connection);
    }

    protected boolean removeConnection(WebSocketConnection connection)
    {
        return connections.remove(connection);
    }

    protected void closeConnections()
    {
        for (WebSocketConnection connection : connections)
            connection.shutdown();
    }

    /* ------------------------------------------------------------ */
    /**
     * WebSocket Client Selector Manager
     */
    class WebSocketClientSelector extends SelectorManager
    {
        @Override
        public boolean dispatch(Runnable task)
        {
            return _threadPool.dispatch(task);
        }

        @Override
        protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, final SelectionKey key) throws IOException
        {
            WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture)key.attachment();
            int maxIdleTime = holder.getMaxIdleTime();
            if (maxIdleTime < 0)
                maxIdleTime = (int)getMaxIdleTime();
            SelectChannelEndPoint result = new SelectChannelEndPoint(channel, selectSet, key, maxIdleTime);
            AsyncEndPoint endPoint = result;

            // Detect if it is SSL, and wrap the connection if so
            if ("wss".equals(holder.getURI().getScheme()))
            {
                SSLEngine sslEngine = newSslEngine(channel);
                SslConnection sslConnection = new SslConnection(sslEngine, endPoint);
                endPoint.setConnection(sslConnection);
                endPoint = sslConnection.getSslEndPoint();
            }

            AsyncConnection connection = selectSet.getManager().newConnection(channel, endPoint, holder);
            endPoint.setConnection(connection);

            return result;
        }

        @Override
        public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
        {
            WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture)attachment;
            return new HandshakeConnection(endpoint, holder);
        }

        @Override
        protected void endPointOpened(SelectChannelEndPoint endpoint)
        {
            // TODO expose on outer class ??
        }

        @Override
        protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
        {
            LOG.debug("upgrade {} -> {}", oldConnection, endpoint.getConnection());
        }

        @Override
        protected void endPointClosed(SelectChannelEndPoint endpoint)
        {
            endpoint.getConnection().onClose();
        }

        @Override
        protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
        {
            if (!(attachment instanceof WebSocketClient.WebSocketFuture))
                super.connectionFailed(channel, ex, attachment);
            else
            {
                __log.debug(ex);
                WebSocketClient.WebSocketFuture future = (WebSocketClient.WebSocketFuture)attachment;

                future.handshakeFailed(ex);
            }
        }
    }

    /* ------------------------------------------------------------ */
    /**
     * Handshake Connection.
     * Handles the connection until the handshake succeeds or fails.
     */
    class HandshakeConnection extends AbstractConnection implements AsyncConnection
    {
        private final AsyncEndPoint _endp;
        private final WebSocketClient.WebSocketFuture _future;
        private final String _key;
        private final HttpParser _parser;
        private String _accept;
        private String _error;
        private ByteArrayBuffer _handshake;

        public HandshakeConnection(AsyncEndPoint endpoint, WebSocketClient.WebSocketFuture future)
        {
            super(endpoint, System.currentTimeMillis());
            _endp = endpoint;
            _future = future;

            byte[] bytes = new byte[16];
            new Random().nextBytes(bytes);
            _key = new String(B64Code.encode(bytes));

            Buffers buffers = new SimpleBuffers(_buffers.getBuffer(), null);
            _parser = new HttpParser(buffers, _endp, new HttpParser.EventHandler()
            {
                @Override
                public void startResponse(Buffer version, int status, Buffer reason) throws IOException
                {
                    if (status != 101)
                    {
                        _error = "Bad response status " + status + " " + reason;
                        _endp.close();
                    }
                }

                @Override
                public void parsedHeader(Buffer name, Buffer value) throws IOException
                {
                    if (__ACCEPT.equals(name))
                        _accept = value.toString();
                }

                @Override // TODO simone says shouldn't be needed
                public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
                {
                    if (_error == null)
                        _error = "Bad response: " + method + " " + url + " " + version;
                    _endp.close();
                }

                @Override // TODO simone says shouldn't be needed
                public void content(Buffer ref) throws IOException
                {
                    if (_error == null)
                        _error = "Bad response. " + ref.length() + "B of content?";
                    _endp.close();
                }
            });
        }

        private boolean handshake()
        {
            if (_handshake==null)
            {
                String path = _future.getURI().getPath();
                if (path == null || path.length() == 0)
                    path = "/";

                if (_future.getURI().getRawQuery() != null)
                    path += "?" + _future.getURI().getRawQuery();

                String origin = _future.getOrigin();

                StringBuilder request = new StringBuilder(512);
                request.append("GET ").append(path).append(" HTTP/1.1\r\n")
                .append("Host: ").append(_future.getURI().getHost()).append(":")
                .append(_future.getURI().getPort()).append("\r\n")
                .append("Upgrade: websocket\r\n")
                .append("Connection: Upgrade\r\n")
                .append("Sec-WebSocket-Key: ")
                .append(_key).append("\r\n");

                if (origin != null)
                    request.append("Origin: ").append(origin).append("\r\n");

                request.append("Sec-WebSocket-Version: ").append(WebSocketConnectionRFC6455.VERSION).append("\r\n");

                if (_future.getProtocol() != null)
                    request.append("Sec-WebSocket-Protocol: ").append(_future.getProtocol()).append("\r\n");

                Map<String, String> cookies = _future.getCookies();
                if (cookies != null && cookies.size() > 0)
                {
                    for (String cookie : cookies.keySet())
                        request.append("Cookie: ")
                        .append(QuotedStringTokenizer.quoteIfNeeded(cookie, HttpFields.__COOKIE_DELIM))
                        .append("=")
                        .append(QuotedStringTokenizer.quoteIfNeeded(cookies.get(cookie), HttpFields.__COOKIE_DELIM))
                        .append("\r\n");
                }

                request.append("\r\n");

                _handshake=new ByteArrayBuffer(request.toString(), false);
            }
            
            // TODO extensions

            try
            {
                int len = _handshake.length();
                int flushed = _endp.flush(_handshake);
                if (flushed<0)
                    throw new IOException("incomplete handshake");
            }
            catch (IOException e)
            {
                _future.handshakeFailed(e);
            }
            return _handshake.length()==0;
        }

        public Connection handle() throws IOException
        {
            while (_endp.isOpen() && !_parser.isComplete())
            {
                if (_handshake==null || _handshake.length()>0)
                    if (!handshake())
                        return this;

                if (!_parser.parseAvailable())
                {
                    if (_endp.isInputShutdown())
                        _future.handshakeFailed(new IOException("Incomplete handshake response"));
                    return this;
                }
            }
            if (_error == null)
            {
                if (_accept == null)
                {
                    _error = "No Sec-WebSocket-Accept";
                }
                else if (!WebSocketConnectionRFC6455.hashKey(_key).equals(_accept))
                {
                    _error = "Bad Sec-WebSocket-Accept";
                }
                else
                {
                    WebSocketConnection connection = newWebSocketConnection();

                    Buffer header = _parser.getHeaderBuffer();
                    if (header.hasContent())
                        connection.fillBuffersFrom(header);
                    _buffers.returnBuffer(header);

                    _future.onConnection(connection);

                    return connection;
                }
            }

            _endp.close();
            return this;
        }

        private WebSocketConnection newWebSocketConnection() throws IOException
        {
            __log.debug("newWebSocketConnection()");
            return new WebSocketClientConnection(
                    _future._client.getFactory(),
                    _future.getWebSocket(),
                    _endp,
                    _buffers,
                    System.currentTimeMillis(),
                    _future.getMaxIdleTime(),
                    _future.getProtocol(),
                    null,
                    WebSocketConnectionRFC6455.VERSION,
                    _future.getMaskGen());
        }

        public void onInputShutdown() throws IOException
        {
            _endp.close();
        }

        public boolean isIdle()
        {
            return false;
        }

        public boolean isSuspended()
        {
            return false;
        }

        public void onClose()
        {
            if (_error != null)
                _future.handshakeFailed(new ProtocolException(_error));
            else
                _future.handshakeFailed(new EOFException());
        }
    }

    private static class WebSocketClientConnection extends WebSocketConnectionRFC6455
    {
        private final WebSocketClientFactory factory;

        public WebSocketClientConnection(WebSocketClientFactory factory, WebSocket webSocket, EndPoint endPoint, WebSocketBuffers buffers, long timeStamp, int maxIdleTime, String protocol, List<Extension> extensions, int draftVersion, MaskGen maskGen) throws IOException
        {
            super(webSocket, endPoint, buffers, timeStamp, maxIdleTime, protocol, extensions, draftVersion, maskGen);
            this.factory = factory;
        }

        @Override
        public void onClose()
        {
            super.onClose();
            factory.removeConnection(this);
        }
    }
}

Back to the top