diff options
| author | pnehrer | 2005-05-24 21:34:13 +0000 |
|---|---|---|
| committer | pnehrer | 2005-05-24 21:34:13 +0000 |
| commit | d5458aa477ce600b10cbf0b2f18a0ab453c4e48e (patch) | |
| tree | a39382d26eab7d91096217eefe11cf5930337c18 | |
| parent | f8421ee28241a9e15b7e84c8dff48152c657778c (diff) | |
| download | org.eclipse.ecf-d5458aa477ce600b10cbf0b2f18a0ab453c4e48e.tar.gz org.eclipse.ecf-d5458aa477ce600b10cbf0b2f18a0ab453c4e48e.tar.xz org.eclipse.ecf-d5458aa477ce600b10cbf0b2f18a0ab453c4e48e.zip | |
Checkpoint.
35 files changed, 2458 insertions, 341 deletions
diff --git a/framework/bundles/org.eclipse.ecf.datashare/.options b/framework/bundles/org.eclipse.ecf.datashare/.options new file mode 100644 index 000000000..b9beb4172 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.datashare/.options @@ -0,0 +1,2 @@ +org.eclipse.ecf.datashare/debug=true +org.eclipse.ecf.datashare/ConsistentMulticaster=true diff --git a/framework/bundles/org.eclipse.ecf.datashare/META-INF/MANIFEST.MF b/framework/bundles/org.eclipse.ecf.datashare/META-INF/MANIFEST.MF index a1197e513..e79bfa983 100644 --- a/framework/bundles/org.eclipse.ecf.datashare/META-INF/MANIFEST.MF +++ b/framework/bundles/org.eclipse.ecf.datashare/META-INF/MANIFEST.MF @@ -4,9 +4,13 @@ Bundle-Name: ECF DataShare Plug-in Bundle-SymbolicName: org.eclipse.ecf.datashare Bundle-Version: 1.0.0 Bundle-ClassPath: datashare.jar -Bundle-Activator: org.eclipse.ecf.internal.datashare.DataSharePlugin +Bundle-Activator: org.eclipse.ecf.internal.datashare.DataSharePlugin$EclipsePlugin Bundle-Vendor: Eclipse.org Bundle-Localization: plugin Require-Bundle: org.eclipse.core.runtime, - org.eclipse.ecf + org.eclipse.ecf;visibility:=reexport Eclipse-AutoStart: true +Export-Package: org.eclipse.ecf.datashare, + org.eclipse.ecf.datashare.multicast, + org.eclipse.ecf.datashare.util, + org.eclipse.ecf.internal.datashare diff --git a/framework/bundles/org.eclipse.ecf.datashare/build.properties b/framework/bundles/org.eclipse.ecf.datashare/build.properties index 0b4cfbcea..4578b1dd7 100644 --- a/framework/bundles/org.eclipse.ecf.datashare/build.properties +++ b/framework/bundles/org.eclipse.ecf.datashare/build.properties @@ -3,4 +3,4 @@ output.datashare.jar = bin/ bin.includes = META-INF/,\ datashare.jar,\ schema/,\ - build.properties + plugin.xml diff --git a/framework/bundles/org.eclipse.ecf.datashare/notes.txt b/framework/bundles/org.eclipse.ecf.datashare/notes.txt new file mode 100644 index 000000000..d929e526d --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.datashare/notes.txt @@ -0,0 +1,22 @@ +Service +- create/obtain Agents +- manage subscriptions + +Agent +- create remote replicas when/where needed +- stay alive only when connected +- client facade +- coordinate sub-components to implement the service + +Synchronizer +- pause/resume messages (commits) + +Coordinator +- ensures there's exactly one coordinator replica (when needed) + +Multicaster +- implements 1/2/3-phase "commit" protocol + +Disposer (Garbage Collector) +- dispose Agents with no subscriptions + diff --git a/framework/bundles/org.eclipse.ecf.datashare/plugin.xml b/framework/bundles/org.eclipse.ecf.datashare/plugin.xml index 19c6ead29..a62303377 100644 --- a/framework/bundles/org.eclipse.ecf.datashare/plugin.xml +++ b/framework/bundles/org.eclipse.ecf.datashare/plugin.xml @@ -1,6 +1,19 @@ <?xml version="1.0" encoding="UTF-8"?> <?eclipse version="3.0"?> <plugin> - <extension-point id="manager" name="DataShare Manager" schema="schema/manager.exsd"/> + <extension-point id="servicemanager" name="DataShare Service Manager" schema="schema/servicemanager.exsd"/> + <extension-point id="updateprovider" name="DataShare Update Provider" schema="schema/updateprovider.exsd"/> + <extension + point="org.eclipse.ecf.datashare.servicemanager"> + <manager + class="org.eclipse.ecf.internal.datashare.ServiceManager" + name="org.eclipse.ecf.datashare.default"/> + </extension> + <extension + point="org.eclipse.ecf.datashare.updateprovider"> + <updateProvider + factory="org.eclipse.ecf.datashare.util.TrackedSetUpdateProvider$Factory" + id="org.eclipse.ecf.datashare.util.TrackedSet"/> + </extension> </plugin> diff --git a/framework/bundles/org.eclipse.ecf.datashare/schema/manager.exsd b/framework/bundles/org.eclipse.ecf.datashare/schema/servicemanager.exsd index e1a5e2aec..5444bd54a 100644 --- a/framework/bundles/org.eclipse.ecf.datashare/schema/manager.exsd +++ b/framework/bundles/org.eclipse.ecf.datashare/schema/servicemanager.exsd @@ -3,7 +3,7 @@ <schema targetNamespace="org.eclipse.ecf.datashare"> <annotation> <appInfo> - <meta.schema plugin="org.eclipse.ecf.datashare" id="manager" name="Data Graph Sharing Manager"/> + <meta.schema plugin="org.eclipse.ecf.datashare" id="manager" name="DataShare Service Manager"/> </appInfo> <documentation> Allows plugins to provide custom implementations of the Data Graph Sharing facility by registering uniquely named Data Graph Sharing Managers (classes that implement <code>org.eclipse.ecf.sdo.IDataGraphSharingManager</code>), which are responsible for producing per-container instances of the Data Graph Sharing service implementation (<code>org.eclipse.ecf.sdo.IDataGraphSharing</code>). diff --git a/framework/bundles/org.eclipse.ecf.datashare/schema/updateprovider.exsd b/framework/bundles/org.eclipse.ecf.datashare/schema/updateprovider.exsd new file mode 100644 index 000000000..2c189fbfa --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.datashare/schema/updateprovider.exsd @@ -0,0 +1,117 @@ +<?xml version='1.0' encoding='UTF-8'?> +<!-- Schema file written by PDE --> +<schema targetNamespace="org.eclipse.ecf.datashare"> +<annotation> + <appInfo> + <meta.schema plugin="org.eclipse.ecf.datashare" id="updateprovider" name="DataShare Update Provider"/> + </appInfo> + <documentation> + [Enter description of this extension point.] + </documentation> + </annotation> + + <element name="extension"> + <complexType> + <sequence> + <element ref="updateProvider" minOccurs="1" maxOccurs="unbounded"/> + </sequence> + <attribute name="point" type="string" use="required"> + <annotation> + <documentation> + + </documentation> + </annotation> + </attribute> + <attribute name="id" type="string"> + <annotation> + <documentation> + + </documentation> + </annotation> + </attribute> + <attribute name="name" type="string"> + <annotation> + <documentation> + + </documentation> + <appInfo> + <meta.attribute translatable="true"/> + </appInfo> + </annotation> + </attribute> + </complexType> + </element> + + <element name="updateProvider"> + <annotation> + <appInfo> + <meta.element labelAttribute="id"/> + </appInfo> + </annotation> + <complexType> + <attribute name="id" type="string" use="required"> + <annotation> + <documentation> + + </documentation> + </annotation> + </attribute> + <attribute name="factory" type="string" use="required"> + <annotation> + <documentation> + + </documentation> + <appInfo> + <meta.attribute kind="java" basedOn="org.eclipse.ecf.datashare.IUpdateProvider"/> + </appInfo> + </annotation> + </attribute> + </complexType> + </element> + + <annotation> + <appInfo> + <meta.section type="since"/> + </appInfo> + <documentation> + [Enter the first release in which this extension point appears.] + </documentation> + </annotation> + + <annotation> + <appInfo> + <meta.section type="examples"/> + </appInfo> + <documentation> + [Enter extension point usage example here.] + </documentation> + </annotation> + + <annotation> + <appInfo> + <meta.section type="apiInfo"/> + </appInfo> + <documentation> + [Enter API information here.] + </documentation> + </annotation> + + <annotation> + <appInfo> + <meta.section type="implementation"/> + </appInfo> + <documentation> + [Enter information about supplied implementation of this extension point.] + </documentation> + </annotation> + + <annotation> + <appInfo> + <meta.section type="copyright"/> + </appInfo> + <documentation> + + </documentation> + </annotation> + +</schema> diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/IDataShareService.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/IDataShareService.java index afa10f171..8da643270 100644 --- a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/IDataShareService.java +++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/IDataShareService.java @@ -22,7 +22,6 @@ public interface IDataShareService { /** * Publishes the given data graph under the given id. - * * @param dataGraph * local data graph instance to share * @param id @@ -30,36 +29,28 @@ public interface IDataShareService { * @param provider * update provider compatible with the given data graph's * implementation - * @param consumer - * application-specific update consumer * @param callback * optional callback used to notify the caller about publication * status - * @return shared data graph + * * @throws ECFException */ - ISharedData publish(Object dataGraph, ID id, IUpdateProvider provider, - IUpdateListener consumer, IPublicationCallback callback) + void publish(Object dataGraph, ID id, IUpdateProvider provider, + IPublicationCallback callback) throws ECFException; /** * Subscribes to a data graph with the given id. - * * @param id * identifier of a previously-published data graph - * @param provider - * update provider compatible with the given data graph's - * implementation - * @param consumer - * application-specific update consumer * @param callback * optional callback used to notify the caller about subscription * status + * * @return shared data graph * @throws ECFException */ - ISharedData subscribe(ID id, IUpdateProvider provider, - IUpdateListener consumer, ISubscriptionCallback callback) + ISharedData subscribe(ID id, ISubscriptionCallback callback) throws ECFException; /** diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/IUpdateProvider.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/IUpdateProvider.java index 2c3c2a5f2..ea258a562 100644 --- a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/IUpdateProvider.java +++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/IUpdateProvider.java @@ -10,8 +10,6 @@ *******************************************************************************/ package org.eclipse.ecf.datashare; -import java.io.IOException; - import org.eclipse.ecf.core.util.ECFException; /** @@ -30,7 +28,9 @@ import org.eclipse.ecf.core.util.ECFException; * @author pnehrer */ public interface IUpdateProvider { - + + IUpdateProviderFactory getFactory(); + /** * Creates an update from the given data graph, which will be forwarded to * other group members. The implementor may use the graph's Change Summary @@ -42,7 +42,7 @@ public interface IUpdateProvider { * @throws ECFException * when an update cannot be created */ - byte[] createUpdate(ISharedData graph) throws ECFException; + Object createUpdate(ISharedData graph) throws ECFException; /** * Applies the remote update to the given data graph. The implementor is @@ -56,26 +56,4 @@ public interface IUpdateProvider { * when this update cannot be applied */ void applyUpdate(ISharedData graph, Object data) throws ECFException; - - /** - * Serializes the given data graph. - * - * @param graph - * data graph instance to serialize - * @return serialized data graph - * @throws IOException - */ - Object serializeData(Object graph) throws IOException; - - /** - * Deserializes the given data graph. - * - * @param data - * serialized data graph - * @return deserialized instance of data graph - * @throws IOException - * @throws ClassNotFoundException - */ - Object deserializeData(Object data) throws IOException, - ClassNotFoundException; } diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/IUpdateProviderFactory.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/IUpdateProviderFactory.java new file mode 100644 index 000000000..bbb23679e --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/IUpdateProviderFactory.java @@ -0,0 +1,23 @@ +/******************************************************************************* + * Copyright (c) 2005 Peter Nehrer and Composent, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Peter Nehrer - initial API and implementation + *******************************************************************************/ +package org.eclipse.ecf.datashare; + +import java.util.Map; + +/** + * @author pnehrer + */ +public interface IUpdateProviderFactory { + + String getID(); + + IUpdateProvider createProvider(Map params); +} diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/UpdateProviderRegistry.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/UpdateProviderRegistry.java new file mode 100644 index 000000000..2015adc2b --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/UpdateProviderRegistry.java @@ -0,0 +1,45 @@ +/******************************************************************************* + * Copyright (c) 2005 Peter Nehrer and Composent, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Peter Nehrer - initial API and implementation + *******************************************************************************/ +package org.eclipse.ecf.datashare; + +import java.util.Hashtable; +import java.util.Map; + +/** + * @author pnehrer + */ +public class UpdateProviderRegistry { + + private static final Hashtable providers = new Hashtable(); + + private UpdateProviderRegistry() { + } + + public static IUpdateProvider createProvider(String id, Map params) { + IUpdateProviderFactory f = (IUpdateProviderFactory) providers.get(id); + if (f == null) + return null; + else + return f.createProvider(params); + } + + public static void registerFactory(String id, IUpdateProviderFactory f) { + providers.put(id, f); + } + + public static void unregisterFactory(String id) { + providers.remove(id); + } + + public static void unregisterAllFactories() { + providers.clear(); + } +} diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Abort.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Abort.java new file mode 100644 index 000000000..1fb62f8fc --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Abort.java @@ -0,0 +1,34 @@ +/******************************************************************************* + * Copyright (c) 2005 Peter Nehrer and Composent, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Peter Nehrer - initial API and implementation + *******************************************************************************/ +package org.eclipse.ecf.datashare.multicast; + +import java.io.Serializable; + +public class Abort implements Serializable { + + private static final long serialVersionUID = 3258410616858294325L; + + final Version version; + + public Abort(Version version) { + this.version = version; + } + + public Version getVersion() { + return version; + } + + public String toString() { + StringBuffer buf = new StringBuffer("Abort[version="); + buf.append(version).append(']'); + return buf.toString(); + } +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/AbstractMulticaster.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/AbstractMulticaster.java new file mode 100644 index 000000000..a8d4e01fd --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/AbstractMulticaster.java @@ -0,0 +1,331 @@ +/******************************************************************************* + * Copyright (c) 2005 Peter Nehrer and Composent, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Peter Nehrer - initial API and implementation + *******************************************************************************/ +package org.eclipse.ecf.datashare.multicast; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; + +import org.eclipse.ecf.core.ISharedObject; +import org.eclipse.ecf.core.ISharedObjectConfig; +import org.eclipse.ecf.core.ISharedObjectContext; +import org.eclipse.ecf.core.SharedObjectInitException; +import org.eclipse.ecf.core.events.ISharedObjectActivatedEvent; +import org.eclipse.ecf.core.events.ISharedObjectContainerDepartedEvent; +import org.eclipse.ecf.core.events.ISharedObjectDeactivatedEvent; +import org.eclipse.ecf.core.events.ISharedObjectMessageEvent; +import org.eclipse.ecf.core.identity.ID; +import org.eclipse.ecf.core.util.ECFException; +import org.eclipse.ecf.core.util.Event; +import org.eclipse.ecf.internal.datashare.DataSharePlugin; + +/** + * @author pnehrer + */ +public abstract class AbstractMulticaster implements ISharedObject { + + public static final short NEW = 0; + + public static final short READY = 1; + + public static final short PAUSED = 2; + + public static final short DISPOSED = 3; + + public class Testable { + + public Version getVersion() { + return version; + } + + public short getState() { + return state; + } + + public String getStateCode() { + return getStateStr(); + } + } + + protected ISharedObjectConfig config; + + protected ID sharedObjectID; + + protected ISharedObjectContext context; + + protected ID localContainerID; + + protected ID groupID; + + protected Version version; + + protected short state = NEW; + + protected final HashSet pauses = new HashSet(); + + protected HashSet pauseRequests; + + protected Testable testable; + + public abstract boolean sendMessage(Object message) throws ECFException; + + public synchronized void pause() throws ECFException, IllegalStateException { + if (pauses.contains(localContainerID)) + throw new IllegalStateException(); + + while (state == NEW) { + try { + wait(); + } catch (InterruptedException e) { + throw new ECFException(e); + } + } + + if (state == DISPOSED) + throw new IllegalStateException(); + + boolean wasEmpty = pauses.isEmpty(); + pauses.add(localContainerID); + pauseRequests = new HashSet(Arrays.asList(context.getGroupMemberIDs())); + pauseRequests.remove(localContainerID); + try { + context.sendMessage(null, new Pause()); + synchronized (pauses) { + pauses.wait(1000); + } + + if (!pauseRequests.isEmpty()) + throw new ECFException("Failed to pause."); + } catch (IOException e) { + pauses.remove(localContainerID); + throw new ECFException(e); + } catch (InterruptedException e) { + pauses.remove(localContainerID); + throw new ECFException(e); + } finally { + if (wasEmpty && !pauses.isEmpty()) + notify(); + } + } + + public synchronized void resume() throws ECFException { + if (state == DISPOSED) + throw new IllegalStateException(); + + if (!pauses.contains(localContainerID)) + throw new IllegalStateException(); + + try { + context.sendMessage(null, new Resume()); + pauses.remove(localContainerID); + } catch (IOException e) { + throw new ECFException(e); + } finally { + if (pauses.isEmpty()) + notify(); + } + } + + protected abstract void receiveMessage(Object message); + + protected synchronized boolean waitToSend() { + while (state != READY || !pauses.isEmpty()) { + if (state == DISPOSED) + return false; + + try { + wait(); + } catch (InterruptedException e) { + DataSharePlugin.log(e); + return false; + } + } + + return true; + } + + protected void traceEntry(String method) { + StringBuffer buf = new StringBuffer("> "); + buf.append(getStateStr()); + buf.append(' '); + buf.append(localContainerID); + buf.append(": "); + buf.append(method); + DataSharePlugin.getTraceLog().println(buf); + } + + protected void traceExit(String method) { + StringBuffer buf = new StringBuffer("< "); + buf.append(getStateStr()); + buf.append(' '); + buf.append(localContainerID); + buf.append(": "); + buf.append(method); + DataSharePlugin.getTraceLog().println(buf); + } + + protected String getStateStr() { + switch (state) { + case NEW: + return "NEW"; + case READY: + return "RDY"; + case DISPOSED: + return "DSP"; + default: + return "UNK"; + } + } + + public synchronized Testable getTestable() { + if (testable == null) + testable = new Testable(); + + return testable; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObject#init(org.eclipse.ecf.core.ISharedObjectConfig) + */ + public synchronized void init(ISharedObjectConfig config) + throws SharedObjectInitException { + this.config = config; + sharedObjectID = config.getSharedObjectID(); + Map params = config.getProperties(); + if (params != null) { + Object param = params.get("version"); + if (param instanceof Version) + version = (Version) param; + } + + if (version == null) + version = new Version(sharedObjectID, 0); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObject#handleEvent(org.eclipse.ecf.core.util.Event) + */ + public void handleEvent(Event event) { + if (event instanceof ISharedObjectActivatedEvent) + handleActivated((ISharedObjectActivatedEvent) event); + else if (event instanceof ISharedObjectDeactivatedEvent) + handleDeactivated((ISharedObjectDeactivatedEvent) event); + else if (event instanceof ISharedObjectContainerDepartedEvent) + handleDeparted((ISharedObjectContainerDepartedEvent) event); + else if (event instanceof ISharedObjectMessageEvent) { + ISharedObjectMessageEvent e = (ISharedObjectMessageEvent) event; + if (e.getData() instanceof Message) + handleMessage(e.getRemoteContainerID(), (Message) e.getData()); + else if (e.getData() instanceof Pause) + handlePause(e.getRemoteContainerID(), (Pause) e.getData()); + else if (e.getData() instanceof Paused) + handlePaused(e.getRemoteContainerID(), (Paused) e.getData()); + else if (e.getData() instanceof Resume) + handleResume(e.getRemoteContainerID(), (Resume) e.getData()); + } + } + + protected void handleActivated(ISharedObjectActivatedEvent event) { + if (event.getActivatedID().equals(sharedObjectID)) { + context = config.getContext(); + localContainerID = context.getLocalContainerID(); + groupID = context.getGroupID(); + if (groupID == null) + try { + context.sendDispose(localContainerID); + } catch (IOException e) { + DataSharePlugin.log(e); + } + else { + synchronized (this) { + state = READY; + notifyAll(); + } + } + } + } + + protected void handleDeactivated(ISharedObjectDeactivatedEvent event) { + if (event.getDeactivatedID().equals(sharedObjectID)) { + synchronized (this) { + state = DISPOSED; + notifyAll(); + } + } + } + + protected void handleDeparted(ISharedObjectContainerDepartedEvent event) { + if (event.getDepartedContainerID().equals(localContainerID)) + context.getSharedObjectManager().removeSharedObject(sharedObjectID); + } + + protected synchronized void handlePause(ID remoteContainerID, Pause pause) { + if (pauses.isEmpty()) + notify(); + + pauses.add(remoteContainerID); + } + + protected synchronized void handlePaused(ID remoteContainerID, Paused paused) { + if (pauses.contains(localContainerID) + && pauseRequests != null + && pauseRequests.remove(remoteContainerID) + && pauseRequests.isEmpty()) + synchronized (pauses) { + pauses.notify(); + } + } + + protected synchronized void handleResume(ID remoteContainerID, Resume resume) { + if (pauses.remove(remoteContainerID) && pauses.isEmpty()) + notify(); + } + + protected void handleMessage(ID remoteContainerID, Message message) { + synchronized (this) { + version = message.getVersion(); + } + + receiveMessage(message.getData()); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObject#handleEvents(org.eclipse.ecf.core.util.Event[]) + */ + public void handleEvents(Event[] events) { + for (int i = 0; i < events.length; ++i) + handleEvent(events[i]); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObject#dispose(org.eclipse.ecf.core.identity.ID) + */ + public void dispose(ID containerID) { + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObject#getAdapter(java.lang.Class) + */ + public Object getAdapter(Class clazz) { + return null; + } +} diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Ack.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Ack.java new file mode 100644 index 000000000..c61ebcec3 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Ack.java @@ -0,0 +1,34 @@ +/******************************************************************************* + * Copyright (c) 2005 Peter Nehrer and Composent, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Peter Nehrer - initial API and implementation + *******************************************************************************/ +package org.eclipse.ecf.datashare.multicast; + +import java.io.Serializable; + +public class Ack implements Serializable { + + private static final long serialVersionUID = 3832621776860952377L; + + final Version version; + + public Ack(Version version) { + this.version = version; + } + + public Version getVersion() { + return version; + } + + public String toString() { + StringBuffer buf = new StringBuffer("Ack[version="); + buf.append(version).append(']'); + return buf.toString(); + } +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Commit.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Commit.java new file mode 100644 index 000000000..d66c30b6b --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Commit.java @@ -0,0 +1,34 @@ +/******************************************************************************* + * Copyright (c) 2005 Peter Nehrer and Composent, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Peter Nehrer - initial API and implementation + *******************************************************************************/ +package org.eclipse.ecf.datashare.multicast; + +import java.io.Serializable; + +public class Commit implements Serializable { + + private static final long serialVersionUID = 3258126938529740848L; + + final Version version; + + public Commit(Version version) { + this.version = version; + } + + public Version getVersion() { + return version; + } + + public String toString() { + StringBuffer buf = new StringBuffer("Commit[version="); + buf.append(version).append(']'); + return buf.toString(); + } +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/ConsistentMulticaster.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/ConsistentMulticaster.java new file mode 100644 index 000000000..c9883c5f0 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/ConsistentMulticaster.java @@ -0,0 +1,395 @@ +/******************************************************************************* + * Copyright (c) 2005 Peter Nehrer and Composent, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Peter Nehrer - initial API and implementation + *******************************************************************************/ +package org.eclipse.ecf.datashare.multicast; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Timer; + +import org.eclipse.ecf.core.ISharedObjectConfig; +import org.eclipse.ecf.core.SharedObjectInitException; +import org.eclipse.ecf.core.events.ISharedObjectContainerDepartedEvent; +import org.eclipse.ecf.core.events.ISharedObjectMessageEvent; +import org.eclipse.ecf.core.identity.ID; +import org.eclipse.ecf.core.util.ECFException; +import org.eclipse.ecf.core.util.Event; +import org.eclipse.ecf.internal.datashare.DataSharePlugin; + +/** + * @author pnehrer + */ +public class ConsistentMulticaster extends AbstractMulticaster implements + Timeout.Listener { + + public static final short SEND = 3; + + public static final short RECEIVE = 4; + + public static final long DEFAULT_TIMEOUT = 1000; + + public static final String TRACE_TAG = "ConsistentMulticaster"; + + private long sendTimeout; + + private long receiveTimeout; + + private Version nextVersion; + + private HashSet requests; + + private final Timer timer = new Timer(); + + private final HashMap timeouts = new HashMap(); + + private final HashMap commits = new HashMap(); + + private boolean granted; + + public synchronized boolean sendMessage(Object message) throws ECFException { + String method = null; + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceEntry(method = "sendMessage[message=" + message + "]"); + + try { + if (!waitToSend()) + return false; + + state = SEND; + nextVersion = new Version(localContainerID, + version.getSequence() + 1); + ArrayList others = new ArrayList(Arrays.asList(context + .getGroupMemberIDs())); + others.remove(localContainerID); + requests = new HashSet(others); + granted = true; + try { + context.sendMessage(null, new Request(nextVersion)); + wait(sendTimeout); + if (state != SEND) + return false; + + if (!granted || !requests.isEmpty()) { + context.sendMessage(null, new Abort(nextVersion)); + return false; + } + + requests.addAll(others); + context.sendMessage(null, new Message(nextVersion, message)); + wait(sendTimeout); + if (state != SEND) + return false; + + if (!requests.isEmpty()) { + context.sendMessage(null, new Abort(nextVersion)); + return false; + } + + context.sendMessage(null, new Commit(nextVersion)); + version = nextVersion; + return true; + } catch (IOException e) { + throw new ECFException(e); + } catch (InterruptedException e) { + throw new ECFException(e); + } finally { + state = READY; + notify(); + } + } finally { + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceExit(method); + } + } + + public String getStateStr() { + switch (state) { + case SEND: + return "SND"; + case RECEIVE: + return "RCV"; + default: + return super.getStateStr(); + } + } + + protected void receiveMessage(Object message) { + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObject#init(org.eclipse.ecf.core.ISharedObjectConfig) + */ + public synchronized void init(ISharedObjectConfig config) + throws SharedObjectInitException { + super.init(config); + + sendTimeout = DEFAULT_TIMEOUT; + receiveTimeout = DEFAULT_TIMEOUT; + + Map params = config.getProperties(); + if (params != null) { + Object param = params.get("sendTimeout"); + if (param instanceof Long) + sendTimeout = ((Long) param).longValue(); + + param = params.get("receiveTimeout"); + if (param instanceof Long) + receiveTimeout = ((Long) param).longValue(); + } + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObject#handleEvent(org.eclipse.ecf.core.util.Event) + */ + public void handleEvent(Event event) { + super.handleEvent(event); + if (event instanceof ISharedObjectMessageEvent) { + ISharedObjectMessageEvent e = (ISharedObjectMessageEvent) event; + if (e.getData() instanceof Request) + handleRequest(e.getRemoteContainerID(), (Request) e.getData()); + else if (e.getData() instanceof Reply) + handleReply(e.getRemoteContainerID(), (Reply) e.getData()); + else if (e.getData() instanceof Abort) + handleAbort(e.getRemoteContainerID(), (Abort) e.getData()); + else if (e.getData() instanceof Ack) + handleAck(e.getRemoteContainerID(), (Ack) e.getData()); + else if (e.getData() instanceof Commit) + handleCommit(e.getRemoteContainerID(), (Commit) e.getData()); + } + } + + protected void handleDeparted(ISharedObjectContainerDepartedEvent event) { + super.handleDeparted(event); + if (!event.getDepartedContainerID().equals(localContainerID)) { + synchronized (this) { + if (state == SEND) { + requests.remove(event.getDepartedContainerID()); + if (requests.isEmpty()) + notify(); + } else if (state == RECEIVE) { + Timeout[] t = (Timeout[]) timeouts.values().toArray( + new Timeout[timeouts.size()]); + for (int i = 0; i < t.length; ++i) { + if (t[i].getVersion().getSenderID().equals( + event.getDepartedContainerID()) + && t[i].cancel()) + timeouts.remove(t[i].getVersion()); + } + + for (Iterator i = commits.keySet().iterator(); i.hasNext();) { + Version version = (Version) i.next(); + if (version.getSenderID().equals( + event.getDepartedContainerID())) + i.remove(); + } + + if (timeouts.isEmpty()) { + state = READY; + notify(); + } + } + } + } + } + + private synchronized void handleRequest(ID remoteContainerID, + Request request) { + String method = null; + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceEntry(method = "handleRequest[remoteContainerID=" + + remoteContainerID + ";request=" + request + "]"); + + try { + if ((state == READY || state == RECEIVE) + && version.getSequence() + 1 == request.getVersion() + .getSequence()) { + state = RECEIVE; + Timeout timeout = new Timeout(this, request.getVersion()); + timeouts.put(request.getVersion(), timeout); + timer.schedule(timeout, receiveTimeout); + context.sendMessage(remoteContainerID, new Reply(request + .getVersion(), true)); + } else if (state != DISPOSED) + context.sendMessage(remoteContainerID, new Reply(request + .getVersion(), false)); + } catch (IOException e) { + DataSharePlugin.log(e); + } finally { + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceExit(method); + } + } + + private synchronized void handleReply(ID remoteContainerID, Reply reply) { + String method = null; + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceEntry(method = "handleReply[remoteContainerID=" + + remoteContainerID + ";reply=" + reply + "]"); + + try { + if (state == SEND && reply.getVersion().equals(nextVersion)) { + if (!reply.isGranted()) { + granted = false; + notify(); + } + + requests.remove(remoteContainerID); + if (requests.isEmpty()) + notify(); + } + } finally { + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceExit(method); + } + } + + protected synchronized void handleMessage(ID remoteContainerID, + Message message) { + String method = null; + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceEntry(method = "handleMessage[remoteContainerID=" + + remoteContainerID + ";message=" + message + "]"); + + try { + Timeout timeout; + if (((timeout = (Timeout) timeouts.get(message.getVersion())) != null) + && timeout.cancel()) { + commits.put(message.getVersion(), message.getData()); + timeout = new Timeout(this, message.getVersion()); + timeouts.put(message.getVersion(), timeout); + timer.schedule(timeout, 1000); + context.sendMessage(remoteContainerID, new Ack(message + .getVersion())); + } + } catch (IOException e) { + DataSharePlugin.log(e); + } finally { + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceExit(method); + } + } + + private synchronized void handleAbort(ID remoteContainerID, Abort abort) { + String method = null; + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceEntry(method = "handleAbort[remoteContainerID=" + + remoteContainerID + ";abort=" + abort + "]"); + + try { + Timeout timeout = (Timeout) timeouts.remove(abort.getVersion()); + if (timeout != null && timeout.cancel()) { + commits.remove(abort.getVersion()); + if (timeouts.isEmpty()) { + state = READY; + notify(); + } + } + } finally { + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceExit(method); + } + } + + private synchronized void handleAck(ID remoteContainerID, Ack ack) { + String method = null; + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceEntry(method = "handleAck[remoteContainerID=" + + remoteContainerID + ";ack=" + ack + "]"); + try { + if (state == SEND && ack.getVersion().equals(nextVersion)) { + requests.remove(remoteContainerID); + if (requests.isEmpty()) + notify(); + } + } finally { + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceExit(method); + } + } + + private synchronized void handleCommit(ID remoteContainerID, Commit commit) { + String method = null; + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceEntry(method = "handleCommit[remoteContainerID=" + + remoteContainerID + ";commit=" + commit + "]"); + + try { + Timeout timeout; + if (((timeout = (Timeout) timeouts.get(commit.getVersion())) != null) + && timeout.cancel()) { + version = commit.getVersion(); + timeouts.remove(version); + if (timeouts.isEmpty()) { + state = READY; + notify(); + } + + receiveMessage(commits.remove(version)); + } + } finally { + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceExit(method); + } + } + + public synchronized void timeout(Version version) { + String method = null; + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceEntry(method = "timeout[version=" + version + "]"); + + try { + timeouts.remove(version); + if (timeouts.isEmpty()) { + commits.remove(version); + state = READY; + notify(); + } + } finally { + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceExit(method); + } + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObject#handleEvents(org.eclipse.ecf.core.util.Event[]) + */ + public void handleEvents(Event[] events) { + for (int i = 0; i < events.length; ++i) + handleEvent(events[i]); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObject#dispose(org.eclipse.ecf.core.identity.ID) + */ + public void dispose(ID containerID) { + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObject#getAdapter(java.lang.Class) + */ + public Object getAdapter(Class clazz) { + return null; + } +} diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/Update.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Message.java index 85579ba98..74b293d15 100644 --- a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/Update.java +++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Message.java @@ -8,22 +8,19 @@ * Contributors: * Peter Nehrer - initial API and implementation *******************************************************************************/ -package org.eclipse.ecf.internal.datashare; +package org.eclipse.ecf.datashare.multicast; import java.io.Serializable; -/** - * @author pnehrer - */ -public class Update implements Serializable { +public class Message implements Serializable { - private static final long serialVersionUID = 3256439205344260914L; + private static final long serialVersionUID = 3257281414121993014L; - private final Version version; + final Version version; - private final Object data; + final Object data; - public Update(Version version, Object data) { + public Message(Version version, Object data) { this.version = version; this.data = data; } @@ -36,25 +33,10 @@ public class Update implements Serializable { return data; } - public boolean equals(Object other) { - if (other instanceof Update) { - Update o = (Update) other; - return version.equals(o.version) && data.equals(o.data); - } else - return false; - } - - public int hashCode() { - int c = 17; - c = 37 * c + version.hashCode(); - c = 37 * c + data.hashCode(); - return c; - } - public String toString() { - StringBuffer buf = new StringBuffer("Update["); - buf.append("version=").append(version).append(";"); - buf.append("data=").append(data).append("]"); + StringBuffer buf = new StringBuffer("Message[version="); + buf.append(version).append(";data="); + buf.append(data).append(']'); return buf.toString(); } -} +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/OrderedMulticaster.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/OrderedMulticaster.java new file mode 100644 index 000000000..d992c14e7 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/OrderedMulticaster.java @@ -0,0 +1,329 @@ +/******************************************************************************* + * Copyright (c) 2005 Peter Nehrer and Composent, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Peter Nehrer - initial API and implementation + *******************************************************************************/ +package org.eclipse.ecf.datashare.multicast; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Timer; + +import org.eclipse.ecf.core.ISharedObjectConfig; +import org.eclipse.ecf.core.SharedObjectInitException; +import org.eclipse.ecf.core.events.ISharedObjectContainerDepartedEvent; +import org.eclipse.ecf.core.events.ISharedObjectMessageEvent; +import org.eclipse.ecf.core.identity.ID; +import org.eclipse.ecf.core.util.ECFException; +import org.eclipse.ecf.core.util.Event; +import org.eclipse.ecf.internal.datashare.DataSharePlugin; + +/** + * @author pnehrer + */ +public class OrderedMulticaster extends AbstractMulticaster implements + Timeout.Listener { + + public static final short SEND = 3; + + public static final short RECEIVE = 4; + + public static final long DEFAULT_TIMEOUT = 1000; + + public static final String TRACE_TAG = "OrderedMulticaster"; + + private long sendTimeout; + + private long receiveTimeout; + + private Version nextVersion; + + private HashSet requests; + + private final Timer timer = new Timer(); + + private final HashMap timeouts = new HashMap(); + + private boolean granted; + + public synchronized boolean sendMessage(Object message) throws ECFException { + String method = null; + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceEntry(method = "sendMessage[message=" + message + "]"); + + try { + if (!waitToSend()) + return false; + + state = SEND; + nextVersion = new Version(localContainerID, + version.getSequence() + 1); + ArrayList others = new ArrayList(Arrays.asList(context + .getGroupMemberIDs())); + others.remove(localContainerID); + requests = new HashSet(others); + granted = true; + try { + context.sendMessage(null, new Request(nextVersion)); + wait(sendTimeout); + if (state != SEND) + return false; + + if (!granted || !requests.isEmpty()) { + context.sendMessage(null, new Abort(nextVersion)); + return false; + } + + context.sendMessage(null, new Message(nextVersion, message)); + version = nextVersion; + return true; + } catch (IOException e) { + throw new ECFException(e); + } catch (InterruptedException e) { + throw new ECFException(e); + } finally { + state = READY; + notify(); + } + } finally { + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceExit(method); + } + } + + public String getStateStr() { + switch (state) { + case SEND: + return "SND"; + case RECEIVE: + return "RCV"; + default: + return super.getStateStr(); + } + } + + protected void receiveMessage(Object message) { + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObject#init(org.eclipse.ecf.core.ISharedObjectConfig) + */ + public synchronized void init(ISharedObjectConfig config) + throws SharedObjectInitException { + super.init(config); + + sendTimeout = DEFAULT_TIMEOUT; + receiveTimeout = DEFAULT_TIMEOUT; + + Map params = config.getProperties(); + if (params != null) { + Object param = params.get("sendTimeout"); + if (param instanceof Long) + sendTimeout = ((Long) param).longValue(); + + param = params.get("receiveTimeout"); + if (param instanceof Long) + receiveTimeout = ((Long) param).longValue(); + } + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObject#handleEvent(org.eclipse.ecf.core.util.Event) + */ + public void handleEvent(Event event) { + super.handleEvent(event); + if (event instanceof ISharedObjectMessageEvent) { + ISharedObjectMessageEvent e = (ISharedObjectMessageEvent) event; + if (e.getData() instanceof Request) + handleRequest(e.getRemoteContainerID(), (Request) e.getData()); + else if (e.getData() instanceof Reply) + handleReply(e.getRemoteContainerID(), (Reply) e.getData()); + else if (e.getData() instanceof Abort) + handleAbort(e.getRemoteContainerID(), (Abort) e.getData()); + } + } + + protected void handleDeparted(ISharedObjectContainerDepartedEvent event) { + super.handleDeparted(event); + if (!event.getDepartedContainerID().equals(localContainerID)) { + synchronized (this) { + if (state == SEND) { + requests.remove(event.getDepartedContainerID()); + if (requests.isEmpty()) + notify(); + } else if (state == RECEIVE) { + Timeout[] t = (Timeout[]) timeouts.values().toArray( + new Timeout[timeouts.size()]); + for (int i = 0; i < t.length; ++i) { + if (t[i].getVersion().getSenderID().equals( + event.getDepartedContainerID()) + && t[i].cancel()) + timeouts.remove(t[i].getVersion()); + } + + if (timeouts.isEmpty()) { + state = READY; + notify(); + } + } + } + } + } + + private synchronized void handleRequest(ID remoteContainerID, + Request request) { + String method = null; + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceEntry(method = "handleRequest[remoteContainerID=" + + remoteContainerID + ";request=" + request + "]"); + + try { + if ((state == READY || state == RECEIVE) + && version.getSequence() + 1 == request.getVersion() + .getSequence()) { + Timeout timeout = new Timeout(this, request.getVersion()); + timeouts.put(request.getVersion(), timeout); + timer.schedule(timeout, receiveTimeout); + context.sendMessage(remoteContainerID, new Reply(request + .getVersion(), true)); + if (state == READY) + notify(); + + state = RECEIVE; + } else if (state != DISPOSED) + context.sendMessage(remoteContainerID, new Reply(request + .getVersion(), false)); + } catch (IOException e) { + DataSharePlugin.log(e); + } finally { + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceExit(method); + } + } + + private synchronized void handleReply(ID remoteContainerID, Reply reply) { + String method = null; + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceEntry(method = "handleReply[remoteContainerID=" + + remoteContainerID + ";reply=" + reply + "]"); + + try { + if (state == SEND && reply.getVersion().equals(nextVersion)) { + if (!reply.isGranted()) { + granted = false; + notify(); + } + + requests.remove(remoteContainerID); + if (requests.isEmpty()) + notify(); + } + } finally { + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceExit(method); + } + } + + protected synchronized void handleMessage(ID remoteContainerID, + Message message) { + String method = null; + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceEntry(method = "handleMessage[remoteContainerID=" + + remoteContainerID + ";message=" + message + "]"); + + try { + Timeout timeout; + if (((timeout = (Timeout) timeouts.get(message.getVersion())) != null) + && timeout.cancel()) { + version = message.getVersion(); + timeouts.remove(version); + if (timeouts.isEmpty()) { + state = READY; + notify(); + } + + receiveMessage(message.getData()); + } + } finally { + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceExit(method); + } + } + + private synchronized void handleAbort(ID remoteContainerID, Abort abort) { + String method = null; + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceEntry(method = "handleAbort[remoteContainerID=" + + remoteContainerID + ";abort=" + abort + "]"); + + try { + Timeout timeout = (Timeout) timeouts.remove(abort.getVersion()); + if (timeout != null && timeout.cancel()) { + if (timeouts.isEmpty()) { + state = READY; + notify(); + } + } + } finally { + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceExit(method); + } + } + + public synchronized void timeout(Version version) { + String method = null; + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceEntry(method = "timeout[version=" + version + "]"); + + try { + timeouts.remove(version); + if (timeouts.isEmpty()) { + state = READY; + notify(); + } + } finally { + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceExit(method); + } + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObject#handleEvents(org.eclipse.ecf.core.util.Event[]) + */ + public void handleEvents(Event[] events) { + for (int i = 0; i < events.length; ++i) + handleEvent(events[i]); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObject#dispose(org.eclipse.ecf.core.identity.ID) + */ + public void dispose(ID containerID) { + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.core.ISharedObject#getAdapter(java.lang.Class) + */ + public Object getAdapter(Class clazz) { + return null; + } +} diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Pause.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Pause.java new file mode 100644 index 000000000..4bf3ae694 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Pause.java @@ -0,0 +1,23 @@ +/******************************************************************************* + * Copyright (c) 2005 Peter Nehrer and Composent, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Peter Nehrer - initial API and implementation + *******************************************************************************/ +package org.eclipse.ecf.datashare.multicast; + +import java.io.Serializable; + +/** + * @author pnehrer + * + */ +public class Pause implements Serializable { + + private static final long serialVersionUID = 3258417222501347637L; + +} diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Paused.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Paused.java new file mode 100644 index 000000000..7c7f6d070 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Paused.java @@ -0,0 +1,22 @@ +/******************************************************************************* + * Copyright (c) 2005 Peter Nehrer and Composent, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Peter Nehrer - initial API and implementation + *******************************************************************************/ +package org.eclipse.ecf.datashare.multicast; + +import java.io.Serializable; + +/** + * @author pnehrer + */ +public class Paused implements Serializable { + + private static final long serialVersionUID = 3978705103752868913L; + +} diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Reply.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Reply.java new file mode 100644 index 000000000..19839bed2 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Reply.java @@ -0,0 +1,42 @@ +/******************************************************************************* + * Copyright (c) 2005 Peter Nehrer and Composent, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Peter Nehrer - initial API and implementation + *******************************************************************************/ +package org.eclipse.ecf.datashare.multicast; + +import java.io.Serializable; + +public class Reply implements Serializable { + + private static final long serialVersionUID = 3689632497314837046L; + + final Version version; + + final boolean granted; + + public Reply(Version version, boolean granted) { + this.version = version; + this.granted = granted; + } + + public Version getVersion() { + return version; + } + + public boolean isGranted() { + return granted; + } + + public String toString() { + StringBuffer buf = new StringBuffer("Reply[version="); + buf.append(version).append(";granted="); + buf.append(granted).append(']'); + return buf.toString(); + } +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Request.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Request.java new file mode 100644 index 000000000..e02d87511 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Request.java @@ -0,0 +1,34 @@ +/******************************************************************************* + * Copyright (c) 2005 Peter Nehrer and Composent, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Peter Nehrer - initial API and implementation + *******************************************************************************/ +package org.eclipse.ecf.datashare.multicast; + +import java.io.Serializable; + +public class Request implements Serializable { + + private static final long serialVersionUID = 3257003237730365493L; + + final Version version; + + public Request(Version version) { + this.version = version; + } + + public Version getVersion() { + return version; + } + + public String toString() { + StringBuffer buf = new StringBuffer("Request[version="); + buf.append(version).append(']'); + return buf.toString(); + } +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Resume.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Resume.java new file mode 100644 index 000000000..5046854df --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Resume.java @@ -0,0 +1,23 @@ +/******************************************************************************* + * Copyright (c) 2005 Peter Nehrer and Composent, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Peter Nehrer - initial API and implementation + *******************************************************************************/ +package org.eclipse.ecf.datashare.multicast; + +import java.io.Serializable; + +/** + * @author pnehrer + * + */ +public class Resume implements Serializable { + + private static final long serialVersionUID = 3258133557074540854L; + +} diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/SimpleMulticaster.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/SimpleMulticaster.java new file mode 100644 index 000000000..8d4095b67 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/SimpleMulticaster.java @@ -0,0 +1,47 @@ +/******************************************************************************* + * Copyright (c) 2005 Peter Nehrer and Composent, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Peter Nehrer - initial API and implementation + *******************************************************************************/ +package org.eclipse.ecf.datashare.multicast; + +import java.io.IOException; + +import org.eclipse.ecf.core.util.ECFException; +import org.eclipse.ecf.internal.datashare.DataSharePlugin; + +/** + * @author pnehrer + */ +public class SimpleMulticaster extends AbstractMulticaster { + + public static final String TRACE_TAG = "SimpleMulticaster"; + + public synchronized boolean sendMessage(Object message) throws ECFException { + String method = null; + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceEntry(method = "sendMessage[message=" + message + "]"); + + try { + if (!waitToSend()) + return false; + + version = new Version(localContainerID, version.getSequence() + 1); + context.sendMessage(null, new Message(version, message)); + return true; + } catch (IOException e) { + throw new ECFException(e); + } finally { + if (DataSharePlugin.isTracing(TRACE_TAG)) + traceExit(method); + } + } + + protected void receiveMessage(Object message) { + } +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Timeout.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Timeout.java new file mode 100644 index 000000000..3ae5fd113 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Timeout.java @@ -0,0 +1,37 @@ +/******************************************************************************* + * Copyright (c) 2005 Peter Nehrer and Composent, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Peter Nehrer - initial API and implementation + *******************************************************************************/ +package org.eclipse.ecf.datashare.multicast; + +import java.util.TimerTask; + +class Timeout extends TimerTask { + + interface Listener { + void timeout(Version version); + } + + private final Listener listener; + + final Version version; + + public Timeout(Listener listener, Version version) { + this.listener = listener; + this.version = version; + } + + public Version getVersion() { + return version; + } + + public void run() { + listener.timeout(version); + } +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Version.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Version.java new file mode 100644 index 000000000..53753aa56 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Version.java @@ -0,0 +1,59 @@ +/******************************************************************************* + * Copyright (c) 2005 Peter Nehrer and Composent, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Peter Nehrer - initial API and implementation + *******************************************************************************/ +package org.eclipse.ecf.datashare.multicast; + +import java.io.Serializable; + +import org.eclipse.ecf.core.identity.ID; + +public class Version implements Serializable { + + private static final long serialVersionUID = 3762538901495101236L; + + private final ID senderID; + + private final long sequence; + + public Version(ID senderID, long sequence) { + this.senderID = senderID; + this.sequence = sequence; + } + + public ID getSenderID() { + return senderID; + } + + public long getSequence() { + return sequence; + } + + public boolean equals(Object other) { + if (other instanceof Version) { + Version o = (Version) other; + return senderID.equals(o.senderID) && sequence == o.sequence; + } else + return false; + } + + public int hashCode() { + int c = 17; + c = 37 * c + senderID.hashCode(); + c = 37 * c + (int) sequence; + return c; + } + + public String toString() { + StringBuffer buf = new StringBuffer("Version[senderID="); + buf.append(senderID).append(";sequence="); + buf.append(sequence).append(']'); + return buf.toString(); + } +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/util/NotifyingSet.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/util/NotifyingSet.java new file mode 100644 index 000000000..28a8b6401 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/util/NotifyingSet.java @@ -0,0 +1,150 @@ +/******************************************************************************* + * Copyright (c) 2005 Peter Nehrer and Composent, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Peter Nehrer - initial API and implementation + *******************************************************************************/ +package org.eclipse.ecf.datashare.util; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; +import java.util.Collections; +import java.util.EventListener; +import java.util.EventObject; +import java.util.HashSet; +import java.util.Set; + +/** + * @author pnehrer + */ +public class NotifyingSet implements Serializable { + + private static final long serialVersionUID = 3258134639355967536L; + + public static interface IChangeListener extends EventListener { + + void changed(ChangeEvent e); + } + + public static class ChangeDelta implements Serializable { + + private static final long serialVersionUID = 3618133446644806960L; + + public static final short ADD = 0; + + public static final short REMOVE = 1; + + protected transient final NotifyingSet source; + + protected final Object member; + + private final short kind; + + protected ChangeDelta(NotifyingSet source, Object member, short kind) { + this.source = source; + this.member = member; + this.kind = kind; + } + + public final Object getMember() { + return member; + } + + public final short getKind() { + return kind; + } + } + + public static class ChangeEvent extends EventObject { + + private static final long serialVersionUID = 3834307341121041721L; + + private final ChangeDelta[] deltas; + + protected ChangeEvent(NotifyingSet source, ChangeDelta[] deltas) { + super(source); + this.deltas = deltas; + } + + protected ChangeEvent(NotifyingSet source, ChangeDelta delta) { + this(source, new ChangeDelta[] { delta }); + } + + public NotifyingSet getTrackedSet() { + return (NotifyingSet) source; + } + + public ChangeDelta[] getChangeDeltas() { + return deltas; + } + } + + private final HashSet set; + + private transient Set listeners = Collections + .synchronizedSet(new HashSet()); + + public NotifyingSet() { + set = new HashSet(); + } + + public NotifyingSet(Set set) { + this.set = new HashSet(set); + } + + public void addChangeListener(IChangeListener l) { + listeners.add(l); + } + + public void removeChangeListener(IChangeListener l) { + listeners.remove(l); + } + + public synchronized boolean add(Object object) { + boolean result = set.add(object); + if (result) + fireChangeEvent(new ChangeEvent(this, new ChangeDelta(this, object, + ChangeDelta.ADD))); + + return result; + } + + public synchronized boolean remove(Object object) { + boolean result = set.remove(object); + if (result) + fireChangeEvent(new ChangeEvent(this, new ChangeDelta(this, object, + ChangeDelta.REMOVE))); + + return result; + } + + public synchronized boolean contains(Object object) { + return set.contains(object); + } + + public synchronized int size() { + return set.size(); + } + + public synchronized Object[] toArray() { + return set.toArray(); + } + + private void fireChangeEvent(ChangeEvent e) { + IChangeListener[] l = (IChangeListener[]) listeners + .toArray(new IChangeListener[listeners.size()]); + for (int i = 0; i < l.length; ++i) + l[i].changed(e); + } + + private void readObject(ObjectInputStream in) throws IOException, + ClassNotFoundException { + in.defaultReadObject(); + listeners = Collections.synchronizedSet(new HashSet()); + } +}
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/util/TrackedSet.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/util/TrackedSet.java new file mode 100644 index 000000000..f4f8831e1 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/util/TrackedSet.java @@ -0,0 +1,104 @@ +/******************************************************************************* + * Copyright (c) 2005 Peter Nehrer and Composent, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Peter Nehrer - initial API and implementation + *******************************************************************************/ +package org.eclipse.ecf.datashare.util; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Set; + +/** + * @author pnehrer + */ +public class TrackedSet extends NotifyingSet implements + NotifyingSet.IChangeListener { + + private static final long serialVersionUID = 3258134660897847607L; + + private transient LinkedList changes = new LinkedList(); + + /** + * + */ + public TrackedSet() { + addChangeListener(this); + } + + /** + * @param set + */ + public TrackedSet(Set set) { + super(set); + addChangeListener(this); + } + + public synchronized ChangeDelta[] getChanges() { + return (ChangeDelta[]) changes.toArray(new ChangeDelta[changes.size()]); + } + + /** + * @param deltas + */ + public synchronized void apply(ChangeDelta[] deltas) { + removeChangeListener(this); + LinkedList oldChanges = new LinkedList(changes); + while (!changes.isEmpty()) + undo((ChangeDelta) changes.removeLast()); + + for (int i = 0; i < deltas.length; ++i) + apply(deltas[i]); + + addChangeListener(this); + for (Iterator i = oldChanges.iterator(); i.hasNext();) + apply((ChangeDelta) i.next()); + } + + private void undo(ChangeDelta delta) { + switch (delta.getKind()) { + case ChangeDelta.ADD: + remove(delta.getMember()); + break; + case ChangeDelta.REMOVE: + add(delta.getMember()); + break; + } + } + + private void apply(ChangeDelta delta) { + switch (delta.getKind()) { + case ChangeDelta.ADD: + add(delta.getMember()); + break; + case ChangeDelta.REMOVE: + remove(delta.getMember()); + break; + } + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.datashare.util.NotifyingSet.IChangeListener#changed(org.eclipse.ecf.datashare.util.NotifyingSet.ChangeEvent) + */ + public synchronized void changed(ChangeEvent e) { + ChangeDelta[] deltas = e.getChangeDeltas(); + for (int i = 0; i < deltas.length; ++i) + changes.add(deltas[i]); + } + + private void readObject(ObjectInputStream in) throws IOException, + ClassNotFoundException { + in.defaultReadObject(); + changes = new LinkedList(); + addChangeListener(this); + } +} diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/util/TrackedSetUpdateProvider.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/util/TrackedSetUpdateProvider.java new file mode 100644 index 000000000..83403da32 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/util/TrackedSetUpdateProvider.java @@ -0,0 +1,71 @@ +/******************************************************************************* + * Copyright (c) 2005 Peter Nehrer and Composent, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Peter Nehrer - initial API and implementation + *******************************************************************************/ +package org.eclipse.ecf.datashare.util; + +import java.util.Map; + +import org.eclipse.ecf.core.util.ECFException; +import org.eclipse.ecf.datashare.ISharedData; +import org.eclipse.ecf.datashare.IUpdateProvider; +import org.eclipse.ecf.datashare.IUpdateProviderFactory; + +/** + * @author pnehrer + */ +public class TrackedSetUpdateProvider implements IUpdateProvider { + + private final Factory factory; + + private TrackedSetUpdateProvider(Factory factory) { + this.factory = factory; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.datashare.IUpdateProvider#getFactory() + */ + public IUpdateProviderFactory getFactory() { + return factory; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.datashare.IUpdateProvider#createUpdate(org.eclipse.ecf.datashare.ISharedData) + */ + public Object createUpdate(ISharedData graph) throws ECFException { + return ((TrackedSet) graph.getData()).getChanges(); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.datashare.IUpdateProvider#applyUpdate(org.eclipse.ecf.datashare.ISharedData, + * java.lang.Object) + */ + public void applyUpdate(ISharedData graph, Object data) throws ECFException { + ((TrackedSet) graph.getData()).apply((NotifyingSet.ChangeDelta[]) data); + } + + public static class Factory implements IUpdateProviderFactory { + + public static final String ID = "org.eclipse.ecf.datashare.util.TrackedSet"; + + public String getID() { + return ID; + } + + public IUpdateProvider createProvider(Map params) { + return new TrackedSetUpdateProvider(this); + } + } +} diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/Agent.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/Agent.java index 55d55a9ef..07063c7d6 100644 --- a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/Agent.java +++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/Agent.java @@ -25,10 +25,18 @@ import org.eclipse.ecf.core.util.ECFException; import org.eclipse.ecf.core.util.Event; import org.eclipse.ecf.datashare.IPublicationCallback; import org.eclipse.ecf.datashare.ISharedData; -import org.eclipse.ecf.datashare.IUpdateListener; import org.eclipse.ecf.datashare.IUpdateProvider; +import org.eclipse.ecf.datashare.UpdateProviderRegistry; /** + * <p> + * State chart: + * </p> + * <p> + * 1. DataShareService.publish(Object, ID, IUpdateProvider, + * IPublicationCallback) -> PUBLISHED + * </p> + * * @author pnehrer */ public class Agent implements ISharedData, ISharedObject { @@ -41,18 +49,28 @@ public class Agent implements ISharedData, ISharedObject { private IUpdateProvider updateProvider; - private IUpdateListener updateListener; - private IPublicationCallback pubCallback; - private Version version; - + /** + * Default constructor; necessary for replication. + */ public Agent() { } - public Agent(Object sharedData, IBootstrap bootstrap) { + /** + * Publisher's constructor; fully initializes the instance. + * + * @param sharedData + * @param bootstrap + * @param updateProvider + * @param pubCallback + */ + public Agent(Object sharedData, IBootstrap bootstrap, + IUpdateProvider updateProvider, IPublicationCallback pubCallback) { this.sharedData = sharedData; this.bootstrap = bootstrap; + this.updateProvider = updateProvider; + this.pubCallback = pubCallback; } public synchronized ID getID() { @@ -64,27 +82,25 @@ public class Agent implements ISharedData, ISharedObject { } public synchronized void commit() throws ECFException { - byte[] buf = updateProvider.createUpdate(this); - if (buf != null) { - Version newVersion = version.getNext(config.getSharedObjectID()); - Update update = new Update(newVersion, buf); - version = newVersion; - try { - config.getContext().sendMessage(null, update); - } catch (IOException e) { - throw new ECFException(e); - } + // lock on transaction (wait till there's none) + // send prepare + // collect replies + // send commit/abort + Object update = updateProvider.createUpdate(this); + if (update != null) { +// Version newVersion = version.getNext(config.getSharedObjectID()); +// Commit msg = new Commit(newVersion, update); +// version = newVersion; +// try { +// config.getContext().sendMessage(null, msg); +// } catch (IOException e) { +// throw new ECFException(e); +// } } } public synchronized void dispose() { - if (config != null) - try { - config.getContext().sendDispose( - config.getContext().getLocalContainerID()); - } catch (IOException e) { - handleError(e); - } + // TODO Finish implementing. } /* @@ -97,15 +113,26 @@ public class Agent implements ISharedData, ISharedObject { this.config = config; Map params = config.getProperties(); if (params != null) { - sharedData = params.get("sharedData"); - version = (Version) params.get("version"); - IBootstrapMemento m = (IBootstrapMemento) params.get("bootstrap"); - if (m != null) - bootstrap = m.createBootstrap(); + Object param = params.get("sharedData"); + if (param != null) + sharedData = param; + + param = params.get("version"); +// if (param != null) +// version = (Version) param; + + param = params.get("bootstrap"); + if (param != null) + bootstrap = ((IBootstrapMemento) param).createBootstrap(); + + param = params.get("updateProvider"); + if (param != null) + updateProvider = UpdateProviderRegistry.createProvider( + (String) param, null); // TODO what about params? } - if (version == null) - version = new Version(config.getSharedObjectID()); +// if (version == null) +// version = new Version(config.getSharedObjectID()); bootstrap.setAgent(this); bootstrap.init(config); @@ -124,8 +151,6 @@ public class Agent implements ISharedData, ISharedObject { handleActivated(); } else if (event instanceof ISharedObjectMessageEvent) { ISharedObjectMessageEvent e = (ISharedObjectMessageEvent) event; - if (e.getData() instanceof Update) - handleUpdate((Update) e.getData()); } } @@ -135,8 +160,10 @@ public class Agent implements ISharedData, ISharedObject { try { Map params = new HashMap(3); params.put("sharedData", sharedData); - params.put("version", version); +// params.put("version", version); params.put("bootstrap", bootstrap.createMemento()); + params.put("updateProvider", updateProvider.getFactory() + .getID()); config.getContext().sendCreate( null, new SharedObjectDescription(config.getSharedObjectID(), @@ -152,20 +179,12 @@ public class Agent implements ISharedData, ISharedObject { } } - private void handleUpdate(Update update) { - try { - updateProvider.applyUpdate(this, update.getData()); - version = update.getVersion(); - } catch (ECFException e) { - handleError(e); - } - } - public void doBootstrap(ID containerID) { Map params = new HashMap(3); params.put("sharedData", sharedData); - params.put("version", version); +// params.put("version", version); params.put("bootstrap", bootstrap.createMemento()); + params.put("updateProvider", updateProvider.getFactory().getID()); try { config.getContext().sendCreate( containerID, diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/DataSharePlugin.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/DataSharePlugin.java index 9cb1f57ca..93eed4509 100644 --- a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/DataSharePlugin.java +++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/DataSharePlugin.java @@ -24,166 +24,227 @@ import org.eclipse.core.runtime.Plugin; import org.eclipse.core.runtime.Status; import org.eclipse.ecf.datashare.DataShareServiceFactory; import org.eclipse.ecf.datashare.IDataShareServiceManager; +import org.eclipse.ecf.datashare.IUpdateProviderFactory; +import org.eclipse.ecf.datashare.UpdateProviderRegistry; import org.osgi.framework.BundleContext; /** * @author pnehrer - * */ public class DataSharePlugin { - + public static final String PLUGIN_ID = "org.eclipse.ecf.datashare"; - private static final String TRACE_PREFIX = PLUGIN_ID + "/"; - - private static EclipsePlugin plugin; - - private static boolean tracingEnabled = Boolean.getBoolean(TRACE_PREFIX - + "debug"); - - private DataSharePlugin() { - } - - public static void log(Object entry) { - if (plugin == null) { - if (entry instanceof Throwable) - ((Throwable) entry).printStackTrace(); - else - System.err.println(entry); - } else { - plugin.log(entry); - } - } - - public static boolean isTracing(String tag) { - if (tracingEnabled) { - return plugin == null ? Boolean.getBoolean(TRACE_PREFIX + tag) - : plugin.isTracing(tag); - } else - return false; - } - - public static PrintStream getTraceLog() { - return System.out; - } - - public static class EclipsePlugin extends Plugin { - - private static final String MANAGER_EXTENSION_POINT = "manager"; - - private static final String MANAGER_EXTENSION = "manager"; - - private static final String ATTR_NAME = "name"; - - private static final String ATTR_CLASS = "class"; - - private IRegistryChangeListener registryChangeListener; - - public EclipsePlugin() { - plugin = this; - tracingEnabled = Platform.inDebugMode(); - } - - /** - * This method is called upon plug-in activation - */ - public void start(BundleContext context) throws Exception { - super.start(context); - registryChangeListener = new IRegistryChangeListener() { - public void registryChanged(IRegistryChangeEvent event) { - IExtensionDelta[] deltas = event.getExtensionDeltas( - getBundle().getSymbolicName(), - MANAGER_EXTENSION_POINT); - for (int i = 0; i < deltas.length; ++i) { - switch (deltas[i].getKind()) { - case IExtensionDelta.ADDED: - registerManagers(deltas[i].getExtension() - .getConfigurationElements()); - break; - - case IExtensionDelta.REMOVED: - IConfigurationElement[] elems = deltas[i] - .getExtension().getConfigurationElements(); - for (int j = 0; j < elems.length; ++j) { - if (!MANAGER_EXTENSION.equals(elems[j] - .getName())) - continue; - - String name = elems[j].getAttribute(ATTR_NAME); - if (name != null && name.length() > 0) - DataShareServiceFactory - .unregisterManager(name); - } - - break; - } - } - } - }; - - IExtensionRegistry reg = Platform.getExtensionRegistry(); - IConfigurationElement[] elems = reg.getConfigurationElementsFor( - getBundle().getSymbolicName(), MANAGER_EXTENSION_POINT); - registerManagers(elems); - } - - private void registerManagers(IConfigurationElement[] elems) { - for (int i = 0; i < elems.length; ++i) { - if (!MANAGER_EXTENSION.equals(elems[i].getName())) - continue; - - String name = elems[i].getAttribute(ATTR_NAME); - if (name == null || name.length() == 0) - continue; - - IDataShareServiceManager mgr; - try { - mgr = (IDataShareServiceManager) elems[i] - .createExecutableExtension(ATTR_CLASS); - } catch (Exception ex) { - continue; - } - - DataShareServiceFactory.registerManager(name, mgr); - } - } - - /** - * This method is called when the plug-in is stopped - */ - public void stop(BundleContext context) throws Exception { - if (registryChangeListener != null) - Platform.getExtensionRegistry().removeRegistryChangeListener( - registryChangeListener); - - DataShareServiceFactory.unregisterAllManagers(); + private static final String TRACE_PREFIX = PLUGIN_ID + "/"; + + private static EclipsePlugin plugin; + + private static boolean tracingEnabled = Boolean.getBoolean(TRACE_PREFIX + + "debug"); + + private DataSharePlugin() { + } + + public static void log(Object entry) { + if (plugin == null) { + if (entry instanceof Throwable) + ((Throwable) entry).printStackTrace(); + else + System.err.println(entry); + } else { + plugin.log(entry); + } + } + + public static boolean isTracing(String tag) { + if (tracingEnabled) { + return plugin == null ? Boolean.getBoolean(TRACE_PREFIX + tag) + : plugin.isTracing(tag); + } else + return false; + } + + public static PrintStream getTraceLog() { + return System.out; + } + + public static class EclipsePlugin extends Plugin { + + private static final String MANAGER_EXTENSION_POINT = "servicemanager"; + + private static final String MANAGER_EXTENSION = "manager"; + + private static final String ATTR_NAME = "name"; + + private static final String ATTR_CLASS = "class"; + + private static final String PROVIDER_EXTENSION_POINT = "updateprovider"; + + private static final String PROVIDER_EXTENSION = "updateProvider"; + + private static final String ATTR_ID = "id"; + + private static final String ATTR_FACTORY = "factory"; + + private IRegistryChangeListener registryChangeListener; + + public EclipsePlugin() { + plugin = this; + tracingEnabled = Platform.inDebugMode(); + } + + /** + * This method is called upon plug-in activation + */ + public void start(BundleContext context) throws Exception { + super.start(context); + final String namespace = getBundle().getSymbolicName(); + registryChangeListener = new IRegistryChangeListener() { + public void registryChanged(IRegistryChangeEvent event) { + IExtensionDelta[] deltas = event + .getExtensionDeltas(namespace); + for (int i = 0; i < deltas.length; ++i) { + IConfigurationElement[] elems = deltas[i] + .getExtension().getConfigurationElements(); + switch (deltas[i].getKind()) { + case IExtensionDelta.ADDED: + if (deltas[i].getExtensionPoint() + .getSimpleIdentifier().equals( + MANAGER_EXTENSION_POINT)) + registerManagers(elems); + else if (deltas[i].getExtensionPoint() + .getSimpleIdentifier().equals( + PROVIDER_EXTENSION_POINT)) + registerProviders(elems); + + break; + + case IExtensionDelta.REMOVED: + if (deltas[i].getExtensionPoint() + .getSimpleIdentifier().equals( + MANAGER_EXTENSION_POINT)) + unregisterManagers(elems); + else if (deltas[i].getExtensionPoint() + .getSimpleIdentifier().equals( + PROVIDER_EXTENSION_POINT)) + unregisterProviders(elems); + break; + } + } + } + }; + + IExtensionRegistry reg = Platform.getExtensionRegistry(); + reg.addRegistryChangeListener(registryChangeListener, namespace); + IConfigurationElement[] elems = reg.getConfigurationElementsFor( + namespace, MANAGER_EXTENSION_POINT); + registerManagers(elems); + elems = reg.getConfigurationElementsFor(namespace, + PROVIDER_EXTENSION_POINT); + registerProviders(elems); + } + + private void registerManagers(IConfigurationElement[] elems) { + for (int i = 0; i < elems.length; ++i) { + if (!MANAGER_EXTENSION.equals(elems[i].getName())) + continue; + + String name = elems[i].getAttribute(ATTR_NAME); + if (name == null || name.length() == 0) + continue; + + IDataShareServiceManager mgr; + try { + mgr = (IDataShareServiceManager) elems[i] + .createExecutableExtension(ATTR_CLASS); + } catch (Exception ex) { + continue; + } + + DataShareServiceFactory.registerManager(name, mgr); + } + } + + private void registerProviders(IConfigurationElement[] elems) { + for (int i = 0; i < elems.length; ++i) { + if (!PROVIDER_EXTENSION.equals(elems[i].getName())) + continue; + + String id = elems[i].getAttribute(ATTR_ID); + if (id == null || id.length() == 0) + continue; + + IUpdateProviderFactory factory; + try { + factory = (IUpdateProviderFactory) elems[i] + .createExecutableExtension(ATTR_FACTORY); + } catch (Exception ex) { + continue; + } + + UpdateProviderRegistry.registerFactory(id, factory); + } + } + + private void unregisterManagers(IConfigurationElement[] elems) { + for (int i = 0; i < elems.length; ++i) { + if (!MANAGER_EXTENSION.equals(elems[i].getName())) + continue; + + String name = elems[i].getAttribute(ATTR_NAME); + if (name != null && name.length() > 0) + DataShareServiceFactory.unregisterManager(name); + } + } + + private void unregisterProviders(IConfigurationElement[] elems) { + for (int i = 0; i < elems.length; ++i) { + if (!PROVIDER_EXTENSION.equals(elems[i].getName())) + continue; + + String id = elems[i].getAttribute(ATTR_ID); + if (id != null && id.length() > 0) + UpdateProviderRegistry.unregisterFactory(id); + } + } + + /** + * This method is called when the plug-in is stopped + */ + public void stop(BundleContext context) throws Exception { + if (registryChangeListener != null) + Platform.getExtensionRegistry().removeRegistryChangeListener( + registryChangeListener); + + DataShareServiceFactory.unregisterAllManagers(); + UpdateProviderRegistry.unregisterAllFactories(); plugin = null; - super.stop(context); - } - - public void log(Object entry) { - IStatus status; - if (entry instanceof IStatus) - status = (IStatus) entry; - else if (entry instanceof CoreException) - status = ((CoreException) entry).getStatus(); - else if (entry instanceof Throwable) { - Throwable t = (Throwable) entry; - status = new Status(Status.ERROR, - getBundle().getSymbolicName(), 0, - t.getLocalizedMessage() == null ? "Unknown error." : t - .getLocalizedMessage(), t); - } else - status = new Status(Status.WARNING, getBundle() - .getSymbolicName(), 0, String.valueOf(entry), - new RuntimeException().fillInStackTrace()); - - getLog().log(status); - } - - public boolean isTracing(String tag) { - return Boolean.TRUE.equals(Boolean.valueOf(Platform - .getDebugOption(TRACE_PREFIX + tag))); - } - } + super.stop(context); + } + + public void log(Object entry) { + IStatus status; + if (entry instanceof IStatus) + status = (IStatus) entry; + else if (entry instanceof CoreException) + status = ((CoreException) entry).getStatus(); + else if (entry instanceof Throwable) { + Throwable t = (Throwable) entry; + status = new Status(Status.ERROR, + getBundle().getSymbolicName(), 0, + t.getLocalizedMessage() == null ? "Unknown error." : t + .getLocalizedMessage(), t); + } else + status = new Status(Status.WARNING, getBundle() + .getSymbolicName(), 0, String.valueOf(entry), + new RuntimeException().fillInStackTrace()); + + getLog().log(status); + } + + public boolean isTracing(String tag) { + return Boolean.TRUE.equals(Boolean.valueOf(Platform + .getDebugOption(TRACE_PREFIX + tag))); + } + } } diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/DataShareService.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/DataShareService.java new file mode 100644 index 000000000..a8eda900a --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/DataShareService.java @@ -0,0 +1,91 @@ +/******************************************************************************* + * Copyright (c) 2005 Peter Nehrer and Composent, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Peter Nehrer - initial API and implementation + *******************************************************************************/ +package org.eclipse.ecf.internal.datashare; + +import org.eclipse.ecf.core.ISharedObjectContainer; +import org.eclipse.ecf.core.identity.ID; +import org.eclipse.ecf.core.util.ECFException; +import org.eclipse.ecf.datashare.IDataShareService; +import org.eclipse.ecf.datashare.IPublicationCallback; +import org.eclipse.ecf.datashare.ISharedData; +import org.eclipse.ecf.datashare.ISubscriptionCallback; +import org.eclipse.ecf.datashare.IUpdateProvider; + +/** + * @author pnehrer + */ +public class DataShareService implements IDataShareService { + + private ServiceManager mgr; + + private ISharedObjectContainer container; + + public DataShareService(ServiceManager mgr, ISharedObjectContainer container) { + this.mgr = mgr; + this.container = container; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.datashare.IDataShareService#publish(java.lang.Object, + * org.eclipse.ecf.core.identity.ID, + * org.eclipse.ecf.datashare.IUpdateProvider, + * org.eclipse.ecf.datashare.IPublicationCallback) + */ + public synchronized void publish(Object dataGraph, ID id, + IUpdateProvider provider, IPublicationCallback callback) + throws ECFException { + Agent agent = (Agent) container.getSharedObjectManager() + .getSharedObject(id); + if (agent != null) + throw new ECFException("Already published!"); + + IBootstrap bootstrap = getBootstrap(); + agent = new Agent(dataGraph, bootstrap, provider, callback); + container.getSharedObjectManager().addSharedObject(id, agent, null, + null); + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.datashare.IDataShareService#subscribe(org.eclipse.ecf.core.identity.ID, + * org.eclipse.ecf.datashare.ISubscriptionCallback) + */ + public synchronized ISharedData subscribe(ID id, + ISubscriptionCallback callback) throws ECFException { + Agent agent = (Agent) container.getSharedObjectManager() + .getSharedObject(id); + if (agent == null) + return null; // TODO should we throw? + + if (callback != null) + callback.dataSubscribed(agent, container.getConfig().getID()); + + return agent; + } + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.datashare.IDataShareService#dispose() + */ + public synchronized void dispose() { + mgr.dispose(container); + mgr = null; + container = null; + } + + private IBootstrap getBootstrap() { + return new ServerBootstrap(); // TODO strategize + } +} diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/LazyElectionBootstrap.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/LazyElectionBootstrap.java index 923bf5cf1..3f79fe631 100644 --- a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/LazyElectionBootstrap.java +++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/LazyElectionBootstrap.java @@ -11,10 +11,10 @@ package org.eclipse.ecf.internal.datashare; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Random; @@ -82,7 +82,8 @@ public class LazyElectionBootstrap implements IBootstrap { } private void handleJoined(ID containerID) { - List members = Arrays.asList(config.getContext().getGroupMemberIDs()); + ArrayList members = new ArrayList(Arrays.asList(config.getContext() + .getGroupMemberIDs())); members.remove(containerID); members.remove(config.getContext().getLocalContainerID()); if (members.isEmpty()) diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/ServiceManager.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/ServiceManager.java new file mode 100644 index 000000000..4e0dd9df2 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/ServiceManager.java @@ -0,0 +1,69 @@ +/******************************************************************************* + * Copyright (c) 2005 Peter Nehrer and Composent, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Peter Nehrer - initial API and implementation + *******************************************************************************/ +package org.eclipse.ecf.internal.datashare; + +import java.util.Hashtable; + +import org.eclipse.ecf.core.ISharedObjectContainer; +import org.eclipse.ecf.core.ISharedObjectContainerListener; +import org.eclipse.ecf.core.events.IContainerEvent; +import org.eclipse.ecf.core.events.ISharedObjectContainerDisposeEvent; +import org.eclipse.ecf.core.util.ECFException; +import org.eclipse.ecf.datashare.IDataShareService; +import org.eclipse.ecf.datashare.IDataShareServiceManager; + +/** + * @author pnehrer + */ +public class ServiceManager implements IDataShareServiceManager { + + private final Hashtable instances = new Hashtable(); + + private final Hashtable listeners = new Hashtable(); + + /* + * (non-Javadoc) + * + * @see org.eclipse.ecf.datashare.IDataShareServiceManager#getInstance(org.eclipse.ecf.core.ISharedObjectContainer) + */ + public synchronized IDataShareService getInstance( + ISharedObjectContainer container) throws ECFException { + IDataShareService svc = (IDataShareService) instances.get(container); + if (svc == null) { + svc = new DataShareService(this, container); + instances.put(container, svc); + DisposeListener listener = new DisposeListener(container); + listeners.put(container, listener); + container.addListener(listener, null); + } + + return svc; + } + + public synchronized void dispose(ISharedObjectContainer container) { + instances.remove(container); + listeners.remove(container); + } + + private class DisposeListener implements ISharedObjectContainerListener { + + private final ISharedObjectContainer container; + + public DisposeListener(ISharedObjectContainer container) { + this.container = container; + } + + public void handleEvent(IContainerEvent evt) { + if (evt instanceof ISharedObjectContainerDisposeEvent) + dispose(container); + } + } +} diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/Version.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/Version.java deleted file mode 100644 index 2c3f1bcb2..000000000 --- a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/Version.java +++ /dev/null @@ -1,70 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2005 Peter Nehrer and Composent, Inc. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * Peter Nehrer - initial API and implementation - *******************************************************************************/ -package org.eclipse.ecf.internal.datashare; - -import java.io.Serializable; - -import org.eclipse.ecf.core.identity.ID; - -/** - * @author pnehrer - */ -public class Version implements Serializable { - - private static final long serialVersionUID = 3258415036413456951L; - - private final long sequence; - - private final ID containerID; - - public Version(ID sourceID) { - this(0, sourceID); - } - - private Version(long sequence, ID sourceID) { - this.sequence = sequence; - this.containerID = sourceID; - } - - public long getSequence() { - return sequence; - } - - public ID getContainerID() { - return containerID; - } - - public Version getNext(ID sourceID) { - return new Version(sequence + 1, sourceID); - } - - public boolean equals(Object other) { - if (other instanceof Version) { - Version o = (Version) other; - return sequence == o.sequence && containerID.equals(o.containerID); - } else - return false; - } - - public int hashCode() { - int c = 17; - c = 37 * c + (int) sequence; - c = 37 * c + containerID.hashCode(); - return c; - } - - public String toString() { - StringBuffer buf = new StringBuffer("Version["); - buf.append("sequence=").append(sequence).append(";"); - buf.append("containerID=").append(containerID).append("]"); - return buf.toString(); - } -} |
