From 6dd03468f7101d06bc7a36fcfd2b0b9c710a92da Mon Sep 17 00:00:00 2001 From: Roberto E. Escobar Date: Thu, 20 Nov 2014 12:25:42 -0700 Subject: feature: Add fixed rate scheduling to ExecutorAdmin Change-Id: I35ebf0acf63c189b23c5a7054825a145815d98b5 --- .../executor/admin/internal/ExecutorAdminTest.java | 24 ++++++++ .../eclipse/osee/executor/admin/ExecutorAdmin.java | 7 +++ .../executor/admin/internal/ExecutorAdminImpl.java | 72 ++++++++++++++++++++++ 3 files changed, 103 insertions(+) (limited to 'plugins') diff --git a/plugins/org.eclipse.osee.executor.admin.test/src/org/eclipse/osee/executor/admin/internal/ExecutorAdminTest.java b/plugins/org.eclipse.osee.executor.admin.test/src/org/eclipse/osee/executor/admin/internal/ExecutorAdminTest.java index eb1d1ccaae5..47a0d6ed4f5 100644 --- a/plugins/org.eclipse.osee.executor.admin.test/src/org/eclipse/osee/executor/admin/internal/ExecutorAdminTest.java +++ b/plugins/org.eclipse.osee.executor.admin.test/src/org/eclipse/osee/executor/admin/internal/ExecutorAdminTest.java @@ -11,7 +11,10 @@ package org.eclipse.osee.executor.admin.internal; import java.util.HashMap; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.osee.logger.Log; import org.junit.After; import org.junit.Assert; @@ -74,4 +77,25 @@ public class ExecutorAdminTest { ExecutorService third = admin.getExecutor("hello"); Assert.assertFalse(third.equals(second)); } + + @Test + public void testScheduleExecutor() throws InterruptedException { + final AtomicInteger executed = new AtomicInteger(); + admin.scheduleAtFixedRate("schedule.test", new Callable() { + + @Override + public Void call() throws Exception { + executed.incrementAndGet(); + return null; + } + }, -1, 250, TimeUnit.MILLISECONDS); + synchronized (this) { + this.wait(1000L); + } + admin.shutdown("schedule.test"); + int numOfExecutions = executed.get(); + int limit = 3; + Assert.assertTrue("Number of executions was [" + numOfExecutions + "] expected > " + limit, + numOfExecutions > limit); + } } diff --git a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/ExecutorAdmin.java b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/ExecutorAdmin.java index 2b651ef7ac4..e20538f1dfb 100644 --- a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/ExecutorAdmin.java +++ b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/ExecutorAdmin.java @@ -12,6 +12,7 @@ package org.eclipse.osee.executor.admin; import java.util.concurrent.Callable; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; /** * @author Roberto E. Escobar @@ -22,6 +23,8 @@ public interface ExecutorAdmin { void createCachedPoolExecutor(String id); + void createScheduledPoolExecutor(String id, int poolSize); + Future schedule(Callable callable, ExecutionCallback callback); Future schedule(String id, Callable callable, ExecutionCallback callback); @@ -30,6 +33,10 @@ public interface ExecutorAdmin { Future schedule(String id, Callable callable); + Future scheduleAtFixedRate(String id, Callable callable, long executionRate, TimeUnit timeUnit); + + Future scheduleAtFixedRate(String id, Callable callable, long startAfter, long executionRate, TimeUnit timeUnit); + void shutdown(String id); } diff --git a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorAdminImpl.java b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorAdminImpl.java index 8107f9d6bc6..e12fff2cccd 100644 --- a/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorAdminImpl.java +++ b/plugins/org.eclipse.osee.executor.admin/src/org/eclipse/osee/executor/admin/internal/ExecutorAdminImpl.java @@ -17,6 +17,8 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.eclipse.osee.executor.admin.ExecutionCallback; @@ -27,6 +29,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -184,4 +187,73 @@ public class ExecutorAdminImpl implements ExecutorAdmin { cache.remove(id); } } + + public ListeningScheduledExecutorService getScheduledExecutor(String id) { + ListeningScheduledExecutorService service = null; + synchronized (cache) { + ListeningExecutorService executor = cache.getById(id); + if (executor instanceof ListeningScheduledExecutorService) { + service = (ListeningScheduledExecutorService) executor; + } else { + service = createScheduledExecutor(id, -1); + } + } + if (service == null) { + throw new OseeStateException("Error creating executor [%s].", id); + } + if (service.isShutdown() || service.isTerminated()) { + throw new OseeStateException("Error executor [%s] was previously shutdown.", id); + } + return service; + } + + private ListeningScheduledExecutorService createScheduledExecutor(String id, int poolSize) { + ThreadFactory threadFactory = new ThreadFactoryBuilder()// + .setNameFormat(id + "- [%s]")// + .setPriority(Thread.NORM_PRIORITY)// + .build(); + ScheduledExecutorService executor = null; + if (poolSize > 0) { + executor = Executors.newScheduledThreadPool(poolSize, threadFactory); + } else { + executor = Executors.newSingleThreadScheduledExecutor(threadFactory); + } + ListeningScheduledExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor); + cache.put(id, listeningExecutor); + return listeningExecutor; + } + + @Override + public void createScheduledPoolExecutor(String id, int poolSize) { + createScheduledExecutor(id, poolSize); + } + + @Override + public Future scheduleAtFixedRate(String id, Callable callable, long executionRate, TimeUnit timeUnit) { + return scheduleAtFixedRate(id, callable, -1, executionRate, timeUnit); + } + + @Override + @SuppressWarnings("unchecked") + public Future scheduleAtFixedRate(String id, final Callable callable, long startAfter, long executionRate, TimeUnit timeUnit) { + ListeningScheduledExecutorService executor = getScheduledExecutor(id); + Runnable runnable = asRunnable(callable); + ScheduledFuture scheduledFuture = executor.scheduleAtFixedRate(runnable, startAfter, executionRate, timeUnit); + return (Future) scheduledFuture; + } + + private Runnable asRunnable(final Callable callable) { + return new Runnable() { + + @Override + public void run() { + try { + callable.call(); + } catch (Throwable th) { + logger.error(th, "Error executing scheduled task [%s]", callable); + } + } + }; + } + } -- cgit v1.2.3