Skip to main content
aboutsummaryrefslogtreecommitdiffstats
blob: de340381030433a11e582c43689cbe0abfd4d59e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
/*******************************************************************************
 * Copyright (c) 2010, 2017 IBM Corporation and others.
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/legal/epl-v10.html
 *
 * Contributors:
 *     IBM Corporation - initial API and implementation
 *******************************************************************************/
package org.eclipse.equinox.coordinator;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimerTask;

import org.eclipse.osgi.util.NLS;
import org.osgi.framework.Bundle;
import org.osgi.service.coordinator.Coordination;
import org.osgi.service.coordinator.CoordinationException;
import org.osgi.service.coordinator.CoordinationPermission;
import org.osgi.service.coordinator.Participant;
import org.osgi.service.log.LogService;

public class CoordinationImpl {
	// Holds a strong reference to the CoordinationWeakReference object associated
	// with this CoordinationImpl. Serves no other purpose. Needs no guarding.
	CoordinationWeakReference reference;

	private volatile Throwable failure;
	private volatile boolean terminated;
	private volatile boolean ending = false;
	
	private Date deadline;
	private CoordinationImpl enclosingCoordination;
	private Thread thread;
	private long totalTimeout;
	private TimerTask timerTask;

	private final CoordinatorImpl coordinator;
	private final long id;
	private final String name;
	private final List<Participant> participants;
	// Store a referent to be used by clients other than the initiator. It must
	// not be a reference to the referent returned to the initiator.
	private final CoordinationReferent referent;
	private final Map<Class<?>, Object> variables;

	public CoordinationImpl(long id, String name, long timeout, CoordinatorImpl coordinator) {
		validateName(name);
		validateTimeout(timeout);
		this.id = id;
		this.name = name;
		totalTimeout = timeout;
		this.coordinator = coordinator;
		participants = Collections.synchronizedList(new ArrayList<Participant>());
		variables = new HashMap<Class<?>, Object>();
		// Not an escaping 'this' reference. It will not escape the thread calling the constructor.
		referent = new CoordinationReferent(this);
	}

	public void addParticipant(Participant participant) throws CoordinationException {
		// This method requires the PARTICIPATE permission.
		coordinator.checkPermission(CoordinationPermission.PARTICIPATE, name);
		if (participant == null)
			throw new NullPointerException(NLS.bind(Messages.NullParameter, "participant")); //$NON-NLS-1$
		/* The caller has permission. Check to see if the participant is already
		 * participating in another coordination. Do this in a loop in case the
		 * participant must wait for the other coordination to finish. The loop
		 * will exit under the following circumstances.
		 *
		 * (1) This coordination is terminated.
		 * (2) The participant is already participating in another coordination
		 * using the same thread as this one.
		 * (3) This thread is interrupted.
		 * (4) The participant is not participating in another coordination.
		 */
		while (true) {
			CoordinationImpl coordination;
			synchronized (this) {
				// Check to see if this coordination has already terminated. If so,
				// throw the appropriate exception.
				checkTerminated();
				coordination = coordinator.addParticipant(participant, this);
				if (coordination == null) {
					// The same participant is not currently participating in
					// any coordination. Add it to this coordination and break
					// out of the loop.
					participants.add(participant);
					break;
				} else if (coordination == this) {
					// The same participant is being added twice to this
					// coordination. Nothing to do.
					break;
				} else {
					// This means the participant is already participating in another
					// coordination. Check to see if it's on the same thread.
					Thread t = coordination.getThread();
					// If thread is null, the coordination is not associated with
					// any thread, and there's nothing to compare. If the coordination
					// is using this thread, then we can't block due to risk of deadlock.
					if (t == Thread.currentThread()) {
						throw new CoordinationException(NLS.bind(Messages.Deadlock, new Object[]{participant, getName(), getId()}), referent, CoordinationException.DEADLOCK_DETECTED);
					}
				}
			}
			// The participant is already participating in another coordination
			// that's not using this thread. Block until that coordination has
			// finished. A decision was made here to use a timeout and incur the
			// expense of waking up and rejoining in order to make a reasonably
			// timely exit if this coordination terminates.
			try {
				coordination.join(1000);
			} catch (InterruptedException e) {
				String message = NLS.bind(Messages.LockInterrupted, new Object[]{participant, name, id, coordination.getName(), coordination.getId()});
				coordinator.getLogService().log(LogService.LOG_DEBUG, message, e);
				// This thread was interrupted while waiting for the coordination
				// to terminate.
				throw new CoordinationException(message, referent, CoordinationException.LOCK_INTERRUPTED, e);
			}
		}
	}

	public void end() throws CoordinationException {
		coordinator.checkPermission(CoordinationPermission.INITIATE, name);
		// Terminating the coordination must be atomic.
		synchronized (this) {
			/*
			 * Set the ending flag to avoid spurious failures for orphans
			 * It appears the VM can aggressively puts objects on the queue if the last call is done in a finally
			 * Coordination c = coordinator.begin("name", 0);
			 * try {
			 *   ...
			 * } finally {
			 *   c.end()
			 * }
			 * In some cases it appears that while in the finally call to c.end()
			 * that c can become put on the queue for GC.
			 * This makes it eligible for orphan processing which will cause 
			 * issues below when calling methods that invoke
			 * CoordinationWeakReference.processOrphanedCoordinations()
			 * We set an ending flag so that we can detect this
			 */
			ending = true;
			// If this coordination is associated with a thread, an additional
			// check is required.
			if (thread != null) {
				// Coordinations may only be ended by the same thread that
				// pushed them onto the stack, if any.
				if (thread != Thread.currentThread()) {
					throw new CoordinationException(NLS.bind(Messages.EndingThreadNotSame, new Object[]{name, id, thread, Thread.currentThread()}), referent, CoordinationException.WRONG_THREAD);
				}
				// Unwind the stack in case there are other coordinations higher
				// up than this one. See bug 421487 for why peek() may be null.
				for (Coordination peeked = coordinator.peek(); !(peeked == null || referent.equals(peeked)); peeked = coordinator.peek()) {
					try {
						peeked.end();
					}
					catch (CoordinationException e) {
						peeked = coordinator.peek();
						if (peeked != null)
							peeked.fail(e);
					}
				}
				// A coordination is removed from the thread local stack only when being ended.
				// This must occur even if the coordination is already terminated due to a
				// failure.
				coordinator.pop();
			}
			terminate();
		}
		// Notify participants this coordination has ended. Track whether or
		// not a partial ending has occurred.
		Exception exception = null;
		Participant exceptionParticipant = null;
		// No additional synchronization is needed here because the participant
		// list will not be modified post termination.
		List<Participant> participantsToNotify = new ArrayList<Participant>(this.participants);
		Collections.reverse(participantsToNotify);
		for (Participant participant : participantsToNotify) {
			try {
				participant.ended(referent);
			} catch (Exception e) {
				coordinator.getLogService().log(LogService.LOG_WARNING, NLS.bind(Messages.ParticipantEndedError, new Object[]{participant, name, id}), e);
				// Only the first exception will be propagated.
				if (exception == null) {
					exception = e;
					exceptionParticipant = participant;
				}
			}
		}
		synchronized (this) {
			// Notify everything joined to this coordination that it has finished.
			notifyAll();
		}
		// If a partial ending has occurred, throw the required exception.
		if (exception != null) {
			throw new CoordinationException(NLS.bind(Messages.CoordinationPartiallyEnded, new Object[]{name, id, exceptionParticipant}), referent, CoordinationException.PARTIALLY_ENDED, exception);
		}
	}

	public long extendTimeout(long timeInMillis) throws CoordinationException {
		coordinator.checkPermission(CoordinationPermission.PARTICIPATE, name);
		validateTimeout(timeInMillis);
		// We don't want this coordination to terminate before the new timer is
		// in place.
		synchronized (this) {
			// Check to see if this coordination has already terminated. If so,
			// throw the appropriate exception.
			checkTerminated();
			// If there was no previous timeout set, return 0 indicating that no
			// extension has taken place.
			if (timerTask == null)
				return 0;
			// Passing anything less than zero as well as zero itself will return the
			// existing deadline. The deadline will not be null if timerTask is not null.
			if (timeInMillis == 0)
				return deadline.getTime();
			long maxTimeout = coordinator.getMaxTimeout();
			long newTotalTimeout = totalTimeout + timeInMillis;
			// If there is no maximum timeout, there's no need to track the total timeout.
			if (maxTimeout != 0) {
				// If the max timeout has already been reached, return 0 indicating that no
				// extension has taken place.
				if (totalTimeout == maxTimeout)
					return 0;
				// If the extension would exceed the maximum timeout, add as much time
				// as possible.
				else if (newTotalTimeout > maxTimeout) {
					totalTimeout = maxTimeout;
					// Adjust the requested extension amount with the allowable amount.
					timeInMillis = newTotalTimeout - maxTimeout;
				}
				// Otherwise, accept the full extension.
				else
					totalTimeout = newTotalTimeout;
			}
			// Cancel the current timeout.
			boolean cancelled = timerTask.cancel();
			if (!cancelled) {
				// This means the previous task has run and is waiting to get a lock on
				// this coordination. We can't throw an exception yet because we can't
				// know which one to use (ALREADY_ENDED or FAILED). Once the lock is
				// released, the running task may fail this coordination due to a timeout,
				// or something else might be waiting to fail this coordination for other
				// reasons or to end it. We simply don't know who will win the race.
				try {
					// Wait until this coordination terminates.
					join(0);
					// Now determine how it terminated and throw the appropriate exception.
					checkTerminated();
				}
				catch (InterruptedException e) {
					throw new CoordinationException(NLS.bind(Messages.InterruptedTimeoutExtension, new Object[]{totalTimeout, getName(), getId(), timeInMillis}), referent, CoordinationException.UNKNOWN, e);
				}
			}
			// Create the new timeout.
			timerTask = new CoordinationTimerTask(this);
			// Extend the current deadline.
			deadline = new Date(deadline.getTime() + timeInMillis);
			// Schedule the new timeout.
			coordinator.schedule(timerTask, deadline);
			// Return the new deadline.
			return deadline.getTime();
		}
	}

	public boolean fail(Throwable reason) {
		coordinator.checkPermission(CoordinationPermission.PARTICIPATE, name);
		// The reason must not be null.
		if (reason == null)
			throw new NullPointerException(NLS.bind(Messages.MissingFailureCause, getName(), getId()));
		// Terminating the coordination must be atomic.
		synchronized (this) {
			// If this coordination is terminated, return false. Do not throw a
			// CoordinationException as in other methods.
			if (terminated)
				return false;
			// This coordination has not already terminated, so terminate now.
			terminate();
			// Store the reason for the failure.
			failure = reason;
		}
		// Notify participants this coordination has failed.
		// No additional synchronization is needed here because the participant
		// list will not be modified post termination.
		List<Participant> participantsToNotify = new ArrayList<Participant>(this.participants);
		Collections.reverse(participantsToNotify);
		for (Participant participant : participantsToNotify) {
			try {
				participant.failed(referent);
			} catch (Exception e) {
				coordinator.getLogService().log(LogService.LOG_WARNING, NLS.bind(Messages.ParticipantFailedError, new Object[]{participant, name, id}), e);
			}
		}
		synchronized (this) {
			// Notify everything joined to this coordination that it has finished.
			notifyAll();
		}
		// Return true to indicate this call resulted in the coordination's failure.
		return true;
	}

	public Bundle getBundle() {
		coordinator.checkPermission(CoordinationPermission.ADMIN, name);
		return coordinator.getBundle();
	}

	public synchronized Coordination getEnclosingCoordination() {
		coordinator.checkPermission(CoordinationPermission.ADMIN, name);
		if (enclosingCoordination == null)
			return null;
		return enclosingCoordination.getReferent();
	}

	public Throwable getFailure() {
		coordinator.checkPermission(CoordinationPermission.INITIATE, name);
		return failure;
	}

	public long getId() {
		return id;
	}

	public String getName() {
		return name;
	}

	public List<Participant> getParticipants() {
		// This method requires the ADMIN permission.
		coordinator.checkPermission(CoordinationPermission.INITIATE, name);
		// Return a mutable snapshot.
		synchronized (participants) {
			return new ArrayList<Participant>(participants);
		}
	}

	public synchronized Thread getThread() {
		coordinator.checkPermission(CoordinationPermission.ADMIN, name);
		return thread;
	}

	public Map<Class<?>, Object> getVariables() {
		coordinator.checkPermission(CoordinationPermission.PARTICIPATE, name);
		return variables;
	}

	public boolean isTerminated() {
		return terminated;
	}

	public boolean isEnding() {
		return ending;
	}

	public void join(final long timeInMillis) throws InterruptedException {
		coordinator.checkPermission(CoordinationPermission.PARTICIPATE, name);
		validateTimeout(timeInMillis);
		// How much system time has elapsed across all waits.
		long elapsed = 0;
		// The system time at the start of the wait.
		long start = System.currentTimeMillis();
		// Wait until this coordination has terminated. Guard against spurious
		// wakeups using the termination status.
		synchronized (this) {
			while (!terminated) {
				// Wait for the desired amount of time minus any time that has already elapsed.
				wait(timeInMillis - elapsed);
				// Only track elapsed time if a definite interval was specified.
				if (timeInMillis != 0) {
					// Update the elapsed time.
					elapsed = System.currentTimeMillis() - start;
					// If the allotted wait time has fully expired, we're done.
					if (elapsed >= timeInMillis) // Don't allow a wait of zero here!
						break;
				}
			}
		}
	}

	public Coordination push() throws CoordinationException {
		coordinator.checkPermission(CoordinationPermission.INITIATE, name);
		synchronized (this) {
			checkTerminated();
			coordinator.push(this);
		}
		return referent;
	}

	LogTracker getLogService() {
		return coordinator.getLogService();
	}

	// Return the referent to be used by clients other than the initiator.
	CoordinationReferent getReferent() {
		return referent;
	}

	synchronized void setTimerTask(TimerTask timerTask) {
		this.timerTask = timerTask;
		deadline = new Date(System.currentTimeMillis() + totalTimeout);
		coordinator.schedule(timerTask, deadline);
	}

	synchronized void setThreadAndEnclosingCoordination(Thread t, CoordinationImpl c) {
		thread = t;
		enclosingCoordination = c;
	}

	private void checkTerminated() throws CoordinationException {
		// If this coordination is not terminated, simply return.
		if (!terminated)
			return;
		// The coordination has terminated. Figure out which type of exception
		// must be thrown.
		if (failure != null) {
			// The fail() method was called indicating the coordination failed.
			throw new CoordinationException(NLS.bind(Messages.CoordinationFailed, name, id), referent, CoordinationException.FAILED, failure);
		}
		// The coordination did not fail, so it either partially ended or
		// ended successfully.
		throw new CoordinationException(NLS.bind(Messages.CoordinationEnded, name, id), referent, CoordinationException.ALREADY_ENDED);
	}

	private void terminate() throws CoordinationException {
		checkTerminated();
		terminated = true;
		// Cancel the timeout. Purge the task if it was, in fact, canceled.
		if (timerTask != null && timerTask.cancel()) {
			coordinator.purge();
		}
		coordinator.terminate(this, participants);
	}

	private static void validateName(String name) {
		boolean valid = true;
		if (name == null || name.length() == 0)
			valid = false;
		else {
			boolean period = false;
			for (char c : name.toCharArray()) {
				if (Character.isLetterOrDigit(c) || c == '_' || c == '-') {
					period = false;
				} else if (c == '.' && !period) {
					period = true;
				} else {
					valid = false;
					break;
				}
			}
		}
		if (!valid)
			throw new IllegalArgumentException(NLS.bind(Messages.InvalidCoordinationName, name));
	}

	private static void validateTimeout(long timeout) {
		if (timeout < 0)
			throw new IllegalArgumentException(NLS.bind(Messages.InvalidTimeInterval, timeout));
	}
}

Back to the top