Skip to main content
summaryrefslogtreecommitdiffstats
blob: a37c9dfa798d03ee23ebe46fc4f18316aeb17272 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
/*******************************************************************************
 * Copyright (c) 2000, 2003 IBM Corporation and others.
 * All rights reserved. This program and the accompanying materials 
 * are made available under the terms of the Common Public License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/legal/cpl-v10.html
 * 
 * Contributors:
 *     IBM Corporation - initial API and implementation
 *******************************************************************************/
package org.eclipse.team.internal.core.subscribers;

import java.util.*;

import org.eclipse.core.resources.*;
import org.eclipse.core.runtime.*;
import org.eclipse.core.runtime.jobs.Job;
import org.eclipse.team.core.*;
import org.eclipse.team.core.subscribers.Subscriber;
import org.eclipse.team.core.synchronize.SyncInfo;
import org.eclipse.team.core.synchronize.SyncInfoSet;
import org.eclipse.team.internal.core.*;
import org.eclipse.team.internal.core.Policy;

/**
 * This handler collects changes and removals to resources and calculates their
 * synchronization state in a background job. The result is fed input the SyncSetInput.
 * 
 * Exceptions that occur when the job is processing the events are collected and
 * returned as part of the Job's status.
 */
public class SubscriberEventHandler extends BackgroundEventHandler {
	// The set that receives notification when the resource synchronization state
	// has been calculated by the job.
	private SyncSetInputFromSubscriber syncSetInput;

	// Changes accumulated by the event handler
	private List resultCache = new ArrayList();
	
	private boolean started = false;

	private IProgressMonitor progressGroup;

	private int ticks;
	
	/**
	 * Internal resource synchronization event. Can contain a result.
	 */
	class SubscriberEvent extends Event{
		static final int REMOVAL = 1;
		static final int CHANGE = 2;
		static final int INITIALIZE = 3;
		SyncInfo result;

		SubscriberEvent(IResource resource, int type, int depth) {
			super(resource, type, depth);
		}
		public SubscriberEvent(
			IResource resource,
			int type,
			int depth,
			SyncInfo result) {
				this(resource, type, depth);
				this.result = result;
		}
		public SyncInfo getResult() {
			return result;
		}
		protected String getTypeString() {
			switch (getType()) {
				case REMOVAL :
					return "REMOVAL"; //$NON-NLS-1$
				case CHANGE :
					return "CHANGE"; //$NON-NLS-1$
				case INITIALIZE :
					return "INITIALIZE"; //$NON-NLS-1$
				default :
					return "INVALID"; //$NON-NLS-1$
			}
		}
	}
	
	/**
	 * This is a special event used to reset and connect sync sets.
	 * The preemtive flag is used to indicate that the runnable should take
	 * the highest priority and thus be placed on the front of the queue
	 * and be processed as soon as possible, preemting any event that is currently
	 * being processed. The curent event will continue processing once the 
	 * high priority event has been processed
	 */
	public class RunnableEvent extends Event {
		static final int RUNNABLE = 1000;
		private IWorkspaceRunnable runnable;
		private boolean preemtive;
		public RunnableEvent(IWorkspaceRunnable runnable, boolean preemtive) {
			super(ResourcesPlugin.getWorkspace().getRoot(), RUNNABLE, IResource.DEPTH_ZERO);
			this.runnable = runnable;
			this.preemtive = preemtive;
		}
		public void run(IProgressMonitor monitor) throws CoreException {
			runnable.run(monitor);
		}
		public boolean isPreemtive() {
			return preemtive;
		}
	}
	
	/**
	 * Create a handler. This will initialize all resources for the subscriber associated with
	 * the set.
	 * @param set the subscriber set to feed changes into
	 */
	public SubscriberEventHandler(Subscriber subscriber) {
		super(
			Policy.bind("SubscriberEventHandler.jobName", subscriber.getName()), //$NON-NLS-1$
			Policy.bind("SubscriberEventHandler.errors", subscriber.getName())); //$NON-NLS-1$
		this.syncSetInput = new SyncSetInputFromSubscriber(subscriber, this);
	}
	
	/**
	 * Start the event handler by queuing events to prime the sync set input with the out-of-sync 
	 * resources of the subscriber.
	 */
	public synchronized void start() {
		// Set the started flag to enable event queueing.
		// We are gaurenteed to be the first since this method is synchronized.
		started = true;
		reset(syncSetInput.getSubscriber().roots(), SubscriberEvent.INITIALIZE);
	}

	protected synchronized void queueEvent(Event event, boolean front) {
		// Only post events if the handler is started
		if (started) {
			super.queueEvent(event, front);
		}
	}
	/**
	 * Schedule the job or process the events now.
	 */
	public void schedule() {
		Job job = getEventHandlerJob();
		if(progressGroup != null) {
			job.setSystem(false);
			job.setProgressGroup(progressGroup, ticks);
		} else {
			job.setSystem(true);
		}
		getEventHandlerJob().schedule();
	}
	
	/**
	 * Initialize all resources for the subscriber associated with the set. This will basically recalculate
	 * all synchronization information for the subscriber.
	 * <p>
	 * This method is sycnrhonized with the queueEvent method to ensure that the two events
	 * queued by this method are back-to-back
	 */
	public synchronized void reset(IResource[] roots) {
		if (roots == null) {
			roots = syncSetInput.getSubscriber().roots();
		}
		// First, reset the sync set input to clear the sync set
		run(new IWorkspaceRunnable() {
			public void run(IProgressMonitor monitor) throws CoreException {
				syncSetInput.reset(monitor);
			}
		}, false /* keep ordering the same */);
		// Then, prime the set from the subscriber
		reset(roots, SubscriberEvent.CHANGE);
	}
	
	/**
	 * Called by a client to indicate that a resource has changed and its synchronization state
	 * should be recalculated.  
	 * @param resource the changed resource
	 * @param depth the depth of the change calculation
	 */
	public void change(IResource resource, int depth) {
		queueEvent(new SubscriberEvent(resource, SubscriberEvent.CHANGE, depth), false);
	}
	
	/**
	 * Called by a client to indicate that a resource has been removed and should be removed. The
	 * removal will propagate to the set.
	 * @param resource the resource that was removed
	 */
	public void remove(IResource resource) {
		queueEvent(
			new SubscriberEvent(resource, SubscriberEvent.REMOVAL, IResource.DEPTH_INFINITE), false);
	}
	
	/**
	 * Collect the calculated synchronization information for the given resource at the given depth. The
	 * results are added to the provided list.
	 */
	private void collect(
		IResource resource,
		int depth,
		IProgressMonitor monitor) {
		
		// handle any preemtive events before continuing
		handlePreemptiveEvents(monitor);
		
		if (resource.getType() != IResource.FILE
			&& depth != IResource.DEPTH_ZERO) {
			try {
				IResource[] members =
					syncSetInput.getSubscriber().members(resource);
				for (int i = 0; i < members.length; i++) {
					collect(
						members[i],
						depth == IResource.DEPTH_INFINITE
							? IResource.DEPTH_INFINITE
							: IResource.DEPTH_ZERO,
						monitor);
				}
			} catch (TeamException e) {
				handleException(e, resource, ITeamStatus.SYNC_INFO_SET_ERROR, Policy.bind("SubscriberEventHandler.8", resource.getFullPath().toString(), e.getMessage())); //$NON-NLS-1$
			}
		}

		monitor.subTask(Policy.bind("SubscriberEventHandler.2", resource.getFullPath().toString())); //$NON-NLS-1$
		try {
			SyncInfo info = syncSetInput.getSubscriber().getSyncInfo(resource);
			// resource is no longer under the subscriber control
			if (info == null) {
				resultCache.add(
					new SubscriberEvent(resource, SubscriberEvent.REMOVAL, IResource.DEPTH_ZERO));
			} else {
				resultCache.add(
					new SubscriberEvent(resource, SubscriberEvent.CHANGE, IResource.DEPTH_ZERO, info));
			}
			handlePendingDispatch(monitor);
		} catch (TeamException e) {
			handleException(e, resource, ITeamStatus.RESOURCE_SYNC_INFO_ERROR, Policy.bind("SubscriberEventHandler.9", resource.getFullPath().toString(), e.getMessage())); //$NON-NLS-1$
		}
		monitor.worked(1);
	}
	
	private void handlePendingDispatch(IProgressMonitor monitor) {
		if (isReadyForDispatch(false /*don't wait if queue is empty*/)) {
			dispatchEvents(Policy.subMonitorFor(monitor, 5));
			eventsDispatched();
		}
	}

	/*
	 * Handle the exception by returning it as a status from the job but also by
	 * dispatching it to the sync set input so any down stream views can react
	 * accordingly.
	 */
	private void handleException(CoreException e, IResource resource, int code, String message) {
		handleException(e);
		syncSetInput.handleError(new TeamStatus(IStatus.ERROR, TeamPlugin.ID, code, message, e, resource));
	}

	/**
	 * Called to initialize to calculate the synchronization information using the optimized subscriber method. For
	 * subscribers that don't support the optimization, all resources in the subscriber are manually re-calculated. 
	 * @param resources the resources to check
	 * @param depth the depth
	 * @param monitor
	 * @return Event[] the change events
	 * @throws TeamException
	 */
	private void collectAll(
		IResource resource,
		int depth,
		IProgressMonitor monitor) {
		
		
		monitor.beginTask(null, IProgressMonitor.UNKNOWN);
		try {
			
			// Create a monitor that will handle preemptions and dispatch if required
			IProgressMonitor collectionMonitor = new SubProgressMonitor(monitor, IProgressMonitor.UNKNOWN) {
				boolean dispatching = false;
				public void subTask(String name) {
					dispatch();
					super.subTask(name);
				}
				private void dispatch() {
					if (dispatching) return;
					try {
						dispatching = true;
						handlePreemptiveEvents(this);
						handlePendingDispatch(this);
					} finally {
						dispatching = false;
					}
				}
				public void worked(int work) {
					dispatch();
					super.worked(work);
				}
			};
			
			// Create a sync set that queues up resources and errors for dispatch
			SyncInfoSet collectionSet = new SyncInfoSet() {
				public void add(SyncInfo info) {
					super.add(info);
					resultCache.add(
							new SubscriberEvent(info.getLocal(), SubscriberEvent.CHANGE, IResource.DEPTH_ZERO, info));
				}
				public void addError(ITeamStatus status) {
					super.addError(status);
					TeamPlugin.getPlugin().getLog().log(status);
					syncSetInput.handleError(status);
				}
				public void remove(IResource resource) {
					super.remove(resource);
					resultCache.add(
							new SubscriberEvent(resource, SubscriberEvent.REMOVAL, IResource.DEPTH_ZERO));
				}
			};
			
			syncSetInput.getSubscriber().collectOutOfSync(new IResource[] { resource }, depth, collectionSet, collectionMonitor);
			
		} finally {
			monitor.done();
		}
	}

	/**
	 * Feed the given events to the set. The appropriate method on the set is called
	 * for each event type. 
	 * @param events
	 */
	private void dispatchEvents(SubscriberEvent[] events, IProgressMonitor monitor) {
		// this will batch the following set changes until endInput is called.
		try {
			syncSetInput.getSyncSet().beginInput();
			for (int i = 0; i < events.length; i++) {
				SubscriberEvent event = events[i];
				switch (event.getType()) {
					case SubscriberEvent.CHANGE :
						syncSetInput.collect(event.getResult(), monitor);
						break;
					case SubscriberEvent.REMOVAL :
						syncSetInput.getSyncSet().remove(event.getResource(), event.getDepth());
						break;
				}
			}
		} finally {
			syncSetInput.getSyncSet().endInput(monitor);
		}
	}
	
	/**
	 * Initialize all resources for the subscriber associated with the set. This will basically recalculate
	 * all synchronization information for the subscriber.
	 * @param type can be Event.CHANGE to recalculate all states or Event.INITIALIZE to perform the
	 *   optimized recalculation if supported by the subscriber.
	 */
	private void reset(IResource[] roots, int type) {
		IResource[] resources = roots;
		for (int i = 0; i < resources.length; i++) {
			queueEvent(new SubscriberEvent(resources[i], type, IResource.DEPTH_INFINITE), false);
		}
	}

	protected void processEvent(Event event, IProgressMonitor monitor) {
		try {
			// Cancellation is dangerous because this will leave the sync info in a bad state.
			// Purposely not checking -
			int type = event.getType();
			switch (type) {
				case RunnableEvent.RUNNABLE :
					executeRunnable(event, monitor);
					break;
				case SubscriberEvent.REMOVAL :
					resultCache.add(event);
					break;
				case SubscriberEvent.CHANGE :
					collect(
						event.getResource(),
						event.getDepth(),
						monitor);
					break;
				case SubscriberEvent.INITIALIZE :
					getEventHandlerJob().setSystem(false);
					monitor.subTask(Policy.bind("SubscriberEventHandler.2", event.getResource().getFullPath().toString())); //$NON-NLS-1$
					collectAll(
							event.getResource(),
							event.getDepth(),
							Policy.subMonitorFor(monitor, 64));
					break;
			}
		} catch (RuntimeException e) {
			// handle the exception and keep processing
			handleException(new TeamException(Policy.bind("SubscriberEventHandler.10"), e), event.getResource(), ITeamStatus.SYNC_INFO_SET_ERROR, Policy.bind("SubscriberEventHandler.11", event.getResource().getFullPath().toString(), e.getMessage())); //$NON-NLS-1$ //$NON-NLS-2$
		}
	}
		
	/*
	 * Execute the RunnableEvent
	 */
	private void executeRunnable(Event event, IProgressMonitor monitor) {
		// Dispatch any queued results to clear pending output events
		dispatchEvents(Policy.subMonitorFor(monitor, 1));
		eventsDispatched();
		try {
			((RunnableEvent)event).run(Policy.subMonitorFor(monitor, 1));
		} catch (CoreException e) {
			handleException(e, event.getResource(), ITeamStatus.SYNC_INFO_SET_ERROR, e.getMessage());
		}
	}

	/* (non-Javadoc)
	 * @see org.eclipse.team.core.subscribers.BackgroundEventHandler#dispatchEvents()
	 */
	protected void dispatchEvents(IProgressMonitor monitor) {
		if (!resultCache.isEmpty()) {
			dispatchEvents((SubscriberEvent[]) resultCache.toArray(new SubscriberEvent[resultCache.size()]), monitor);
			resultCache.clear();
		}
	}

	/**
	 * Queue up the given runnable in an event to be processed by this job
	 * @param runnable the runnable to be run by the handler
	 */
	public void run(IWorkspaceRunnable runnable, boolean frontOnQueue) {
		queueEvent(new RunnableEvent(runnable, frontOnQueue), frontOnQueue);
	}

	/**
	 * Return the sync set input that was created by this event handler
	 * @return
	 */
	public SyncSetInputFromSubscriber getSyncSetInput() {
		return syncSetInput;
	}
	
	public void setProgressGroupHint(IProgressMonitor progressGroup, int ticks) {
		this.progressGroup = progressGroup;
		this.ticks = ticks;
	}
	
	/**
	 * @return Returns the started.
	 */
	protected boolean isStarted() {
		return started;
	}
	
	private void handlePreemptiveEvents(IProgressMonitor monitor) {
		Event event = peek();
		if (event instanceof RunnableEvent && ((RunnableEvent)event).isPreemtive()) {
			executeRunnable(nextElement(), monitor);
		}
	}
}

Back to the top