Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'examples/bundles/org.eclipse.ecf.example.pubsub')
-rw-r--r--examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/example/pubsub/PubSubView.java2
-rw-r--r--examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/example/pubsub/SubscriptionView.java5
-rw-r--r--examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/IPublishedService.java6
-rw-r--r--examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/ISubscribedService.java19
-rw-r--r--examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/ISubscriber.java16
-rw-r--r--examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/ISubscription.java2
-rw-r--r--examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/PublishedServiceDirectoryChangeEvent.java2
-rw-r--r--examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/impl/SubscribeMessage.java38
-rw-r--r--examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/impl/SubscriptionAgent.java82
-rw-r--r--examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/impl/UnsubscribeMessage.java8
-rw-r--r--examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/model/impl/AgentBase.java2
-rw-r--r--examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/model/impl/LocalAgent.java84
-rw-r--r--examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/model/impl/RemoteAgent.java63
13 files changed, 226 insertions, 103 deletions
diff --git a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/example/pubsub/PubSubView.java b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/example/pubsub/PubSubView.java
index 97baed678..11182d5fd 100644
--- a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/example/pubsub/PubSubView.java
+++ b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/example/pubsub/PubSubView.java
@@ -256,7 +256,7 @@ public class PubSubView extends ViewPart {
return;
}
- view.setSubscription(container, subscription);
+ view.setSubscription(subscription);
}
});
}
diff --git a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/example/pubsub/SubscriptionView.java b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/example/pubsub/SubscriptionView.java
index 653a3e22f..3437c0fda 100644
--- a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/example/pubsub/SubscriptionView.java
+++ b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/example/pubsub/SubscriptionView.java
@@ -13,7 +13,6 @@ package org.eclipse.ecf.example.pubsub;
import java.io.PrintWriter;
import java.io.StringWriter;
-import org.eclipse.ecf.core.ISharedObjectContainer;
import org.eclipse.ecf.pubsub.ISubscription;
import org.eclipse.ecf.pubsub.model.IReplicaModel;
import org.eclipse.swt.SWT;
@@ -32,10 +31,10 @@ public class SubscriptionView extends ViewPart implements IAppendableListListene
protected Text text;
- public synchronized void setSubscription(ISharedObjectContainer container, ISubscription subscription) {
+ public synchronized void setSubscription(ISubscription subscription) {
this.subscription = subscription;
setPartName("Subscription: " + subscription.getID());
- Object object = container.getSharedObjectManager().getSharedObject(subscription.getID());
+ Object object = subscription.getSubscribedService();
if (object instanceof IReplicaModel) {
Object data = ((IReplicaModel) object).getData();
if (data instanceof AppendableList) {
diff --git a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/IPublishedService.java b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/IPublishedService.java
index 22dd1f324..5f4a65f13 100644
--- a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/IPublishedService.java
+++ b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/IPublishedService.java
@@ -19,9 +19,5 @@ public interface IPublishedService extends IIdentifiable {
Map getProperties();
- void subscribe(ID containerID);
-
- // TODO this is a bit unsafe -- does not prevent duplicate unsubscription
- // -- perhaps we should create a token (ID) in subscribe() and require it here?
- void unsubscribe(ID containerID);
+ void subscribe(ID containerID, ID requestorID);
}
diff --git a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/ISubscribedService.java b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/ISubscribedService.java
new file mode 100644
index 000000000..c3718d280
--- /dev/null
+++ b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/ISubscribedService.java
@@ -0,0 +1,19 @@
+/**
+ * Copyright (c) 2006 Ecliptical Software Inc. and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Ecliptical Software Inc. - initial API and implementation
+ */
+package org.eclipse.ecf.pubsub;
+
+import org.eclipse.ecf.core.IIdentifiable;
+import org.eclipse.ecf.core.identity.ID;
+
+public interface ISubscribedService extends IIdentifiable {
+
+ void unsubscribe(ID requestorID);
+}
diff --git a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/ISubscriber.java b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/ISubscriber.java
new file mode 100644
index 000000000..7ae7cbce9
--- /dev/null
+++ b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/ISubscriber.java
@@ -0,0 +1,16 @@
+/**
+ * Copyright (c) 2006 Ecliptical Software Inc. and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Ecliptical Software Inc. - initial API and implementation
+ */
+package org.eclipse.ecf.pubsub;
+
+public interface ISubscriber {
+
+ void subscribed(ISubscribedService svc);
+}
diff --git a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/ISubscription.java b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/ISubscription.java
index 25c3abb1e..40d13414d 100644
--- a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/ISubscription.java
+++ b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/ISubscription.java
@@ -17,5 +17,7 @@ public interface ISubscription extends IIdentifiable {
ID getHomeContainerID();
+ ISubscribedService getSubscribedService();
+
void dispose();
}
diff --git a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/PublishedServiceDirectoryChangeEvent.java b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/PublishedServiceDirectoryChangeEvent.java
index 720f7f400..08d2e195e 100644
--- a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/PublishedServiceDirectoryChangeEvent.java
+++ b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/PublishedServiceDirectoryChangeEvent.java
@@ -31,7 +31,7 @@ public class PublishedServiceDirectoryChangeEvent extends EventObject {
this.services = services;
}
- public IPublishedServiceDirectory getManager() {
+ public IPublishedServiceDirectory getDirectory() {
return (IPublishedServiceDirectory) source;
}
diff --git a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/impl/SubscribeMessage.java b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/impl/SubscribeMessage.java
new file mode 100644
index 000000000..e324184c2
--- /dev/null
+++ b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/impl/SubscribeMessage.java
@@ -0,0 +1,38 @@
+package org.eclipse.ecf.pubsub.impl;
+
+import java.io.Serializable;
+
+import org.eclipse.ecf.core.identity.ID;
+
+public class SubscribeMessage implements Serializable {
+
+ private static final long serialVersionUID = -8507642983243509135L;
+
+ private final ID requestorID;
+
+ public SubscribeMessage(ID requestorID) {
+ this.requestorID = requestorID;
+ }
+
+ public ID getRequestorID() {
+ return requestorID;
+ }
+
+ public int hashCode() {
+ return requestorID.hashCode();
+ }
+
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+
+ if (obj == null)
+ return false;
+
+ if (getClass() != obj.getClass())
+ return false;
+
+ SubscribeMessage other = (SubscribeMessage) obj;
+ return requestorID.equals(other.requestorID);
+ }
+}
diff --git a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/impl/SubscriptionAgent.java b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/impl/SubscriptionAgent.java
index ce29aa2b0..384d3e830 100644
--- a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/impl/SubscriptionAgent.java
+++ b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/impl/SubscriptionAgent.java
@@ -21,17 +21,18 @@ import org.eclipse.ecf.core.ISharedObjectContext;
import org.eclipse.ecf.core.ISharedObjectManager;
import org.eclipse.ecf.core.ReplicaSharedObjectDescription;
import org.eclipse.ecf.core.SharedObjectInitException;
-import org.eclipse.ecf.core.events.IContainerDisconnectedEvent;
import org.eclipse.ecf.core.events.ISharedObjectActivatedEvent;
import org.eclipse.ecf.core.events.ISharedObjectCreateResponseEvent;
import org.eclipse.ecf.core.events.ISharedObjectDeactivatedEvent;
import org.eclipse.ecf.core.identity.ID;
import org.eclipse.ecf.core.util.Event;
import org.eclipse.ecf.pubsub.IPublishedService;
+import org.eclipse.ecf.pubsub.ISubscribedService;
+import org.eclipse.ecf.pubsub.ISubscriber;
import org.eclipse.ecf.pubsub.ISubscription;
import org.eclipse.ecf.pubsub.ISubscriptionCallback;
-public class SubscriptionAgent extends PlatformObject implements ISharedObject {
+public class SubscriptionAgent extends PlatformObject implements ISharedObject, ISubscriber {
protected static final Object CONTAINER_ID_KEY = new Integer(0);
@@ -47,10 +48,18 @@ public class SubscriptionAgent extends PlatformObject implements ISharedObject {
protected ISubscriptionCallback callback;
- protected boolean subscribed;
+ protected ISubscribedService svc;
protected boolean disposed;
+ public synchronized void subscribed(ISubscribedService svc) {
+ if (disposed)
+ throw new IllegalStateException("Already disposed.");
+
+ this.svc = svc;
+ callback.subscribed(new Subscription());
+ }
+
public void init(ISharedObjectConfig config) throws SharedObjectInitException {
Map props = config.getProperties();
@@ -76,18 +85,10 @@ public class SubscriptionAgent extends PlatformObject implements ISharedObject {
ISharedObjectActivatedEvent e = (ISharedObjectActivatedEvent) event;
if (e.getActivatedID().equals(config.getSharedObjectID()))
activated();
- else
- activated(e.getActivatedID());
} else if (event instanceof ISharedObjectDeactivatedEvent) {
ISharedObjectDeactivatedEvent e = (ISharedObjectDeactivatedEvent) event;
if (e.getDeactivatedID().equals(config.getSharedObjectID()))
deactivated();
- } else if (event instanceof IContainerDisconnectedEvent) {
- IContainerDisconnectedEvent e = (IContainerDisconnectedEvent) event;
- if (e.getTargetID().equals(e.getLocalContainerID()))
- disconnected();
- else
- disconnected(e.getTargetID());
} else if (event instanceof ISharedObjectCreateResponseEvent)
received((ISharedObjectCreateResponseEvent) event);
}
@@ -101,7 +102,6 @@ public class SubscriptionAgent extends PlatformObject implements ISharedObject {
if (isPrimary()) {
try {
ctx.sendCreate(containerID, createReplicaDescription());
- // TODO set timer to time out if no response received within some bound
} catch (IOException e) {
callback.requestFailed(e);
ctx.getSharedObjectManager().removeSharedObject(config.getSharedObjectID());
@@ -116,55 +116,33 @@ public class SubscriptionAgent extends PlatformObject implements ISharedObject {
ID homeContainerID = config.getHomeContainerID();
if (so instanceof IPublishedService) {
IPublishedService svc = (IPublishedService) so;
- svc.subscribe(homeContainerID);
- subscribed = true;
+ svc.subscribe(homeContainerID, config.getSharedObjectID());
} else {
ctx.sendCreateResponse(homeContainerID, new IllegalArgumentException("Not an IPublishedService."), -1);
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
-
+ } finally {
ctx.getSharedObjectManager().removeSharedObject(config.getSharedObjectID());
}
}
-
- protected void activated(ID sharedObjectID) {
- if (isPrimary() && sharedObjectID.equals(this.sharedObjectID))
- callback.subscribed(new Subscription());
- }
protected void deactivated() {
if (isPrimary()) {
synchronized (this) {
disposed = true;
- }
-
- return;
- }
-
- if (subscribed) {
- ISharedObject so = config.getContext().getSharedObjectManager().getSharedObject(sharedObjectID);
- if (so instanceof IPublishedService) {
- IPublishedService svc = (IPublishedService) so;
- svc.unsubscribe(config.getHomeContainerID());
- subscribed = false;
+ if (svc != null)
+ svc.unsubscribe(config.getSharedObjectID());
}
}
}
- protected void disconnected() {
- config.getContext().getSharedObjectManager().removeSharedObject(config.getSharedObjectID());
- }
-
- protected void disconnected(ID containerID) {
- if (containerID.equals(config.getHomeContainerID()) || containerID.equals(this.containerID))
- config.getContext().getSharedObjectManager().removeSharedObject(config.getSharedObjectID());
- }
-
protected void received(ISharedObjectCreateResponseEvent e) {
- if (e.getRemoteContainerID().equals(containerID) && e.getSenderSharedObjectID().equals(config.getSharedObjectID()))
+ if (e.getRemoteContainerID().equals(containerID) && e.getSenderSharedObjectID().equals(config.getSharedObjectID())) {
callback.requestFailed(e.getException());
+ config.getContext().getSharedObjectManager().removeSharedObject(config.getSharedObjectID());
+ }
}
protected ReplicaSharedObjectDescription createReplicaDescription() {
@@ -180,6 +158,8 @@ public class SubscriptionAgent extends PlatformObject implements ISharedObject {
public void dispose(ID containerID) {
config = null;
+ callback = null;
+ svc = null;
}
protected class Subscription implements ISubscription {
@@ -191,24 +171,16 @@ public class SubscriptionAgent extends PlatformObject implements ISharedObject {
public ID getHomeContainerID() {
return containerID;
}
+
+ public ISubscribedService getSubscribedService() {
+ return svc;
+ }
public void dispose() {
synchronized (SubscriptionAgent.this) {
- if (disposed)
- return;
-
- disposed = true;
- }
-
- ISharedObjectContext ctx = config.getContext();
- try {
- ctx.sendDispose(containerID);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ if (!disposed)
+ config.getContext().getSharedObjectManager().removeSharedObject(config.getSharedObjectID());
}
-
- ctx.getSharedObjectManager().removeSharedObject(config.getSharedObjectID());
}
}
}
diff --git a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/impl/UnsubscribeMessage.java b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/impl/UnsubscribeMessage.java
new file mode 100644
index 000000000..09514696b
--- /dev/null
+++ b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/impl/UnsubscribeMessage.java
@@ -0,0 +1,8 @@
+package org.eclipse.ecf.pubsub.impl;
+
+import java.io.Serializable;
+
+public class UnsubscribeMessage implements Serializable {
+
+ private static final long serialVersionUID = -3516868138602600800L;
+}
diff --git a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/model/impl/AgentBase.java b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/model/impl/AgentBase.java
index ea9378cb5..34c0a8957 100644
--- a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/model/impl/AgentBase.java
+++ b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/model/impl/AgentBase.java
@@ -32,6 +32,8 @@ public abstract class AgentBase extends PlatformObject implements ISharedObject,
public static final Object MODEL_UPDATER_KEY = new Integer(1);
+ protected static final Object REQUESTOR_ID = new Integer(2);
+
protected ISharedObjectConfig config;
protected Object data;
diff --git a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/model/impl/LocalAgent.java b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/model/impl/LocalAgent.java
index 148f00686..7e187c4c1 100644
--- a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/model/impl/LocalAgent.java
+++ b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/model/impl/LocalAgent.java
@@ -11,8 +11,10 @@
package org.eclipse.ecf.pubsub.model.impl;
import java.io.IOException;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import org.eclipse.ecf.core.ISharedObjectContext;
@@ -21,11 +23,13 @@ import org.eclipse.ecf.core.SharedObjectInitException;
import org.eclipse.ecf.core.identity.ID;
import org.eclipse.ecf.example.pubsub.SerializationUtil;
import org.eclipse.ecf.pubsub.IPublishedService;
+import org.eclipse.ecf.pubsub.impl.SubscribeMessage;
+import org.eclipse.ecf.pubsub.impl.UnsubscribeMessage;
import org.eclipse.ecf.pubsub.model.IMasterModel;
public class LocalAgent extends AgentBase implements IPublishedService, IMasterModel {
- protected Map subscriptions;
+ protected Collection subscriptions;
private final Object subscriptionMutex = new Object();
@@ -41,52 +45,57 @@ public class LocalAgent extends AgentBase implements IPublishedService, IMasterM
return Collections.EMPTY_MAP;
}
- public void subscribe(ID containerID) {
+ public void subscribe(ID containerID, ID requestorID) {
synchronized (subscriptionMutex) {
if (subscriptions == null)
- subscriptions = new HashMap();
-
- Integer refCount = (Integer) subscriptions.get(containerID);
- if (refCount == null)
- refCount = new Integer(0);
-
- refCount = new Integer(refCount.intValue() + 1);
- subscriptions.put(containerID, refCount);
+ subscriptions = new HashSet();
+
+ ISharedObjectContext ctx = config.getContext();
+ try {
+ if (subscriptions.add(containerID)) {
+ ctx.sendCreate(containerID, createRemoteAgentDescription(requestorID));
+ } else {
+ SubscribeMessage msg = new SubscribeMessage(requestorID);
+ ctx.sendMessage(containerID, SerializationUtil.serialize(msg));
+ }
+ } catch (IOException e) {
+ // TODO Log me!
+ e.printStackTrace();
+ }
}
+ }
+
+ protected void received(ID containerID, Object data) {
+ if (!(data instanceof byte[]))
+ return;
- ISharedObjectContext ctx = config.getContext();
+ Object msg;
try {
- ctx.sendCreate(containerID, createRemoteAgentDescription());
+ msg = SerializationUtil.deserialize((byte[]) data);
} catch (IOException e) {
- // TODO Log me!
+ // TODO Auto-generated catch block
e.printStackTrace();
+ return;
+ } catch (ClassNotFoundException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ return;
+ }
+
+ if (!(msg instanceof UnsubscribeMessage))
+ return;
+
+ synchronized (subscriptionMutex) {
+ if (subscriptions != null)
+ subscriptions.remove(containerID);
}
}
- public void unsubscribe(ID containerID) {
- boolean disposeReplica = false;
+ protected void disconnected(ID containerID) {
synchronized (subscriptionMutex) {
- if (subscriptions != null) {
- Integer refCount = (Integer) subscriptions.get(containerID);
- if (refCount != null) {
- refCount = new Integer(refCount.intValue() - 1);
- if (refCount.intValue() <= 0) {
- subscriptions.remove(containerID);
- disposeReplica = true;
- } else {
- subscriptions.put(containerID, refCount);
- }
- }
- }
+ if (subscriptions != null)
+ subscriptions.remove(containerID);
}
-
- if (disposeReplica)
- try {
- config.getContext().sendDispose(containerID);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
}
protected void deactivated() {
@@ -99,8 +108,8 @@ public class LocalAgent extends AgentBase implements IPublishedService, IMasterM
}
}
- protected ReplicaSharedObjectDescription createRemoteAgentDescription() {
- Map props = new HashMap(2);
+ protected ReplicaSharedObjectDescription createRemoteAgentDescription(ID requestorID) {
+ Map props = new HashMap(3);
try {
props.put(INITIAL_DATA_KEY, SerializationUtil.serialize(data));
} catch (IOException e) {
@@ -109,6 +118,7 @@ public class LocalAgent extends AgentBase implements IPublishedService, IMasterM
}
props.put(MODEL_UPDATER_KEY, updaterID);
+ props.put(REQUESTOR_ID, requestorID);
return new ReplicaSharedObjectDescription(RemoteAgent.class, config.getSharedObjectID(), config.getHomeContainerID(), props);
}
}
diff --git a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/model/impl/RemoteAgent.java b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/model/impl/RemoteAgent.java
index 40f0f20b0..16b99b924 100644
--- a/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/model/impl/RemoteAgent.java
+++ b/examples/bundles/org.eclipse.ecf.example.pubsub/src/org/eclipse/ecf/pubsub/model/impl/RemoteAgent.java
@@ -11,18 +11,58 @@
package org.eclipse.ecf.pubsub.model.impl;
import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
import org.eclipse.core.runtime.CoreException;
import org.eclipse.core.runtime.IConfigurationElement;
import org.eclipse.core.runtime.IExtensionRegistry;
import org.eclipse.core.runtime.Platform;
+import org.eclipse.ecf.core.ISharedObjectConfig;
+import org.eclipse.ecf.core.ISharedObjectContext;
import org.eclipse.ecf.core.SharedObjectInitException;
import org.eclipse.ecf.core.identity.ID;
import org.eclipse.ecf.example.pubsub.SerializationUtil;
+import org.eclipse.ecf.pubsub.ISubscribedService;
+import org.eclipse.ecf.pubsub.ISubscriber;
+import org.eclipse.ecf.pubsub.impl.SubscribeMessage;
+import org.eclipse.ecf.pubsub.impl.UnsubscribeMessage;
import org.eclipse.ecf.pubsub.model.IModelUpdater;
import org.eclipse.ecf.pubsub.model.IReplicaModel;
-public class RemoteAgent extends AgentBase implements IReplicaModel {
+public class RemoteAgent extends AgentBase implements IReplicaModel, ISubscribedService {
+
+ private Collection subscribers;
+
+ private final Object subscriptionMutex = new Object();
+
+ public void unsubscribe(ID requestorID) {
+ synchronized (subscriptionMutex) {
+ if (subscribers == null)
+ return;
+
+ subscribers.remove(requestorID);
+ if (subscribers.isEmpty()) {
+ ISharedObjectContext ctx = config.getContext();
+ try {
+ ctx.sendMessage(config.getHomeContainerID(), new UnsubscribeMessage());
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ ctx.getSharedObjectManager().removeSharedObject(config.getSharedObjectID());
+ }
+ }
+ }
+
+ public void init(ISharedObjectConfig config) throws SharedObjectInitException {
+ super.init(config);
+ subscribers = new HashSet();
+ ID requestorID = (ID) config.getProperties().get(REQUESTOR_ID);
+ if (requestorID != null)
+ subscribers.add(requestorID);
+ }
protected void initializeData(Object data) throws SharedObjectInitException {
try {
@@ -58,6 +98,15 @@ public class RemoteAgent extends AgentBase implements IReplicaModel {
throw new SharedObjectInitException("Could not find specified Model Updater.");
}
+ protected void activated() {
+ ID requestorID = (ID) config.getProperties().get(REQUESTOR_ID);
+ if (requestorID != null) {
+ Object svc = config.getContext().getSharedObjectManager().getSharedObject(requestorID);
+ if (svc instanceof ISubscriber)
+ ((ISubscriber) svc).subscribed(this);
+ }
+ }
+
protected void disconnected() {
config.getContext().getSharedObjectManager().removeSharedObject(config.getSharedObjectID());
}
@@ -68,6 +117,18 @@ public class RemoteAgent extends AgentBase implements IReplicaModel {
}
protected void received(ID containerID, Object data) {
+ if (!(data instanceof byte[]))
+ return;
+
+ if (data instanceof SubscribeMessage) {
+ SubscribeMessage msg = (SubscribeMessage) data;
+ synchronized (subscriptionMutex) {
+ subscribers.add(msg.getRequestorID());
+ }
+
+ return;
+ }
+
try {
updater.update(this.data, SerializationUtil.deserialize((byte[]) data));
} catch (IOException e) {

Back to the top