Skip to main content
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjmisinco2013-07-11 14:30:46 -0400
committerjmisinco2013-07-16 14:59:10 -0400
commit46f8200295486608bd448a0f0aa49e53f01470be (patch)
tree2dc34e944aa82d284dcbf49a9f4eb96660546284
parentdb255184fb04c64d7fa2910e514810f61d2a1b36 (diff)
downloadorg.eclipse.osee-46f8200295486608bd448a0f0aa49e53f01470be.tar.gz
org.eclipse.osee-46f8200295486608bd448a0f0aa49e53f01470be.tar.xz
org.eclipse.osee-46f8200295486608bd448a0f0aa49e53f01470be.zip
refactor[ats_LRLBR]: Change ExecutorAdmin to support different executors
Two types of executors are now supported, fixed thread pool executors which will be used for database bound operations, and cached pool executors while will be used for non DB bound operations. Guava's ListeningExecutorService is used to support the callback functionality on futures. Change-Id: I87bd0c79c6be5cfed07c33be3fcbb9878fd20346
-rw-r--r--plugins/org.eclipse.osee.executor.admin.test/src/org/eclipse/osee/executor/admin/internal/ExecutionCallbackTest.java12
-rw-r--r--plugins/org.eclipse.osee.executor.admin/META-INF/MANIFEST.MF4
-rw-r--r--plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/CancellableCallable.java12
-rw-r--r--plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/ExecutorAdmin.java7
-rw-r--r--plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/WorkUtility.java5
-rw-r--r--plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorAdminImpl.java95
-rw-r--r--plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorCache.java102
-rw-r--r--plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorServiceImpl.java120
-rw-r--r--plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorServiceLifecycleListener.java28
-rw-r--r--plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorThreadFactory.java36
-rw-r--r--plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/FutureTaskWithCallback.java375
-rw-r--r--plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ThreadCleaner.java58
-rw-r--r--plugins/org.eclipse.osee.orcs.db.test/src/org/eclipse/osee/orcs/db/internal/search/QueryEngineImplTest.java4
-rw-r--r--plugins/org.eclipse.osee.orcs.db.test/src/org/eclipse/osee/orcs/db/internal/search/QuerySqlHandlerFactoryImplTest.java4
-rw-r--r--plugins/org.eclipse.osee.orcs.db/src/org/eclipse/osee/orcs/db/internal/OrcsDataStoreImpl.java9
-rw-r--r--plugins/org.eclipse.osee.orcs.db/src/org/eclipse/osee/orcs/db/internal/search/QueryModuleFactory.java20
-rw-r--r--plugins/org.eclipse.osee.orcs.db/src/org/eclipse/osee/orcs/db/internal/search/indexer/IndexingTaskConsumer.java2
-rw-r--r--plugins/org.eclipse.osee.orcs.db/src/org/eclipse/osee/orcs/db/internal/search/indexer/IndexingTaskConsumerImpl.java5
18 files changed, 133 insertions, 765 deletions
diff --git a/plugins/org.eclipse.osee.executor.admin.test/src/org/eclipse/osee/executor/admin/internal/ExecutionCallbackTest.java b/plugins/org.eclipse.osee.executor.admin.test/src/org/eclipse/osee/executor/admin/internal/ExecutionCallbackTest.java
index 994a6ec44e..a384cd39d4 100644
--- a/plugins/org.eclipse.osee.executor.admin.test/src/org/eclipse/osee/executor/admin/internal/ExecutionCallbackTest.java
+++ b/plugins/org.eclipse.osee.executor.admin.test/src/org/eclipse/osee/executor/admin/internal/ExecutionCallbackTest.java
@@ -99,10 +99,14 @@ public class ExecutionCallbackTest {
verify(callback).onFailure(expectedException);
}
- @Test
+ @Test(timeout = 5000)
public void testCallbackOnCancel() throws Exception {
TestCancellableCallable callable = new TestCancellableCallable("results");
Future<String> future = admin.schedule(callable, callback);
+
+ while (!callable.hasBeenCalled()) {
+ Thread.sleep(20);
+ }
future.cancel(true);
verify(callback, times(0)).onSuccess(Matchers.anyString());
@@ -119,13 +123,19 @@ public class ExecutionCallbackTest {
private class TestCancellableCallable extends CancellableCallable<String> {
private final String results;
+ private volatile boolean beenCalled = false;
public TestCancellableCallable(String results) {
this.results = results;
}
+ public boolean hasBeenCalled() {
+ return beenCalled;
+ }
+
@Override
public String call() throws Exception {
+ beenCalled = true;
while (!isCancelled()) {
checkForCancelled();
}
diff --git a/plugins/org.eclipse.osee.executor.admin/META-INF/MANIFEST.MF b/plugins/org.eclipse.osee.executor.admin/META-INF/MANIFEST.MF
index 68802fb051..c1abc7c8a0 100644
--- a/plugins/org.eclipse.osee.executor.admin/META-INF/MANIFEST.MF
+++ b/plugins/org.eclipse.osee.executor.admin/META-INF/MANIFEST.MF
@@ -6,7 +6,9 @@ Bundle-Version: 0.11.1.qualifier
Bundle-Vendor: Eclipse Open System Engineering Environment
Bundle-RequiredExecutionEnvironment: JavaSE-1.6
Export-Package: org.eclipse.osee.executor.admin
-Import-Package: org.eclipse.osee.event,
+Import-Package: com.google.common.collect,
+ com.google.common.util.concurrent,
+ org.eclipse.osee.event,
org.eclipse.osee.logger
Service-Component: OSGI-INF/*.xml
Bundle-ActivationPolicy: lazy
diff --git a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/CancellableCallable.java b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/CancellableCallable.java
index 74ea3e02f8..3f5e0e99d7 100644
--- a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/CancellableCallable.java
+++ b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/CancellableCallable.java
@@ -18,25 +18,31 @@ import java.util.concurrent.CancellationException;
*/
public abstract class CancellableCallable<T> implements Callable<T>, HasCancellation {
- private volatile boolean cancelled;
+ private volatile boolean cancelled = false;
protected CancellableCallable() {
- cancelled = false;
+ // do nothing
}
@Override
public boolean isCancelled() {
+ cancelled = cancelled || Thread.currentThread().isInterrupted();
return cancelled;
}
@Override
public void setCancel(boolean isCancelled) {
- cancelled = isCancelled;
+ if (isCancelled) {
+ cancelled = isCancelled;
+ Thread.currentThread().interrupt();
+ }
}
@Override
public void checkForCancelled() throws CancellationException {
if (isCancelled()) {
+ // clear interrupted flag before throwing exception
+ Thread.interrupted();
throw new CancellationException();
}
}
diff --git a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/ExecutorAdmin.java b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/ExecutorAdmin.java
index ef4b9a1289..bbbadddf80 100644
--- a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/ExecutorAdmin.java
+++ b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/ExecutorAdmin.java
@@ -24,6 +24,10 @@ public interface ExecutorAdmin {
//
// <T> Callable<T> addCallback(Callable<T> callable, ExecutionCallback<T> callback);
+ void createFixedPoolExecutor(String id, int poolSize) throws Exception;
+
+ void createCachedPoolExecutor(String id) throws Exception;
+
<T> Future<T> schedule(Callable<T> callable, ExecutionCallback<T> callback) throws Exception;
<T> Future<T> schedule(String id, Callable<T> callable, ExecutionCallback<T> callback) throws Exception;
@@ -32,5 +36,6 @@ public interface ExecutorAdmin {
<T> Future<T> schedule(String id, Callable<T> callable) throws Exception;
- int cancelTasks(String id) throws Exception;
+ void shutdown(String id) throws Exception;
+
}
diff --git a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/WorkUtility.java b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/WorkUtility.java
index 63723a3648..e6ec8886dd 100644
--- a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/WorkUtility.java
+++ b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/WorkUtility.java
@@ -23,6 +23,8 @@ import org.eclipse.osee.executor.admin.internal.ExecutorAdminImpl;
*/
public final class WorkUtility {
+ private static final int NUM_PARTITIONS = Math.min(4, Runtime.getRuntime().availableProcessors());
+
private WorkUtility() {
// Utility Class
}
@@ -37,8 +39,7 @@ public final class WorkUtility {
List<Callable<Collection<OUTPUT>>> callables = new LinkedList<Callable<Collection<OUTPUT>>>();
if (!work.isEmpty()) {
- int numProcessors = Runtime.getRuntime().availableProcessors();
- int partitionSize = work.size() / numProcessors;
+ int partitionSize = Math.max(1, work.size() / NUM_PARTITIONS);
List<INPUT> subList = new LinkedList<INPUT>();
diff --git a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorAdminImpl.java b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorAdminImpl.java
index f811000eb0..f845fffc1d 100644
--- a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorAdminImpl.java
+++ b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorAdminImpl.java
@@ -12,10 +12,10 @@ package org.eclipse.osee.executor.admin.internal;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.osee.event.EventService;
@@ -23,6 +23,11 @@ import org.eclipse.osee.executor.admin.ExecutionCallback;
import org.eclipse.osee.executor.admin.ExecutorAdmin;
import org.eclipse.osee.executor.admin.ExecutorConstants;
import org.eclipse.osee.logger.Log;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
/**
* @author Roberto E. Escobar
@@ -30,12 +35,10 @@ import org.eclipse.osee.logger.Log;
public class ExecutorAdminImpl implements ExecutorAdmin {
public static final String DEFAULT_EXECUTOR = "default.executor";
- private static final int THREAD_CHECK_TIME = 5000; // every 5 seconds
private ExecutorCache cache;
private Log logger;
private EventService eventService;
- private Timer timer;
public void setLogger(Log logger) {
this.logger = logger;
@@ -56,34 +59,27 @@ public class ExecutorAdminImpl implements ExecutorAdmin {
public void start(Map<String, ?> props) {
cache = new ExecutorCache();
- timer = new Timer();
- TimerTask task = new ThreadCleaner(getLogger(), cache);
- timer.scheduleAtFixedRate(task, 0, THREAD_CHECK_TIME);
-
getEventService().postEvent(ExecutorConstants.EXECUTOR_ADMIN_REGISTRATION_EVENT, props);
}
public void stop(Map<String, ?> props) {
- timer.cancel();
- timer = null;
- for (Entry<String, ExecutorService> entry : cache.getExecutors().entrySet()) {
+ for (Entry<String, ListeningExecutorService> entry : cache.getExecutors().entrySet()) {
shutdown(entry.getKey(), entry.getValue());
}
cache = null;
getEventService().postEvent(ExecutorConstants.EXECUTOR_ADMIN_DEREGISTRATION_EVENT, props);
}
- public ExecutorService getDefaultExecutor() throws Exception {
+ public ListeningExecutorService getDefaultExecutor() throws Exception {
return getExecutor(DEFAULT_EXECUTOR);
}
- public ExecutorService getExecutor(String id) throws Exception {
- ExecutorService service = null;
+ public ListeningExecutorService getExecutor(String id) throws Exception {
+ ListeningExecutorService service = null;
synchronized (cache) {
service = cache.getById(id);
if (service == null) {
- service = createExecutor(id);
- cache.put(id, service);
+ service = createExecutor(id, -1);
}
}
if (service == null) {
@@ -95,10 +91,6 @@ public class ExecutorAdminImpl implements ExecutorAdmin {
return service;
}
- public <T> Callable<T> addCallback(Callable<T> callable, ExecutionCallback<T> callback) {
- return new CallableWithCallbackImpl<T>(callable, callback);
- }
-
@Override
public <T> Future<T> schedule(Callable<T> callable) throws Exception {
return schedule(callable, null);
@@ -116,20 +108,46 @@ public class ExecutorAdminImpl implements ExecutorAdmin {
@Override
public <T> Future<T> schedule(String id, Callable<T> callable, ExecutionCallback<T> callback) throws Exception {
- Callable<T> toExecute = callable;
+ ListenableFuture<T> listenableFuture = getExecutor(id).submit(callable);
if (callback != null) {
- toExecute = addCallback(callable, callback);
+ FutureCallback<T> futureCallback = asFutureCallback(callback);
+ Futures.addCallback(listenableFuture, futureCallback);
}
- return getExecutor(id).submit(toExecute);
+ return listenableFuture;
+ }
+
+ private <T> FutureCallback<T> asFutureCallback(final ExecutionCallback<T> callback) {
+ return new FutureCallback<T>() {
+
+ @Override
+ public void onFailure(Throwable arg0) {
+ if (arg0 instanceof CancellationException) {
+ callback.onCancelled();
+ } else {
+ callback.onFailure(arg0);
+ }
+ }
+
+ @Override
+ public void onSuccess(T arg0) {
+ callback.onSuccess(arg0);
+ }
+ };
}
- private ExecutorService createExecutor(String id) throws Exception {
+ private ListeningExecutorService createExecutor(String id, int poolSize) throws Exception {
ExecutorThreadFactory threadFactory = new ExecutorThreadFactory(id, Thread.NORM_PRIORITY);
- cache.put(id, threadFactory);
- // TODO: Better way to control pool size per executor service
- int corePoolSize = Math.min(4, Runtime.getRuntime().availableProcessors());
- return new ExecutorServiceImpl(getLogger(), id, corePoolSize, threadFactory, cache);
+ ExecutorService executor = null;
+ if (poolSize > 0) {
+ executor = Executors.newFixedThreadPool(poolSize, threadFactory);
+ } else {
+ executor = Executors.newCachedThreadPool(threadFactory);
+ }
+
+ ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor);
+ cache.put(id, listeningExecutor);
+ return listeningExecutor;
}
private void shutdown(String id, ExecutorService executor) {
@@ -155,12 +173,21 @@ public class ExecutorAdminImpl implements ExecutorAdmin {
}
@Override
- public int cancelTasks(String id) throws Exception {
- int itemsCancelled = 0;
- ExecutorWorkCache workCache = cache.getWorkerCache(id);
- if (workCache != null) {
- itemsCancelled = workCache.cancelAll();
+ public void createFixedPoolExecutor(String id, int poolSize) throws Exception {
+ createExecutor(id, poolSize);
+ }
+
+ @Override
+ public void createCachedPoolExecutor(String id) throws Exception {
+ createExecutor(id, -1);
+ }
+
+ @Override
+ public void shutdown(String id) throws Exception {
+ ListeningExecutorService service = cache.getById(id);
+ if (service != null) {
+ shutdown(id, service);
+ cache.remove(id);
}
- return itemsCancelled;
}
}
diff --git a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorCache.java b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorCache.java
index a71fe3b350..dadba8e0b9 100644
--- a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorCache.java
+++ b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorCache.java
@@ -11,111 +11,41 @@
package org.eclipse.osee.executor.admin.internal;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
+import com.google.common.util.concurrent.ListeningExecutorService;
/**
* @author Roberto E. Escobar
*/
-public class ExecutorCache implements ExecutorServiceLifecycleListener {
+public class ExecutorCache {
- private final Map<String, ExecutorService> executors = new ConcurrentHashMap<String, ExecutorService>();
- private final Map<String, ExecutorThreadFactory> factories = new ConcurrentHashMap<String, ExecutorThreadFactory>();
- private final Map<String, ExecutorWorkCache> workers = new ConcurrentHashMap<String, ExecutorWorkCache>();
+ private final ConcurrentHashMap<String, ListeningExecutorService> executors =
+ new ConcurrentHashMap<String, ListeningExecutorService>();
- public void put(String id, ExecutorService service) throws IllegalStateException {
- if (executors.containsKey(id)) {
+ public void put(String id, ListeningExecutorService service) throws IllegalStateException {
+ if (executors.putIfAbsent(id, service) != null) {
throw new IllegalStateException(String.format("Error non-unique executor detected [%s]", id));
}
- executors.put(id, service);
}
- public void put(String id, ExecutorThreadFactory factory) throws IllegalStateException {
- if (factories.containsKey(id)) {
- throw new IllegalStateException(String.format("Error non-unique thread factory detected [%s]", id));
- }
- factories.put(id, factory);
- }
-
- public void put(String id, ExecutorWorkCache workerCache) throws IllegalStateException {
- if (workers.containsKey(id)) {
- throw new IllegalStateException(String.format("Error non-unique executor worker cache detected [%s]", id));
- }
- workers.put(id, workerCache);
- }
-
- public void remove(String id) {
- executors.remove(id);
- factories.remove(id);
- workers.remove(id);
- }
-
- public ExecutorService getById(String id) throws IllegalArgumentException {
+ public ListeningExecutorService getById(String id) throws IllegalArgumentException {
if (id == null || id.length() <= 0) {
throw new IllegalArgumentException("Error - executorId cannot be null");
}
- return executors.get(id);
- }
-
- public Map<String, ExecutorThreadFactory> getThreadFactories() {
- return factories;
- }
-
- public Map<String, ExecutorService> getExecutors() {
- return executors;
- }
-
- public Map<String, ExecutorWorkCache> getWorkers() {
- return workers;
- }
-
- @Override
- public void onTerminate(String id) {
- remove(id);
- }
-
- public ExecutorWorkCache getWorkerCache(String id) throws IllegalArgumentException {
- if (id == null || id.length() <= 0) {
- throw new IllegalArgumentException("Error - executorId cannot be null");
+ ListeningExecutorService executor = executors.get(id);
+ if (executor != null && (executor.isShutdown() || executor.isTerminated())) {
+ executors.remove(id);
+ executor = null;
}
- return workers.get(id);
+ return executor;
}
- @Override
- public void onScheduled(String id, UUID workId, Future<?> future) {
- ExecutorWorkCache worker = getWorkerCache(id);
- if (worker != null) {
- try {
- worker.scheduled(workId, future);
- } catch (Exception ex) {
- //
- }
- }
+ public Map<String, ListeningExecutorService> getExecutors() {
+ return executors;
}
- @Override
- public void onBeforeExecute(String id, UUID workId, Future<?> future) {
- ExecutorWorkCache worker = getWorkerCache(id);
- if (worker != null) {
- try {
- worker.executing(workId, future);
- } catch (Exception ex) {
- //
- }
- }
+ public void remove(String id) {
+ executors.remove(id);
}
- @Override
- public void onAfterExecute(String id, UUID workId, Future<?> future) {
- ExecutorWorkCache worker = getWorkerCache(id);
- if (worker != null) {
- try {
- worker.completed(workId, future);
- } catch (Exception ex) {
- //
- }
- }
- }
}
diff --git a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorServiceImpl.java b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorServiceImpl.java
deleted file mode 100644
index d5b90e8459..0000000000
--- a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorServiceImpl.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*******************************************************************************
- * Copyright (c) 2004, 2007 Boeing.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Boeing - initial API and implementation
- *******************************************************************************/
-package org.eclipse.osee.executor.admin.internal;
-
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RunnableFuture;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import org.eclipse.osee.executor.admin.ExecutionCallback;
-import org.eclipse.osee.executor.admin.HasExecutionCallback;
-import org.eclipse.osee.logger.Log;
-
-/**
- * @author Roberto E. Escobar
- */
-public class ExecutorServiceImpl extends ThreadPoolExecutor {
-
- private final String id;
- private final Log logger;
- private final ExecutorServiceLifecycleListener listener;
-
- public ExecutorServiceImpl(Log logger, String id, int corePoolSize, ThreadFactory threadFactory, ExecutorServiceLifecycleListener listener) {
- super(corePoolSize, corePoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
- this.logger = logger;
- this.id = id;
- this.listener = listener;
- }
-
- public String getId() {
- return id;
- }
-
- private Log getLogger() {
- return logger;
- }
-
- @Override
- protected void terminated() {
- super.terminated();
- listener.onTerminate(getId());
- }
-
- @SuppressWarnings("unchecked")
- private <V> ExecutionCallback<V> getCallBack(Object object) {
- ExecutionCallback<V> callback = null;
- if (object instanceof HasExecutionCallback) {
- HasExecutionCallback<V> item = (HasExecutionCallback<V>) object;
- callback = item.getExecutionCallback();
- }
- return callback;
- }
-
- @Override
- protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
- UUID uuid = createID();
- ExecutionCallback<T> callback = getCallBack(runnable);
- FutureTaskWithCallback<T> future = new FutureTaskWithCallback<T>(uuid, getLogger(), runnable, value, callback);
- onScheduled(future);
- return future;
- }
-
- @Override
- protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
- UUID uuid = createID();
- ExecutionCallback<T> callback = getCallBack(callable);
- FutureTaskWithCallback<T> future = new FutureTaskWithCallback<T>(uuid, getLogger(), callable, callback);
- onScheduled(future);
- return future;
- }
-
- private UUID createID() {
- return UUID.randomUUID();
- }
-
- /**
- * Tasks waiting
- */
- @Override
- public long getTaskCount() {
- return super.getTaskCount();
- }
-
- /**
- * Number of Tasks Completed
- */
- @Override
- public long getCompletedTaskCount() {
- return super.getCompletedTaskCount();
- }
-
- private void onScheduled(FutureTaskWithCallback<?> future) {
- listener.onScheduled(getId(), future.getUUID(), future);
- }
-
- @Override
- protected void beforeExecute(Thread t, Runnable r) {
- FutureTaskWithCallback<?> future = (FutureTaskWithCallback<?>) r;
- listener.onBeforeExecute(getId(), future.getUUID(), future);
- super.beforeExecute(t, r);
- }
-
- @Override
- protected void afterExecute(Runnable r, Throwable t) {
- FutureTaskWithCallback<?> future = (FutureTaskWithCallback<?>) r;
- listener.onAfterExecute(getId(), future.getUUID(), future);
- super.afterExecute(r, t);
- }
-
-}
diff --git a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorServiceLifecycleListener.java b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorServiceLifecycleListener.java
deleted file mode 100644
index 817d4b5175..0000000000
--- a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorServiceLifecycleListener.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*******************************************************************************
- * Copyright (c) 2004, 2007 Boeing.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Boeing - initial API and implementation
- *******************************************************************************/
-package org.eclipse.osee.executor.admin.internal;
-
-import java.util.UUID;
-import java.util.concurrent.Future;
-
-/**
- * @author Roberto E. Escobar
- */
-public interface ExecutorServiceLifecycleListener {
-
- void onTerminate(String id);
-
- void onScheduled(String id, UUID workId, Future<?> future);
-
- void onBeforeExecute(String id, UUID workId, Future<?> future);
-
- void onAfterExecute(String id, UUID workId, Future<?> future);
-}
diff --git a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorThreadFactory.java b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorThreadFactory.java
index fde35081fd..647e36a153 100644
--- a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorThreadFactory.java
+++ b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorThreadFactory.java
@@ -10,27 +10,20 @@
*******************************************************************************/
package org.eclipse.osee.executor.admin.internal;
-import java.lang.Thread.State;
-import java.lang.ref.WeakReference;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* @author Roberto E. Escobar
*/
public class ExecutorThreadFactory implements ThreadFactory {
- private final List<WeakReference<Thread>> threads;
private final String threadName;
private final int priority;
+ private final AtomicInteger threadCount = new AtomicInteger(0);
public ExecutorThreadFactory(String threadName, int priority) {
this.threadName = threadName;
- this.threads = new CopyOnWriteArrayList<WeakReference<Thread>>();
this.priority = priority;
}
@@ -42,7 +35,7 @@ public class ExecutorThreadFactory implements ThreadFactory {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable);
- String name = String.format("%s: %s", threadName, threads.size());
+ String name = String.format("%s: %s", threadName, threadCount.getAndAdd(1));
thread.setName(name);
int priorityToSet = priority;
@@ -53,30 +46,7 @@ public class ExecutorThreadFactory implements ThreadFactory {
}
thread.setPriority(priority);
- threads.add(new WeakReference<Thread>(thread));
return thread;
}
- public List<Thread> getThreads() {
- List<Thread> toReturn = new ArrayList<Thread>();
- for (WeakReference<Thread> weak : threads) {
- Thread thread = weak.get();
- if (thread != null) {
- toReturn.add(thread);
- }
- }
- return toReturn;
- }
-
- public synchronized void cleanUp() {
- Set<WeakReference<Thread>> toRemove = new HashSet<WeakReference<Thread>>();
- for (WeakReference<Thread> reference : threads) {
- Thread thread = reference.get();
- if (thread == null || State.TERMINATED == thread.getState()) {
- toRemove.add(reference);
- }
- }
- threads.removeAll(toRemove);
- }
-
}
diff --git a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/FutureTaskWithCallback.java b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/FutureTaskWithCallback.java
deleted file mode 100644
index 194e8328a6..0000000000
--- a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/FutureTaskWithCallback.java
+++ /dev/null
@@ -1,375 +0,0 @@
-/*******************************************************************************
- * Copyright (c) 2004, 2007 Boeing.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Boeing - initial API and implementation
- *******************************************************************************/
-package org.eclipse.osee.executor.admin.internal;
-
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RunnableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.AbstractQueuedSynchronizer;
-import org.eclipse.osee.executor.admin.ExecutionCallback;
-import org.eclipse.osee.executor.admin.HasCancellation;
-import org.eclipse.osee.executor.admin.HasExecutionCallback;
-import org.eclipse.osee.logger.Log;
-
-/**
- * @author Roberto E. Escobar
- */
-public class FutureTaskWithCallback<T> implements RunnableFuture<T>, HasExecutionCallback<T> {
-
- private final UUID uuid;
- private final Log logger;
- private final ExecutionCallback<T> callback;
- private final Sync sync;
- private final Runnable runnable;
-
- public FutureTaskWithCallback(UUID uuid, Log logger, Callable<T> callable, ExecutionCallback<T> callback) {
- this.uuid = uuid;
- if (callable == null) {
- throw new NullPointerException();
- }
- sync = new Sync(callable);
-
- this.logger = logger;
- this.callback = callback;
- this.runnable = null;
- }
-
- public FutureTaskWithCallback(UUID uuid, Log logger, Runnable runnable, T result, ExecutionCallback<T> callback) {
- this.uuid = uuid;
- this.runnable = runnable;
- sync = new Sync(Executors.callable(runnable, result));
- this.logger = logger;
- this.callback = callback;
- }
-
- public UUID getUUID() {
- return uuid;
- }
-
- @Override
- public ExecutionCallback<T> getExecutionCallback() {
- return callback;
- }
-
- @Override
- public boolean isCancelled() {
- return sync.innerIsCancelled();
- }
-
- @Override
- public boolean isDone() {
- return sync.innerIsDone();
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- return sync.innerCancel(mayInterruptIfRunning);
- }
-
- @Override
- public T get() throws InterruptedException, ExecutionException {
- return sync.innerGet();
- }
-
- @Override
- public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- return sync.innerGet(unit.toNanos(timeout));
- }
-
- protected void done() {
- // Do nothing
- }
-
- protected void set(T v) {
- sync.innerSet(v);
- }
-
- protected void setException(Throwable t) {
- sync.innerSetException(t);
- }
-
- @Override
- public void run() {
- sync.innerRun();
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((uuid == null) ? 0 : uuid.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- FutureTaskWithCallback<?> other = (FutureTaskWithCallback<?>) obj;
- if (uuid == null) {
- if (other.uuid != null) {
- return false;
- }
- } else if (!uuid.equals(other.uuid)) {
- return false;
- }
- return true;
- }
-
- protected boolean runAndReset() {
- return sync.innerRunAndReset();
- }
-
- protected void notifyOnSuccess(T result) {
- ExecutionCallback<T> callback = getExecutionCallback();
- if (callback != null) {
- try {
- callback.onSuccess(result);
- } catch (Throwable th) {
- handleNotificationError(th, "success");
- }
- }
- }
-
- protected void notifyOnCancelled() {
- ExecutionCallback<T> callback = getExecutionCallback();
- if (callback != null) {
- try {
- callback.onCancelled();
- } catch (Throwable th) {
- handleNotificationError(th, "cancelled");
- }
- }
- }
-
- protected void notifyOnFailure(Throwable throwable) {
- logger.error(throwable, "Error during execution");
-
- ExecutionCallback<T> callback = getExecutionCallback();
- if (callback != null) {
- try {
- callback.onFailure(throwable);
- } catch (Throwable th) {
- handleNotificationError(th, "failure");
- }
- }
- }
-
- protected void handleNotificationError(Throwable th, String value) {
- logger.error(th, "Error during on %s notification [%s]", value, callback.toString());
- }
-
- private final class Sync extends AbstractQueuedSynchronizer {
-
- private static final long serialVersionUID = 592282723700675891L;
-
- /** State value representing that task is running */
- private static final int RUNNING = 1;
- /** State value representing that task ran */
- private static final int RAN = 2;
- /** State value representing that task was cancelled */
- private static final int CANCELLED = 4;
-
- /** The underlying callable */
- private final Callable<T> callable;
- /** The result to return from get() */
- private T result;
- /** The exception to throw from get() */
- private Throwable exception;
-
- /**
- * The thread running task. When nulled after set/cancel, this indicates that the results are accessible. Must be
- * volatile, to ensure visibility upon completion.
- */
- private volatile Thread runner;
-
- Sync(Callable<T> callable) {
- this.callable = callable;
- }
-
- private boolean ranOrCancelled(int state) {
- return (state & (RAN | CANCELLED)) != 0;
- }
-
- @Override
- protected int tryAcquireShared(int ignore) {
- return innerIsDone() ? 1 : -1;
- }
-
- @Override
- protected boolean tryReleaseShared(int ignore) {
- runner = null;
- return true;
- }
-
- boolean innerIsCancelled() {
- return getState() == CANCELLED;
- }
-
- boolean innerIsDone() {
- return ranOrCancelled(getState()) && runner == null;
- }
-
- T innerGet() throws InterruptedException, ExecutionException {
- acquireSharedInterruptibly(0);
- if (getState() == CANCELLED) {
- throw new CancellationException();
- }
- if (exception != null) {
- throw new ExecutionException(exception);
- }
-
- return result;
- }
-
- T innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
- if (!tryAcquireSharedNanos(0, nanosTimeout)) {
- throw new TimeoutException();
- }
- if (getState() == CANCELLED) {
- throw new CancellationException();
- }
- if (exception != null) {
- throw new ExecutionException(exception);
- }
- return result;
- }
-
- void innerSet(T v) {
- for (;;) {
- int s = getState();
- if (s == RAN) {
- return;
- }
- if (s == CANCELLED) {
- // aggressively release to set runner to null,
- // in case we are racing with a cancel request
- // that will try to interrupt runner
- notifyOnCancelled();
- releaseShared(0);
- return;
- }
- if (compareAndSetState(s, RAN)) {
- result = v;
-
- notifyOnSuccess(result);
- releaseShared(0);
- done();
- return;
- }
- }
- }
-
- void innerSetException(Throwable throwable) {
- for (;;) {
- int s = getState();
- if (s == RAN) {
- return;
- }
- if (s == CANCELLED) {
- // aggressively release to set runner to null,
- // in case we are racing with a cancel request
- // that will try to interrupt runner
- notifyOnCancelled();
- releaseShared(0);
- return;
- }
- if (compareAndSetState(s, RAN)) {
- exception = throwable;
- result = null;
-
- notifyOnFailure(throwable);
- releaseShared(0);
- done();
- return;
- }
- }
- }
-
- boolean innerCancel(boolean mayInterruptIfRunning) {
- for (;;) {
- int s = getState();
- if (ranOrCancelled(s)) {
- return false;
- }
- if (compareAndSetState(s, CANCELLED)) {
- break;
- }
- }
-
- if (callable instanceof HasCancellation) {
- HasCancellation cancellable = (HasCancellation) callable;
- cancellable.setCancel(true);
- }
- if (runnable instanceof HasCancellation) {
- HasCancellation cancellable = (HasCancellation) runnable;
- cancellable.setCancel(true);
- }
-
- if (mayInterruptIfRunning) {
- Thread r = runner;
- if (r != null) {
- r.interrupt();
- }
- }
- notifyOnCancelled();
- releaseShared(0);
- done();
- return true;
- }
-
- void innerRun() {
- if (!compareAndSetState(0, RUNNING)) {
- return;
- }
- try {
- runner = Thread.currentThread();
- if (getState() == RUNNING) {
- innerSet(callable.call());
- } else {
- releaseShared(0); // cancel
- }
- } catch (Throwable ex) {
- innerSetException(ex);
- }
- }
-
- boolean innerRunAndReset() {
- if (!compareAndSetState(0, RUNNING)) {
- return false;
- }
- try {
- runner = Thread.currentThread();
- if (getState() == RUNNING) {
- callable.call(); // don't set result
- }
- runner = null;
- return compareAndSetState(RUNNING, 0);
- } catch (Throwable ex) {
- innerSetException(ex);
- return false;
- }
- }
- }
-
-}
diff --git a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ThreadCleaner.java b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ThreadCleaner.java
deleted file mode 100644
index 0655b979f4..0000000000
--- a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ThreadCleaner.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*******************************************************************************
- * Copyright (c) 2004, 2007 Boeing.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Boeing - initial API and implementation
- *******************************************************************************/
-package org.eclipse.osee.executor.admin.internal;
-
-import java.util.Map.Entry;
-import java.util.TimerTask;
-import org.eclipse.osee.logger.Log;
-
-/**
- * @author Roberto E. Escobar
- */
-public class ThreadCleaner extends TimerTask {
-
- private final ExecutorCache cache;
- private final Log logger;
-
- public ThreadCleaner(Log logger, ExecutorCache cache) {
- super();
- this.logger = logger;
- this.cache = cache;
- }
-
- @Override
- public void run() {
- cleanUpThreadFactory();
- cleanUpExecutionCache();
- }
-
- private void cleanUpThreadFactory() {
- for (Entry<String, ExecutorThreadFactory> entry : cache.getThreadFactories().entrySet()) {
- try {
- ExecutorThreadFactory factory = entry.getValue();
- factory.cleanUp();
- } catch (Throwable ex) {
- logger.error(ex, "Error removing dead threads for [%s]", entry.getKey());
- }
- }
- }
-
- private void cleanUpExecutionCache() {
- for (Entry<String, ExecutorWorkCache> entry : cache.getWorkers().entrySet()) {
- try {
- ExecutorWorkCache workers = entry.getValue();
- workers.cleanUp();
- } catch (Throwable ex) {
- logger.error(ex, "Error removing dead workers for [%s]", entry.getKey());
- }
- }
- }
-} \ No newline at end of file
diff --git a/plugins/org.eclipse.osee.orcs.db.test/src/org/eclipse/osee/orcs/db/internal/search/QueryEngineImplTest.java b/plugins/org.eclipse.osee.orcs.db.test/src/org/eclipse/osee/orcs/db/internal/search/QueryEngineImplTest.java
index 49d856b579..e68f3615ef 100644
--- a/plugins/org.eclipse.osee.orcs.db.test/src/org/eclipse/osee/orcs/db/internal/search/QueryEngineImplTest.java
+++ b/plugins/org.eclipse.osee.orcs.db.test/src/org/eclipse/osee/orcs/db/internal/search/QueryEngineImplTest.java
@@ -110,12 +110,12 @@ public class QueryEngineImplTest {
sessionId = GUID.create();
- QueryModuleFactory queryModule = new QueryModuleFactory(logger);
+ QueryModuleFactory queryModule = new QueryModuleFactory(logger, executorAdmin);
TaggingEngine taggingEngine = queryModule.createTaggingEngine();
DataPostProcessorFactory<CriteriaAttributeKeywords> postProcessorFactory =
- queryModule.createAttributeKeywordPostProcessor(executorAdmin, taggingEngine);
+ queryModule.createAttributeKeywordPostProcessor(taggingEngine);
SqlHandlerFactory handlerFactory =
queryModule.createHandlerFactory(identityService, postProcessorFactory, taggingEngine.getTagProcessor());
queryEngine = queryModule.createQueryEngine(dbService, handlerFactory, sqlProvider, branchCache);
diff --git a/plugins/org.eclipse.osee.orcs.db.test/src/org/eclipse/osee/orcs/db/internal/search/QuerySqlHandlerFactoryImplTest.java b/plugins/org.eclipse.osee.orcs.db.test/src/org/eclipse/osee/orcs/db/internal/search/QuerySqlHandlerFactoryImplTest.java
index 80a6265251..aad186fd5d 100644
--- a/plugins/org.eclipse.osee.orcs.db.test/src/org/eclipse/osee/orcs/db/internal/search/QuerySqlHandlerFactoryImplTest.java
+++ b/plugins/org.eclipse.osee.orcs.db.test/src/org/eclipse/osee/orcs/db/internal/search/QuerySqlHandlerFactoryImplTest.java
@@ -77,9 +77,9 @@ public class QuerySqlHandlerFactoryImplTest {
public void setUp() {
MockitoAnnotations.initMocks(this);
- QueryModuleFactory queryModule = new QueryModuleFactory(logger);
+ QueryModuleFactory queryModule = new QueryModuleFactory(logger, executorAdmin);
- postProcessorFactory = queryModule.createAttributeKeywordPostProcessor(executorAdmin, taggingEngine);
+ postProcessorFactory = queryModule.createAttributeKeywordPostProcessor(taggingEngine);
factory =
queryModule.createHandlerFactory(identityService, postProcessorFactory, taggingEngine.getTagProcessor());
}
diff --git a/plugins/org.eclipse.osee.orcs.db/src/org/eclipse/osee/orcs/db/internal/OrcsDataStoreImpl.java b/plugins/org.eclipse.osee.orcs.db/src/org/eclipse/osee/orcs/db/internal/OrcsDataStoreImpl.java
index 4d8d395673..e89f3598ff 100644
--- a/plugins/org.eclipse.osee.orcs.db/src/org/eclipse/osee/orcs/db/internal/OrcsDataStoreImpl.java
+++ b/plugins/org.eclipse.osee.orcs.db/src/org/eclipse/osee/orcs/db/internal/OrcsDataStoreImpl.java
@@ -117,7 +117,7 @@ public class OrcsDataStoreImpl implements OrcsDataStore, TempCachingService {
this.eventService = eventService;
}
- public void start(BundleContext context) {
+ public void start(BundleContext context) throws Exception {
TempCachingServiceFactory modelingService =
new TempCachingServiceFactory(logger, dbService, executorAdmin, modelFactory, eventService);
@@ -139,12 +139,11 @@ public class OrcsDataStoreImpl implements OrcsDataStore, TempCachingService {
dataStoreAdmin = new DataStoreAdminImpl(logger, dbService, identityService, branchStore, preferences);
- queryModule = new QueryModuleFactory(logger);
- queryModule.start(executorAdmin, dbService, identityService, sqlProvider, resourceManager,
- cacheService.getBranchCache());
+ queryModule = new QueryModuleFactory(logger, executorAdmin);
+ queryModule.start(dbService, identityService, sqlProvider, resourceManager, cacheService.getBranchCache());
}
- public void stop() {
+ public void stop() throws Exception {
queryModule.stop();
queryModule = null;
diff --git a/plugins/org.eclipse.osee.orcs.db/src/org/eclipse/osee/orcs/db/internal/search/QueryModuleFactory.java b/plugins/org.eclipse.osee.orcs.db/src/org/eclipse/osee/orcs/db/internal/search/QueryModuleFactory.java
index c7f648eea4..0420cf2f35 100644
--- a/plugins/org.eclipse.osee.orcs.db/src/org/eclipse/osee/orcs/db/internal/search/QueryModuleFactory.java
+++ b/plugins/org.eclipse.osee.orcs.db/src/org/eclipse/osee/orcs/db/internal/search/QueryModuleFactory.java
@@ -45,6 +45,7 @@ import org.eclipse.osee.orcs.db.internal.search.handlers.RelatedToSqlHandler;
import org.eclipse.osee.orcs.db.internal.search.handlers.RelationTypeExistsSqlHandler;
import org.eclipse.osee.orcs.db.internal.search.indexer.IndexerCallableFactory;
import org.eclipse.osee.orcs.db.internal.search.indexer.IndexerCallableFactoryImpl;
+import org.eclipse.osee.orcs.db.internal.search.indexer.IndexerConstants;
import org.eclipse.osee.orcs.db.internal.search.indexer.IndexingTaskConsumer;
import org.eclipse.osee.orcs.db.internal.search.indexer.IndexingTaskConsumerImpl;
import org.eclipse.osee.orcs.db.internal.search.indexer.QueryEngineIndexerImpl;
@@ -70,28 +71,33 @@ import org.eclipse.osee.orcs.db.internal.sql.SqlHandlerFactoryImpl;
public class QueryModuleFactory {
private final Log logger;
+ private final ExecutorAdmin executorAdmin;
private QueryEngine queryEngine;
private QueryEngineIndexer queryIndexer;
- public QueryModuleFactory(Log logger) {
+ public QueryModuleFactory(Log logger, ExecutorAdmin executorAdmin) {
super();
this.logger = logger;
+ this.executorAdmin = executorAdmin;
}
- public void start(ExecutorAdmin executorAdmin, IOseeDatabaseService dbService, IdentityService idService, SqlProvider sqlProvider, IResourceManager resourceManager, BranchCache branchCache) {
+ public void start(IOseeDatabaseService dbService, IdentityService idService, SqlProvider sqlProvider, IResourceManager resourceManager, BranchCache branchCache) throws Exception {
TaggingEngine taggingEngine = createTaggingEngine();
DataPostProcessorFactory<CriteriaAttributeKeywords> postProcessor =
- createAttributeKeywordPostProcessor(executorAdmin, taggingEngine);
+ createAttributeKeywordPostProcessor(taggingEngine);
SqlHandlerFactory handlerFactory =
createHandlerFactory(idService, postProcessor, taggingEngine.getTagProcessor());
+ executorAdmin.createFixedPoolExecutor(IndexerConstants.INDEXING_CONSUMER_EXECUTOR_ID, 4);
+
queryEngine = createQueryEngine(dbService, handlerFactory, sqlProvider, branchCache);
- queryIndexer = createQueryEngineIndexer(dbService, idService, executorAdmin, taggingEngine, resourceManager);
+ queryIndexer = createQueryEngineIndexer(dbService, idService, taggingEngine, resourceManager);
}
- public void stop() {
+ public void stop() throws Exception {
queryIndexer = null;
queryEngine = null;
+ executorAdmin.shutdown(IndexerConstants.INDEXING_CONSUMER_EXECUTOR_ID);
}
public QueryEngine getQueryEngine() {
@@ -117,7 +123,7 @@ public class QueryModuleFactory {
return new QueryEngineImpl(logger, dbService, sqlProvider, branchCache, handlerFactory);
}
- protected QueryEngineIndexer createQueryEngineIndexer(IOseeDatabaseService dbService, IdentityService identityService, ExecutorAdmin executorAdmin, TaggingEngine taggingEngine, IResourceManager resourceManager) {
+ protected QueryEngineIndexer createQueryEngineIndexer(IOseeDatabaseService dbService, IdentityService identityService, TaggingEngine taggingEngine, IResourceManager resourceManager) {
QueueToAttributeLoader attributeLoader =
new QueueToAttributeLoaderImpl(logger, dbService, identityService, resourceManager);
IndexerCallableFactory callableFactory =
@@ -150,7 +156,7 @@ public class QueryModuleFactory {
return new SqlHandlerFactoryImpl(logger, identityService, tagProcessor, handleMap, factoryMap);
}
- protected DataPostProcessorFactory<CriteriaAttributeKeywords> createAttributeKeywordPostProcessor(ExecutorAdmin executorAdmin, TaggingEngine taggingEngine) {
+ protected DataPostProcessorFactory<CriteriaAttributeKeywords> createAttributeKeywordPostProcessor(TaggingEngine taggingEngine) {
return new DataPostProcessorFactoryImpl(logger, taggingEngine, executorAdmin);
}
}
diff --git a/plugins/org.eclipse.osee.orcs.db/src/org/eclipse/osee/orcs/db/internal/search/indexer/IndexingTaskConsumer.java b/plugins/org.eclipse.osee.orcs.db/src/org/eclipse/osee/orcs/db/internal/search/indexer/IndexingTaskConsumer.java
index 6ebe3c9ad2..0c64ef1677 100644
--- a/plugins/org.eclipse.osee.orcs.db/src/org/eclipse/osee/orcs/db/internal/search/indexer/IndexingTaskConsumer.java
+++ b/plugins/org.eclipse.osee.orcs.db/src/org/eclipse/osee/orcs/db/internal/search/indexer/IndexingTaskConsumer.java
@@ -20,8 +20,6 @@ import org.eclipse.osee.orcs.search.IndexerCollector;
*/
public interface IndexingTaskConsumer {
- int cancelIndexer() throws Exception;
-
int cancelTaskId(Collection<Integer> taskIds);
int getWorkersInQueue();
diff --git a/plugins/org.eclipse.osee.orcs.db/src/org/eclipse/osee/orcs/db/internal/search/indexer/IndexingTaskConsumerImpl.java b/plugins/org.eclipse.osee.orcs.db/src/org/eclipse/osee/orcs/db/internal/search/indexer/IndexingTaskConsumerImpl.java
index 56e9eb5918..85e752b7fb 100644
--- a/plugins/org.eclipse.osee.orcs.db/src/org/eclipse/osee/orcs/db/internal/search/indexer/IndexingTaskConsumerImpl.java
+++ b/plugins/org.eclipse.osee.orcs.db/src/org/eclipse/osee/orcs/db/internal/search/indexer/IndexingTaskConsumerImpl.java
@@ -36,11 +36,6 @@ public class IndexingTaskConsumerImpl implements IndexingTaskConsumer {
}
@Override
- public int cancelIndexer() throws Exception {
- return executorAdmin.cancelTasks(IndexerConstants.INDEXING_CONSUMER_EXECUTOR_ID);
- }
-
- @Override
public int cancelTaskId(Collection<Integer> taskIds) {
int toReturn = 0;
for (int item : taskIds) {

Back to the top