Skip to main content
aboutsummaryrefslogtreecommitdiffstats
blob: a50131f1ed2b46220d4993dc8905b5d5c9b52d2a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
//
//  ========================================================================
//  Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package org.eclipse.jetty.client;

import java.util.concurrent.atomic.AtomicReference;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.util.Promise;

public abstract class MultiplexHttpDestination<C extends Connection> extends HttpDestination implements Promise<Connection>
{
    private final AtomicReference<ConnectState> connect = new AtomicReference<>(ConnectState.DISCONNECTED);
    private C connection;

    protected MultiplexHttpDestination(HttpClient client, Origin origin)
    {
        super(client, origin);
    }

    @Override
    public void send()
    {
        while (true)
        {
            ConnectState current = connect.get();
            switch (current)
            {
                case DISCONNECTED:
                {
                    if (!connect.compareAndSet(current, ConnectState.CONNECTING))
                        break;
                    newConnection(this);
                    return;
                }
                case CONNECTING:
                {
                    // Waiting to connect, just return
                    return;
                }
                case CONNECTED:
                {
                    if (process(connection))
                        break;
                    return;
                }
                default:
                {
                    abort(new IllegalStateException("Invalid connection state " + current));
                    return;
                }
            }
        }
    }

    @Override
    @SuppressWarnings("unchecked")
    public void succeeded(Connection result)
    {
        C connection = this.connection = (C)result;
        if (connect.compareAndSet(ConnectState.CONNECTING, ConnectState.CONNECTED))
        {
            process(connection);
        }
        else
        {
            connection.close();
            failed(new IllegalStateException());
        }
    }

    @Override
    public void failed(Throwable x)
    {
        connect.set(ConnectState.DISCONNECTED);
        abort(x);
    }

    protected boolean process(final C connection)
    {
        HttpClient client = getHttpClient();
        final HttpExchange exchange = getHttpExchanges().poll();
        if (LOG.isDebugEnabled())
            LOG.debug("Processing {} on {}", exchange, connection);
        if (exchange == null)
            return false;

        final Request request = exchange.getRequest();
        Throwable cause = request.getAbortCause();
        if (cause != null)
        {
            if (LOG.isDebugEnabled())
                LOG.debug("Aborted before processing {}: {}", exchange, cause);
            // It may happen that the request is aborted before the exchange
            // is created. Aborting the exchange a second time will result in
            // a no-operation, so we just abort here to cover that edge case.
            exchange.abort(cause);
        }
        else
        {
            send(connection, exchange);
        }
        return true;
    }

    @Override
    public void close()
    {
        super.close();
        C connection = this.connection;
        if (connection != null)
            connection.close();
    }

    @Override
    public void close(Connection connection)
    {
        super.close(connection);
        while (true)
        {
            ConnectState current = connect.get();
            if (connect.compareAndSet(current, ConnectState.DISCONNECTED))
            {
                if (getHttpClient().isRemoveIdleDestinations())
                    getHttpClient().removeDestination(this);
                break;
            }
        }
    }

    protected abstract void send(C connection, HttpExchange exchange);

    private enum ConnectState
    {
        DISCONNECTED, CONNECTING, CONNECTED
    }
}

Back to the top