Skip to main content
aboutsummaryrefslogtreecommitdiffstats
blob: 743bdc38b6473fcad2126aac3acc2036a3e71179 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
/*******************************************************************************
 * Copyright (c) 2011 Intalio, Inc.
 * ======================================================================
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v1.0
 * and Apache License v2.0 which accompanies this distribution.
 *
 *   The Eclipse Public License is available at
 *   http://www.eclipse.org/legal/epl-v10.html
 *
 *   The Apache License v2.0 is available at
 *   http://www.opensource.org/licenses/apache2.0.php
 *
 * You may elect to redistribute this code under either of these licenses.
 *******************************************************************************/
package org.eclipse.jetty.websocket;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ProtocolException;
import java.net.SocketAddress;
import java.net.URI;
import java.nio.channels.ByteChannel;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.eclipse.jetty.util.log.Logger;


/* ------------------------------------------------------------ */
/**
 * <p>{@link WebSocketClient} allows to create multiple connections to multiple destinations
 * that can speak the websocket protocol.</p>
 * <p>When creating websocket connections, {@link WebSocketClient} accepts a {@link WebSocket}
 * object (to receive events from the server), and returns a {@link WebSocket.Connection} to
 * send data to the server.</p>
 * <p>Example usage is as follows:</p>
 * <pre>
 *   WebSocketClientFactory factory = new WebSocketClientFactory();
 *   factory.start();
 *
 *   WebSocketClient client = factory.newWebSocketClient();
 *   // Configure the client
 *
 *   WebSocket.Connection connection = client.open(new URI("ws://127.0.0.1:8080/"), new WebSocket.OnTextMessage()
 *   {
 *     public void onOpen(Connection connection)
 *     {
 *       // open notification
 *     }
 *
 *     public void onClose(int closeCode, String message)
 *     {
 *       // close notification
 *     }
 *
 *     public void onMessage(String data)
 *     {
 *       // handle incoming message
 *     }
 *   }).get(5, TimeUnit.SECONDS);
 *
 *   connection.sendMessage("Hello World");
 * </pre>
 */
public class WebSocketClient
{
    private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClient.class.getName());

    private final WebSocketClientFactory _factory;
    private final Map<String,String> _cookies=new ConcurrentHashMap<String, String>();
    private final List<String> _extensions=new CopyOnWriteArrayList<String>();
    private String _origin;
    private String _protocol;
    private int _maxIdleTime=-1;
    private int _maxTextMessageSize=16*1024;
    private int _maxBinaryMessageSize=-1;
    private MaskGen _maskGen;
    private SocketAddress _bindAddress;

    /* ------------------------------------------------------------ */
    /**
     * <p>Creates a WebSocketClient from a private WebSocketClientFactory.</p>
     * <p>This can be wasteful of resources if many clients are created.</p>
     *
     * @deprecated Use {@link WebSocketClientFactory#newWebSocketClient()}
     * @throws Exception if the private WebSocketClientFactory fails to start
     */
    @Deprecated
    public WebSocketClient() throws Exception
    {
        _factory=new WebSocketClientFactory();
        _factory.start();
        _maskGen=_factory.getMaskGen();
    }

    /* ------------------------------------------------------------ */
    /**
     * <p>Creates a WebSocketClient with shared WebSocketClientFactory.</p>
     *
     * @param factory the shared {@link WebSocketClientFactory}
     */
    public WebSocketClient(WebSocketClientFactory factory)
    {
        _factory=factory;
        _maskGen=_factory.getMaskGen();
    }

    /* ------------------------------------------------------------ */
    /**
     * @return The WebSocketClientFactory this client was created with.
     */
    public WebSocketClientFactory getFactory()
    {
        return _factory;
    }

    /* ------------------------------------------------------------ */
    /**
     * @return the address to bind the socket channel to
     * @see #setBindAddress(SocketAddress)
     */
    public SocketAddress getBindAddress()
    {
        return _bindAddress;
    }

    /* ------------------------------------------------------------ */
    /**
     * @param bindAddress the address to bind the socket channel to
     * @see #getBindAddress()
     */
    public void setBindAddress(SocketAddress bindAddress)
    {
        this._bindAddress = bindAddress;
    }

    /* ------------------------------------------------------------ */
    /**
     * @return The maxIdleTime in ms for connections opened by this client,
     * or -1 if the default from {@link WebSocketClientFactory#getSelectorManager()} is used.
     * @see #setMaxIdleTime(int)
     */
    public int getMaxIdleTime()
    {
        return _maxIdleTime;
    }

    /* ------------------------------------------------------------ */
    /**
     * @param maxIdleTime The max idle time in ms for connections opened by this client
     * @see #getMaxIdleTime()
     */
    public void setMaxIdleTime(int maxIdleTime)
    {
        _maxIdleTime=maxIdleTime;
    }

    /* ------------------------------------------------------------ */
    /**
     * @return The subprotocol string for connections opened by this client.
     * @see #setProtocol(String)
     */
    public String getProtocol()
    {
        return _protocol;
    }

    /* ------------------------------------------------------------ */
    /**
     * @param protocol The subprotocol string for connections opened by this client.
     * @see #getProtocol()
     */
    public void setProtocol(String protocol)
    {
        _protocol = protocol;
    }

    /* ------------------------------------------------------------ */
    /**
     * @return The origin URI of the client
     * @see #setOrigin(String)
     */
    public String getOrigin()
    {
        return _origin;
    }

    /* ------------------------------------------------------------ */
    /**
     * @param origin The origin URI of the client (eg "http://example.com")
     * @see #getOrigin()
     */
    public void setOrigin(String origin)
    {
        _origin = origin;
    }

    /* ------------------------------------------------------------ */
    /**
     * <p>Returns the map of the cookies that are sent during the initial HTTP handshake
     * that upgrades to the websocket protocol.</p>
     * @return The read-write cookie map
     */
    public Map<String,String> getCookies()
    {
        return _cookies;
    }

    /* ------------------------------------------------------------ */
    /**
     * @return The list of websocket protocol extensions
     */
    public List<String> getExtensions()
    {
        return _extensions;
    }

    /* ------------------------------------------------------------ */
    /**
     * @return the mask generator to use, or null if not mask generator should be used
     * @see #setMaskGen(MaskGen)
     */
    public MaskGen getMaskGen()
    {
        return _maskGen;
    }

    /* ------------------------------------------------------------ */
    /**
     * @param maskGen the mask generator to use, or null if not mask generator should be used
     * @see #getMaskGen()
     */
    public void setMaskGen(MaskGen maskGen)
    {
        _maskGen = maskGen;
    }

    /* ------------------------------------------------------------ */
    /**
     * @return The initial maximum text message size (in characters) for a connection
     */
    public int getMaxTextMessageSize()
    {
        return _maxTextMessageSize;
    }

    /* ------------------------------------------------------------ */
    /**
     * Set the initial maximum text message size for a connection. This can be changed by
     * the application calling {@link WebSocket.Connection#setMaxTextMessageSize(int)}.
     * @param maxTextMessageSize The default maximum text message size (in characters) for a connection
     */
    public void setMaxTextMessageSize(int maxTextMessageSize)
    {
        _maxTextMessageSize = maxTextMessageSize;
    }

    /* ------------------------------------------------------------ */
    /**
     * @return The initial maximum binary message size (in bytes)  for a connection
     */
    public int getMaxBinaryMessageSize()
    {
        return _maxBinaryMessageSize;
    }

    /* ------------------------------------------------------------ */
    /**
     * Set the initial maximum binary message size for a connection. This can be changed by
     * the application calling {@link WebSocket.Connection#setMaxBinaryMessageSize(int)}.
     * @param maxBinaryMessageSize The default maximum binary message size (in bytes) for a connection
     */
    public void setMaxBinaryMessageSize(int maxBinaryMessageSize)
    {
        _maxBinaryMessageSize = maxBinaryMessageSize;
    }

    /* ------------------------------------------------------------ */
    /**
     * <p>Opens a websocket connection to the URI and blocks until the connection is accepted or there is an error.</p>
     *
     * @param uri The URI to connect to.
     * @param websocket The {@link WebSocket} instance to handle incoming events.
     * @param maxConnectTime The interval to wait for a successful connection
     * @param units the units of the maxConnectTime
     * @return A {@link WebSocket.Connection}
     * @throws IOException if the connection fails
     * @throws InterruptedException if the thread is interrupted
     * @throws TimeoutException if the timeout elapses before the connection is completed
     * @see #open(URI, WebSocket)
     */
    public WebSocket.Connection open(URI uri, WebSocket websocket,long maxConnectTime,TimeUnit units) throws IOException, InterruptedException, TimeoutException
    {
        try
        {
            return open(uri,websocket).get(maxConnectTime,units);
        }
        catch (ExecutionException e)
        {
            Throwable cause = e.getCause();
            if (cause instanceof IOException)
                throw (IOException)cause;
            if (cause instanceof Error)
                throw (Error)cause;
            if (cause instanceof RuntimeException)
                throw (RuntimeException)cause;
            throw new RuntimeException(cause);
        }
    }

    /* ------------------------------------------------------------ */
    /**
     * <p>Asynchronously opens a websocket connection and returns a {@link Future} to obtain the connection.</p>
     * <p>The caller must call {@link Future#get(long, TimeUnit)} if they wish to impose a connect timeout on the open.</p>
     *
     * @param uri The URI to connect to.
     * @param websocket The {@link WebSocket} instance to handle incoming events.
     * @return A {@link Future} to the {@link WebSocket.Connection}
     * @throws IOException if the connection fails
     * @see #open(URI, WebSocket, long, TimeUnit)
     */
    public Future<WebSocket.Connection> open(URI uri, WebSocket websocket) throws IOException
    {
        if (!_factory.isStarted())
            throw new IllegalStateException("Factory !started");

        InetSocketAddress address = toSocketAddress(uri);

        SocketChannel channel = SocketChannel.open();
        if (_bindAddress != null)
            channel.socket().bind(_bindAddress);
        channel.socket().setTcpNoDelay(true);

        WebSocketFuture holder = new WebSocketFuture(websocket, uri, this, channel);

        channel.configureBlocking(false);
        channel.connect(address);
        _factory.getSelectorManager().register(channel, holder);

        return holder;
    }

    public static InetSocketAddress toSocketAddress(URI uri)
    {
        String scheme = uri.getScheme();
        if (!("ws".equalsIgnoreCase(scheme) || "wss".equalsIgnoreCase(scheme)))
            throw new IllegalArgumentException("Bad WebSocket scheme: " + scheme);
        int port = uri.getPort();
        if (port == 0)
            throw new IllegalArgumentException("Bad WebSocket port: " + port);
        if (port < 0)
            port = "ws".equals(scheme) ? 80 : 443;

        return new InetSocketAddress(uri.getHost(), port);
    }

    /* ------------------------------------------------------------ */
    /** The Future Websocket Connection.
     */
    static class WebSocketFuture implements Future<WebSocket.Connection>
    {
        final WebSocket _websocket;
        final URI _uri;
        final WebSocketClient _client;
        final CountDownLatch _done = new CountDownLatch(1);
        ByteChannel _channel;
        WebSocketConnection _connection;
        Throwable _exception;

        private WebSocketFuture(WebSocket websocket, URI uri, WebSocketClient client, ByteChannel channel)
        {
            _websocket=websocket;
            _uri=uri;
            _client=client;
            _channel=channel;
        }

        public void onConnection(WebSocketConnection connection)
        {
            try
            {
                _client.getFactory().addConnection(connection);

                connection.getConnection().setMaxTextMessageSize(_client.getMaxTextMessageSize());
                connection.getConnection().setMaxBinaryMessageSize(_client.getMaxBinaryMessageSize());

                WebSocketConnection con;
                synchronized (this)
                {
                    if (_channel!=null)
                        _connection=connection;
                    con=_connection;
                }

                if (con!=null)
                {
                    if (_websocket instanceof WebSocket.OnFrame)
                        ((WebSocket.OnFrame)_websocket).onHandshake((WebSocket.FrameConnection)con.getConnection());

                    _websocket.onOpen(con.getConnection());
                }
            }
            finally
            {
                _done.countDown();
            }
        }

        public void handshakeFailed(Throwable ex)
        {
            try
            {
                ByteChannel channel=null;
                synchronized (this)
                {
                    if (_channel!=null)
                    {
                        channel=_channel;
                        _channel=null;
                        _exception=ex;
                    }
                }

                if (channel!=null)
                {
                    if (ex instanceof ProtocolException)
                        closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_PROTOCOL,ex.getMessage());
                    else
                        closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,ex.getMessage());
                }
            }
            finally
            {
                _done.countDown();
            }
        }

        public Map<String,String> getCookies()
        {
            return _client.getCookies();
        }

        public String getProtocol()
        {
            return _client.getProtocol();
        }

        public WebSocket getWebSocket()
        {
            return _websocket;
        }

        public URI getURI()
        {
            return _uri;
        }

        public int getMaxIdleTime()
        {
            return _client.getMaxIdleTime();
        }

        public String getOrigin()
        {
            return _client.getOrigin();
        }

        public MaskGen getMaskGen()
        {
            return _client.getMaskGen();
        }

        @Override
        public String toString()
        {
            return "[" + _uri + ","+_websocket+"]@"+hashCode();
        }

        public boolean cancel(boolean mayInterruptIfRunning)
        {
            try
            {
                ByteChannel channel=null;
                synchronized (this)
                {
                    if (_connection==null && _exception==null && _channel!=null)
                    {
                        channel=_channel;
                        _channel=null;
                    }
                }

                if (channel!=null)
                {
                    closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,"cancelled");
                    return true;
                }
                return false;
            }
            finally
            {
                _done.countDown();
            }
        }

        public boolean isCancelled()
        {
            synchronized (this)
            {
                return _channel==null && _connection==null;
            }
        }

        public boolean isDone()
        {
            synchronized (this)
            {
                return _connection!=null && _exception==null;
            }
        }

        public org.eclipse.jetty.websocket.WebSocket.Connection get() throws InterruptedException, ExecutionException
        {
            try
            {
                return get(Long.MAX_VALUE,TimeUnit.SECONDS);
            }
            catch(TimeoutException e)
            {
                throw new IllegalStateException("The universe has ended",e);
            }
        }

        public org.eclipse.jetty.websocket.WebSocket.Connection get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
                TimeoutException
        {
            _done.await(timeout,unit);
            ByteChannel channel=null;
            org.eclipse.jetty.websocket.WebSocket.Connection connection=null;
            Throwable exception;
            synchronized (this)
            {
                exception=_exception;
                if (_connection==null)
                {
                    exception=_exception;
                    channel=_channel;
                    _channel=null;
                }
                else
                    connection=_connection.getConnection();
            }

            if (channel!=null)
                closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,"timeout");
            if (exception!=null)
                throw new ExecutionException(exception);
            if (connection!=null)
                return connection;
            throw new TimeoutException();
        }

        private void closeChannel(ByteChannel channel,int code, String message)
        {
            try
            {
                _websocket.onClose(code,message);
            }
            catch(Exception e)
            {
                __log.warn(e);
            }

            try
            {
                channel.close();
            }
            catch(IOException e)
            {
                __log.debug(e);
            }
        }
    }
}

Back to the top