From fdddb4768224869602b37929e1e27dcda22ed44c Mon Sep 17 00:00:00 2001 From: pnehrer Date: Thu, 10 Aug 2006 03:23:05 +0000 Subject: Refactored subscription agent to deliver replica instance. --- .../org/eclipse/ecf/example/pubsub/PubSubView.java | 2 +- .../ecf/example/pubsub/SubscriptionView.java | 5 +- .../org/eclipse/ecf/pubsub/IPublishedService.java | 6 +- .../org/eclipse/ecf/pubsub/ISubscribedService.java | 19 +++++ .../src/org/eclipse/ecf/pubsub/ISubscriber.java | 16 +++++ .../src/org/eclipse/ecf/pubsub/ISubscription.java | 2 + .../PublishedServiceDirectoryChangeEvent.java | 2 +- .../eclipse/ecf/pubsub/impl/SubscribeMessage.java | 38 ++++++++++ .../eclipse/ecf/pubsub/impl/SubscriptionAgent.java | 82 +++++++-------------- .../ecf/pubsub/impl/UnsubscribeMessage.java | 8 +++ .../eclipse/ecf/pubsub/model/impl/AgentBase.java | 2 + .../eclipse/ecf/pubsub/model/impl/LocalAgent.java | 84 ++++++++++++---------- .../eclipse/ecf/pubsub/model/impl/RemoteAgent.java | 63 +++++++++++++++- 13 files changed, 226 insertions(+), 103 deletions(-) create mode 100644 examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/ISubscribedService.java create mode 100644 examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/ISubscriber.java create mode 100644 examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/impl/SubscribeMessage.java create mode 100644 examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/impl/UnsubscribeMessage.java (limited to 'examples/bundles') diff --git a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/example/pubsub/PubSubView.java b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/example/pubsub/PubSubView.java index 97baed678..11182d5fd 100644 --- a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/example/pubsub/PubSubView.java +++ b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/example/pubsub/PubSubView.java @@ -256,7 +256,7 @@ public class PubSubView extends ViewPart { return; } - view.setSubscription(container, subscription); + view.setSubscription(subscription); } }); } diff --git a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/example/pubsub/SubscriptionView.java b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/example/pubsub/SubscriptionView.java index 653a3e22f..3437c0fda 100644 --- a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/example/pubsub/SubscriptionView.java +++ b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/example/pubsub/SubscriptionView.java @@ -13,7 +13,6 @@ package org.eclipse.ecf.example.pubsub; import java.io.PrintWriter; import java.io.StringWriter; -import org.eclipse.ecf.core.ISharedObjectContainer; import org.eclipse.ecf.pubsub.ISubscription; import org.eclipse.ecf.pubsub.model.IReplicaModel; import org.eclipse.swt.SWT; @@ -32,10 +31,10 @@ public class SubscriptionView extends ViewPart implements IAppendableListListene protected Text text; - public synchronized void setSubscription(ISharedObjectContainer container, ISubscription subscription) { + public synchronized void setSubscription(ISubscription subscription) { this.subscription = subscription; setPartName("Subscription: " + subscription.getID()); - Object object = container.getSharedObjectManager().getSharedObject(subscription.getID()); + Object object = subscription.getSubscribedService(); if (object instanceof IReplicaModel) { Object data = ((IReplicaModel) object).getData(); if (data instanceof AppendableList) { diff --git a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/IPublishedService.java b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/IPublishedService.java index 22dd1f324..5f4a65f13 100644 --- a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/IPublishedService.java +++ b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/IPublishedService.java @@ -19,9 +19,5 @@ public interface IPublishedService extends IIdentifiable { Map getProperties(); - void subscribe(ID containerID); - - // TODO this is a bit unsafe -- does not prevent duplicate unsubscription - // -- perhaps we should create a token (ID) in subscribe() and require it here? - void unsubscribe(ID containerID); + void subscribe(ID containerID, ID requestorID); } diff --git a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/ISubscribedService.java b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/ISubscribedService.java new file mode 100644 index 000000000..c3718d280 --- /dev/null +++ b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/ISubscribedService.java @@ -0,0 +1,19 @@ +/** + * Copyright (c) 2006 Ecliptical Software Inc. and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Ecliptical Software Inc. - initial API and implementation + */ +package org.eclipse.ecf.pubsub; + +import org.eclipse.ecf.core.IIdentifiable; +import org.eclipse.ecf.core.identity.ID; + +public interface ISubscribedService extends IIdentifiable { + + void unsubscribe(ID requestorID); +} diff --git a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/ISubscriber.java b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/ISubscriber.java new file mode 100644 index 000000000..7ae7cbce9 --- /dev/null +++ b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/ISubscriber.java @@ -0,0 +1,16 @@ +/** + * Copyright (c) 2006 Ecliptical Software Inc. and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Ecliptical Software Inc. - initial API and implementation + */ +package org.eclipse.ecf.pubsub; + +public interface ISubscriber { + + void subscribed(ISubscribedService svc); +} diff --git a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/ISubscription.java b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/ISubscription.java index 25c3abb1e..40d13414d 100644 --- a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/ISubscription.java +++ b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/ISubscription.java @@ -17,5 +17,7 @@ public interface ISubscription extends IIdentifiable { ID getHomeContainerID(); + ISubscribedService getSubscribedService(); + void dispose(); } diff --git a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/PublishedServiceDirectoryChangeEvent.java b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/PublishedServiceDirectoryChangeEvent.java index 720f7f400..08d2e195e 100644 --- a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/PublishedServiceDirectoryChangeEvent.java +++ b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/PublishedServiceDirectoryChangeEvent.java @@ -31,7 +31,7 @@ public class PublishedServiceDirectoryChangeEvent extends EventObject { this.services = services; } - public IPublishedServiceDirectory getManager() { + public IPublishedServiceDirectory getDirectory() { return (IPublishedServiceDirectory) source; } diff --git a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/impl/SubscribeMessage.java b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/impl/SubscribeMessage.java new file mode 100644 index 000000000..e324184c2 --- /dev/null +++ b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/impl/SubscribeMessage.java @@ -0,0 +1,38 @@ +package org.eclipse.ecf.pubsub.impl; + +import java.io.Serializable; + +import org.eclipse.ecf.core.identity.ID; + +public class SubscribeMessage implements Serializable { + + private static final long serialVersionUID = -8507642983243509135L; + + private final ID requestorID; + + public SubscribeMessage(ID requestorID) { + this.requestorID = requestorID; + } + + public ID getRequestorID() { + return requestorID; + } + + public int hashCode() { + return requestorID.hashCode(); + } + + public boolean equals(Object obj) { + if (this == obj) + return true; + + if (obj == null) + return false; + + if (getClass() != obj.getClass()) + return false; + + SubscribeMessage other = (SubscribeMessage) obj; + return requestorID.equals(other.requestorID); + } +} diff --git a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/impl/SubscriptionAgent.java b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/impl/SubscriptionAgent.java index ce29aa2b0..384d3e830 100644 --- a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/impl/SubscriptionAgent.java +++ b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/impl/SubscriptionAgent.java @@ -21,17 +21,18 @@ import org.eclipse.ecf.core.ISharedObjectContext; import org.eclipse.ecf.core.ISharedObjectManager; import org.eclipse.ecf.core.ReplicaSharedObjectDescription; import org.eclipse.ecf.core.SharedObjectInitException; -import org.eclipse.ecf.core.events.IContainerDisconnectedEvent; import org.eclipse.ecf.core.events.ISharedObjectActivatedEvent; import org.eclipse.ecf.core.events.ISharedObjectCreateResponseEvent; import org.eclipse.ecf.core.events.ISharedObjectDeactivatedEvent; import org.eclipse.ecf.core.identity.ID; import org.eclipse.ecf.core.util.Event; import org.eclipse.ecf.pubsub.IPublishedService; +import org.eclipse.ecf.pubsub.ISubscribedService; +import org.eclipse.ecf.pubsub.ISubscriber; import org.eclipse.ecf.pubsub.ISubscription; import org.eclipse.ecf.pubsub.ISubscriptionCallback; -public class SubscriptionAgent extends PlatformObject implements ISharedObject { +public class SubscriptionAgent extends PlatformObject implements ISharedObject, ISubscriber { protected static final Object CONTAINER_ID_KEY = new Integer(0); @@ -47,10 +48,18 @@ public class SubscriptionAgent extends PlatformObject implements ISharedObject { protected ISubscriptionCallback callback; - protected boolean subscribed; + protected ISubscribedService svc; protected boolean disposed; + public synchronized void subscribed(ISubscribedService svc) { + if (disposed) + throw new IllegalStateException("Already disposed."); + + this.svc = svc; + callback.subscribed(new Subscription()); + } + public void init(ISharedObjectConfig config) throws SharedObjectInitException { Map props = config.getProperties(); @@ -76,18 +85,10 @@ public class SubscriptionAgent extends PlatformObject implements ISharedObject { ISharedObjectActivatedEvent e = (ISharedObjectActivatedEvent) event; if (e.getActivatedID().equals(config.getSharedObjectID())) activated(); - else - activated(e.getActivatedID()); } else if (event instanceof ISharedObjectDeactivatedEvent) { ISharedObjectDeactivatedEvent e = (ISharedObjectDeactivatedEvent) event; if (e.getDeactivatedID().equals(config.getSharedObjectID())) deactivated(); - } else if (event instanceof IContainerDisconnectedEvent) { - IContainerDisconnectedEvent e = (IContainerDisconnectedEvent) event; - if (e.getTargetID().equals(e.getLocalContainerID())) - disconnected(); - else - disconnected(e.getTargetID()); } else if (event instanceof ISharedObjectCreateResponseEvent) received((ISharedObjectCreateResponseEvent) event); } @@ -101,7 +102,6 @@ public class SubscriptionAgent extends PlatformObject implements ISharedObject { if (isPrimary()) { try { ctx.sendCreate(containerID, createReplicaDescription()); - // TODO set timer to time out if no response received within some bound } catch (IOException e) { callback.requestFailed(e); ctx.getSharedObjectManager().removeSharedObject(config.getSharedObjectID()); @@ -116,55 +116,33 @@ public class SubscriptionAgent extends PlatformObject implements ISharedObject { ID homeContainerID = config.getHomeContainerID(); if (so instanceof IPublishedService) { IPublishedService svc = (IPublishedService) so; - svc.subscribe(homeContainerID); - subscribed = true; + svc.subscribe(homeContainerID, config.getSharedObjectID()); } else { ctx.sendCreateResponse(homeContainerID, new IllegalArgumentException("Not an IPublishedService."), -1); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); - + } finally { ctx.getSharedObjectManager().removeSharedObject(config.getSharedObjectID()); } } - - protected void activated(ID sharedObjectID) { - if (isPrimary() && sharedObjectID.equals(this.sharedObjectID)) - callback.subscribed(new Subscription()); - } protected void deactivated() { if (isPrimary()) { synchronized (this) { disposed = true; - } - - return; - } - - if (subscribed) { - ISharedObject so = config.getContext().getSharedObjectManager().getSharedObject(sharedObjectID); - if (so instanceof IPublishedService) { - IPublishedService svc = (IPublishedService) so; - svc.unsubscribe(config.getHomeContainerID()); - subscribed = false; + if (svc != null) + svc.unsubscribe(config.getSharedObjectID()); } } } - protected void disconnected() { - config.getContext().getSharedObjectManager().removeSharedObject(config.getSharedObjectID()); - } - - protected void disconnected(ID containerID) { - if (containerID.equals(config.getHomeContainerID()) || containerID.equals(this.containerID)) - config.getContext().getSharedObjectManager().removeSharedObject(config.getSharedObjectID()); - } - protected void received(ISharedObjectCreateResponseEvent e) { - if (e.getRemoteContainerID().equals(containerID) && e.getSenderSharedObjectID().equals(config.getSharedObjectID())) + if (e.getRemoteContainerID().equals(containerID) && e.getSenderSharedObjectID().equals(config.getSharedObjectID())) { callback.requestFailed(e.getException()); + config.getContext().getSharedObjectManager().removeSharedObject(config.getSharedObjectID()); + } } protected ReplicaSharedObjectDescription createReplicaDescription() { @@ -180,6 +158,8 @@ public class SubscriptionAgent extends PlatformObject implements ISharedObject { public void dispose(ID containerID) { config = null; + callback = null; + svc = null; } protected class Subscription implements ISubscription { @@ -191,24 +171,16 @@ public class SubscriptionAgent extends PlatformObject implements ISharedObject { public ID getHomeContainerID() { return containerID; } + + public ISubscribedService getSubscribedService() { + return svc; + } public void dispose() { synchronized (SubscriptionAgent.this) { - if (disposed) - return; - - disposed = true; - } - - ISharedObjectContext ctx = config.getContext(); - try { - ctx.sendDispose(containerID); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + if (!disposed) + config.getContext().getSharedObjectManager().removeSharedObject(config.getSharedObjectID()); } - - ctx.getSharedObjectManager().removeSharedObject(config.getSharedObjectID()); } } } diff --git a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/impl/UnsubscribeMessage.java b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/impl/UnsubscribeMessage.java new file mode 100644 index 000000000..09514696b --- /dev/null +++ b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/impl/UnsubscribeMessage.java @@ -0,0 +1,8 @@ +package org.eclipse.ecf.pubsub.impl; + +import java.io.Serializable; + +public class UnsubscribeMessage implements Serializable { + + private static final long serialVersionUID = -3516868138602600800L; +} diff --git a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/model/impl/AgentBase.java b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/model/impl/AgentBase.java index ea9378cb5..34c0a8957 100644 --- a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/model/impl/AgentBase.java +++ b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/model/impl/AgentBase.java @@ -32,6 +32,8 @@ public abstract class AgentBase extends PlatformObject implements ISharedObject, public static final Object MODEL_UPDATER_KEY = new Integer(1); + protected static final Object REQUESTOR_ID = new Integer(2); + protected ISharedObjectConfig config; protected Object data; diff --git a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/model/impl/LocalAgent.java b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/model/impl/LocalAgent.java index 148f00686..7e187c4c1 100644 --- a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/model/impl/LocalAgent.java +++ b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/model/impl/LocalAgent.java @@ -11,8 +11,10 @@ package org.eclipse.ecf.pubsub.model.impl; import java.io.IOException; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import org.eclipse.ecf.core.ISharedObjectContext; @@ -21,11 +23,13 @@ import org.eclipse.ecf.core.SharedObjectInitException; import org.eclipse.ecf.core.identity.ID; import org.eclipse.ecf.example.pubsub.SerializationUtil; import org.eclipse.ecf.pubsub.IPublishedService; +import org.eclipse.ecf.pubsub.impl.SubscribeMessage; +import org.eclipse.ecf.pubsub.impl.UnsubscribeMessage; import org.eclipse.ecf.pubsub.model.IMasterModel; public class LocalAgent extends AgentBase implements IPublishedService, IMasterModel { - protected Map subscriptions; + protected Collection subscriptions; private final Object subscriptionMutex = new Object(); @@ -41,52 +45,57 @@ public class LocalAgent extends AgentBase implements IPublishedService, IMasterM return Collections.EMPTY_MAP; } - public void subscribe(ID containerID) { + public void subscribe(ID containerID, ID requestorID) { synchronized (subscriptionMutex) { if (subscriptions == null) - subscriptions = new HashMap(); - - Integer refCount = (Integer) subscriptions.get(containerID); - if (refCount == null) - refCount = new Integer(0); - - refCount = new Integer(refCount.intValue() + 1); - subscriptions.put(containerID, refCount); + subscriptions = new HashSet(); + + ISharedObjectContext ctx = config.getContext(); + try { + if (subscriptions.add(containerID)) { + ctx.sendCreate(containerID, createRemoteAgentDescription(requestorID)); + } else { + SubscribeMessage msg = new SubscribeMessage(requestorID); + ctx.sendMessage(containerID, SerializationUtil.serialize(msg)); + } + } catch (IOException e) { + // TODO Log me! + e.printStackTrace(); + } } + } + + protected void received(ID containerID, Object data) { + if (!(data instanceof byte[])) + return; - ISharedObjectContext ctx = config.getContext(); + Object msg; try { - ctx.sendCreate(containerID, createRemoteAgentDescription()); + msg = SerializationUtil.deserialize((byte[]) data); } catch (IOException e) { - // TODO Log me! + // TODO Auto-generated catch block e.printStackTrace(); + return; + } catch (ClassNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + return; + } + + if (!(msg instanceof UnsubscribeMessage)) + return; + + synchronized (subscriptionMutex) { + if (subscriptions != null) + subscriptions.remove(containerID); } } - public void unsubscribe(ID containerID) { - boolean disposeReplica = false; + protected void disconnected(ID containerID) { synchronized (subscriptionMutex) { - if (subscriptions != null) { - Integer refCount = (Integer) subscriptions.get(containerID); - if (refCount != null) { - refCount = new Integer(refCount.intValue() - 1); - if (refCount.intValue() <= 0) { - subscriptions.remove(containerID); - disposeReplica = true; - } else { - subscriptions.put(containerID, refCount); - } - } - } + if (subscriptions != null) + subscriptions.remove(containerID); } - - if (disposeReplica) - try { - config.getContext().sendDispose(containerID); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } } protected void deactivated() { @@ -99,8 +108,8 @@ public class LocalAgent extends AgentBase implements IPublishedService, IMasterM } } - protected ReplicaSharedObjectDescription createRemoteAgentDescription() { - Map props = new HashMap(2); + protected ReplicaSharedObjectDescription createRemoteAgentDescription(ID requestorID) { + Map props = new HashMap(3); try { props.put(INITIAL_DATA_KEY, SerializationUtil.serialize(data)); } catch (IOException e) { @@ -109,6 +118,7 @@ public class LocalAgent extends AgentBase implements IPublishedService, IMasterM } props.put(MODEL_UPDATER_KEY, updaterID); + props.put(REQUESTOR_ID, requestorID); return new ReplicaSharedObjectDescription(RemoteAgent.class, config.getSharedObjectID(), config.getHomeContainerID(), props); } } diff --git a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/model/impl/RemoteAgent.java b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/model/impl/RemoteAgent.java index 40f0f20b0..16b99b924 100644 --- a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/model/impl/RemoteAgent.java +++ b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/model/impl/RemoteAgent.java @@ -11,18 +11,58 @@ package org.eclipse.ecf.pubsub.model.impl; import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; import org.eclipse.core.runtime.CoreException; import org.eclipse.core.runtime.IConfigurationElement; import org.eclipse.core.runtime.IExtensionRegistry; import org.eclipse.core.runtime.Platform; +import org.eclipse.ecf.core.ISharedObjectConfig; +import org.eclipse.ecf.core.ISharedObjectContext; import org.eclipse.ecf.core.SharedObjectInitException; import org.eclipse.ecf.core.identity.ID; import org.eclipse.ecf.example.pubsub.SerializationUtil; +import org.eclipse.ecf.pubsub.ISubscribedService; +import org.eclipse.ecf.pubsub.ISubscriber; +import org.eclipse.ecf.pubsub.impl.SubscribeMessage; +import org.eclipse.ecf.pubsub.impl.UnsubscribeMessage; import org.eclipse.ecf.pubsub.model.IModelUpdater; import org.eclipse.ecf.pubsub.model.IReplicaModel; -public class RemoteAgent extends AgentBase implements IReplicaModel { +public class RemoteAgent extends AgentBase implements IReplicaModel, ISubscribedService { + + private Collection subscribers; + + private final Object subscriptionMutex = new Object(); + + public void unsubscribe(ID requestorID) { + synchronized (subscriptionMutex) { + if (subscribers == null) + return; + + subscribers.remove(requestorID); + if (subscribers.isEmpty()) { + ISharedObjectContext ctx = config.getContext(); + try { + ctx.sendMessage(config.getHomeContainerID(), new UnsubscribeMessage()); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + ctx.getSharedObjectManager().removeSharedObject(config.getSharedObjectID()); + } + } + } + + public void init(ISharedObjectConfig config) throws SharedObjectInitException { + super.init(config); + subscribers = new HashSet(); + ID requestorID = (ID) config.getProperties().get(REQUESTOR_ID); + if (requestorID != null) + subscribers.add(requestorID); + } protected void initializeData(Object data) throws SharedObjectInitException { try { @@ -58,6 +98,15 @@ public class RemoteAgent extends AgentBase implements IReplicaModel { throw new SharedObjectInitException("Could not find specified Model Updater."); } + protected void activated() { + ID requestorID = (ID) config.getProperties().get(REQUESTOR_ID); + if (requestorID != null) { + Object svc = config.getContext().getSharedObjectManager().getSharedObject(requestorID); + if (svc instanceof ISubscriber) + ((ISubscriber) svc).subscribed(this); + } + } + protected void disconnected() { config.getContext().getSharedObjectManager().removeSharedObject(config.getSharedObjectID()); } @@ -68,6 +117,18 @@ public class RemoteAgent extends AgentBase implements IReplicaModel { } protected void received(ID containerID, Object data) { + if (!(data instanceof byte[])) + return; + + if (data instanceof SubscribeMessage) { + SubscribeMessage msg = (SubscribeMessage) data; + synchronized (subscriptionMutex) { + subscribers.add(msg.getRequestorID()); + } + + return; + } + try { updater.update(this.data, SerializationUtil.deserialize((byte[]) data)); } catch (IOException e) { -- cgit v1.2.3