diff options
author | slewis | 2009-09-17 05:30:58 +0000 |
---|---|---|
committer | slewis | 2009-09-17 05:30:58 +0000 |
commit | f5783d4832177d0819897e923ca3324d025b19f8 (patch) | |
tree | 11f99bce34d307a46aaed767b3e2ce69d23e5f24 /server-side | |
parent | 109445faad31e9951b0a3dfaa74309fa5d9880f9 (diff) | |
download | org.eclipse.ecf-f5783d4832177d0819897e923ca3324d025b19f8.tar.gz org.eclipse.ecf-f5783d4832177d0819897e923ca3324d025b19f8.tar.xz org.eclipse.ecf-f5783d4832177d0819897e923ca3324d025b19f8.zip |
Updates to distributed eventadmin
Diffstat (limited to 'server-side')
3 files changed, 50 insertions, 121 deletions
diff --git a/server-side/bundles/org.eclipse.ecf.remoteservice.eventadmin/META-INF/MANIFEST.MF b/server-side/bundles/org.eclipse.ecf.remoteservice.eventadmin/META-INF/MANIFEST.MF index eb7c2d1de..c4141caa4 100644 --- a/server-side/bundles/org.eclipse.ecf.remoteservice.eventadmin/META-INF/MANIFEST.MF +++ b/server-side/bundles/org.eclipse.ecf.remoteservice.eventadmin/META-INF/MANIFEST.MF @@ -9,7 +9,7 @@ Bundle-RequiredExecutionEnvironment: CDC-1.1/Foundation-1.1, Bundle-Localization: bundle Import-Package: org.eclipse.ecf.core.identity;version="3.0.0", org.eclipse.ecf.core.sharedobject, - org.eclipse.equinox.concurrent.future;version="1.0.0", + org.eclipse.osgi.framework.eventmgr;version="1.2.0", org.eclipse.osgi.util;version="1.1.0", org.osgi.framework;version="1.3.0", org.osgi.service.event;version="1.2.0", diff --git a/server-side/bundles/org.eclipse.ecf.remoteservice.eventadmin/src/org/eclipse/ecf/internal/remoteservice/eventadmin/EventHandlerTracker.java b/server-side/bundles/org.eclipse.ecf.remoteservice.eventadmin/src/org/eclipse/ecf/internal/remoteservice/eventadmin/EventHandlerTracker.java index 8b99deb7a..5b3b09146 100644 --- a/server-side/bundles/org.eclipse.ecf.remoteservice.eventadmin/src/org/eclipse/ecf/internal/remoteservice/eventadmin/EventHandlerTracker.java +++ b/server-side/bundles/org.eclipse.ecf.remoteservice.eventadmin/src/org/eclipse/ecf/internal/remoteservice/eventadmin/EventHandlerTracker.java @@ -11,6 +11,7 @@ package org.eclipse.ecf.internal.remoteservice.eventadmin; +import java.security.Permission; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -18,13 +19,15 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.eclipse.osgi.framework.eventmgr.EventDispatcher; import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceReference; +import org.osgi.service.event.Event; import org.osgi.service.event.EventHandler; import org.osgi.service.log.LogService; import org.osgi.util.tracker.ServiceTracker; -public class EventHandlerTracker extends ServiceTracker { +public class EventHandlerTracker extends ServiceTracker implements EventDispatcher{ private final LogService log; // * List<EventHandlerWrapper> of all handlers with topic of "*" private final List globalWildcard; @@ -206,4 +209,8 @@ public class EventHandlerTracker extends ServiceTracker { return handlers; } + public void dispatchEvent(Object eventListener, Object listenerObject, int eventAction, Object eventObject) { + ((EventHandlerWrapper) eventListener).handleEvent((Event) eventObject, (Permission) listenerObject); + } + } diff --git a/server-side/bundles/org.eclipse.ecf.remoteservice.eventadmin/src/org/eclipse/ecf/remoteservice/eventadmin/DistributedEventAdmin.java b/server-side/bundles/org.eclipse.ecf.remoteservice.eventadmin/src/org/eclipse/ecf/remoteservice/eventadmin/DistributedEventAdmin.java index 0c27a3ae1..4e5d4e774 100644 --- a/server-side/bundles/org.eclipse.ecf.remoteservice.eventadmin/src/org/eclipse/ecf/remoteservice/eventadmin/DistributedEventAdmin.java +++ b/server-side/bundles/org.eclipse.ecf.remoteservice.eventadmin/src/org/eclipse/ecf/remoteservice/eventadmin/DistributedEventAdmin.java @@ -12,103 +12,52 @@ package org.eclipse.ecf.remoteservice.eventadmin; import java.io.IOException; import java.security.Permission; import java.util.Iterator; -import java.util.Properties; import java.util.Set; import org.eclipse.core.runtime.Assert; -import org.eclipse.core.runtime.IProgressMonitor; -import org.eclipse.core.runtime.ISafeRunnable; -import org.eclipse.core.runtime.SafeRunner; -import org.eclipse.ecf.core.identity.ID; -import org.eclipse.ecf.core.identity.IDFactory; import org.eclipse.ecf.core.sharedobject.BaseSharedObject; -import org.eclipse.ecf.core.sharedobject.ISharedObjectContainer; -import org.eclipse.ecf.core.sharedobject.SharedObjectAddException; import org.eclipse.ecf.core.sharedobject.SharedObjectMsg; import org.eclipse.ecf.internal.remoteservice.eventadmin.DistributedEventAdminMessage; import org.eclipse.ecf.internal.remoteservice.eventadmin.EventHandlerTracker; import org.eclipse.ecf.internal.remoteservice.eventadmin.EventHandlerWrapper; import org.eclipse.ecf.internal.remoteservice.eventadmin.LogTracker; -import org.eclipse.equinox.concurrent.future.IExecutor; -import org.eclipse.equinox.concurrent.future.IProgressRunnable; -import org.eclipse.equinox.concurrent.future.ThreadsExecutor; +import org.eclipse.osgi.framework.eventmgr.CopyOnWriteIdentityMap; +import org.eclipse.osgi.framework.eventmgr.EventManager; +import org.eclipse.osgi.framework.eventmgr.ListenerQueue; import org.eclipse.osgi.util.NLS; import org.osgi.framework.BundleContext; -import org.osgi.framework.ServiceRegistration; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; -import org.osgi.service.event.EventConstants; import org.osgi.service.event.TopicPermission; import org.osgi.service.log.LogService; public class DistributedEventAdmin extends BaseSharedObject implements EventAdmin { - private BundleContext context; - private String topic; - private ID topicID; private LogTracker log; - private EventHandlerTracker eventHandlerTracker; - private IExecutor executor; - - public DistributedEventAdmin(BundleContext context, String topic, - IExecutor executor) { - this.context = context; - Assert.isNotNull(this.context); - this.topic = topic; - Assert.isNotNull(this.topic); - this.topicID = IDFactory.getDefault().createStringID(getTopic()); + private EventManager eventManager; + + public DistributedEventAdmin(BundleContext context, String topic) { + Assert.isNotNull(context); + Assert.isNotNull(topic); this.log = new LogTracker(context, System.out); - this.executor = (executor == null) ? new ThreadsExecutor() : executor; this.eventHandlerTracker = new EventHandlerTracker(context, log); - this.eventHandlerTracker.open(); - } - - public DistributedEventAdmin(BundleContext context, String topic) { - this(context, topic, null); - } - - public String getTopic() { - return topic; } - public ID getTopicID() { - return topicID; + public void start() { + log.open(); + ThreadGroup eventGroup = new ThreadGroup("Distributed EventAdmin"); //$NON-NLS-1$ + eventGroup.setDaemon(true); + eventManager = new EventManager("Distributed EventAdmin Async Event Dispatcher Thread", eventGroup); + eventHandlerTracker.open(); } - - public void addToContainer(ISharedObjectContainer soContainer) - throws SharedObjectAddException { - soContainer.getSharedObjectManager().addSharedObject(topicID, this, - null); - } - - public void removeFromContainer(ISharedObjectContainer soContainer) { - soContainer.getSharedObjectManager().removeSharedObject(getTopicID()); - } - - public ServiceRegistration register(Properties props) { - if (props == null) - props = new Properties(); - props.put(EventConstants.EVENT_TOPIC, getTopic()); - return this.context.registerService(EventAdmin.class.getName(), this, - props); - } - - public ServiceRegistration register() { - return register(null); - } - - public void dispose() { - if (this.eventHandlerTracker != null) { - this.eventHandlerTracker.close(); - } - if (this.log != null) { - this.log.close(); - } - topic = null; - topicID = null; - executor = null; + + public void stop() { + eventHandlerTracker.close(); + eventManager.close(); + eventManager = null; + log.close(); } public void postEvent(final Event event) { @@ -118,7 +67,7 @@ public class DistributedEventAdmin extends BaseSharedObject implements } catch (IOException e) { logError(NLS.bind( "IOException posting distributed event {0} to {1}", event, - topic), e); + event.getTopic()), e); } localDispatch(event, true); } @@ -138,15 +87,14 @@ public class DistributedEventAdmin extends BaseSharedObject implements } protected void localDispatch(Event event, boolean isAsync) { - IExecutor exec = executor; - if (exec == null) + EventManager currentManager = eventManager; + if (currentManager == null) { return; + } if (event == null) { log.log(LogService.LOG_ERROR, "Null event passed to EventAdmin was ignored."); - // continue from here will result in an NPE below; the spec for - // EventAdmin does not allow for null here } String eventTopic = event.getTopic(); @@ -154,7 +102,7 @@ public class DistributedEventAdmin extends BaseSharedObject implements try { SecurityManager sm = System.getSecurityManager(); if (sm != null) - sm.checkPermission(new TopicPermission(topic, + sm.checkPermission(new TopicPermission(eventTopic, TopicPermission.PUBLISH)); } catch (SecurityException e) { String msg = NLS @@ -162,62 +110,36 @@ public class DistributedEventAdmin extends BaseSharedObject implements "Caller bundle does not have TopicPermission to publish topic {0}", eventTopic); logError(msg, e); - // must throw a security exception here according to the EventAdmin - // spec throw e; } Set eventHandlerWrappers = eventHandlerTracker.getHandlers(eventTopic); SecurityManager sm = System.getSecurityManager(); - Permission perm = (sm == null) ? null : new TopicPermission(topic, + Permission perm = (sm == null) ? null : new TopicPermission(eventTopic, TopicPermission.SUBSCRIBE); - // Now synchronously call every eventhandler - if (isAsync) - fireAsync(exec, eventHandlerWrappers, event, perm); - else - callSync(eventHandlerWrappers, event, perm); - + CopyOnWriteIdentityMap listeners = new CopyOnWriteIdentityMap(); + Iterator iter = eventHandlerWrappers.iterator(); + while (iter.hasNext()) { + EventHandlerWrapper wrapper = (EventHandlerWrapper) iter.next(); + listeners.put(wrapper, perm); + } + + ListenerQueue listenerQueue = new ListenerQueue(currentManager); + listenerQueue.queueListeners(listeners.entrySet(), eventHandlerTracker); + if (isAsync) { + listenerQueue.dispatchEventAsynchronous(0, event); + } + else { + listenerQueue.dispatchEventSynchronous(0, event); + } } void handlePostEvent(DistributedEventAdminMessage event) { localDispatch(event.getEvent(), true); } - protected void callSync(Set eventHandlerWrappers, final Event event, - final Permission perm) { - for (Iterator i = eventHandlerWrappers.iterator(); i.hasNext();) { - final EventHandlerWrapper wrapper = (EventHandlerWrapper) i.next(); - SafeRunner.run(new ISafeRunnable() { - public void handleException(Throwable exception) { - logError( - NLS - .bind( - "Exception while dispatching event {0} to handler {1}", - event, wrapper), exception); - } - - public void run() throws Exception { - wrapper.handleEvent(event, perm); - } - }); - } - } - - protected void fireAsync(IExecutor exec, Set eventHandlerWrappers, - final Event event, final Permission perm) { - for (Iterator i = eventHandlerWrappers.iterator(); i.hasNext();) { - final EventHandlerWrapper wrapper = (EventHandlerWrapper) i.next(); - exec.execute(new IProgressRunnable() { - public Object run(IProgressMonitor arg0) throws Exception { - wrapper.handleEvent(event, perm); - return null; - } - }, null); - } - } - protected void logError(String message, Throwable exception) { if (log != null) { log.log(LogService.LOG_ERROR, message, exception); |