Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorpnehrer2005-05-27 03:42:00 +0000
committerpnehrer2005-05-27 03:42:00 +0000
commit82f5f06dbad96686dfeae08c885eceb91fe82a66 (patch)
tree8e17250fa0da115ac98149772a82e4bf55c56dab
parentea2a2d0d1c708693654a943151a6f5825297ccde (diff)
downloadorg.eclipse.ecf-82f5f06dbad96686dfeae08c885eceb91fe82a66.tar.gz
org.eclipse.ecf-82f5f06dbad96686dfeae08c885eceb91fe82a66.tar.xz
org.eclipse.ecf-82f5f06dbad96686dfeae08c885eceb91fe82a66.zip
Checkpoint.
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/.options2
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/notes.txt4
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/AbstractMulticaster.java75
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Activated.java (renamed from framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/IBootstrapMemento.java)7
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/ConsistentMulticaster.java33
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/IMessageListener.java19
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/OrderedMulticaster.java48
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/SimpleMulticaster.java3
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/Agent.java171
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/DataShareService.java9
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/EagerElectionBootstrap.java38
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/IBootstrap.java3
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/LazyElectionBootstrap.java20
-rw-r--r--framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/ServerBootstrap.java20
14 files changed, 278 insertions, 174 deletions
diff --git a/framework/bundles/org.eclipse.ecf.datashare/.options b/framework/bundles/org.eclipse.ecf.datashare/.options
index b9beb4172..c244a4637 100644
--- a/framework/bundles/org.eclipse.ecf.datashare/.options
+++ b/framework/bundles/org.eclipse.ecf.datashare/.options
@@ -1,2 +1,4 @@
org.eclipse.ecf.datashare/debug=true
+org.eclipse.ecf.datashare/SimpleMulticaster=true
+org.eclipse.ecf.datashare/OrderedMulticaster=true
org.eclipse.ecf.datashare/ConsistentMulticaster=true
diff --git a/framework/bundles/org.eclipse.ecf.datashare/notes.txt b/framework/bundles/org.eclipse.ecf.datashare/notes.txt
index d929e526d..62d05300a 100644
--- a/framework/bundles/org.eclipse.ecf.datashare/notes.txt
+++ b/framework/bundles/org.eclipse.ecf.datashare/notes.txt
@@ -8,14 +8,12 @@ Agent
- client facade
- coordinate sub-components to implement the service
-Synchronizer
-- pause/resume messages (commits)
-
Coordinator
- ensures there's exactly one coordinator replica (when needed)
Multicaster
- implements 1/2/3-phase "commit" protocol
+- pause/resume messages (commits)
Disposer (Garbage Collector)
- dispose Agents with no subscriptions
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/AbstractMulticaster.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/AbstractMulticaster.java
index a8d4e01fd..ac73a4e05 100644
--- a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/AbstractMulticaster.java
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/AbstractMulticaster.java
@@ -12,12 +12,15 @@ package org.eclipse.ecf.datashare.multicast;
import java.io.IOException;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
import org.eclipse.ecf.core.ISharedObject;
import org.eclipse.ecf.core.ISharedObjectConfig;
import org.eclipse.ecf.core.ISharedObjectContext;
+import org.eclipse.ecf.core.SharedObjectDescription;
import org.eclipse.ecf.core.SharedObjectInitException;
import org.eclipse.ecf.core.events.ISharedObjectActivatedEvent;
import org.eclipse.ecf.core.events.ISharedObjectContainerDepartedEvent;
@@ -37,9 +40,7 @@ public abstract class AbstractMulticaster implements ISharedObject {
public static final short READY = 1;
- public static final short PAUSED = 2;
-
- public static final short DISPOSED = 3;
+ public static final short DISPOSED = 2;
public class Testable {
@@ -56,6 +57,8 @@ public abstract class AbstractMulticaster implements ISharedObject {
}
}
+ protected final HashSet listeners = new HashSet();
+
protected ISharedObjectConfig config;
protected ID sharedObjectID;
@@ -74,8 +77,22 @@ public abstract class AbstractMulticaster implements ISharedObject {
protected HashSet pauseRequests;
+ protected final HashSet groupMembers = new HashSet();
+
protected Testable testable;
+ public void addMessageListener(IMessageListener l) {
+ synchronized (listeners) {
+ listeners.add(l);
+ }
+ }
+
+ public void removeMessageListener(IMessageListener l) {
+ synchronized (listeners) {
+ listeners.remove(l);
+ }
+ }
+
public abstract boolean sendMessage(Object message) throws ECFException;
public synchronized void pause() throws ECFException, IllegalStateException {
@@ -95,12 +112,13 @@ public abstract class AbstractMulticaster implements ISharedObject {
boolean wasEmpty = pauses.isEmpty();
pauses.add(localContainerID);
- pauseRequests = new HashSet(Arrays.asList(context.getGroupMemberIDs()));
- pauseRequests.remove(localContainerID);
+ pauseRequests = new HashSet(groupMembers);
try {
- context.sendMessage(null, new Pause());
- synchronized (pauses) {
- pauses.wait(1000);
+ if (!pauseRequests.isEmpty()) {
+ context.sendMessage(null, new Pause());
+ synchronized (pauses) {
+ pauses.wait(1000);
+ }
}
if (!pauseRequests.isEmpty())
@@ -135,7 +153,18 @@ public abstract class AbstractMulticaster implements ISharedObject {
}
}
- protected abstract void receiveMessage(Object message);
+ public synchronized SharedObjectDescription createDescription() {
+ HashMap params = new HashMap(1);
+ params.put("version", version);
+ return new SharedObjectDescription(sharedObjectID, getClass(), params);
+ }
+
+ protected void receiveMessage(Version version, Object message) {
+ synchronized (listeners) {
+ for (Iterator i = listeners.iterator(); i.hasNext();)
+ ((IMessageListener) i.next()).messageReceived(version, message);
+ }
+ }
protected synchronized boolean waitToSend() {
while (state != READY || !pauses.isEmpty()) {
@@ -180,7 +209,7 @@ public abstract class AbstractMulticaster implements ISharedObject {
case READY:
return "RDY";
case DISPOSED:
- return "DSP";
+ return "DIS";
default:
return "UNK";
}
@@ -205,7 +234,7 @@ public abstract class AbstractMulticaster implements ISharedObject {
Map params = config.getProperties();
if (params != null) {
Object param = params.get("version");
- if (param instanceof Version)
+ if (param != null)
version = (Version) param;
}
@@ -251,6 +280,15 @@ public abstract class AbstractMulticaster implements ISharedObject {
}
else {
synchronized (this) {
+ groupMembers.addAll(Arrays.asList(context
+ .getGroupMemberIDs()));
+ groupMembers.remove(localContainerID);
+ try {
+ context.sendMessage(null, new Activated());
+ } catch (IOException e) {
+ DataSharePlugin.log(e);
+ }
+
state = READY;
notifyAll();
}
@@ -270,6 +308,16 @@ public abstract class AbstractMulticaster implements ISharedObject {
protected void handleDeparted(ISharedObjectContainerDepartedEvent event) {
if (event.getDepartedContainerID().equals(localContainerID))
context.getSharedObjectManager().removeSharedObject(sharedObjectID);
+ else {
+ synchronized (this) {
+ groupMembers.remove(event.getDepartedContainerID());
+ }
+ }
+ }
+
+ protected synchronized void handleActivated(ID remoteContainerID,
+ Activated activated) {
+ groupMembers.add(remoteContainerID);
}
protected synchronized void handlePause(ID remoteContainerID, Pause pause) {
@@ -280,8 +328,7 @@ public abstract class AbstractMulticaster implements ISharedObject {
}
protected synchronized void handlePaused(ID remoteContainerID, Paused paused) {
- if (pauses.contains(localContainerID)
- && pauseRequests != null
+ if (pauses.contains(localContainerID) && pauseRequests != null
&& pauseRequests.remove(remoteContainerID)
&& pauseRequests.isEmpty())
synchronized (pauses) {
@@ -299,7 +346,7 @@ public abstract class AbstractMulticaster implements ISharedObject {
version = message.getVersion();
}
- receiveMessage(message.getData());
+ receiveMessage(message.getVersion(), message.getData());
}
/*
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/IBootstrapMemento.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Activated.java
index f1eb5dce3..5d7fe0d6e 100644
--- a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/IBootstrapMemento.java
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/Activated.java
@@ -8,7 +8,7 @@
* Contributors:
* Peter Nehrer - initial API and implementation
*******************************************************************************/
-package org.eclipse.ecf.internal.datashare;
+package org.eclipse.ecf.datashare.multicast;
import java.io.Serializable;
@@ -16,7 +16,8 @@ import java.io.Serializable;
* @author pnehrer
*
*/
-public interface IBootstrapMemento extends Serializable {
+public class Activated implements Serializable {
+
+ private static final long serialVersionUID = 3256726186536351801L;
- IBootstrap createBootstrap();
}
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/ConsistentMulticaster.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/ConsistentMulticaster.java
index c9883c5f0..9e4d1ce1e 100644
--- a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/ConsistentMulticaster.java
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/ConsistentMulticaster.java
@@ -11,8 +11,6 @@
package org.eclipse.ecf.datashare.multicast;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -70,16 +68,16 @@ public class ConsistentMulticaster extends AbstractMulticaster implements
state = SEND;
nextVersion = new Version(localContainerID,
version.getSequence() + 1);
- ArrayList others = new ArrayList(Arrays.asList(context
- .getGroupMemberIDs()));
- others.remove(localContainerID);
+ HashSet others = new HashSet(groupMembers);
requests = new HashSet(others);
granted = true;
try {
- context.sendMessage(null, new Request(nextVersion));
- wait(sendTimeout);
- if (state != SEND)
- return false;
+ if (!requests.isEmpty()) {
+ context.sendMessage(null, new Request(nextVersion));
+ wait(sendTimeout);
+ if (state != SEND)
+ return false;
+ }
if (!granted || !requests.isEmpty()) {
context.sendMessage(null, new Abort(nextVersion));
@@ -87,10 +85,14 @@ public class ConsistentMulticaster extends AbstractMulticaster implements
}
requests.addAll(others);
- context.sendMessage(null, new Message(nextVersion, message));
- wait(sendTimeout);
- if (state != SEND)
- return false;
+ if (!requests.isEmpty()) {
+ context
+ .sendMessage(null,
+ new Message(nextVersion, message));
+ wait(sendTimeout);
+ if (state != SEND)
+ return false;
+ }
if (!requests.isEmpty()) {
context.sendMessage(null, new Abort(nextVersion));
@@ -125,9 +127,6 @@ public class ConsistentMulticaster extends AbstractMulticaster implements
}
}
- protected void receiveMessage(Object message) {
- }
-
/*
* (non-Javadoc)
*
@@ -340,7 +339,7 @@ public class ConsistentMulticaster extends AbstractMulticaster implements
notify();
}
- receiveMessage(commits.remove(version));
+ receiveMessage(version, commits.remove(version));
}
} finally {
if (DataSharePlugin.isTracing(TRACE_TAG))
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/IMessageListener.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/IMessageListener.java
new file mode 100644
index 000000000..d8c9aaaf8
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/IMessageListener.java
@@ -0,0 +1,19 @@
+/*******************************************************************************
+ * Copyright (c) 2005 Peter Nehrer and Composent, Inc.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Peter Nehrer - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.ecf.datashare.multicast;
+
+/**
+ * @author pnehrer
+ */
+public interface IMessageListener {
+
+ void messageReceived(Version version, Object message);
+}
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/OrderedMulticaster.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/OrderedMulticaster.java
index d992c14e7..7c7953f6b 100644
--- a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/OrderedMulticaster.java
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/OrderedMulticaster.java
@@ -11,8 +11,6 @@
package org.eclipse.ecf.datashare.multicast;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -67,16 +65,16 @@ public class OrderedMulticaster extends AbstractMulticaster implements
state = SEND;
nextVersion = new Version(localContainerID,
version.getSequence() + 1);
- ArrayList others = new ArrayList(Arrays.asList(context
- .getGroupMemberIDs()));
- others.remove(localContainerID);
+ HashSet others = new HashSet(groupMembers);
requests = new HashSet(others);
granted = true;
try {
- context.sendMessage(null, new Request(nextVersion));
- wait(sendTimeout);
- if (state != SEND)
- return false;
+ if (!requests.isEmpty()) {
+ context.sendMessage(null, new Request(nextVersion));
+ wait(sendTimeout);
+ if (state != SEND)
+ return false;
+ }
if (!granted || !requests.isEmpty()) {
context.sendMessage(null, new Abort(nextVersion));
@@ -111,9 +109,6 @@ public class OrderedMulticaster extends AbstractMulticaster implements
}
}
- protected void receiveMessage(Object message) {
- }
-
/*
* (non-Javadoc)
*
@@ -255,7 +250,7 @@ public class OrderedMulticaster extends AbstractMulticaster implements
notify();
}
- receiveMessage(message.getData());
+ receiveMessage(message.getVersion(), message.getData());
}
} finally {
if (DataSharePlugin.isTracing(TRACE_TAG))
@@ -299,31 +294,4 @@ public class OrderedMulticaster extends AbstractMulticaster implements
traceExit(method);
}
}
-
- /*
- * (non-Javadoc)
- *
- * @see org.eclipse.ecf.core.ISharedObject#handleEvents(org.eclipse.ecf.core.util.Event[])
- */
- public void handleEvents(Event[] events) {
- for (int i = 0; i < events.length; ++i)
- handleEvent(events[i]);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.eclipse.ecf.core.ISharedObject#dispose(org.eclipse.ecf.core.identity.ID)
- */
- public void dispose(ID containerID) {
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.eclipse.ecf.core.ISharedObject#getAdapter(java.lang.Class)
- */
- public Object getAdapter(Class clazz) {
- return null;
- }
}
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/SimpleMulticaster.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/SimpleMulticaster.java
index 8d4095b67..3421d221a 100644
--- a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/SimpleMulticaster.java
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/datashare/multicast/SimpleMulticaster.java
@@ -41,7 +41,4 @@ public class SimpleMulticaster extends AbstractMulticaster {
traceExit(method);
}
}
-
- protected void receiveMessage(Object message) {
- }
} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/Agent.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/Agent.java
index 07063c7d6..0d51fa0bb 100644
--- a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/Agent.java
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/Agent.java
@@ -16,6 +16,7 @@ import java.util.Map;
import org.eclipse.ecf.core.ISharedObject;
import org.eclipse.ecf.core.ISharedObjectConfig;
+import org.eclipse.ecf.core.ISharedObjectContext;
import org.eclipse.ecf.core.SharedObjectDescription;
import org.eclipse.ecf.core.SharedObjectInitException;
import org.eclipse.ecf.core.events.ISharedObjectActivatedEvent;
@@ -27,6 +28,10 @@ import org.eclipse.ecf.datashare.IPublicationCallback;
import org.eclipse.ecf.datashare.ISharedData;
import org.eclipse.ecf.datashare.IUpdateProvider;
import org.eclipse.ecf.datashare.UpdateProviderRegistry;
+import org.eclipse.ecf.datashare.multicast.AbstractMulticaster;
+import org.eclipse.ecf.datashare.multicast.Activated;
+import org.eclipse.ecf.datashare.multicast.IMessageListener;
+import org.eclipse.ecf.datashare.multicast.Version;
/**
* <p>
@@ -39,7 +44,7 @@ import org.eclipse.ecf.datashare.UpdateProviderRegistry;
*
* @author pnehrer
*/
-public class Agent implements ISharedData, ISharedObject {
+public class Agent implements ISharedData, ISharedObject, IMessageListener {
private Object sharedData;
@@ -47,6 +52,10 @@ public class Agent implements ISharedData, ISharedObject {
private IBootstrap bootstrap;
+ private ID newContainerID;
+
+ private AbstractMulticaster sender;
+
private IUpdateProvider updateProvider;
private IPublicationCallback pubCallback;
@@ -66,9 +75,11 @@ public class Agent implements ISharedData, ISharedObject {
* @param pubCallback
*/
public Agent(Object sharedData, IBootstrap bootstrap,
- IUpdateProvider updateProvider, IPublicationCallback pubCallback) {
+ AbstractMulticaster sender, IUpdateProvider updateProvider,
+ IPublicationCallback pubCallback) {
this.sharedData = sharedData;
this.bootstrap = bootstrap;
+ this.sender = sender;
this.updateProvider = updateProvider;
this.pubCallback = pubCallback;
}
@@ -82,21 +93,10 @@ public class Agent implements ISharedData, ISharedObject {
}
public synchronized void commit() throws ECFException {
- // lock on transaction (wait till there's none)
- // send prepare
- // collect replies
- // send commit/abort
Object update = updateProvider.createUpdate(this);
- if (update != null) {
-// Version newVersion = version.getNext(config.getSharedObjectID());
-// Commit msg = new Commit(newVersion, update);
-// version = newVersion;
-// try {
-// config.getContext().sendMessage(null, msg);
-// } catch (IOException e) {
-// throw new ECFException(e);
-// }
- }
+ boolean sent = sender.sendMessage(update);
+ if (!sent)
+ throw new ECFException("Commit failed.");
}
public synchronized void dispose() {
@@ -112,18 +112,44 @@ public class Agent implements ISharedData, ISharedObject {
throws SharedObjectInitException {
this.config = config;
Map params = config.getProperties();
+ ISharedObjectConfig bootstrapConfig = null;
+ ISharedObjectConfig senderConfig = null;
if (params != null) {
Object param = params.get("sharedData");
if (param != null)
sharedData = param;
- param = params.get("version");
-// if (param != null)
-// version = (Version) param;
-
param = params.get("bootstrap");
- if (param != null)
- bootstrap = ((IBootstrapMemento) param).createBootstrap();
+ if (param != null) {
+ SharedObjectDescription sd = (SharedObjectDescription) param;
+ try {
+ Class bootstrapClass = Class.forName(sd.getClassname());
+ bootstrap = (IBootstrap) bootstrapClass.newInstance();
+ bootstrapConfig = new ComponentConfig(sd);
+ } catch (ClassNotFoundException e) {
+ throw new SharedObjectInitException(e);
+ } catch (InstantiationException e) {
+ throw new SharedObjectInitException(e);
+ } catch (IllegalAccessException e) {
+ throw new SharedObjectInitException(e);
+ }
+ }
+
+ param = params.get("sender");
+ if (param != null) {
+ SharedObjectDescription sd = (SharedObjectDescription) param;
+ try {
+ Class senderClass = Class.forName(sd.getClassname());
+ sender = (AbstractMulticaster) senderClass.newInstance();
+ senderConfig = new ComponentConfig(sd);
+ } catch (ClassNotFoundException e) {
+ throw new SharedObjectInitException(e);
+ } catch (InstantiationException e) {
+ throw new SharedObjectInitException(e);
+ } catch (IllegalAccessException e) {
+ throw new SharedObjectInitException(e);
+ }
+ }
param = params.get("updateProvider");
if (param != null)
@@ -131,11 +157,13 @@ public class Agent implements ISharedData, ISharedObject {
(String) param, null); // TODO what about params?
}
-// if (version == null)
-// version = new Version(config.getSharedObjectID());
-
bootstrap.setAgent(this);
- bootstrap.init(config);
+ bootstrap.init(bootstrapConfig == null ? new ComponentConfig(null)
+ : bootstrapConfig);
+
+ sender.addMessageListener(this);
+ sender.init(senderConfig == null ? new ComponentConfig(null)
+ : senderConfig);
}
/*
@@ -145,12 +173,16 @@ public class Agent implements ISharedData, ISharedObject {
*/
public void handleEvent(Event event) {
bootstrap.handleEvent(event);
+ sender.handleEvent(event);
if (event instanceof ISharedObjectActivatedEvent) {
ISharedObjectActivatedEvent e = (ISharedObjectActivatedEvent) event;
if (e.getActivatedID().equals(config.getSharedObjectID()))
handleActivated();
} else if (event instanceof ISharedObjectMessageEvent) {
ISharedObjectMessageEvent e = (ISharedObjectMessageEvent) event;
+ if (e.getData() instanceof Activated)
+ handleActivated(e.getRemoteContainerID(), (Activated) e
+ .getData());
}
}
@@ -158,10 +190,10 @@ public class Agent implements ISharedData, ISharedObject {
if (config.getHomeContainerID().equals(
config.getContext().getLocalContainerID()))
try {
- Map params = new HashMap(3);
+ Map params = new HashMap(4);
params.put("sharedData", sharedData);
-// params.put("version", version);
- params.put("bootstrap", bootstrap.createMemento());
+ params.put("bootstrap", bootstrap.createDescription());
+ params.put("sender", sender.createDescription());
params.put("updateProvider", updateProvider.getFactory()
.getID());
config.getContext().sendCreate(
@@ -179,22 +211,57 @@ public class Agent implements ISharedData, ISharedObject {
}
}
- public void doBootstrap(ID containerID) {
- Map params = new HashMap(3);
+ public synchronized void doBootstrap(ID containerID) {
+ while (newContainerID != null)
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ handleError(e);
+ }
+
+ try {
+ sender.pause();
+ } catch (IllegalStateException e) {
+ handleError(e);
+ return;
+ } catch (ECFException e) {
+ handleError(e);
+ return;
+ }
+
+ Map params = new HashMap(4);
params.put("sharedData", sharedData);
-// params.put("version", version);
- params.put("bootstrap", bootstrap.createMemento());
+ params.put("bootstrap", bootstrap.createDescription());
+ params.put("sender", sender.createDescription());
params.put("updateProvider", updateProvider.getFactory().getID());
try {
config.getContext().sendCreate(
containerID,
new SharedObjectDescription(config.getSharedObjectID(),
getClass(), params));
+ wait(1000);
+ newContainerID = null;
} catch (IOException e) {
handleError(e);
+ } catch (InterruptedException e) {
+ handleError(e);
+ } finally {
+ try {
+ sender.resume();
+ } catch (IllegalStateException e) {
+ handleError(e);
+ } catch (ECFException e) {
+ handleError(e);
+ }
}
}
+ private synchronized void handleActivated(ID remoteContainerID,
+ Activated activated) {
+ if (remoteContainerID.equals(newContainerID))
+ notify();
+ }
+
private void handleError(Throwable t) {
t.printStackTrace();
}
@@ -209,15 +276,22 @@ public class Agent implements ISharedData, ISharedObject {
handleEvent(events[i]);
}
+ public synchronized void messageReceived(Version version, Object message) {
+ try {
+ updateProvider.applyUpdate(this, message);
+ } catch (ECFException e) {
+ handleError(e);
+ }
+ }
+
/*
* (non-Javadoc)
*
* @see org.eclipse.ecf.core.ISharedObject#dispose(org.eclipse.ecf.core.identity.ID)
*/
- public synchronized void dispose(ID containerID) {
+ public void dispose(ID containerID) {
+ sender.dispose(containerID);
bootstrap.dispose(containerID);
- bootstrap = null;
- config = null;
}
/*
@@ -228,4 +302,29 @@ public class Agent implements ISharedData, ISharedObject {
public Object getAdapter(Class clazz) {
return null;
}
+
+ private class ComponentConfig implements ISharedObjectConfig {
+
+ private final SharedObjectDescription sd;
+
+ public ComponentConfig(SharedObjectDescription sd) {
+ this.sd = sd;
+ }
+
+ public ID getSharedObjectID() {
+ return config.getSharedObjectID();
+ }
+
+ public ID getHomeContainerID() {
+ return config.getHomeContainerID();
+ }
+
+ public ISharedObjectContext getContext() {
+ return config.getContext();
+ }
+
+ public Map getProperties() {
+ return sd == null ? null : sd.getProperties();
+ }
+ }
}
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/DataShareService.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/DataShareService.java
index a8eda900a..d43f4259c 100644
--- a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/DataShareService.java
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/DataShareService.java
@@ -18,6 +18,8 @@ import org.eclipse.ecf.datashare.IPublicationCallback;
import org.eclipse.ecf.datashare.ISharedData;
import org.eclipse.ecf.datashare.ISubscriptionCallback;
import org.eclipse.ecf.datashare.IUpdateProvider;
+import org.eclipse.ecf.datashare.multicast.AbstractMulticaster;
+import org.eclipse.ecf.datashare.multicast.ConsistentMulticaster;
/**
* @author pnehrer
@@ -50,7 +52,8 @@ public class DataShareService implements IDataShareService {
throw new ECFException("Already published!");
IBootstrap bootstrap = getBootstrap();
- agent = new Agent(dataGraph, bootstrap, provider, callback);
+ AbstractMulticaster sender = getSender();
+ agent = new Agent(dataGraph, bootstrap, sender, provider, callback);
container.getSharedObjectManager().addSharedObject(id, agent, null,
null);
}
@@ -88,4 +91,8 @@ public class DataShareService implements IDataShareService {
private IBootstrap getBootstrap() {
return new ServerBootstrap(); // TODO strategize
}
+
+ private AbstractMulticaster getSender() {
+ return new ConsistentMulticaster(); // TODO strategize
+ }
}
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/EagerElectionBootstrap.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/EagerElectionBootstrap.java
index f3181ac2c..29addd385 100644
--- a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/EagerElectionBootstrap.java
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/EagerElectionBootstrap.java
@@ -12,12 +12,15 @@ package org.eclipse.ecf.internal.datashare;
import java.io.IOException;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import org.eclipse.ecf.core.ISharedObjectConfig;
+import org.eclipse.ecf.core.SharedObjectDescription;
import org.eclipse.ecf.core.SharedObjectInitException;
import org.eclipse.ecf.core.events.ISharedObjectActivatedEvent;
import org.eclipse.ecf.core.events.ISharedObjectContainerDepartedEvent;
@@ -62,6 +65,12 @@ public class EagerElectionBootstrap implements IBootstrap {
public void init(ISharedObjectConfig config)
throws SharedObjectInitException {
this.config = config;
+ Map params = config.getProperties();
+ if (params != null) {
+ Object param = params.get("coordinatorID");
+ if (param != null)
+ coordinatorID = (ID) param;
+ }
}
/*
@@ -181,13 +190,11 @@ public class EagerElectionBootstrap implements IBootstrap {
config = null;
}
- /*
- * (non-Javadoc)
- *
- * @see org.eclipse.ecf.internal.datashare.IBootstrap#createMemento()
- */
- public IBootstrapMemento createMemento() {
- return new BootstrapMemento(coordinatorID);
+ public SharedObjectDescription createDescription() {
+ HashMap params = new HashMap(1);
+ params.put("coordinatorID", coordinatorID);
+ return new SharedObjectDescription(config.getSharedObjectID(),
+ getClass(), params);
}
private class Pinger extends TimerTask {
@@ -207,21 +214,4 @@ public class EagerElectionBootstrap implements IBootstrap {
startElection();
}
}
-
- public static class BootstrapMemento implements IBootstrapMemento {
-
- private static final long serialVersionUID = 3257562910522814772L;
-
- private final ID coordinatorID;
-
- private BootstrapMemento(ID coordinatorID) {
- this.coordinatorID = coordinatorID;
- }
-
- public IBootstrap createBootstrap() {
- EagerElectionBootstrap b = new EagerElectionBootstrap();
- b.coordinatorID = this.coordinatorID;
- return b;
- }
- }
}
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/IBootstrap.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/IBootstrap.java
index 8e5fa1b42..fdaeeb531 100644
--- a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/IBootstrap.java
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/IBootstrap.java
@@ -11,6 +11,7 @@
package org.eclipse.ecf.internal.datashare;
import org.eclipse.ecf.core.ISharedObjectConfig;
+import org.eclipse.ecf.core.SharedObjectDescription;
import org.eclipse.ecf.core.SharedObjectInitException;
import org.eclipse.ecf.core.identity.ID;
import org.eclipse.ecf.core.util.Event;
@@ -28,5 +29,5 @@ public interface IBootstrap {
void dispose(ID containerID);
- IBootstrapMemento createMemento();
+ SharedObjectDescription createDescription();
} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/LazyElectionBootstrap.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/LazyElectionBootstrap.java
index 3f79fe631..601ec7242 100644
--- a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/LazyElectionBootstrap.java
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/LazyElectionBootstrap.java
@@ -19,6 +19,7 @@ import java.util.Map;
import java.util.Random;
import org.eclipse.ecf.core.ISharedObjectConfig;
+import org.eclipse.ecf.core.SharedObjectDescription;
import org.eclipse.ecf.core.SharedObjectInitException;
import org.eclipse.ecf.core.events.ISharedObjectContainerDepartedEvent;
import org.eclipse.ecf.core.events.ISharedObjectContainerJoinedEvent;
@@ -167,21 +168,8 @@ public class LazyElectionBootstrap implements IBootstrap {
config = null;
}
- /*
- * (non-Javadoc)
- *
- * @see org.eclipse.ecf.internal.datashare.IBootstrap#createMemento()
- */
- public IBootstrapMemento createMemento() {
- return new BootstrapMemento();
- }
-
- public static class BootstrapMemento implements IBootstrapMemento {
-
- private static final long serialVersionUID = 3256438127341418808L;
-
- public IBootstrap createBootstrap() {
- return new LazyElectionBootstrap();
- }
+ public SharedObjectDescription createDescription() {
+ return new SharedObjectDescription(config.getSharedObjectID(),
+ getClass());
}
}
diff --git a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/ServerBootstrap.java b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/ServerBootstrap.java
index 0d4c7fcaf..e8453997b 100644
--- a/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/ServerBootstrap.java
+++ b/framework/bundles/org.eclipse.ecf.datashare/src/org/eclipse/ecf/internal/datashare/ServerBootstrap.java
@@ -11,6 +11,7 @@
package org.eclipse.ecf.internal.datashare;
import org.eclipse.ecf.core.ISharedObjectConfig;
+import org.eclipse.ecf.core.SharedObjectDescription;
import org.eclipse.ecf.core.SharedObjectInitException;
import org.eclipse.ecf.core.events.ISharedObjectContainerJoinedEvent;
import org.eclipse.ecf.core.identity.ID;
@@ -75,21 +76,8 @@ public class ServerBootstrap implements IBootstrap {
config = null;
}
- /*
- * (non-Javadoc)
- *
- * @see org.eclipse.ecf.internal.datashare.IBootstrap#createMemento()
- */
- public IBootstrapMemento createMemento() {
- return new BootstrapMemento();
- }
-
- public static class BootstrapMemento implements IBootstrapMemento {
-
- private static final long serialVersionUID = 3691040980384299317L;
-
- public IBootstrap createBootstrap() {
- return new ServerBootstrap();
- }
+ public SharedObjectDescription createDescription() {
+ return new SharedObjectDescription(config.getSharedObjectID(),
+ getClass());
}
}

Back to the top