Skip to main content
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRoberto E. Escobar2014-11-20 19:25:42 +0000
committerAngel Avila2014-11-20 19:25:42 +0000
commit6dd03468f7101d06bc7a36fcfd2b0b9c710a92da (patch)
treee20582c4309a296717ccbab3b51796fa41342ffd
parentcfac7d6a0a966ecf613e9cb3f76352b57fba1c98 (diff)
downloadorg.eclipse.osee-6dd03468f7101d06bc7a36fcfd2b0b9c710a92da.tar.gz
org.eclipse.osee-6dd03468f7101d06bc7a36fcfd2b0b9c710a92da.tar.xz
org.eclipse.osee-6dd03468f7101d06bc7a36fcfd2b0b9c710a92da.zip
feature: Add fixed rate scheduling to ExecutorAdmin
-rw-r--r--plugins/org.eclipse.osee.executor.admin.test/src/org/eclipse/osee/executor/admin/internal/ExecutorAdminTest.java24
-rw-r--r--plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/ExecutorAdmin.java7
-rw-r--r--plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorAdminImpl.java72
3 files changed, 103 insertions, 0 deletions
diff --git a/plugins/org.eclipse.osee.executor.admin.test/src/org/eclipse/osee/executor/admin/internal/ExecutorAdminTest.java b/plugins/org.eclipse.osee.executor.admin.test/src/org/eclipse/osee/executor/admin/internal/ExecutorAdminTest.java
index eb1d1ccaae..47a0d6ed4f 100644
--- a/plugins/org.eclipse.osee.executor.admin.test/src/org/eclipse/osee/executor/admin/internal/ExecutorAdminTest.java
+++ b/plugins/org.eclipse.osee.executor.admin.test/src/org/eclipse/osee/executor/admin/internal/ExecutorAdminTest.java
@@ -11,7 +11,10 @@
package org.eclipse.osee.executor.admin.internal;
import java.util.HashMap;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.osee.logger.Log;
import org.junit.After;
import org.junit.Assert;
@@ -74,4 +77,25 @@ public class ExecutorAdminTest {
ExecutorService third = admin.getExecutor("hello");
Assert.assertFalse(third.equals(second));
}
+
+ @Test
+ public void testScheduleExecutor() throws InterruptedException {
+ final AtomicInteger executed = new AtomicInteger();
+ admin.scheduleAtFixedRate("schedule.test", new Callable<Void>() {
+
+ @Override
+ public Void call() throws Exception {
+ executed.incrementAndGet();
+ return null;
+ }
+ }, -1, 250, TimeUnit.MILLISECONDS);
+ synchronized (this) {
+ this.wait(1000L);
+ }
+ admin.shutdown("schedule.test");
+ int numOfExecutions = executed.get();
+ int limit = 3;
+ Assert.assertTrue("Number of executions was [" + numOfExecutions + "] expected > " + limit,
+ numOfExecutions > limit);
+ }
}
diff --git a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/ExecutorAdmin.java b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/ExecutorAdmin.java
index 2b651ef7ac..e20538f1df 100644
--- a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/ExecutorAdmin.java
+++ b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/ExecutorAdmin.java
@@ -12,6 +12,7 @@ package org.eclipse.osee.executor.admin;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
/**
* @author Roberto E. Escobar
@@ -22,6 +23,8 @@ public interface ExecutorAdmin {
void createCachedPoolExecutor(String id);
+ void createScheduledPoolExecutor(String id, int poolSize);
+
<T> Future<T> schedule(Callable<T> callable, ExecutionCallback<T> callback);
<T> Future<T> schedule(String id, Callable<T> callable, ExecutionCallback<T> callback);
@@ -30,6 +33,10 @@ public interface ExecutorAdmin {
<T> Future<T> schedule(String id, Callable<T> callable);
+ <T> Future<T> scheduleAtFixedRate(String id, Callable<T> callable, long executionRate, TimeUnit timeUnit);
+
+ <T> Future<T> scheduleAtFixedRate(String id, Callable<T> callable, long startAfter, long executionRate, TimeUnit timeUnit);
+
void shutdown(String id);
}
diff --git a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorAdminImpl.java b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorAdminImpl.java
index 8107f9d6bc..e12fff2ccc 100644
--- a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorAdminImpl.java
+++ b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorAdminImpl.java
@@ -17,6 +17,8 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.eclipse.osee.executor.admin.ExecutionCallback;
@@ -27,6 +29,7 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -184,4 +187,73 @@ public class ExecutorAdminImpl implements ExecutorAdmin {
cache.remove(id);
}
}
+
+ public ListeningScheduledExecutorService getScheduledExecutor(String id) {
+ ListeningScheduledExecutorService service = null;
+ synchronized (cache) {
+ ListeningExecutorService executor = cache.getById(id);
+ if (executor instanceof ListeningScheduledExecutorService) {
+ service = (ListeningScheduledExecutorService) executor;
+ } else {
+ service = createScheduledExecutor(id, -1);
+ }
+ }
+ if (service == null) {
+ throw new OseeStateException("Error creating executor [%s].", id);
+ }
+ if (service.isShutdown() || service.isTerminated()) {
+ throw new OseeStateException("Error executor [%s] was previously shutdown.", id);
+ }
+ return service;
+ }
+
+ private ListeningScheduledExecutorService createScheduledExecutor(String id, int poolSize) {
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()//
+ .setNameFormat(id + "- [%s]")//
+ .setPriority(Thread.NORM_PRIORITY)//
+ .build();
+ ScheduledExecutorService executor = null;
+ if (poolSize > 0) {
+ executor = Executors.newScheduledThreadPool(poolSize, threadFactory);
+ } else {
+ executor = Executors.newSingleThreadScheduledExecutor(threadFactory);
+ }
+ ListeningScheduledExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor);
+ cache.put(id, listeningExecutor);
+ return listeningExecutor;
+ }
+
+ @Override
+ public void createScheduledPoolExecutor(String id, int poolSize) {
+ createScheduledExecutor(id, poolSize);
+ }
+
+ @Override
+ public <T> Future<T> scheduleAtFixedRate(String id, Callable<T> callable, long executionRate, TimeUnit timeUnit) {
+ return scheduleAtFixedRate(id, callable, -1, executionRate, timeUnit);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T> Future<T> scheduleAtFixedRate(String id, final Callable<T> callable, long startAfter, long executionRate, TimeUnit timeUnit) {
+ ListeningScheduledExecutorService executor = getScheduledExecutor(id);
+ Runnable runnable = asRunnable(callable);
+ ScheduledFuture<?> scheduledFuture = executor.scheduleAtFixedRate(runnable, startAfter, executionRate, timeUnit);
+ return (Future<T>) scheduledFuture;
+ }
+
+ private Runnable asRunnable(final Callable<?> callable) {
+ return new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ callable.call();
+ } catch (Throwable th) {
+ logger.error(th, "Error executing scheduled task [%s]", callable);
+ }
+ }
+ };
+ }
+
}

Back to the top