Skip to main content
aboutsummaryrefslogtreecommitdiffstats
blob: 24f2aed4986a45d513bd1390b3e2f267fdb51835 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
/*******************************************************************************
 * Copyright (c) 2011, 2014 Wind River Systems, Inc. and others. All rights reserved.
 * This program and the accompanying materials are made available under the terms
 * of the Eclipse Public License v1.0 which accompanies this distribution, and is
 * available at http://www.eclipse.org/legal/epl-v10.html
 *
 * Contributors:
 * Wind River Systems - initial API and implementation
 *******************************************************************************/
package org.eclipse.tcf.te.runtime.concurrent.executors;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.tcf.te.runtime.concurrent.factories.SingleThreadThreadFactory;
import org.eclipse.tcf.te.runtime.concurrent.interfaces.INestableExecutor;
import org.eclipse.tcf.te.runtime.concurrent.interfaces.ISingleThreadedExecutor;

/**
 * A single threaded executor service implementation.
 */
public class SingleThreadedExecutorService extends AbstractDelegatingExecutorService implements ISingleThreadedExecutor, INestableExecutor {

	/**
	 * A single threaded executor implementation.
	 */
	protected class SingleThreadedExecutor extends ThreadPoolExecutor implements INestableExecutor {
		// The current nesting depth
		private final AtomicInteger currentNestingDepth = new AtomicInteger(0);

		/**
		 * Constructor.
		 *
		 * @param threadFactory
		 *            The thread factory instance. Must not be <code>null</code>.
		 *
		 * @throws NullPointerException
		 *             if threadFactory is <code>null</code>.
		 */
		public SingleThreadedExecutor(ThreadFactory threadFactory) {
			this(threadFactory, new LinkedBlockingQueue<Runnable>());
		}

		/**
		 * Constructor.
		 * <p>
		 * Private constructor to catch the work queue instance passed into the
		 * {@link ThreadPoolExecutor} constructor.
		 *
		 * @param threadFactory
		 *            The thread factory instance. Must not be <code>null</code>.
		 * @param workQueue
		 *            The work queue instance. Must not be <code>null</code>.
		 */
		private SingleThreadedExecutor(ThreadFactory threadFactory, BlockingQueue<Runnable> workQueue) {
			super(1, 1, 0L, TimeUnit.NANOSECONDS, workQueue, threadFactory);
		}

		/* (non-Javadoc)
		 * @see org.eclipse.tcf.te.runtime.concurrent.interfaces.INestableExecutor#getMaxDepth()
		 */
		@Override
		public int getMaxDepth() {
			return 1;
		}

		/* (non-Javadoc)
		 * @see org.eclipse.tcf.te.runtime.concurrent.interfaces.INestableExecutor#readAndExecute()
		 */
		@Override
		public boolean readAndExecute() {
			// Method is callable from the executor thread only
			if (!isExecutorThread()) {
				throw new IllegalStateException("Must be called from within the executor thread!"); //$NON-NLS-1$
			}

			BlockingQueue<Runnable> queue = getQueue();

			// If the work queue is empty, there is nothing to do
			if (!queue.isEmpty()) {
				// Work queue not empty, check if we reached the maximum nesting
				// depth
				if (currentNestingDepth.get() >= getMaxDepth()) {
					throw new IllegalStateException("Maximum nesting depth exceeded!"); //$NON-NLS-1$
				}

				// Get the next work item to do
				Runnable runnable = null;
				try {
					// Double check that the queue is not empty, we desire to
					// avoid
					// blocking here!
					if (!queue.isEmpty()) {
						runnable = queue.take();
					}
				} catch (InterruptedException e) { /* ignored on purpose */ }

				if (runnable != null) {
					// Increase the nesting depth
					currentNestingDepth.incrementAndGet();
					try {
						// Execute the runnable
						runnable.run();
					} finally {
						// Decrease nesting depth
						currentNestingDepth.decrementAndGet();
					}
				}
			}

			return !queue.isEmpty();
		}

		/* (non-Javadoc)
		 * @see java.util.concurrent.ThreadPoolExecutor#afterExecute(java.lang.Runnable, java.lang.Throwable)
		 */
		@Override
		protected void afterExecute(Runnable r, Throwable t) {
			super.afterExecute(r, t);
			if (t != null)
				logException(t);
		}
	}

	// Internal reference to the one shot thread factory instance
	private SingleThreadThreadFactory threadFactory;

	/**
	 * Constructor.
	 */
	public SingleThreadedExecutorService() {
	}

	/* (non-Javadoc)
	 * @see org.eclipse.tcf.te.runtime.concurrent.executors.AbstractDelegatingExecutorService#createExecutorServiceDelegate()
	 */
	@Override
	protected ExecutorService createExecutorServiceDelegate() {
		threadFactory = new SingleThreadThreadFactory(getThreadPoolNamePrefix());
		return new SingleThreadedExecutor(threadFactory);
	}

	/* (non-Javadoc)
	 * @see org.eclipse.tcf.te.runtime.concurrent.interfaces.ISingleThreadedExecutor#isExecutorThread()
	 */
	@Override
	public final boolean isExecutorThread() {
		return isExecutorThread(Thread.currentThread());
	}

	/* (non-Javadoc)
	 * @see org.eclipse.tcf.te.runtime.concurrent.interfaces.ISingleThreadedExecutor#isExecutorThread(java.lang.Thread)
	 */
	@Override
	public final boolean isExecutorThread(Thread thread) {
		if (thread != null && threadFactory != null) {
			return thread.equals(threadFactory.getThread());
		}
		return false;
	}

	/* (non-Javadoc)
	 * @see org.eclipse.tcf.te.runtime.concurrent.interfaces.INestableExecutor#getMaxDepth()
	 */
	@Override
	public int getMaxDepth() {
		if (!(getExecutorServiceDelegate() instanceof INestableExecutor)) {
			throw new UnsupportedOperationException("Executor service delegate must implement INestableExecutor"); //$NON-NLS-1$
		}
		return ((INestableExecutor) getExecutorServiceDelegate()).getMaxDepth();
	}

	/* (non-Javadoc)
	 * @see org.eclipse.tcf.te.runtime.concurrent.interfaces.INestableExecutor#readAndExecute()
	 */
	@Override
	public boolean readAndExecute() {
		if (!(getExecutorServiceDelegate() instanceof INestableExecutor)) {
			throw new UnsupportedOperationException("Executor service delegate must implement INestableExecutor"); //$NON-NLS-1$
		}
		return ((INestableExecutor) getExecutorServiceDelegate()).readAndExecute();
	}
}

Back to the top