Skip to main content
aboutsummaryrefslogtreecommitdiffstats
blob: 4643775e6a9e1a61fca2b98886a7c64d81610c8d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
//
//  ========================================================================
//  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package org.eclipse.jetty.util.thread.strategy;

import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;

import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Locker.Lock;
import org.eclipse.jetty.util.thread.ThreadPool;

/**
 * <p>A strategy where the thread calls produce will always run the resulting task
 * itself.  The strategy may dispatches another thread to continue production.
 * </p>
 * <p>The strategy is also known by the nickname 'eat what you kill', which comes from
 * the hunting ethic that says a person should not kill anything he or she does not
 * plan on eating. In this case, the phrase is used to mean that a thread should
 * not produce a task that it does not intend to run. By making producers run the
 * task that they have just produced avoids execution delays and avoids parallel slow
 * down by running the task in the same core, with good chances of having a hot CPU
 * cache. It also avoids the creation of a queue of produced tasks that the system
 * does not yet have capacity to consume, which can save memory and exert back
 * pressure on producers.
 * </p>
 */
public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
{
    private static final Logger LOG = Log.getLogger(ExecuteProduceConsume.class);
    private final Locker _locker = new Locker();
    private final Runnable _runExecute = new RunExecute();
    private final Producer _producer;
    private final Executor _executor;
    private boolean _idle=true;
    private boolean _execute;
    private boolean _producing;
    private boolean _pending;
    private final ThreadPool _threadpool;
    private final ExecutionStrategy _lowresources;

    public ExecuteProduceConsume(Producer producer, Executor executor)
    {
        this(producer,executor,(executor instanceof ThreadPool)?new ProduceExecuteConsume(producer,executor):null);
    }
    
    public ExecuteProduceConsume(Producer producer, Executor executor, ExecutionStrategy lowResourceStrategy)
    {
        this._producer = producer;
        this._executor = executor;
        _threadpool = (executor instanceof ThreadPool)?((ThreadPool)executor):null;
        _lowresources = _threadpool==null?null:lowResourceStrategy;
    }

    @Override
    public void execute()
    {
        if (LOG.isDebugEnabled())
            LOG.debug("{} execute",this);
        
        boolean produce=false;
        try (Lock locked = _locker.lock())
        {
            // If we are idle and a thread is not producing
            if (_idle)
            {
                if (_producing)
                    throw new IllegalStateException();

                // Then this thread will do the producing
                produce=_producing=true;
                // and we are no longer idle
                _idle=false;
            }
            else
            {
                // Otherwise, lets tell the producing thread
                // that it should call produce again before going idle
                _execute=true;
            }
        }

        if (produce)
            produceAndRun();
    }

    @Override
    public void dispatch()
    {
        if (LOG.isDebugEnabled())
            LOG.debug("{} spawning",this);
        boolean dispatch=false;
        try (Lock locked = _locker.lock())
        {
            if (_idle)
                dispatch=true;
            else
                _execute=true;
        }
        if (dispatch)
            _executor.execute(_runExecute);
    }

    @Override
    public void run()
    {
        if (LOG.isDebugEnabled())
            LOG.debug("{} run",this);
        boolean produce=false;
        try (Lock locked = _locker.lock())
        {
            _pending=false;
            if (!_idle && !_producing)
            {
                produce=_producing=true;
            }
        }

        if (produce)
        {
            // If we are low on resources, then switch to PEC strategy which does not
            // suffer as badly from thread starvation
            while (_threadpool!=null && _threadpool.isLowOnThreads())
            {
                LOG.debug("EWYK low resources {}",this);
                try
                {
                    _lowresources.execute();
                }
                catch(Throwable e)
                {
                    // just warn if lowresources execute fails and keep producing
                    LOG.warn(e);
                }
            }
            
            // no longer low resources so produceAndRun normally
            produceAndRun();
        }
    }

    private void produceAndRun()
    {
        if (LOG.isDebugEnabled())
            LOG.debug("{} produce enter",this);

        while (true)
        {
            // If we got here, then we are the thread that is producing.
            if (LOG.isDebugEnabled())
                LOG.debug("{} producing",this);

            Runnable task = _producer.produce();

            if (LOG.isDebugEnabled())
                LOG.debug("{} produced {}",this,task);

            boolean dispatch=false;
            try (Lock locked = _locker.lock())
            {
                // Finished producing
                _producing=false;

                // Did we produced a task?
                if (task == null)
                {
                    // There is no task.
                    if (_execute)
                    {
                        _idle=false;
                        _producing=true;
                        _execute=false;
                        continue;
                    }

                    // ... and no additional calls to execute, so we are idle
                    _idle=true;
                    break;
                }

                // We have a task, which we will run ourselves,
                // so if we don't have another thread pending
                if (!_pending)
                {
                    // dispatch one
                    dispatch=_pending=true;
                }

                _execute=false;
            }

            // If we became pending
            if (dispatch)
            {
                // Spawn a new thread to continue production by running the produce loop.
                if (LOG.isDebugEnabled())
                    LOG.debug("{} dispatch",this);
                try
                {
                    _executor.execute(this);
                }
                catch(RejectedExecutionException e)
                {
                    // If we cannot execute, then discard/reject the task and keep producing
                    LOG.debug(e);
                    LOG.warn("RejectedExecution {}",task);
                    try
                    {
                        if (task instanceof Rejectable)
                            ((Rejectable)task).reject();
                    }
                    catch (Exception x)
                    {
                        e.addSuppressed(x);
                        LOG.warn(e);
                    }
                    finally
                    {
                        task=null;
                    }
                }
            }

            // Run the task.
            if (LOG.isDebugEnabled())
                LOG.debug("{} run {}",this,task);
            if (task != null)
                task.run();
            if (LOG.isDebugEnabled())
                LOG.debug("{} ran {}",this,task);

            // Once we have run the task, we can try producing again.
            try (Lock locked = _locker.lock())
            {
                // Is another thread already producing or we are now idle?
                if (_producing || _idle)
                    break;
                _producing=true;
            }
        }

        if (LOG.isDebugEnabled())
            LOG.debug("{} produce exit",this);
    }

    public Boolean isIdle()
    {
        try (Lock locked = _locker.lock())
        {
            return _idle;
        }
    }

    public String toString()
    {
        StringBuilder builder = new StringBuilder();
        builder.append("EPR ");
        try (Lock locked = _locker.lock())
        {
            builder.append(_idle?"Idle/":"");
            builder.append(_producing?"Prod/":"");
            builder.append(_pending?"Pend/":"");
            builder.append(_execute?"Exec/":"");
        }
        builder.append(_producer);
        return builder.toString();
    }

    private class RunExecute implements Runnable
    {
        @Override
        public void run()
        {
            execute();
        }
    }
}

Back to the top