Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorpnehrer2005-05-24 21:34:13 +0000
committerpnehrer2005-05-24 21:34:13 +0000
commitd5458aa477ce600b10cbf0b2f18a0ab453c4e48e (patch)
treea39382d26eab7d91096217eefe11cf5930337c18
parentf8421ee28241a9e15b7e84c8dff48152c657778c (diff)
downloadorg.eclipse.ecf-d5458aa477ce600b10cbf0b2f18a0ab453c4e48e.tar.gz
org.eclipse.ecf-d5458aa477ce600b10cbf0b2f18a0ab453c4e48e.tar.xz
org.eclipse.ecf-d5458aa477ce600b10cbf0b2f18a0ab453c4e48e.zip
Checkpoint.
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/.options2
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/META-INF/MANIFEST.MF8
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/build.properties2
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/notes.txt22
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/plugin.xml15
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/schema/servicemanager.exsd (renamed from framework/bundles/org.eclipse.ecf.datashare/schema/manager.exsd)2
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/schema/updateprovider.exsd117
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/IDataShareService.java19
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/IUpdateProvider.java30
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/IUpdateProviderFactory.java23
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/UpdateProviderRegistry.java45
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Abort.java34
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/AbstractMulticaster.java331
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Ack.java34
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Commit.java34
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/ConsistentMulticaster.java395
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Message.java (renamed from framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/Update.java)38
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/OrderedMulticaster.java329
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Pause.java23
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Paused.java22
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Reply.java42
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Request.java34
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Resume.java23
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/SimpleMulticaster.java47
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Timeout.java37
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Version.java59
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/util/NotifyingSet.java150
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/util/TrackedSet.java104
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/util/TrackedSetUpdateProvider.java71
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/Agent.java105
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/DataSharePlugin.java367
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/DataShareService.java91
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/LazyElectionBootstrap.java5
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/ServiceManager.java69
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/Version.java70
35 files changed, 2458 insertions, 341 deletions
diff --git a/framework/bundles/org.eclipse.ecf.datashare/.options b/framework/bundles/org.eclipse.ecf.datashare/.options
new file mode 100644
index 000000000..b9beb4172
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.datashare/.options
@@ -0,0 +1,2 @@
+org.eclipse.ecf.datashare/debug=true
+org.eclipse.ecf.datashare/ConsistentMulticaster=true
diff --git a/framework/bundles/org.eclipse.ecf.datashare/META-INF/MANIFEST.MF b/framework/bundles/org.eclipse.ecf.datashare/META-INF/MANIFEST.MF
index a1197e513..e79bfa983 100644
--- a/framework/bundles/org.eclipse.ecf.datashare/META-INF/MANIFEST.MF
+++ b/framework/bundles/org.eclipse.ecf.datashare/META-INF/MANIFEST.MF
@@ -4,9 +4,13 @@ Bundle-Name: ECF DataShare Plug-in
Bundle-SymbolicName: org.eclipse.ecf.datashare
Bundle-Version: 1.0.0
Bundle-ClassPath: datashare.jar
-Bundle-Activator: org.eclipse.ecf.internal.datashare.DataSharePlugin
+Bundle-Activator: org.eclipse.ecf.internal.datashare.DataSharePlugin$EclipsePlugin
Bundle-Vendor: Eclipse.org
Bundle-Localization: plugin
Require-Bundle: org.eclipse.core.runtime,
- org.eclipse.ecf
+ org.eclipse.ecf;visibility:=reexport
Eclipse-AutoStart: true
+Export-Package: org.eclipse.ecf.datashare,
+ org.eclipse.ecf.datashare.multicast,
+ org.eclipse.ecf.datashare.util,
+ org.eclipse.ecf.internal.datashare
diff --git a/framework/bundles/org.eclipse.ecf.datashare/build.properties b/framework/bundles/org.eclipse.ecf.datashare/build.properties
index 0b4cfbcea..4578b1dd7 100644
--- a/framework/bundles/org.eclipse.ecf.datashare/build.properties
+++ b/framework/bundles/org.eclipse.ecf.datashare/build.properties
@@ -3,4 +3,4 @@ output.datashare.jar = bin/
bin.includes = META-INF/,\
datashare.jar,\
schema/,\
- build.properties
+ plugin.xml
diff --git a/framework/bundles/org.eclipse.ecf.datashare/notes.txt b/framework/bundles/org.eclipse.ecf.datashare/notes.txt
new file mode 100644
index 000000000..d929e526d
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.datashare/notes.txt
@@ -0,0 +1,22 @@
+Service
+- create/obtain Agents
+- manage subscriptions
+
+Agent
+- create remote replicas when/where needed
+- stay alive only when connected
+- client facade
+- coordinate sub-components to implement the service
+
+Synchronizer
+- pause/resume messages (commits)
+
+Coordinator
+- ensures there's exactly one coordinator replica (when needed)
+
+Multicaster
+- implements 1/2/3-phase "commit" protocol
+
+Disposer (Garbage Collector)
+- dispose Agents with no subscriptions
+
diff --git a/framework/bundles/org.eclipse.ecf.datashare/plugin.xml b/framework/bundles/org.eclipse.ecf.datashare/plugin.xml
index 19c6ead29..a62303377 100644
--- a/framework/bundles/org.eclipse.ecf.datashare/plugin.xml
+++ b/framework/bundles/org.eclipse.ecf.datashare/plugin.xml
@@ -1,6 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<?eclipse version="3.0"?>
<plugin>
- <extension-point id="manager" name="DataShare Manager" schema="schema/manager.exsd"/>
+ <extension-point id="servicemanager" name="DataShare Service Manager" schema="schema/servicemanager.exsd"/>
+ <extension-point id="updateprovider" name="DataShare Update Provider" schema="schema/updateprovider.exsd"/>
+ <extension
+ point="org.eclipse.ecf.datashare.servicemanager">
+ <manager
+ class="org.eclipse.ecf.internal.datashare.ServiceManager"
+ name="org.eclipse.ecf.datashare.default"/>
+ </extension>
+ <extension
+ point="org.eclipse.ecf.datashare.updateprovider">
+ <updateProvider
+ factory="org.eclipse.ecf.datashare.util.TrackedSetUpdateProvider$Factory"
+ id="org.eclipse.ecf.datashare.util.TrackedSet"/>
+ </extension>
</plugin>
diff --git a/framework/bundles/org.eclipse.ecf.datashare/schema/manager.exsd b/framework/bundles/org.eclipse.ecf.datashare/schema/servicemanager.exsd
index e1a5e2aec..5444bd54a 100644
--- a/framework/bundles/org.eclipse.ecf.datashare/schema/manager.exsd
+++ b/framework/bundles/org.eclipse.ecf.datashare/schema/servicemanager.exsd
@@ -3,7 +3,7 @@
<schema targetNamespace="org.eclipse.ecf.datashare">
<annotation>
<appInfo>
- <meta.schema plugin="org.eclipse.ecf.datashare" id="manager" name="Data Graph Sharing Manager"/>
+ <meta.schema plugin="org.eclipse.ecf.datashare" id="manager" name="DataShare Service Manager"/>
</appInfo>
<documentation>
Allows plugins to provide custom implementations of the Data Graph Sharing facility by registering uniquely named Data Graph Sharing Managers (classes that implement &lt;code&gt;org.eclipse.ecf.sdo.IDataGraphSharingManager&lt;/code&gt;), which are responsible for producing per-container instances of the Data Graph Sharing service implementation (&lt;code&gt;org.eclipse.ecf.sdo.IDataGraphSharing&lt;/code&gt;).
diff --git a/framework/bundles/org.eclipse.ecf.datashare/schema/updateprovider.exsd b/framework/bundles/org.eclipse.ecf.datashare/schema/updateprovider.exsd
new file mode 100644
index 000000000..2c189fbfa
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.datashare/schema/updateprovider.exsd
@@ -0,0 +1,117 @@
+<?xml version='1.0' encoding='UTF-8'?>
+<!-- Schema file written by PDE -->
+<schema targetNamespace="org.eclipse.ecf.datashare">
+<annotation>
+ <appInfo>
+ <meta.schema plugin="org.eclipse.ecf.datashare" id="updateprovider" name="DataShare Update Provider"/>
+ </appInfo>
+ <documentation>
+ [Enter description of this extension point.]
+ </documentation>
+ </annotation>
+
+ <element name="extension">
+ <complexType>
+ <sequence>
+ <element ref="updateProvider" minOccurs="1" maxOccurs="unbounded"/>
+ </sequence>
+ <attribute name="point" type="string" use="required">
+ <annotation>
+ <documentation>
+
+ </documentation>
+ </annotation>
+ </attribute>
+ <attribute name="id" type="string">
+ <annotation>
+ <documentation>
+
+ </documentation>
+ </annotation>
+ </attribute>
+ <attribute name="name" type="string">
+ <annotation>
+ <documentation>
+
+ </documentation>
+ <appInfo>
+ <meta.attribute translatable="true"/>
+ </appInfo>
+ </annotation>
+ </attribute>
+ </complexType>
+ </element>
+
+ <element name="updateProvider">
+ <annotation>
+ <appInfo>
+ <meta.element labelAttribute="id"/>
+ </appInfo>
+ </annotation>
+ <complexType>
+ <attribute name="id" type="string" use="required">
+ <annotation>
+ <documentation>
+
+ </documentation>
+ </annotation>
+ </attribute>
+ <attribute name="factory" type="string" use="required">
+ <annotation>
+ <documentation>
+
+ </documentation>
+ <appInfo>
+ <meta.attribute kind="java" basedOn="org.eclipse.ecf.datashare.IUpdateProvider"/>
+ </appInfo>
+ </annotation>
+ </attribute>
+ </complexType>
+ </element>
+
+ <annotation>
+ <appInfo>
+ <meta.section type="since"/>
+ </appInfo>
+ <documentation>
+ [Enter the first release in which this extension point appears.]
+ </documentation>
+ </annotation>
+
+ <annotation>
+ <appInfo>
+ <meta.section type="examples"/>
+ </appInfo>
+ <documentation>
+ [Enter extension point usage example here.]
+ </documentation>
+ </annotation>
+
+ <annotation>
+ <appInfo>
+ <meta.section type="apiInfo"/>
+ </appInfo>
+ <documentation>
+ [Enter API information here.]
+ </documentation>
+ </annotation>
+
+ <annotation>
+ <appInfo>
+ <meta.section type="implementation"/>
+ </appInfo>
+ <documentation>
+ [Enter information about supplied implementation of this extension point.]
+ </documentation>
+ </annotation>
+
+ <annotation>
+ <appInfo>
+ <meta.section type="copyright"/>
+ </appInfo>
+ <documentation>
+
+ </documentation>
+ </annotation>
+
+</schema>
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/IDataShareService.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/IDataShareService.java
index afa10f171..8da643270 100644
--- a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/IDataShareService.java
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/IDataShareService.java
@@ -22,7 +22,6 @@ public interface IDataShareService {
/**
* Publishes the given data graph under the given id.
- *
* @param dataGraph
* local data graph instance to share
* @param id
@@ -30,36 +29,28 @@ public interface IDataShareService {
* @param provider
* update provider compatible with the given data graph's
* implementation
- * @param consumer
- * application-specific update consumer
* @param callback
* optional callback used to notify the caller about publication
* status
- * @return shared data graph
+ *
* @throws ECFException
*/
- ISharedData publish(Object dataGraph, ID id, IUpdateProvider provider,
- IUpdateListener consumer, IPublicationCallback callback)
+ void publish(Object dataGraph, ID id, IUpdateProvider provider,
+ IPublicationCallback callback)
throws ECFException;
/**
* Subscribes to a data graph with the given id.
- *
* @param id
* identifier of a previously-published data graph
- * @param provider
- * update provider compatible with the given data graph's
- * implementation
- * @param consumer
- * application-specific update consumer
* @param callback
* optional callback used to notify the caller about subscription
* status
+ *
* @return shared data graph
* @throws ECFException
*/
- ISharedData subscribe(ID id, IUpdateProvider provider,
- IUpdateListener consumer, ISubscriptionCallback callback)
+ ISharedData subscribe(ID id, ISubscriptionCallback callback)
throws ECFException;
/**
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/IUpdateProvider.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/IUpdateProvider.java
index 2c3c2a5f2..ea258a562 100644
--- a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/IUpdateProvider.java
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/IUpdateProvider.java
@@ -10,8 +10,6 @@
*******************************************************************************/
package org.eclipse.ecf.datashare;
-import java.io.IOException;
-
import org.eclipse.ecf.core.util.ECFException;
/**
@@ -30,7 +28,9 @@ import org.eclipse.ecf.core.util.ECFException;
* @author pnehrer
*/
public interface IUpdateProvider {
-
+
+ IUpdateProviderFactory getFactory();
+
/**
* Creates an update from the given data graph, which will be forwarded to
* other group members. The implementor may use the graph's Change Summary
@@ -42,7 +42,7 @@ public interface IUpdateProvider {
* @throws ECFException
* when an update cannot be created
*/
- byte[] createUpdate(ISharedData graph) throws ECFException;
+ Object createUpdate(ISharedData graph) throws ECFException;
/**
* Applies the remote update to the given data graph. The implementor is
@@ -56,26 +56,4 @@ public interface IUpdateProvider {
* when this update cannot be applied
*/
void applyUpdate(ISharedData graph, Object data) throws ECFException;
-
- /**
- * Serializes the given data graph.
- *
- * @param graph
- * data graph instance to serialize
- * @return serialized data graph
- * @throws IOException
- */
- Object serializeData(Object graph) throws IOException;
-
- /**
- * Deserializes the given data graph.
- *
- * @param data
- * serialized data graph
- * @return deserialized instance of data graph
- * @throws IOException
- * @throws ClassNotFoundException
- */
- Object deserializeData(Object data) throws IOException,
- ClassNotFoundException;
}
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/IUpdateProviderFactory.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/IUpdateProviderFactory.java
new file mode 100644
index 000000000..bbb23679e
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/IUpdateProviderFactory.java
@@ -0,0 +1,23 @@
+/*******************************************************************************
+ * Copyright (c) 2005 Peter Nehrer and Composent, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Peter Nehrer - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.ecf.datashare;
+
+import java.util.Map;
+
+/**
+ * @author pnehrer
+ */
+public interface IUpdateProviderFactory {
+
+ String getID();
+
+ IUpdateProvider createProvider(Map params);
+}
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/UpdateProviderRegistry.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/UpdateProviderRegistry.java
new file mode 100644
index 000000000..2015adc2b
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/UpdateProviderRegistry.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Copyright (c) 2005 Peter Nehrer and Composent, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Peter Nehrer - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.ecf.datashare;
+
+import java.util.Hashtable;
+import java.util.Map;
+
+/**
+ * @author pnehrer
+ */
+public class UpdateProviderRegistry {
+
+ private static final Hashtable providers = new Hashtable();
+
+ private UpdateProviderRegistry() {
+ }
+
+ public static IUpdateProvider createProvider(String id, Map params) {
+ IUpdateProviderFactory f = (IUpdateProviderFactory) providers.get(id);
+ if (f == null)
+ return null;
+ else
+ return f.createProvider(params);
+ }
+
+ public static void registerFactory(String id, IUpdateProviderFactory f) {
+ providers.put(id, f);
+ }
+
+ public static void unregisterFactory(String id) {
+ providers.remove(id);
+ }
+
+ public static void unregisterAllFactories() {
+ providers.clear();
+ }
+}
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Abort.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Abort.java
new file mode 100644
index 000000000..1fb62f8fc
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Abort.java
@@ -0,0 +1,34 @@
+/*******************************************************************************
+ * Copyright (c) 2005 Peter Nehrer and Composent, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Peter Nehrer - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.ecf.datashare.multicast;
+
+import java.io.Serializable;
+
+public class Abort implements Serializable {
+
+ private static final long serialVersionUID = 3258410616858294325L;
+
+ final Version version;
+
+ public Abort(Version version) {
+ this.version = version;
+ }
+
+ public Version getVersion() {
+ return version;
+ }
+
+ public String toString() {
+ StringBuffer buf = new StringBuffer("Abort[version=");
+ buf.append(version).append(']');
+ return buf.toString();
+ }
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/AbstractMulticaster.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/AbstractMulticaster.java
new file mode 100644
index 000000000..a8d4e01fd
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/AbstractMulticaster.java
@@ -0,0 +1,331 @@
+/*******************************************************************************
+ * Copyright (c) 2005 Peter Nehrer and Composent, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Peter Nehrer - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.ecf.datashare.multicast;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.eclipse.ecf.core.ISharedObject;
+import org.eclipse.ecf.core.ISharedObjectConfig;
+import org.eclipse.ecf.core.ISharedObjectContext;
+import org.eclipse.ecf.core.SharedObjectInitException;
+import org.eclipse.ecf.core.events.ISharedObjectActivatedEvent;
+import org.eclipse.ecf.core.events.ISharedObjectContainerDepartedEvent;
+import org.eclipse.ecf.core.events.ISharedObjectDeactivatedEvent;
+import org.eclipse.ecf.core.events.ISharedObjectMessageEvent;
+import org.eclipse.ecf.core.identity.ID;
+import org.eclipse.ecf.core.util.ECFException;
+import org.eclipse.ecf.core.util.Event;
+import org.eclipse.ecf.internal.datashare.DataSharePlugin;
+
+/**
+ * @author pnehrer
+ */
+public abstract class AbstractMulticaster implements ISharedObject {
+
+ public static final short NEW = 0;
+
+ public static final short READY = 1;
+
+ public static final short PAUSED = 2;
+
+ public static final short DISPOSED = 3;
+
+ public class Testable {
+
+ public Version getVersion() {
+ return version;
+ }
+
+ public short getState() {
+ return state;
+ }
+
+ public String getStateCode() {
+ return getStateStr();
+ }
+ }
+
+ protected ISharedObjectConfig config;
+
+ protected ID sharedObjectID;
+
+ protected ISharedObjectContext context;
+
+ protected ID localContainerID;
+
+ protected ID groupID;
+
+ protected Version version;
+
+ protected short state = NEW;
+
+ protected final HashSet pauses = new HashSet();
+
+ protected HashSet pauseRequests;
+
+ protected Testable testable;
+
+ public abstract boolean sendMessage(Object message) throws ECFException;
+
+ public synchronized void pause() throws ECFException, IllegalStateException {
+ if (pauses.contains(localContainerID))
+ throw new IllegalStateException();
+
+ while (state == NEW) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new ECFException(e);
+ }
+ }
+
+ if (state == DISPOSED)
+ throw new IllegalStateException();
+
+ boolean wasEmpty = pauses.isEmpty();
+ pauses.add(localContainerID);
+ pauseRequests = new HashSet(Arrays.asList(context.getGroupMemberIDs()));
+ pauseRequests.remove(localContainerID);
+ try {
+ context.sendMessage(null, new Pause());
+ synchronized (pauses) {
+ pauses.wait(1000);
+ }
+
+ if (!pauseRequests.isEmpty())
+ throw new ECFException("Failed to pause.");
+ } catch (IOException e) {
+ pauses.remove(localContainerID);
+ throw new ECFException(e);
+ } catch (InterruptedException e) {
+ pauses.remove(localContainerID);
+ throw new ECFException(e);
+ } finally {
+ if (wasEmpty && !pauses.isEmpty())
+ notify();
+ }
+ }
+
+ public synchronized void resume() throws ECFException {
+ if (state == DISPOSED)
+ throw new IllegalStateException();
+
+ if (!pauses.contains(localContainerID))
+ throw new IllegalStateException();
+
+ try {
+ context.sendMessage(null, new Resume());
+ pauses.remove(localContainerID);
+ } catch (IOException e) {
+ throw new ECFException(e);
+ } finally {
+ if (pauses.isEmpty())
+ notify();
+ }
+ }
+
+ protected abstract void receiveMessage(Object message);
+
+ protected synchronized boolean waitToSend() {
+ while (state != READY || !pauses.isEmpty()) {
+ if (state == DISPOSED)
+ return false;
+
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ DataSharePlugin.log(e);
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ protected void traceEntry(String method) {
+ StringBuffer buf = new StringBuffer("> ");
+ buf.append(getStateStr());
+ buf.append(' ');
+ buf.append(localContainerID);
+ buf.append(": ");
+ buf.append(method);
+ DataSharePlugin.getTraceLog().println(buf);
+ }
+
+ protected void traceExit(String method) {
+ StringBuffer buf = new StringBuffer("< ");
+ buf.append(getStateStr());
+ buf.append(' ');
+ buf.append(localContainerID);
+ buf.append(": ");
+ buf.append(method);
+ DataSharePlugin.getTraceLog().println(buf);
+ }
+
+ protected String getStateStr() {
+ switch (state) {
+ case NEW:
+ return "NEW";
+ case READY:
+ return "RDY";
+ case DISPOSED:
+ return "DSP";
+ default:
+ return "UNK";
+ }
+ }
+
+ public synchronized Testable getTestable() {
+ if (testable == null)
+ testable = new Testable();
+
+ return testable;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObject#init(org.eclipse.ecf.core.ISharedObjectConfig)
+ */
+ public synchronized void init(ISharedObjectConfig config)
+ throws SharedObjectInitException {
+ this.config = config;
+ sharedObjectID = config.getSharedObjectID();
+ Map params = config.getProperties();
+ if (params != null) {
+ Object param = params.get("version");
+ if (param instanceof Version)
+ version = (Version) param;
+ }
+
+ if (version == null)
+ version = new Version(sharedObjectID, 0);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObject#handleEvent(org.eclipse.ecf.core.util.Event)
+ */
+ public void handleEvent(Event event) {
+ if (event instanceof ISharedObjectActivatedEvent)
+ handleActivated((ISharedObjectActivatedEvent) event);
+ else if (event instanceof ISharedObjectDeactivatedEvent)
+ handleDeactivated((ISharedObjectDeactivatedEvent) event);
+ else if (event instanceof ISharedObjectContainerDepartedEvent)
+ handleDeparted((ISharedObjectContainerDepartedEvent) event);
+ else if (event instanceof ISharedObjectMessageEvent) {
+ ISharedObjectMessageEvent e = (ISharedObjectMessageEvent) event;
+ if (e.getData() instanceof Message)
+ handleMessage(e.getRemoteContainerID(), (Message) e.getData());
+ else if (e.getData() instanceof Pause)
+ handlePause(e.getRemoteContainerID(), (Pause) e.getData());
+ else if (e.getData() instanceof Paused)
+ handlePaused(e.getRemoteContainerID(), (Paused) e.getData());
+ else if (e.getData() instanceof Resume)
+ handleResume(e.getRemoteContainerID(), (Resume) e.getData());
+ }
+ }
+
+ protected void handleActivated(ISharedObjectActivatedEvent event) {
+ if (event.getActivatedID().equals(sharedObjectID)) {
+ context = config.getContext();
+ localContainerID = context.getLocalContainerID();
+ groupID = context.getGroupID();
+ if (groupID == null)
+ try {
+ context.sendDispose(localContainerID);
+ } catch (IOException e) {
+ DataSharePlugin.log(e);
+ }
+ else {
+ synchronized (this) {
+ state = READY;
+ notifyAll();
+ }
+ }
+ }
+ }
+
+ protected void handleDeactivated(ISharedObjectDeactivatedEvent event) {
+ if (event.getDeactivatedID().equals(sharedObjectID)) {
+ synchronized (this) {
+ state = DISPOSED;
+ notifyAll();
+ }
+ }
+ }
+
+ protected void handleDeparted(ISharedObjectContainerDepartedEvent event) {
+ if (event.getDepartedContainerID().equals(localContainerID))
+ context.getSharedObjectManager().removeSharedObject(sharedObjectID);
+ }
+
+ protected synchronized void handlePause(ID remoteContainerID, Pause pause) {
+ if (pauses.isEmpty())
+ notify();
+
+ pauses.add(remoteContainerID);
+ }
+
+ protected synchronized void handlePaused(ID remoteContainerID, Paused paused) {
+ if (pauses.contains(localContainerID)
+ && pauseRequests != null
+ && pauseRequests.remove(remoteContainerID)
+ && pauseRequests.isEmpty())
+ synchronized (pauses) {
+ pauses.notify();
+ }
+ }
+
+ protected synchronized void handleResume(ID remoteContainerID, Resume resume) {
+ if (pauses.remove(remoteContainerID) && pauses.isEmpty())
+ notify();
+ }
+
+ protected void handleMessage(ID remoteContainerID, Message message) {
+ synchronized (this) {
+ version = message.getVersion();
+ }
+
+ receiveMessage(message.getData());
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObject#handleEvents(org.eclipse.ecf.core.util.Event[])
+ */
+ public void handleEvents(Event[] events) {
+ for (int i = 0; i < events.length; ++i)
+ handleEvent(events[i]);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObject#dispose(org.eclipse.ecf.core.identity.ID)
+ */
+ public void dispose(ID containerID) {
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObject#getAdapter(java.lang.Class)
+ */
+ public Object getAdapter(Class clazz) {
+ return null;
+ }
+}
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Ack.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Ack.java
new file mode 100644
index 000000000..c61ebcec3
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Ack.java
@@ -0,0 +1,34 @@
+/*******************************************************************************
+ * Copyright (c) 2005 Peter Nehrer and Composent, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Peter Nehrer - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.ecf.datashare.multicast;
+
+import java.io.Serializable;
+
+public class Ack implements Serializable {
+
+ private static final long serialVersionUID = 3832621776860952377L;
+
+ final Version version;
+
+ public Ack(Version version) {
+ this.version = version;
+ }
+
+ public Version getVersion() {
+ return version;
+ }
+
+ public String toString() {
+ StringBuffer buf = new StringBuffer("Ack[version=");
+ buf.append(version).append(']');
+ return buf.toString();
+ }
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Commit.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Commit.java
new file mode 100644
index 000000000..d66c30b6b
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Commit.java
@@ -0,0 +1,34 @@
+/*******************************************************************************
+ * Copyright (c) 2005 Peter Nehrer and Composent, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Peter Nehrer - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.ecf.datashare.multicast;
+
+import java.io.Serializable;
+
+public class Commit implements Serializable {
+
+ private static final long serialVersionUID = 3258126938529740848L;
+
+ final Version version;
+
+ public Commit(Version version) {
+ this.version = version;
+ }
+
+ public Version getVersion() {
+ return version;
+ }
+
+ public String toString() {
+ StringBuffer buf = new StringBuffer("Commit[version=");
+ buf.append(version).append(']');
+ return buf.toString();
+ }
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/ConsistentMulticaster.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/ConsistentMulticaster.java
new file mode 100644
index 000000000..c9883c5f0
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/ConsistentMulticaster.java
@@ -0,0 +1,395 @@
+/*******************************************************************************
+ * Copyright (c) 2005 Peter Nehrer and Composent, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Peter Nehrer - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.ecf.datashare.multicast;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+
+import org.eclipse.ecf.core.ISharedObjectConfig;
+import org.eclipse.ecf.core.SharedObjectInitException;
+import org.eclipse.ecf.core.events.ISharedObjectContainerDepartedEvent;
+import org.eclipse.ecf.core.events.ISharedObjectMessageEvent;
+import org.eclipse.ecf.core.identity.ID;
+import org.eclipse.ecf.core.util.ECFException;
+import org.eclipse.ecf.core.util.Event;
+import org.eclipse.ecf.internal.datashare.DataSharePlugin;
+
+/**
+ * @author pnehrer
+ */
+public class ConsistentMulticaster extends AbstractMulticaster implements
+ Timeout.Listener {
+
+ public static final short SEND = 3;
+
+ public static final short RECEIVE = 4;
+
+ public static final long DEFAULT_TIMEOUT = 1000;
+
+ public static final String TRACE_TAG = "ConsistentMulticaster";
+
+ private long sendTimeout;
+
+ private long receiveTimeout;
+
+ private Version nextVersion;
+
+ private HashSet requests;
+
+ private final Timer timer = new Timer();
+
+ private final HashMap timeouts = new HashMap();
+
+ private final HashMap commits = new HashMap();
+
+ private boolean granted;
+
+ public synchronized boolean sendMessage(Object message) throws ECFException {
+ String method = null;
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceEntry(method = "sendMessage[message=" + message + "]");
+
+ try {
+ if (!waitToSend())
+ return false;
+
+ state = SEND;
+ nextVersion = new Version(localContainerID,
+ version.getSequence() + 1);
+ ArrayList others = new ArrayList(Arrays.asList(context
+ .getGroupMemberIDs()));
+ others.remove(localContainerID);
+ requests = new HashSet(others);
+ granted = true;
+ try {
+ context.sendMessage(null, new Request(nextVersion));
+ wait(sendTimeout);
+ if (state != SEND)
+ return false;
+
+ if (!granted || !requests.isEmpty()) {
+ context.sendMessage(null, new Abort(nextVersion));
+ return false;
+ }
+
+ requests.addAll(others);
+ context.sendMessage(null, new Message(nextVersion, message));
+ wait(sendTimeout);
+ if (state != SEND)
+ return false;
+
+ if (!requests.isEmpty()) {
+ context.sendMessage(null, new Abort(nextVersion));
+ return false;
+ }
+
+ context.sendMessage(null, new Commit(nextVersion));
+ version = nextVersion;
+ return true;
+ } catch (IOException e) {
+ throw new ECFException(e);
+ } catch (InterruptedException e) {
+ throw new ECFException(e);
+ } finally {
+ state = READY;
+ notify();
+ }
+ } finally {
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceExit(method);
+ }
+ }
+
+ public String getStateStr() {
+ switch (state) {
+ case SEND:
+ return "SND";
+ case RECEIVE:
+ return "RCV";
+ default:
+ return super.getStateStr();
+ }
+ }
+
+ protected void receiveMessage(Object message) {
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObject#init(org.eclipse.ecf.core.ISharedObjectConfig)
+ */
+ public synchronized void init(ISharedObjectConfig config)
+ throws SharedObjectInitException {
+ super.init(config);
+
+ sendTimeout = DEFAULT_TIMEOUT;
+ receiveTimeout = DEFAULT_TIMEOUT;
+
+ Map params = config.getProperties();
+ if (params != null) {
+ Object param = params.get("sendTimeout");
+ if (param instanceof Long)
+ sendTimeout = ((Long) param).longValue();
+
+ param = params.get("receiveTimeout");
+ if (param instanceof Long)
+ receiveTimeout = ((Long) param).longValue();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObject#handleEvent(org.eclipse.ecf.core.util.Event)
+ */
+ public void handleEvent(Event event) {
+ super.handleEvent(event);
+ if (event instanceof ISharedObjectMessageEvent) {
+ ISharedObjectMessageEvent e = (ISharedObjectMessageEvent) event;
+ if (e.getData() instanceof Request)
+ handleRequest(e.getRemoteContainerID(), (Request) e.getData());
+ else if (e.getData() instanceof Reply)
+ handleReply(e.getRemoteContainerID(), (Reply) e.getData());
+ else if (e.getData() instanceof Abort)
+ handleAbort(e.getRemoteContainerID(), (Abort) e.getData());
+ else if (e.getData() instanceof Ack)
+ handleAck(e.getRemoteContainerID(), (Ack) e.getData());
+ else if (e.getData() instanceof Commit)
+ handleCommit(e.getRemoteContainerID(), (Commit) e.getData());
+ }
+ }
+
+ protected void handleDeparted(ISharedObjectContainerDepartedEvent event) {
+ super.handleDeparted(event);
+ if (!event.getDepartedContainerID().equals(localContainerID)) {
+ synchronized (this) {
+ if (state == SEND) {
+ requests.remove(event.getDepartedContainerID());
+ if (requests.isEmpty())
+ notify();
+ } else if (state == RECEIVE) {
+ Timeout[] t = (Timeout[]) timeouts.values().toArray(
+ new Timeout[timeouts.size()]);
+ for (int i = 0; i < t.length; ++i) {
+ if (t[i].getVersion().getSenderID().equals(
+ event.getDepartedContainerID())
+ && t[i].cancel())
+ timeouts.remove(t[i].getVersion());
+ }
+
+ for (Iterator i = commits.keySet().iterator(); i.hasNext();) {
+ Version version = (Version) i.next();
+ if (version.getSenderID().equals(
+ event.getDepartedContainerID()))
+ i.remove();
+ }
+
+ if (timeouts.isEmpty()) {
+ state = READY;
+ notify();
+ }
+ }
+ }
+ }
+ }
+
+ private synchronized void handleRequest(ID remoteContainerID,
+ Request request) {
+ String method = null;
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceEntry(method = "handleRequest[remoteContainerID="
+ + remoteContainerID + ";request=" + request + "]");
+
+ try {
+ if ((state == READY || state == RECEIVE)
+ && version.getSequence() + 1 == request.getVersion()
+ .getSequence()) {
+ state = RECEIVE;
+ Timeout timeout = new Timeout(this, request.getVersion());
+ timeouts.put(request.getVersion(), timeout);
+ timer.schedule(timeout, receiveTimeout);
+ context.sendMessage(remoteContainerID, new Reply(request
+ .getVersion(), true));
+ } else if (state != DISPOSED)
+ context.sendMessage(remoteContainerID, new Reply(request
+ .getVersion(), false));
+ } catch (IOException e) {
+ DataSharePlugin.log(e);
+ } finally {
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceExit(method);
+ }
+ }
+
+ private synchronized void handleReply(ID remoteContainerID, Reply reply) {
+ String method = null;
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceEntry(method = "handleReply[remoteContainerID="
+ + remoteContainerID + ";reply=" + reply + "]");
+
+ try {
+ if (state == SEND && reply.getVersion().equals(nextVersion)) {
+ if (!reply.isGranted()) {
+ granted = false;
+ notify();
+ }
+
+ requests.remove(remoteContainerID);
+ if (requests.isEmpty())
+ notify();
+ }
+ } finally {
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceExit(method);
+ }
+ }
+
+ protected synchronized void handleMessage(ID remoteContainerID,
+ Message message) {
+ String method = null;
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceEntry(method = "handleMessage[remoteContainerID="
+ + remoteContainerID + ";message=" + message + "]");
+
+ try {
+ Timeout timeout;
+ if (((timeout = (Timeout) timeouts.get(message.getVersion())) != null)
+ && timeout.cancel()) {
+ commits.put(message.getVersion(), message.getData());
+ timeout = new Timeout(this, message.getVersion());
+ timeouts.put(message.getVersion(), timeout);
+ timer.schedule(timeout, 1000);
+ context.sendMessage(remoteContainerID, new Ack(message
+ .getVersion()));
+ }
+ } catch (IOException e) {
+ DataSharePlugin.log(e);
+ } finally {
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceExit(method);
+ }
+ }
+
+ private synchronized void handleAbort(ID remoteContainerID, Abort abort) {
+ String method = null;
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceEntry(method = "handleAbort[remoteContainerID="
+ + remoteContainerID + ";abort=" + abort + "]");
+
+ try {
+ Timeout timeout = (Timeout) timeouts.remove(abort.getVersion());
+ if (timeout != null && timeout.cancel()) {
+ commits.remove(abort.getVersion());
+ if (timeouts.isEmpty()) {
+ state = READY;
+ notify();
+ }
+ }
+ } finally {
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceExit(method);
+ }
+ }
+
+ private synchronized void handleAck(ID remoteContainerID, Ack ack) {
+ String method = null;
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceEntry(method = "handleAck[remoteContainerID="
+ + remoteContainerID + ";ack=" + ack + "]");
+ try {
+ if (state == SEND && ack.getVersion().equals(nextVersion)) {
+ requests.remove(remoteContainerID);
+ if (requests.isEmpty())
+ notify();
+ }
+ } finally {
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceExit(method);
+ }
+ }
+
+ private synchronized void handleCommit(ID remoteContainerID, Commit commit) {
+ String method = null;
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceEntry(method = "handleCommit[remoteContainerID="
+ + remoteContainerID + ";commit=" + commit + "]");
+
+ try {
+ Timeout timeout;
+ if (((timeout = (Timeout) timeouts.get(commit.getVersion())) != null)
+ && timeout.cancel()) {
+ version = commit.getVersion();
+ timeouts.remove(version);
+ if (timeouts.isEmpty()) {
+ state = READY;
+ notify();
+ }
+
+ receiveMessage(commits.remove(version));
+ }
+ } finally {
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceExit(method);
+ }
+ }
+
+ public synchronized void timeout(Version version) {
+ String method = null;
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceEntry(method = "timeout[version=" + version + "]");
+
+ try {
+ timeouts.remove(version);
+ if (timeouts.isEmpty()) {
+ commits.remove(version);
+ state = READY;
+ notify();
+ }
+ } finally {
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceExit(method);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObject#handleEvents(org.eclipse.ecf.core.util.Event[])
+ */
+ public void handleEvents(Event[] events) {
+ for (int i = 0; i < events.length; ++i)
+ handleEvent(events[i]);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObject#dispose(org.eclipse.ecf.core.identity.ID)
+ */
+ public void dispose(ID containerID) {
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObject#getAdapter(java.lang.Class)
+ */
+ public Object getAdapter(Class clazz) {
+ return null;
+ }
+}
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/Update.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Message.java
index 85579ba98..74b293d15 100644
--- a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/Update.java
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Message.java
@@ -8,22 +8,19 @@
* Contributors:
* Peter Nehrer - initial API and implementation
*******************************************************************************/
-package org.eclipse.ecf.internal.datashare;
+package org.eclipse.ecf.datashare.multicast;
import java.io.Serializable;
-/**
- * @author pnehrer
- */
-public class Update implements Serializable {
+public class Message implements Serializable {
- private static final long serialVersionUID = 3256439205344260914L;
+ private static final long serialVersionUID = 3257281414121993014L;
- private final Version version;
+ final Version version;
- private final Object data;
+ final Object data;
- public Update(Version version, Object data) {
+ public Message(Version version, Object data) {
this.version = version;
this.data = data;
}
@@ -36,25 +33,10 @@ public class Update implements Serializable {
return data;
}
- public boolean equals(Object other) {
- if (other instanceof Update) {
- Update o = (Update) other;
- return version.equals(o.version) && data.equals(o.data);
- } else
- return false;
- }
-
- public int hashCode() {
- int c = 17;
- c = 37 * c + version.hashCode();
- c = 37 * c + data.hashCode();
- return c;
- }
-
public String toString() {
- StringBuffer buf = new StringBuffer("Update[");
- buf.append("version=").append(version).append(";");
- buf.append("data=").append(data).append("]");
+ StringBuffer buf = new StringBuffer("Message[version=");
+ buf.append(version).append(";data=");
+ buf.append(data).append(']');
return buf.toString();
}
-}
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/OrderedMulticaster.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/OrderedMulticaster.java
new file mode 100644
index 000000000..d992c14e7
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/OrderedMulticaster.java
@@ -0,0 +1,329 @@
+/*******************************************************************************
+ * Copyright (c) 2005 Peter Nehrer and Composent, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Peter Nehrer - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.ecf.datashare.multicast;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Timer;
+
+import org.eclipse.ecf.core.ISharedObjectConfig;
+import org.eclipse.ecf.core.SharedObjectInitException;
+import org.eclipse.ecf.core.events.ISharedObjectContainerDepartedEvent;
+import org.eclipse.ecf.core.events.ISharedObjectMessageEvent;
+import org.eclipse.ecf.core.identity.ID;
+import org.eclipse.ecf.core.util.ECFException;
+import org.eclipse.ecf.core.util.Event;
+import org.eclipse.ecf.internal.datashare.DataSharePlugin;
+
+/**
+ * @author pnehrer
+ */
+public class OrderedMulticaster extends AbstractMulticaster implements
+ Timeout.Listener {
+
+ public static final short SEND = 3;
+
+ public static final short RECEIVE = 4;
+
+ public static final long DEFAULT_TIMEOUT = 1000;
+
+ public static final String TRACE_TAG = "OrderedMulticaster";
+
+ private long sendTimeout;
+
+ private long receiveTimeout;
+
+ private Version nextVersion;
+
+ private HashSet requests;
+
+ private final Timer timer = new Timer();
+
+ private final HashMap timeouts = new HashMap();
+
+ private boolean granted;
+
+ public synchronized boolean sendMessage(Object message) throws ECFException {
+ String method = null;
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceEntry(method = "sendMessage[message=" + message + "]");
+
+ try {
+ if (!waitToSend())
+ return false;
+
+ state = SEND;
+ nextVersion = new Version(localContainerID,
+ version.getSequence() + 1);
+ ArrayList others = new ArrayList(Arrays.asList(context
+ .getGroupMemberIDs()));
+ others.remove(localContainerID);
+ requests = new HashSet(others);
+ granted = true;
+ try {
+ context.sendMessage(null, new Request(nextVersion));
+ wait(sendTimeout);
+ if (state != SEND)
+ return false;
+
+ if (!granted || !requests.isEmpty()) {
+ context.sendMessage(null, new Abort(nextVersion));
+ return false;
+ }
+
+ context.sendMessage(null, new Message(nextVersion, message));
+ version = nextVersion;
+ return true;
+ } catch (IOException e) {
+ throw new ECFException(e);
+ } catch (InterruptedException e) {
+ throw new ECFException(e);
+ } finally {
+ state = READY;
+ notify();
+ }
+ } finally {
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceExit(method);
+ }
+ }
+
+ public String getStateStr() {
+ switch (state) {
+ case SEND:
+ return "SND";
+ case RECEIVE:
+ return "RCV";
+ default:
+ return super.getStateStr();
+ }
+ }
+
+ protected void receiveMessage(Object message) {
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObject#init(org.eclipse.ecf.core.ISharedObjectConfig)
+ */
+ public synchronized void init(ISharedObjectConfig config)
+ throws SharedObjectInitException {
+ super.init(config);
+
+ sendTimeout = DEFAULT_TIMEOUT;
+ receiveTimeout = DEFAULT_TIMEOUT;
+
+ Map params = config.getProperties();
+ if (params != null) {
+ Object param = params.get("sendTimeout");
+ if (param instanceof Long)
+ sendTimeout = ((Long) param).longValue();
+
+ param = params.get("receiveTimeout");
+ if (param instanceof Long)
+ receiveTimeout = ((Long) param).longValue();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObject#handleEvent(org.eclipse.ecf.core.util.Event)
+ */
+ public void handleEvent(Event event) {
+ super.handleEvent(event);
+ if (event instanceof ISharedObjectMessageEvent) {
+ ISharedObjectMessageEvent e = (ISharedObjectMessageEvent) event;
+ if (e.getData() instanceof Request)
+ handleRequest(e.getRemoteContainerID(), (Request) e.getData());
+ else if (e.getData() instanceof Reply)
+ handleReply(e.getRemoteContainerID(), (Reply) e.getData());
+ else if (e.getData() instanceof Abort)
+ handleAbort(e.getRemoteContainerID(), (Abort) e.getData());
+ }
+ }
+
+ protected void handleDeparted(ISharedObjectContainerDepartedEvent event) {
+ super.handleDeparted(event);
+ if (!event.getDepartedContainerID().equals(localContainerID)) {
+ synchronized (this) {
+ if (state == SEND) {
+ requests.remove(event.getDepartedContainerID());
+ if (requests.isEmpty())
+ notify();
+ } else if (state == RECEIVE) {
+ Timeout[] t = (Timeout[]) timeouts.values().toArray(
+ new Timeout[timeouts.size()]);
+ for (int i = 0; i < t.length; ++i) {
+ if (t[i].getVersion().getSenderID().equals(
+ event.getDepartedContainerID())
+ && t[i].cancel())
+ timeouts.remove(t[i].getVersion());
+ }
+
+ if (timeouts.isEmpty()) {
+ state = READY;
+ notify();
+ }
+ }
+ }
+ }
+ }
+
+ private synchronized void handleRequest(ID remoteContainerID,
+ Request request) {
+ String method = null;
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceEntry(method = "handleRequest[remoteContainerID="
+ + remoteContainerID + ";request=" + request + "]");
+
+ try {
+ if ((state == READY || state == RECEIVE)
+ && version.getSequence() + 1 == request.getVersion()
+ .getSequence()) {
+ Timeout timeout = new Timeout(this, request.getVersion());
+ timeouts.put(request.getVersion(), timeout);
+ timer.schedule(timeout, receiveTimeout);
+ context.sendMessage(remoteContainerID, new Reply(request
+ .getVersion(), true));
+ if (state == READY)
+ notify();
+
+ state = RECEIVE;
+ } else if (state != DISPOSED)
+ context.sendMessage(remoteContainerID, new Reply(request
+ .getVersion(), false));
+ } catch (IOException e) {
+ DataSharePlugin.log(e);
+ } finally {
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceExit(method);
+ }
+ }
+
+ private synchronized void handleReply(ID remoteContainerID, Reply reply) {
+ String method = null;
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceEntry(method = "handleReply[remoteContainerID="
+ + remoteContainerID + ";reply=" + reply + "]");
+
+ try {
+ if (state == SEND && reply.getVersion().equals(nextVersion)) {
+ if (!reply.isGranted()) {
+ granted = false;
+ notify();
+ }
+
+ requests.remove(remoteContainerID);
+ if (requests.isEmpty())
+ notify();
+ }
+ } finally {
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceExit(method);
+ }
+ }
+
+ protected synchronized void handleMessage(ID remoteContainerID,
+ Message message) {
+ String method = null;
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceEntry(method = "handleMessage[remoteContainerID="
+ + remoteContainerID + ";message=" + message + "]");
+
+ try {
+ Timeout timeout;
+ if (((timeout = (Timeout) timeouts.get(message.getVersion())) != null)
+ && timeout.cancel()) {
+ version = message.getVersion();
+ timeouts.remove(version);
+ if (timeouts.isEmpty()) {
+ state = READY;
+ notify();
+ }
+
+ receiveMessage(message.getData());
+ }
+ } finally {
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceExit(method);
+ }
+ }
+
+ private synchronized void handleAbort(ID remoteContainerID, Abort abort) {
+ String method = null;
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceEntry(method = "handleAbort[remoteContainerID="
+ + remoteContainerID + ";abort=" + abort + "]");
+
+ try {
+ Timeout timeout = (Timeout) timeouts.remove(abort.getVersion());
+ if (timeout != null && timeout.cancel()) {
+ if (timeouts.isEmpty()) {
+ state = READY;
+ notify();
+ }
+ }
+ } finally {
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceExit(method);
+ }
+ }
+
+ public synchronized void timeout(Version version) {
+ String method = null;
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceEntry(method = "timeout[version=" + version + "]");
+
+ try {
+ timeouts.remove(version);
+ if (timeouts.isEmpty()) {
+ state = READY;
+ notify();
+ }
+ } finally {
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceExit(method);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObject#handleEvents(org.eclipse.ecf.core.util.Event[])
+ */
+ public void handleEvents(Event[] events) {
+ for (int i = 0; i < events.length; ++i)
+ handleEvent(events[i]);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObject#dispose(org.eclipse.ecf.core.identity.ID)
+ */
+ public void dispose(ID containerID) {
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.core.ISharedObject#getAdapter(java.lang.Class)
+ */
+ public Object getAdapter(Class clazz) {
+ return null;
+ }
+}
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Pause.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Pause.java
new file mode 100644
index 000000000..4bf3ae694
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Pause.java
@@ -0,0 +1,23 @@
+/*******************************************************************************
+ * Copyright (c) 2005 Peter Nehrer and Composent, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Peter Nehrer - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.ecf.datashare.multicast;
+
+import java.io.Serializable;
+
+/**
+ * @author pnehrer
+ *
+ */
+public class Pause implements Serializable {
+
+ private static final long serialVersionUID = 3258417222501347637L;
+
+}
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Paused.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Paused.java
new file mode 100644
index 000000000..7c7f6d070
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Paused.java
@@ -0,0 +1,22 @@
+/*******************************************************************************
+ * Copyright (c) 2005 Peter Nehrer and Composent, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Peter Nehrer - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.ecf.datashare.multicast;
+
+import java.io.Serializable;
+
+/**
+ * @author pnehrer
+ */
+public class Paused implements Serializable {
+
+ private static final long serialVersionUID = 3978705103752868913L;
+
+}
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Reply.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Reply.java
new file mode 100644
index 000000000..19839bed2
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Reply.java
@@ -0,0 +1,42 @@
+/*******************************************************************************
+ * Copyright (c) 2005 Peter Nehrer and Composent, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Peter Nehrer - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.ecf.datashare.multicast;
+
+import java.io.Serializable;
+
+public class Reply implements Serializable {
+
+ private static final long serialVersionUID = 3689632497314837046L;
+
+ final Version version;
+
+ final boolean granted;
+
+ public Reply(Version version, boolean granted) {
+ this.version = version;
+ this.granted = granted;
+ }
+
+ public Version getVersion() {
+ return version;
+ }
+
+ public boolean isGranted() {
+ return granted;
+ }
+
+ public String toString() {
+ StringBuffer buf = new StringBuffer("Reply[version=");
+ buf.append(version).append(";granted=");
+ buf.append(granted).append(']');
+ return buf.toString();
+ }
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Request.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Request.java
new file mode 100644
index 000000000..e02d87511
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Request.java
@@ -0,0 +1,34 @@
+/*******************************************************************************
+ * Copyright (c) 2005 Peter Nehrer and Composent, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Peter Nehrer - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.ecf.datashare.multicast;
+
+import java.io.Serializable;
+
+public class Request implements Serializable {
+
+ private static final long serialVersionUID = 3257003237730365493L;
+
+ final Version version;
+
+ public Request(Version version) {
+ this.version = version;
+ }
+
+ public Version getVersion() {
+ return version;
+ }
+
+ public String toString() {
+ StringBuffer buf = new StringBuffer("Request[version=");
+ buf.append(version).append(']');
+ return buf.toString();
+ }
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Resume.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Resume.java
new file mode 100644
index 000000000..5046854df
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Resume.java
@@ -0,0 +1,23 @@
+/*******************************************************************************
+ * Copyright (c) 2005 Peter Nehrer and Composent, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Peter Nehrer - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.ecf.datashare.multicast;
+
+import java.io.Serializable;
+
+/**
+ * @author pnehrer
+ *
+ */
+public class Resume implements Serializable {
+
+ private static final long serialVersionUID = 3258133557074540854L;
+
+}
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/SimpleMulticaster.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/SimpleMulticaster.java
new file mode 100644
index 000000000..8d4095b67
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/SimpleMulticaster.java
@@ -0,0 +1,47 @@
+/*******************************************************************************
+ * Copyright (c) 2005 Peter Nehrer and Composent, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Peter Nehrer - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.ecf.datashare.multicast;
+
+import java.io.IOException;
+
+import org.eclipse.ecf.core.util.ECFException;
+import org.eclipse.ecf.internal.datashare.DataSharePlugin;
+
+/**
+ * @author pnehrer
+ */
+public class SimpleMulticaster extends AbstractMulticaster {
+
+ public static final String TRACE_TAG = "SimpleMulticaster";
+
+ public synchronized boolean sendMessage(Object message) throws ECFException {
+ String method = null;
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceEntry(method = "sendMessage[message=" + message + "]");
+
+ try {
+ if (!waitToSend())
+ return false;
+
+ version = new Version(localContainerID, version.getSequence() + 1);
+ context.sendMessage(null, new Message(version, message));
+ return true;
+ } catch (IOException e) {
+ throw new ECFException(e);
+ } finally {
+ if (DataSharePlugin.isTracing(TRACE_TAG))
+ traceExit(method);
+ }
+ }
+
+ protected void receiveMessage(Object message) {
+ }
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Timeout.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Timeout.java
new file mode 100644
index 000000000..3ae5fd113
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Timeout.java
@@ -0,0 +1,37 @@
+/*******************************************************************************
+ * Copyright (c) 2005 Peter Nehrer and Composent, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Peter Nehrer - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.ecf.datashare.multicast;
+
+import java.util.TimerTask;
+
+class Timeout extends TimerTask {
+
+ interface Listener {
+ void timeout(Version version);
+ }
+
+ private final Listener listener;
+
+ final Version version;
+
+ public Timeout(Listener listener, Version version) {
+ this.listener = listener;
+ this.version = version;
+ }
+
+ public Version getVersion() {
+ return version;
+ }
+
+ public void run() {
+ listener.timeout(version);
+ }
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Version.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Version.java
new file mode 100644
index 000000000..53753aa56
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Version.java
@@ -0,0 +1,59 @@
+/*******************************************************************************
+ * Copyright (c) 2005 Peter Nehrer and Composent, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Peter Nehrer - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.ecf.datashare.multicast;
+
+import java.io.Serializable;
+
+import org.eclipse.ecf.core.identity.ID;
+
+public class Version implements Serializable {
+
+ private static final long serialVersionUID = 3762538901495101236L;
+
+ private final ID senderID;
+
+ private final long sequence;
+
+ public Version(ID senderID, long sequence) {
+ this.senderID = senderID;
+ this.sequence = sequence;
+ }
+
+ public ID getSenderID() {
+ return senderID;
+ }
+
+ public long getSequence() {
+ return sequence;
+ }
+
+ public boolean equals(Object other) {
+ if (other instanceof Version) {
+ Version o = (Version) other;
+ return senderID.equals(o.senderID) && sequence == o.sequence;
+ } else
+ return false;
+ }
+
+ public int hashCode() {
+ int c = 17;
+ c = 37 * c + senderID.hashCode();
+ c = 37 * c + (int) sequence;
+ return c;
+ }
+
+ public String toString() {
+ StringBuffer buf = new StringBuffer("Version[senderID=");
+ buf.append(senderID).append(";sequence=");
+ buf.append(sequence).append(']');
+ return buf.toString();
+ }
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/util/NotifyingSet.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/util/NotifyingSet.java
new file mode 100644
index 000000000..28a8b6401
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/util/NotifyingSet.java
@@ -0,0 +1,150 @@
+/*******************************************************************************
+ * Copyright (c) 2005 Peter Nehrer and Composent, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Peter Nehrer - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.ecf.datashare.util;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.EventListener;
+import java.util.EventObject;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * @author pnehrer
+ */
+public class NotifyingSet implements Serializable {
+
+ private static final long serialVersionUID = 3258134639355967536L;
+
+ public static interface IChangeListener extends EventListener {
+
+ void changed(ChangeEvent e);
+ }
+
+ public static class ChangeDelta implements Serializable {
+
+ private static final long serialVersionUID = 3618133446644806960L;
+
+ public static final short ADD = 0;
+
+ public static final short REMOVE = 1;
+
+ protected transient final NotifyingSet source;
+
+ protected final Object member;
+
+ private final short kind;
+
+ protected ChangeDelta(NotifyingSet source, Object member, short kind) {
+ this.source = source;
+ this.member = member;
+ this.kind = kind;
+ }
+
+ public final Object getMember() {
+ return member;
+ }
+
+ public final short getKind() {
+ return kind;
+ }
+ }
+
+ public static class ChangeEvent extends EventObject {
+
+ private static final long serialVersionUID = 3834307341121041721L;
+
+ private final ChangeDelta[] deltas;
+
+ protected ChangeEvent(NotifyingSet source, ChangeDelta[] deltas) {
+ super(source);
+ this.deltas = deltas;
+ }
+
+ protected ChangeEvent(NotifyingSet source, ChangeDelta delta) {
+ this(source, new ChangeDelta[] { delta });
+ }
+
+ public NotifyingSet getTrackedSet() {
+ return (NotifyingSet) source;
+ }
+
+ public ChangeDelta[] getChangeDeltas() {
+ return deltas;
+ }
+ }
+
+ private final HashSet set;
+
+ private transient Set listeners = Collections
+ .synchronizedSet(new HashSet());
+
+ public NotifyingSet() {
+ set = new HashSet();
+ }
+
+ public NotifyingSet(Set set) {
+ this.set = new HashSet(set);
+ }
+
+ public void addChangeListener(IChangeListener l) {
+ listeners.add(l);
+ }
+
+ public void removeChangeListener(IChangeListener l) {
+ listeners.remove(l);
+ }
+
+ public synchronized boolean add(Object object) {
+ boolean result = set.add(object);
+ if (result)
+ fireChangeEvent(new ChangeEvent(this, new ChangeDelta(this, object,
+ ChangeDelta.ADD)));
+
+ return result;
+ }
+
+ public synchronized boolean remove(Object object) {
+ boolean result = set.remove(object);
+ if (result)
+ fireChangeEvent(new ChangeEvent(this, new ChangeDelta(this, object,
+ ChangeDelta.REMOVE)));
+
+ return result;
+ }
+
+ public synchronized boolean contains(Object object) {
+ return set.contains(object);
+ }
+
+ public synchronized int size() {
+ return set.size();
+ }
+
+ public synchronized Object[] toArray() {
+ return set.toArray();
+ }
+
+ private void fireChangeEvent(ChangeEvent e) {
+ IChangeListener[] l = (IChangeListener[]) listeners
+ .toArray(new IChangeListener[listeners.size()]);
+ for (int i = 0; i < l.length; ++i)
+ l[i].changed(e);
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException,
+ ClassNotFoundException {
+ in.defaultReadObject();
+ listeners = Collections.synchronizedSet(new HashSet());
+ }
+} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/util/TrackedSet.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/util/TrackedSet.java
new file mode 100644
index 000000000..f4f8831e1
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/util/TrackedSet.java
@@ -0,0 +1,104 @@
+/*******************************************************************************
+ * Copyright (c) 2005 Peter Nehrer and Composent, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Peter Nehrer - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.ecf.datashare.util;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Set;
+
+/**
+ * @author pnehrer
+ */
+public class TrackedSet extends NotifyingSet implements
+ NotifyingSet.IChangeListener {
+
+ private static final long serialVersionUID = 3258134660897847607L;
+
+ private transient LinkedList changes = new LinkedList();
+
+ /**
+ *
+ */
+ public TrackedSet() {
+ addChangeListener(this);
+ }
+
+ /**
+ * @param set
+ */
+ public TrackedSet(Set set) {
+ super(set);
+ addChangeListener(this);
+ }
+
+ public synchronized ChangeDelta[] getChanges() {
+ return (ChangeDelta[]) changes.toArray(new ChangeDelta[changes.size()]);
+ }
+
+ /**
+ * @param deltas
+ */
+ public synchronized void apply(ChangeDelta[] deltas) {
+ removeChangeListener(this);
+ LinkedList oldChanges = new LinkedList(changes);
+ while (!changes.isEmpty())
+ undo((ChangeDelta) changes.removeLast());
+
+ for (int i = 0; i < deltas.length; ++i)
+ apply(deltas[i]);
+
+ addChangeListener(this);
+ for (Iterator i = oldChanges.iterator(); i.hasNext();)
+ apply((ChangeDelta) i.next());
+ }
+
+ private void undo(ChangeDelta delta) {
+ switch (delta.getKind()) {
+ case ChangeDelta.ADD:
+ remove(delta.getMember());
+ break;
+ case ChangeDelta.REMOVE:
+ add(delta.getMember());
+ break;
+ }
+ }
+
+ private void apply(ChangeDelta delta) {
+ switch (delta.getKind()) {
+ case ChangeDelta.ADD:
+ add(delta.getMember());
+ break;
+ case ChangeDelta.REMOVE:
+ remove(delta.getMember());
+ break;
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.datashare.util.NotifyingSet.IChangeListener#changed(org.eclipse.ecf.datashare.util.NotifyingSet.ChangeEvent)
+ */
+ public synchronized void changed(ChangeEvent e) {
+ ChangeDelta[] deltas = e.getChangeDeltas();
+ for (int i = 0; i < deltas.length; ++i)
+ changes.add(deltas[i]);
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException,
+ ClassNotFoundException {
+ in.defaultReadObject();
+ changes = new LinkedList();
+ addChangeListener(this);
+ }
+}
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/util/TrackedSetUpdateProvider.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/util/TrackedSetUpdateProvider.java
new file mode 100644
index 000000000..83403da32
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/util/TrackedSetUpdateProvider.java
@@ -0,0 +1,71 @@
+/*******************************************************************************
+ * Copyright (c) 2005 Peter Nehrer and Composent, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Peter Nehrer - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.ecf.datashare.util;
+
+import java.util.Map;
+
+import org.eclipse.ecf.core.util.ECFException;
+import org.eclipse.ecf.datashare.ISharedData;
+import org.eclipse.ecf.datashare.IUpdateProvider;
+import org.eclipse.ecf.datashare.IUpdateProviderFactory;
+
+/**
+ * @author pnehrer
+ */
+public class TrackedSetUpdateProvider implements IUpdateProvider {
+
+ private final Factory factory;
+
+ private TrackedSetUpdateProvider(Factory factory) {
+ this.factory = factory;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.datashare.IUpdateProvider#getFactory()
+ */
+ public IUpdateProviderFactory getFactory() {
+ return factory;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.datashare.IUpdateProvider#createUpdate(org.eclipse.ecf.datashare.ISharedData)
+ */
+ public Object createUpdate(ISharedData graph) throws ECFException {
+ return ((TrackedSet) graph.getData()).getChanges();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.datashare.IUpdateProvider#applyUpdate(org.eclipse.ecf.datashare.ISharedData,
+ * java.lang.Object)
+ */
+ public void applyUpdate(ISharedData graph, Object data) throws ECFException {
+ ((TrackedSet) graph.getData()).apply((NotifyingSet.ChangeDelta[]) data);
+ }
+
+ public static class Factory implements IUpdateProviderFactory {
+
+ public static final String ID = "org.eclipse.ecf.datashare.util.TrackedSet";
+
+ public String getID() {
+ return ID;
+ }
+
+ public IUpdateProvider createProvider(Map params) {
+ return new TrackedSetUpdateProvider(this);
+ }
+ }
+}
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/Agent.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/Agent.java
index 55d55a9ef..07063c7d6 100644
--- a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/Agent.java
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/Agent.java
@@ -25,10 +25,18 @@ import org.eclipse.ecf.core.util.ECFException;
import org.eclipse.ecf.core.util.Event;
import org.eclipse.ecf.datashare.IPublicationCallback;
import org.eclipse.ecf.datashare.ISharedData;
-import org.eclipse.ecf.datashare.IUpdateListener;
import org.eclipse.ecf.datashare.IUpdateProvider;
+import org.eclipse.ecf.datashare.UpdateProviderRegistry;
/**
+ * <p>
+ * State chart:
+ * </p>
+ * <p>
+ * 1. DataShareService.publish(Object, ID, IUpdateProvider,
+ * IPublicationCallback) -> PUBLISHED
+ * </p>
+ *
* @author pnehrer
*/
public class Agent implements ISharedData, ISharedObject {
@@ -41,18 +49,28 @@ public class Agent implements ISharedData, ISharedObject {
private IUpdateProvider updateProvider;
- private IUpdateListener updateListener;
-
private IPublicationCallback pubCallback;
- private Version version;
-
+ /**
+ * Default constructor; necessary for replication.
+ */
public Agent() {
}
- public Agent(Object sharedData, IBootstrap bootstrap) {
+ /**
+ * Publisher's constructor; fully initializes the instance.
+ *
+ * @param sharedData
+ * @param bootstrap
+ * @param updateProvider
+ * @param pubCallback
+ */
+ public Agent(Object sharedData, IBootstrap bootstrap,
+ IUpdateProvider updateProvider, IPublicationCallback pubCallback) {
this.sharedData = sharedData;
this.bootstrap = bootstrap;
+ this.updateProvider = updateProvider;
+ this.pubCallback = pubCallback;
}
public synchronized ID getID() {
@@ -64,27 +82,25 @@ public class Agent implements ISharedData, ISharedObject {
}
public synchronized void commit() throws ECFException {
- byte[] buf = updateProvider.createUpdate(this);
- if (buf != null) {
- Version newVersion = version.getNext(config.getSharedObjectID());
- Update update = new Update(newVersion, buf);
- version = newVersion;
- try {
- config.getContext().sendMessage(null, update);
- } catch (IOException e) {
- throw new ECFException(e);
- }
+ // lock on transaction (wait till there's none)
+ // send prepare
+ // collect replies
+ // send commit/abort
+ Object update = updateProvider.createUpdate(this);
+ if (update != null) {
+// Version newVersion = version.getNext(config.getSharedObjectID());
+// Commit msg = new Commit(newVersion, update);
+// version = newVersion;
+// try {
+// config.getContext().sendMessage(null, msg);
+// } catch (IOException e) {
+// throw new ECFException(e);
+// }
}
}
public synchronized void dispose() {
- if (config != null)
- try {
- config.getContext().sendDispose(
- config.getContext().getLocalContainerID());
- } catch (IOException e) {
- handleError(e);
- }
+ // TODO Finish implementing.
}
/*
@@ -97,15 +113,26 @@ public class Agent implements ISharedData, ISharedObject {
this.config = config;
Map params = config.getProperties();
if (params != null) {
- sharedData = params.get("sharedData");
- version = (Version) params.get("version");
- IBootstrapMemento m = (IBootstrapMemento) params.get("bootstrap");
- if (m != null)
- bootstrap = m.createBootstrap();
+ Object param = params.get("sharedData");
+ if (param != null)
+ sharedData = param;
+
+ param = params.get("version");
+// if (param != null)
+// version = (Version) param;
+
+ param = params.get("bootstrap");
+ if (param != null)
+ bootstrap = ((IBootstrapMemento) param).createBootstrap();
+
+ param = params.get("updateProvider");
+ if (param != null)
+ updateProvider = UpdateProviderRegistry.createProvider(
+ (String) param, null); // TODO what about params?
}
- if (version == null)
- version = new Version(config.getSharedObjectID());
+// if (version == null)
+// version = new Version(config.getSharedObjectID());
bootstrap.setAgent(this);
bootstrap.init(config);
@@ -124,8 +151,6 @@ public class Agent implements ISharedData, ISharedObject {
handleActivated();
} else if (event instanceof ISharedObjectMessageEvent) {
ISharedObjectMessageEvent e = (ISharedObjectMessageEvent) event;
- if (e.getData() instanceof Update)
- handleUpdate((Update) e.getData());
}
}
@@ -135,8 +160,10 @@ public class Agent implements ISharedData, ISharedObject {
try {
Map params = new HashMap(3);
params.put("sharedData", sharedData);
- params.put("version", version);
+// params.put("version", version);
params.put("bootstrap", bootstrap.createMemento());
+ params.put("updateProvider", updateProvider.getFactory()
+ .getID());
config.getContext().sendCreate(
null,
new SharedObjectDescription(config.getSharedObjectID(),
@@ -152,20 +179,12 @@ public class Agent implements ISharedData, ISharedObject {
}
}
- private void handleUpdate(Update update) {
- try {
- updateProvider.applyUpdate(this, update.getData());
- version = update.getVersion();
- } catch (ECFException e) {
- handleError(e);
- }
- }
-
public void doBootstrap(ID containerID) {
Map params = new HashMap(3);
params.put("sharedData", sharedData);
- params.put("version", version);
+// params.put("version", version);
params.put("bootstrap", bootstrap.createMemento());
+ params.put("updateProvider", updateProvider.getFactory().getID());
try {
config.getContext().sendCreate(
containerID,
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/DataSharePlugin.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/DataSharePlugin.java
index 9cb1f57ca..93eed4509 100644
--- a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/DataSharePlugin.java
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/DataSharePlugin.java
@@ -24,166 +24,227 @@ import org.eclipse.core.runtime.Plugin;
import org.eclipse.core.runtime.Status;
import org.eclipse.ecf.datashare.DataShareServiceFactory;
import org.eclipse.ecf.datashare.IDataShareServiceManager;
+import org.eclipse.ecf.datashare.IUpdateProviderFactory;
+import org.eclipse.ecf.datashare.UpdateProviderRegistry;
import org.osgi.framework.BundleContext;
/**
* @author pnehrer
- *
*/
public class DataSharePlugin {
-
+
public static final String PLUGIN_ID = "org.eclipse.ecf.datashare";
- private static final String TRACE_PREFIX = PLUGIN_ID + "/";
-
- private static EclipsePlugin plugin;
-
- private static boolean tracingEnabled = Boolean.getBoolean(TRACE_PREFIX
- + "debug");
-
- private DataSharePlugin() {
- }
-
- public static void log(Object entry) {
- if (plugin == null) {
- if (entry instanceof Throwable)
- ((Throwable) entry).printStackTrace();
- else
- System.err.println(entry);
- } else {
- plugin.log(entry);
- }
- }
-
- public static boolean isTracing(String tag) {
- if (tracingEnabled) {
- return plugin == null ? Boolean.getBoolean(TRACE_PREFIX + tag)
- : plugin.isTracing(tag);
- } else
- return false;
- }
-
- public static PrintStream getTraceLog() {
- return System.out;
- }
-
- public static class EclipsePlugin extends Plugin {
-
- private static final String MANAGER_EXTENSION_POINT = "manager";
-
- private static final String MANAGER_EXTENSION = "manager";
-
- private static final String ATTR_NAME = "name";
-
- private static final String ATTR_CLASS = "class";
-
- private IRegistryChangeListener registryChangeListener;
-
- public EclipsePlugin() {
- plugin = this;
- tracingEnabled = Platform.inDebugMode();
- }
-
- /**
- * This method is called upon plug-in activation
- */
- public void start(BundleContext context) throws Exception {
- super.start(context);
- registryChangeListener = new IRegistryChangeListener() {
- public void registryChanged(IRegistryChangeEvent event) {
- IExtensionDelta[] deltas = event.getExtensionDeltas(
- getBundle().getSymbolicName(),
- MANAGER_EXTENSION_POINT);
- for (int i = 0; i < deltas.length; ++i) {
- switch (deltas[i].getKind()) {
- case IExtensionDelta.ADDED:
- registerManagers(deltas[i].getExtension()
- .getConfigurationElements());
- break;
-
- case IExtensionDelta.REMOVED:
- IConfigurationElement[] elems = deltas[i]
- .getExtension().getConfigurationElements();
- for (int j = 0; j < elems.length; ++j) {
- if (!MANAGER_EXTENSION.equals(elems[j]
- .getName()))
- continue;
-
- String name = elems[j].getAttribute(ATTR_NAME);
- if (name != null && name.length() > 0)
- DataShareServiceFactory
- .unregisterManager(name);
- }
-
- break;
- }
- }
- }
- };
-
- IExtensionRegistry reg = Platform.getExtensionRegistry();
- IConfigurationElement[] elems = reg.getConfigurationElementsFor(
- getBundle().getSymbolicName(), MANAGER_EXTENSION_POINT);
- registerManagers(elems);
- }
-
- private void registerManagers(IConfigurationElement[] elems) {
- for (int i = 0; i < elems.length; ++i) {
- if (!MANAGER_EXTENSION.equals(elems[i].getName()))
- continue;
-
- String name = elems[i].getAttribute(ATTR_NAME);
- if (name == null || name.length() == 0)
- continue;
-
- IDataShareServiceManager mgr;
- try {
- mgr = (IDataShareServiceManager) elems[i]
- .createExecutableExtension(ATTR_CLASS);
- } catch (Exception ex) {
- continue;
- }
-
- DataShareServiceFactory.registerManager(name, mgr);
- }
- }
-
- /**
- * This method is called when the plug-in is stopped
- */
- public void stop(BundleContext context) throws Exception {
- if (registryChangeListener != null)
- Platform.getExtensionRegistry().removeRegistryChangeListener(
- registryChangeListener);
-
- DataShareServiceFactory.unregisterAllManagers();
+ private static final String TRACE_PREFIX = PLUGIN_ID + "/";
+
+ private static EclipsePlugin plugin;
+
+ private static boolean tracingEnabled = Boolean.getBoolean(TRACE_PREFIX
+ + "debug");
+
+ private DataSharePlugin() {
+ }
+
+ public static void log(Object entry) {
+ if (plugin == null) {
+ if (entry instanceof Throwable)
+ ((Throwable) entry).printStackTrace();
+ else
+ System.err.println(entry);
+ } else {
+ plugin.log(entry);
+ }
+ }
+
+ public static boolean isTracing(String tag) {
+ if (tracingEnabled) {
+ return plugin == null ? Boolean.getBoolean(TRACE_PREFIX + tag)
+ : plugin.isTracing(tag);
+ } else
+ return false;
+ }
+
+ public static PrintStream getTraceLog() {
+ return System.out;
+ }
+
+ public static class EclipsePlugin extends Plugin {
+
+ private static final String MANAGER_EXTENSION_POINT = "servicemanager";
+
+ private static final String MANAGER_EXTENSION = "manager";
+
+ private static final String ATTR_NAME = "name";
+
+ private static final String ATTR_CLASS = "class";
+
+ private static final String PROVIDER_EXTENSION_POINT = "updateprovider";
+
+ private static final String PROVIDER_EXTENSION = "updateProvider";
+
+ private static final String ATTR_ID = "id";
+
+ private static final String ATTR_FACTORY = "factory";
+
+ private IRegistryChangeListener registryChangeListener;
+
+ public EclipsePlugin() {
+ plugin = this;
+ tracingEnabled = Platform.inDebugMode();
+ }
+
+ /**
+ * This method is called upon plug-in activation
+ */
+ public void start(BundleContext context) throws Exception {
+ super.start(context);
+ final String namespace = getBundle().getSymbolicName();
+ registryChangeListener = new IRegistryChangeListener() {
+ public void registryChanged(IRegistryChangeEvent event) {
+ IExtensionDelta[] deltas = event
+ .getExtensionDeltas(namespace);
+ for (int i = 0; i < deltas.length; ++i) {
+ IConfigurationElement[] elems = deltas[i]
+ .getExtension().getConfigurationElements();
+ switch (deltas[i].getKind()) {
+ case IExtensionDelta.ADDED:
+ if (deltas[i].getExtensionPoint()
+ .getSimpleIdentifier().equals(
+ MANAGER_EXTENSION_POINT))
+ registerManagers(elems);
+ else if (deltas[i].getExtensionPoint()
+ .getSimpleIdentifier().equals(
+ PROVIDER_EXTENSION_POINT))
+ registerProviders(elems);
+
+ break;
+
+ case IExtensionDelta.REMOVED:
+ if (deltas[i].getExtensionPoint()
+ .getSimpleIdentifier().equals(
+ MANAGER_EXTENSION_POINT))
+ unregisterManagers(elems);
+ else if (deltas[i].getExtensionPoint()
+ .getSimpleIdentifier().equals(
+ PROVIDER_EXTENSION_POINT))
+ unregisterProviders(elems);
+ break;
+ }
+ }
+ }
+ };
+
+ IExtensionRegistry reg = Platform.getExtensionRegistry();
+ reg.addRegistryChangeListener(registryChangeListener, namespace);
+ IConfigurationElement[] elems = reg.getConfigurationElementsFor(
+ namespace, MANAGER_EXTENSION_POINT);
+ registerManagers(elems);
+ elems = reg.getConfigurationElementsFor(namespace,
+ PROVIDER_EXTENSION_POINT);
+ registerProviders(elems);
+ }
+
+ private void registerManagers(IConfigurationElement[] elems) {
+ for (int i = 0; i < elems.length; ++i) {
+ if (!MANAGER_EXTENSION.equals(elems[i].getName()))
+ continue;
+
+ String name = elems[i].getAttribute(ATTR_NAME);
+ if (name == null || name.length() == 0)
+ continue;
+
+ IDataShareServiceManager mgr;
+ try {
+ mgr = (IDataShareServiceManager) elems[i]
+ .createExecutableExtension(ATTR_CLASS);
+ } catch (Exception ex) {
+ continue;
+ }
+
+ DataShareServiceFactory.registerManager(name, mgr);
+ }
+ }
+
+ private void registerProviders(IConfigurationElement[] elems) {
+ for (int i = 0; i < elems.length; ++i) {
+ if (!PROVIDER_EXTENSION.equals(elems[i].getName()))
+ continue;
+
+ String id = elems[i].getAttribute(ATTR_ID);
+ if (id == null || id.length() == 0)
+ continue;
+
+ IUpdateProviderFactory factory;
+ try {
+ factory = (IUpdateProviderFactory) elems[i]
+ .createExecutableExtension(ATTR_FACTORY);
+ } catch (Exception ex) {
+ continue;
+ }
+
+ UpdateProviderRegistry.registerFactory(id, factory);
+ }
+ }
+
+ private void unregisterManagers(IConfigurationElement[] elems) {
+ for (int i = 0; i < elems.length; ++i) {
+ if (!MANAGER_EXTENSION.equals(elems[i].getName()))
+ continue;
+
+ String name = elems[i].getAttribute(ATTR_NAME);
+ if (name != null && name.length() > 0)
+ DataShareServiceFactory.unregisterManager(name);
+ }
+ }
+
+ private void unregisterProviders(IConfigurationElement[] elems) {
+ for (int i = 0; i < elems.length; ++i) {
+ if (!PROVIDER_EXTENSION.equals(elems[i].getName()))
+ continue;
+
+ String id = elems[i].getAttribute(ATTR_ID);
+ if (id != null && id.length() > 0)
+ UpdateProviderRegistry.unregisterFactory(id);
+ }
+ }
+
+ /**
+ * This method is called when the plug-in is stopped
+ */
+ public void stop(BundleContext context) throws Exception {
+ if (registryChangeListener != null)
+ Platform.getExtensionRegistry().removeRegistryChangeListener(
+ registryChangeListener);
+
+ DataShareServiceFactory.unregisterAllManagers();
+ UpdateProviderRegistry.unregisterAllFactories();
plugin = null;
- super.stop(context);
- }
-
- public void log(Object entry) {
- IStatus status;
- if (entry instanceof IStatus)
- status = (IStatus) entry;
- else if (entry instanceof CoreException)
- status = ((CoreException) entry).getStatus();
- else if (entry instanceof Throwable) {
- Throwable t = (Throwable) entry;
- status = new Status(Status.ERROR,
- getBundle().getSymbolicName(), 0,
- t.getLocalizedMessage() == null ? "Unknown error." : t
- .getLocalizedMessage(), t);
- } else
- status = new Status(Status.WARNING, getBundle()
- .getSymbolicName(), 0, String.valueOf(entry),
- new RuntimeException().fillInStackTrace());
-
- getLog().log(status);
- }
-
- public boolean isTracing(String tag) {
- return Boolean.TRUE.equals(Boolean.valueOf(Platform
- .getDebugOption(TRACE_PREFIX + tag)));
- }
- }
+ super.stop(context);
+ }
+
+ public void log(Object entry) {
+ IStatus status;
+ if (entry instanceof IStatus)
+ status = (IStatus) entry;
+ else if (entry instanceof CoreException)
+ status = ((CoreException) entry).getStatus();
+ else if (entry instanceof Throwable) {
+ Throwable t = (Throwable) entry;
+ status = new Status(Status.ERROR,
+ getBundle().getSymbolicName(), 0,
+ t.getLocalizedMessage() == null ? "Unknown error." : t
+ .getLocalizedMessage(), t);
+ } else
+ status = new Status(Status.WARNING, getBundle()
+ .getSymbolicName(), 0, String.valueOf(entry),
+ new RuntimeException().fillInStackTrace());
+
+ getLog().log(status);
+ }
+
+ public boolean isTracing(String tag) {
+ return Boolean.TRUE.equals(Boolean.valueOf(Platform
+ .getDebugOption(TRACE_PREFIX + tag)));
+ }
+ }
}
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/DataShareService.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/DataShareService.java
new file mode 100644
index 000000000..a8eda900a
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/DataShareService.java
@@ -0,0 +1,91 @@
+/*******************************************************************************
+ * Copyright (c) 2005 Peter Nehrer and Composent, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Peter Nehrer - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.ecf.internal.datashare;
+
+import org.eclipse.ecf.core.ISharedObjectContainer;
+import org.eclipse.ecf.core.identity.ID;
+import org.eclipse.ecf.core.util.ECFException;
+import org.eclipse.ecf.datashare.IDataShareService;
+import org.eclipse.ecf.datashare.IPublicationCallback;
+import org.eclipse.ecf.datashare.ISharedData;
+import org.eclipse.ecf.datashare.ISubscriptionCallback;
+import org.eclipse.ecf.datashare.IUpdateProvider;
+
+/**
+ * @author pnehrer
+ */
+public class DataShareService implements IDataShareService {
+
+ private ServiceManager mgr;
+
+ private ISharedObjectContainer container;
+
+ public DataShareService(ServiceManager mgr, ISharedObjectContainer container) {
+ this.mgr = mgr;
+ this.container = container;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.datashare.IDataShareService#publish(java.lang.Object,
+ * org.eclipse.ecf.core.identity.ID,
+ * org.eclipse.ecf.datashare.IUpdateProvider,
+ * org.eclipse.ecf.datashare.IPublicationCallback)
+ */
+ public synchronized void publish(Object dataGraph, ID id,
+ IUpdateProvider provider, IPublicationCallback callback)
+ throws ECFException {
+ Agent agent = (Agent) container.getSharedObjectManager()
+ .getSharedObject(id);
+ if (agent != null)
+ throw new ECFException("Already published!");
+
+ IBootstrap bootstrap = getBootstrap();
+ agent = new Agent(dataGraph, bootstrap, provider, callback);
+ container.getSharedObjectManager().addSharedObject(id, agent, null,
+ null);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.datashare.IDataShareService#subscribe(org.eclipse.ecf.core.identity.ID,
+ * org.eclipse.ecf.datashare.ISubscriptionCallback)
+ */
+ public synchronized ISharedData subscribe(ID id,
+ ISubscriptionCallback callback) throws ECFException {
+ Agent agent = (Agent) container.getSharedObjectManager()
+ .getSharedObject(id);
+ if (agent == null)
+ return null; // TODO should we throw?
+
+ if (callback != null)
+ callback.dataSubscribed(agent, container.getConfig().getID());
+
+ return agent;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.datashare.IDataShareService#dispose()
+ */
+ public synchronized void dispose() {
+ mgr.dispose(container);
+ mgr = null;
+ container = null;
+ }
+
+ private IBootstrap getBootstrap() {
+ return new ServerBootstrap(); // TODO strategize
+ }
+}
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/LazyElectionBootstrap.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/LazyElectionBootstrap.java
index 923bf5cf1..3f79fe631 100644
--- a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/LazyElectionBootstrap.java
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/LazyElectionBootstrap.java
@@ -11,10 +11,10 @@
package org.eclipse.ecf.internal.datashare;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -82,7 +82,8 @@ public class LazyElectionBootstrap implements IBootstrap {
}
private void handleJoined(ID containerID) {
- List members = Arrays.asList(config.getContext().getGroupMemberIDs());
+ ArrayList members = new ArrayList(Arrays.asList(config.getContext()
+ .getGroupMemberIDs()));
members.remove(containerID);
members.remove(config.getContext().getLocalContainerID());
if (members.isEmpty())
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/ServiceManager.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/ServiceManager.java
new file mode 100644
index 000000000..4e0dd9df2
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/ServiceManager.java
@@ -0,0 +1,69 @@
+/*******************************************************************************
+ * Copyright (c) 2005 Peter Nehrer and Composent, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Peter Nehrer - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.ecf.internal.datashare;
+
+import java.util.Hashtable;
+
+import org.eclipse.ecf.core.ISharedObjectContainer;
+import org.eclipse.ecf.core.ISharedObjectContainerListener;
+import org.eclipse.ecf.core.events.IContainerEvent;
+import org.eclipse.ecf.core.events.ISharedObjectContainerDisposeEvent;
+import org.eclipse.ecf.core.util.ECFException;
+import org.eclipse.ecf.datashare.IDataShareService;
+import org.eclipse.ecf.datashare.IDataShareServiceManager;
+
+/**
+ * @author pnehrer
+ */
+public class ServiceManager implements IDataShareServiceManager {
+
+ private final Hashtable instances = new Hashtable();
+
+ private final Hashtable listeners = new Hashtable();
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.eclipse.ecf.datashare.IDataShareServiceManager#getInstance(org.eclipse.ecf.core.ISharedObjectContainer)
+ */
+ public synchronized IDataShareService getInstance(
+ ISharedObjectContainer container) throws ECFException {
+ IDataShareService svc = (IDataShareService) instances.get(container);
+ if (svc == null) {
+ svc = new DataShareService(this, container);
+ instances.put(container, svc);
+ DisposeListener listener = new DisposeListener(container);
+ listeners.put(container, listener);
+ container.addListener(listener, null);
+ }
+
+ return svc;
+ }
+
+ public synchronized void dispose(ISharedObjectContainer container) {
+ instances.remove(container);
+ listeners.remove(container);
+ }
+
+ private class DisposeListener implements ISharedObjectContainerListener {
+
+ private final ISharedObjectContainer container;
+
+ public DisposeListener(ISharedObjectContainer container) {
+ this.container = container;
+ }
+
+ public void handleEvent(IContainerEvent evt) {
+ if (evt instanceof ISharedObjectContainerDisposeEvent)
+ dispose(container);
+ }
+ }
+}
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/Version.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/Version.java
deleted file mode 100644
index 2c3f1bcb2..000000000
--- a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/Version.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*******************************************************************************
- * Copyright (c) 2005 Peter Nehrer and Composent, Inc.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Peter Nehrer - initial API and implementation
- *******************************************************************************/
-package org.eclipse.ecf.internal.datashare;
-
-import java.io.Serializable;
-
-import org.eclipse.ecf.core.identity.ID;
-
-/**
- * @author pnehrer
- */
-public class Version implements Serializable {
-
- private static final long serialVersionUID = 3258415036413456951L;
-
- private final long sequence;
-
- private final ID containerID;
-
- public Version(ID sourceID) {
- this(0, sourceID);
- }
-
- private Version(long sequence, ID sourceID) {
- this.sequence = sequence;
- this.containerID = sourceID;
- }
-
- public long getSequence() {
- return sequence;
- }
-
- public ID getContainerID() {
- return containerID;
- }
-
- public Version getNext(ID sourceID) {
- return new Version(sequence + 1, sourceID);
- }
-
- public boolean equals(Object other) {
- if (other instanceof Version) {
- Version o = (Version) other;
- return sequence == o.sequence && containerID.equals(o.containerID);
- } else
- return false;
- }
-
- public int hashCode() {
- int c = 17;
- c = 37 * c + (int) sequence;
- c = 37 * c + containerID.hashCode();
- return c;
- }
-
- public String toString() {
- StringBuffer buf = new StringBuffer("Version[");
- buf.append("sequence=").append(sequence).append(";");
- buf.append("containerID=").append(containerID).append("]");
- return buf.toString();
- }
-}

Back to the top