Skip to main content
aboutsummaryrefslogtreecommitdiffstats
blob: 62b2e70653b6d20ce2a0c8e87d4bfe6370536bcf (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
/*******************************************************************************
 * Copyright (c) 2009, 2010 Oracle. All rights reserved.
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v1.0, which accompanies this distribution
 * and is available at http://www.eclipse.org/legal/epl-v10.html.
 * 
 * Contributors:
 *     Oracle - initial API and implementation
 ******************************************************************************/
package org.eclipse.jpt.common.utility.internal;

import java.util.concurrent.ThreadFactory;

import org.eclipse.jpt.common.utility.Command;

/**
 * This command executor will dispatch commands to be executed in a separate
 * thread, allowing calls to {@link CommandExecutor#execute(Command)} to return
 * immediately.
 * <p>
 * <strong>NB:</strong> The client-supplied commands should handle any
 * exception appropriately (e.g. log the exception and return gracefully) so
 * the command execution thread can continue executing.
 */
public class AsynchronousCommandExecutor
	implements StatefulCommandExecutor
{
	/**
	 * This command queue is shared with the command execution/consumer thread.
	 * Adding a command to it will trigger the command to be executed by the
	 * command execution thread or, if another command is currently executing,
	 * to execute the new command once the currently executing command has
	 * finished executing.
	 */
	final SynchronizedQueue<Command> commands = new SynchronizedQueue<Command>();

	/**
	 * Most of the thread-related behavior is delegated to this coordinator.
	 */
	private final ConsumerThreadCoordinator consumerThreadCoordinator;


	// ********** construction **********

	/**
	 * Construct an asynchronous command executor.
	 * Use simple JDK thread(s) for the command execution thread(s).
	 * Allow the command execution thread(s) to be assigned JDK-generated names.
	 */
	public AsynchronousCommandExecutor() {
		this(SimpleThreadFactory.instance(), null);
	}

	/**
	 * Construct an asynchronous command executor.
	 * Use the specified thread factory to construct the command execution thread(s).
	 * Allow the command execution thread(s) to be assigned JDK-generated names.
	 */
	public AsynchronousCommandExecutor(ThreadFactory threadFactory) {
		this(threadFactory, null);
	}

	/**
	 * Construct an asynchronous command executor.
	 * Use simple JDK thread(s) for the command execution thread(s).
	 * Assign the command execution thread(s) the specified name.
	 */
	public AsynchronousCommandExecutor(String threadName) {
		this(SimpleThreadFactory.instance(), threadName);
	}

	/**
	 * Construct an asynchronous command executor.
	 * Assign the command execution thread(s) the specified name.
	 */
	public AsynchronousCommandExecutor(ThreadFactory threadFactory, String threadName) {
		super();
		this.consumerThreadCoordinator = this.buildConsumerThreadCoordinator(threadFactory, threadName);
	}

	private ConsumerThreadCoordinator buildConsumerThreadCoordinator(ThreadFactory threadFactory, String threadName) {
		return new ConsumerThreadCoordinator(this.buildConsumer(), threadFactory, threadName);
	}

	private ConsumerThreadCoordinator.Consumer buildConsumer() {
		return new Consumer();
	}


	// ********** CallbackStatefulCommandExecutor implementation **********

	/**
	 * Build and start the command execution/consumer thread.
	 * <p>
	 * Note: We don't clear the command queue here; so if a command has been
	 * added to the queue <em>before</em> getting here, the first command will
	 * be executed promptly (albeit, asynchronously).
	 * The command queue will be non-empty if:<ul>
	 * <li>{@link #execute(Command)} was called after the command executor was
	 *     constructed but before {@link #start()} was called; or
	 * <li>{@link #execute(Command)} was called after {@link #stop()} was called
	 *     but before {@link #start()} was called (to restart the command executor); or
	 * <li>{@link #stop()} was called when there were still outstanding commands
	 *     remaining in the command queue
	 * </ul>
	 */
	public void start() {
		this.consumerThreadCoordinator.start();
	}

	/**
	 * Put the specified command on the command queue, to be consumed by the
	 * command execution thread.
	 */
	public void execute(Command command) {
		this.commands.enqueue(command);
	}

	/**
	 * Interrupt the command execution thread so that it stops executing at the
	 * end of the current command. Suspend the current thread until
	 * the command execution thread is finished executing. If any uncaught
	 * exceptions were thrown while the execution thread was executing,
	 * wrap them in a composite exception and throw the composite exception.
	 */
	public void stop() {
		this.consumerThreadCoordinator.stop();
	}


	// ********** consumer **********

	/**
	 * This implementation of {@link ConsumerThreadCoordinator.Consumer}
	 * will execute the commands enqueued by the asynchronous command executor.
	 * It will wait until the shared command queue is non-empty to begin executing the
	 * commands in the queue. Once a comand is executed, the thread will quiesce until
	 * another command is placed in the command queue. If a new command is
	 * enqueued during the execution of another command (either recursively by
	 * the command itself or by another thread),
	 * the new command will be executed immediately after the currently
	 * executing command is finished.
	 * Stop the thread by calling {@link Thread#interrupt()}.
	 */
	class Consumer
		implements ConsumerThreadCoordinator.Consumer
	{
		Consumer() {
			super();
		}

		/**
		 * Wait until a command has been placed in the queue.
		 */
		public void waitForProducer() throws InterruptedException {
			AsynchronousCommandExecutor.this.commands.waitUntilNotEmpty();
		}

		/**
		 * Execute the first command in the queue and notify our listeners.
		 */
		public void execute() {
			AsynchronousCommandExecutor.this.commands.dequeue().execute();
		}

	}

}

Back to the top