Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorslewis2005-07-04 02:09:56 +0000
committerslewis2005-07-04 02:09:56 +0000
commit2afc9dcc278736c3715c0ee80603052ab4206533 (patch)
tree5f30aa6d190c904149e41820663df02c73d87aef
parent485d4ffabe0ca9c5bca504a19b383180918ba291 (diff)
downloadorg.eclipse.ecf-2afc9dcc278736c3715c0ee80603052ab4206533.tar.gz
org.eclipse.ecf-2afc9dcc278736c3715c0ee80603052ab4206533.tar.xz
org.eclipse.ecf-2afc9dcc278736c3715c0ee80603052ab4206533.zip
Added code to support transactional distribution of shared objects
-rw-r--r--examples/bundles/org.eclipse.ecf.example.collab/src/org/eclipse/ecf/example/collab/Client.java1
-rw-r--r--examples/bundles/org.eclipse.ecf.example.collab/src/org/eclipse/ecf/example/collab/share/TransactionSharedObject.java4
-rw-r--r--examples/bundles/org.eclipse.ecf.example.collab/src/org/eclipse/ecf/example/collab/share/io/EclipseFileTransfer.java2
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/.options3
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java41
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/ReplicateSharedObjectTransaction.java10
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TransactionEventProcessor.java142
-rw-r--r--framework/bundles/org.eclipse.ecf/src/org/eclipse/ecf/core/SharedObjectAddAbortException.java13
-rw-r--r--framework/bundles/org.eclipse.ecf/src/org/eclipse/ecf/core/util/IEventProcessor.java2
9 files changed, 201 insertions, 17 deletions
diff --git a/examples/bundles/org.eclipse.ecf.example.collab/src/org/eclipse/ecf/example/collab/Client.java b/examples/bundles/org.eclipse.ecf.example.collab/src/org/eclipse/ecf/example/collab/Client.java
index bf312c723..1eb2ab446 100644
--- a/examples/bundles/org.eclipse.ecf.example.collab/src/org/eclipse/ecf/example/collab/Client.java
+++ b/examples/bundles/org.eclipse.ecf.example.collab/src/org/eclipse/ecf/example/collab/Client.java
@@ -253,7 +253,6 @@ public class Client {
(username == null) ? USERNAME : username, proj);
makeAndAddSharedObject(client, project, user, getSharedFileDirectoryForProject(project));
}
-
public synchronized ClientEntry isConnected(IResource project, String type) {
if (type == null) type = GENERIC_CONTAINER_CLIENT_NAME;
ClientEntry entry = getClientEntry(project,type);
diff --git a/examples/bundles/org.eclipse.ecf.example.collab/src/org/eclipse/ecf/example/collab/share/TransactionSharedObject.java b/examples/bundles/org.eclipse.ecf.example.collab/src/org/eclipse/ecf/example/collab/share/TransactionSharedObject.java
index 3950317f6..4557c7208 100644
--- a/examples/bundles/org.eclipse.ecf.example.collab/src/org/eclipse/ecf/example/collab/share/TransactionSharedObject.java
+++ b/examples/bundles/org.eclipse.ecf.example.collab/src/org/eclipse/ecf/example/collab/share/TransactionSharedObject.java
@@ -31,7 +31,7 @@ public class TransactionSharedObject extends GenericSharedObject implements ISha
// Dummy inner class to provide lock
static final class Lock {}
- // Timeout value associated with this object's replication within a RepSpace
+ // Timeout value associated with this object's replication
protected int timeout;
// Replication state this object is currently in.
protected byte state;
@@ -279,7 +279,7 @@ public class TransactionSharedObject extends GenericSharedObject implements ISha
Exception e = (Exception) failedParticipants.get(remoteID);
trace("failed participant "+remoteID+" causing abort");
// Abort!
- throw new SharedObjectAddAbortException("Abort received from RepSpace "+remoteID, e);
+ throw new SharedObjectAddAbortException("Abort received "+remoteID, e);
// If no problems, and the number of participants to here from is 0, then we're done
} else if (state == ISharedObjectContainerTransaction.VOTING && participantIDs.size() == 0) {
// Success!
diff --git a/examples/bundles/org.eclipse.ecf.example.collab/src/org/eclipse/ecf/example/collab/share/io/EclipseFileTransfer.java b/examples/bundles/org.eclipse.ecf.example.collab/src/org/eclipse/ecf/example/collab/share/io/EclipseFileTransfer.java
index d80cf10e5..ac07895c1 100644
--- a/examples/bundles/org.eclipse.ecf.example.collab/src/org/eclipse/ecf/example/collab/share/io/EclipseFileTransfer.java
+++ b/examples/bundles/org.eclipse.ecf.example.collab/src/org/eclipse/ecf/example/collab/share/io/EclipseFileTransfer.java
@@ -179,8 +179,6 @@ public class EclipseFileTransfer extends FileTransferSharedObject implements
// In this case, we're going to go ahead and continue anyway
return true;
// Abort!
- // throw new AbortException("Abort received from RepSpace
- // "+remoteID, e);
// If no problems, and the number of participants to here from is 0,
// then we're done
} else if (state == ISharedObjectContainerTransaction.VOTING
diff --git a/framework/bundles/org.eclipse.ecf.provider/.options b/framework/bundles/org.eclipse.ecf.provider/.options
index 5b80749fb..1550ba4ca 100644
--- a/framework/bundles/org.eclipse.ecf.provider/.options
+++ b/framework/bundles/org.eclipse.ecf.provider/.options
@@ -8,4 +8,5 @@ org.eclipse.ecf.provider/debug/sharedobjectcontext = true
org.eclipse.ecf.provider/debug/sharedobjectmanager = true
org.eclipse.ecf.provider/debug/gmm = true
org.eclipse.ecf.provider/debug/containerfactory = true
-org.eclipse.ecf.provider/debug/basesharedobject = true \ No newline at end of file
+org.eclipse.ecf.provider/debug/basesharedobject = true
+org.eclipse.ecf.provider/debug/transactioneventprocessor = true \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java
index 7f9092c71..c0d52b4ab 100644
--- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java
@@ -11,6 +11,7 @@
package org.eclipse.ecf.provider.generic.sobject;
+import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -20,6 +21,7 @@ import org.eclipse.ecf.core.IIdentifiable;
import org.eclipse.ecf.core.ISharedObject;
import org.eclipse.ecf.core.ISharedObjectConfig;
import org.eclipse.ecf.core.ISharedObjectContext;
+import org.eclipse.ecf.core.ISharedObjectManager;
import org.eclipse.ecf.core.SharedObjectInitException;
import org.eclipse.ecf.core.identity.ID;
import org.eclipse.ecf.core.util.Event;
@@ -47,10 +49,10 @@ public class BaseSharedObject implements ISharedObject, IIdentifiable {
trace.dumpStack(t,msg);
}
}
- protected void addProcessor(IEventProcessor proc) {
+ protected void addEventProcessor(IEventProcessor proc) {
eventProcessors.add(proc);
}
- protected boolean removeProcessor(IEventProcessor proc) {
+ protected boolean removeEventProcessor(IEventProcessor proc) {
return eventProcessors.remove(proc);
}
protected void fireEventProcessors(Event event) {
@@ -141,5 +143,40 @@ public class BaseSharedObject implements ISharedObject, IIdentifiable {
public ID getID() {
return getConfig().getSharedObjectID();
}
+ public void destroySelf() {
+ if (isPrimary()) {
+ try {
+ // Send destroy message to all known remotes
+ destroyRemote(null);
+ } catch (IOException e) {
+ traceStack("Exception sending destroy message to remotes", e);
+ }
+ }
+ destroySelfLocal();
+ }
+
+ public void destroySelfLocal() {
+ try {
+ ISharedObjectConfig soconfig = getConfig();
+ if (soconfig != null) {
+ ID myID = soconfig.getSharedObjectID();
+ ISharedObjectContext context = getContext();
+ if (context != null) {
+ ISharedObjectManager manager = context.getSharedObjectManager();
+ if (manager != null) {
+ manager.removeSharedObject(myID);
+ }
+ }
+ }
+ } catch (Exception e) {
+ traceStack("Exception in destroySelfLocal()",e);
+ }
+ }
+ public void destroyRemote(ID remoteID) throws IOException {
+ ISharedObjectContext context = getContext();
+ if (context != null) {
+ context.sendDispose(remoteID);
+ }
+ }
}
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/ReplicateSharedObjectTransaction.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/ReplicateSharedObjectTransaction.java
index bccd52ac8..2eb04d6e1 100644
--- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/ReplicateSharedObjectTransaction.java
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/ReplicateSharedObjectTransaction.java
@@ -20,25 +20,23 @@ import org.eclipse.ecf.core.SharedObjectAddAbortException;
public class ReplicateSharedObjectTransaction implements
ISharedObjectContainerTransaction {
- BaseSharedObject sharedObject = null;
+ TransactionEventProcessor eventProcessor = null;
public ReplicateSharedObjectTransaction(BaseSharedObject so) {
- sharedObject = so;
+ eventProcessor = new TransactionEventProcessor(so);
}
/* (non-Javadoc)
* @see org.eclipse.ecf.core.ISharedObjectContainerTransaction#waitToCommit()
*/
public void waitToCommit() throws SharedObjectAddAbortException {
- // TODO Auto-generated method stub
-
+ eventProcessor.waitToCommit();
}
/* (non-Javadoc)
* @see org.eclipse.ecf.core.ISharedObjectContainerTransaction#getTransactionState()
*/
public byte getTransactionState() {
- // TODO Auto-generated method stub
- return 0;
+ return eventProcessor.getTransactionState();
}
}
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TransactionEventProcessor.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TransactionEventProcessor.java
new file mode 100644
index 000000000..fb78050d5
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TransactionEventProcessor.java
@@ -0,0 +1,142 @@
+/**
+ *
+ */
+package org.eclipse.ecf.provider.generic.sobject;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import org.eclipse.ecf.core.ISharedObjectContainerTransaction;
+import org.eclipse.ecf.core.SharedObjectAddAbortException;
+import org.eclipse.ecf.core.identity.ID;
+import org.eclipse.ecf.core.util.Event;
+import org.eclipse.ecf.core.util.IEventProcessor;
+import org.eclipse.ecf.provider.Trace;
+
+/**
+ * @author slewis
+ *
+ */
+public class TransactionEventProcessor implements IEventProcessor {
+
+ public static final Trace trace = Trace.create("transactioneventprocessor");
+ public static final int DEFAULT_TIMEOUT = 30000;
+ BaseSharedObject sharedObject = null;
+ byte transactionState = ISharedObjectContainerTransaction.ACTIVE;
+ Object lock = new Object();
+ List participants = new Vector();
+ Map failed = new HashMap();
+ int timeout = DEFAULT_TIMEOUT;
+
+ public TransactionEventProcessor(BaseSharedObject bse, int timeout) {
+ sharedObject = bse;
+ sharedObject.addEventProcessor(this);
+ this.timeout = timeout;
+ }
+ public TransactionEventProcessor(BaseSharedObject bse) {
+ this(bse,DEFAULT_TIMEOUT);
+ }
+ protected void trace(String msg) {
+ if (Trace.ON && trace != null) {
+ trace.msg(sharedObject.getID()+":"+msg);
+ }
+ }
+
+ protected void traceDump(String msg, Throwable t) {
+ if (Trace.ON && trace != null) {
+ trace.dumpStack(t, sharedObject.getID()+":"+msg);
+ }
+ }
+
+ protected void addParticipants(ID [] ids) {
+ if (ids != null) {
+ for(int i=0; i < ids.length; i++) {
+ trace("adding participant:"+ids[i]);
+ if (!sharedObject.getHomeID().equals(ids[i])) participants.add(ids[i]);
+ }
+ }
+ }
+ protected void removeParticipant(ID id) {
+ if (id != null) {
+ trace("removing participant:"+id);
+ participants.remove(id);
+ }
+ }
+ protected void addFailed(ID remote, Throwable failure)
+ {
+ if (remote != null && failure != null) {
+ trace("adding failed:"+remote+":exception:"+failure);
+ failed.put(remote, failure);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.eclipse.ecf.core.util.IEventProcessor#acceptEvent(org.eclipse.ecf.core.util.Event)
+ */
+ public boolean acceptEvent(Event event) {
+ return true;
+ }
+
+ /* (non-Javadoc)
+ * @see org.eclipse.ecf.core.util.IEventProcessor#processEvent(org.eclipse.ecf.core.util.Event)
+ */
+ public Event processEvent(Event e) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ protected byte getTransactionState() {
+ synchronized (lock) {
+ return transactionState;
+ }
+ }
+ protected void waitToCommit() throws SharedObjectAddAbortException {
+ synchronized (lock) {
+ long end = System.currentTimeMillis() + timeout;
+ try {
+ while (!votingCompleted()) {
+ long wait = end - System.currentTimeMillis();
+ trace("waitForFinish waiting "+wait+"ms on "+sharedObject.getID());
+ if (wait <= 0L) throw new SharedObjectAddAbortException("Timeout waiting for create responses",(Throwable) null,timeout);
+ // Wait right here
+ lock.wait(wait);
+ }
+ } catch (InterruptedException e) {
+ throw new SharedObjectAddAbortException("Wait interrupted",(Throwable) null,timeout);
+ } catch (SharedObjectAddAbortException e1) {
+ // Aborted for some reason. Clean up.
+ doAbort(e1);
+ }
+ // Success. Send commit to remotes and clean up before returning.
+ doCommit();
+ }
+ }
+ protected void doAbort(Throwable t) {
+ // XXX TODO
+ }
+ protected void doCommit() {
+ // XXX TODO
+ }
+ protected boolean votingCompleted() throws SharedObjectAddAbortException
+ {
+ // The test here is is we've received any indication of failed participants in
+ // the transaction. If so, we throw.
+ trace("voting completed test");
+ if (failed.size() > 0) {
+ trace("voting completed. Failures:"+failed);
+ // Abort!
+ throw new SharedObjectAddAbortException("Abort received",failed,timeout);
+ // If no problems, and the number of participants to here from is 0, then we're done
+ } else if (getTransactionState() == ISharedObjectContainerTransaction.VOTING && participants.size() == 0) {
+ // Success!
+ trace("votingCompleted:true");
+ return true;
+ }
+ // Else continue waiting
+ trace("voting not completed");
+ return false;
+ }
+
+}
diff --git a/framework/bundles/org.eclipse.ecf/src/org/eclipse/ecf/core/SharedObjectAddAbortException.java b/framework/bundles/org.eclipse.ecf/src/org/eclipse/ecf/core/SharedObjectAddAbortException.java
index aecb9b213..df12fb5f4 100644
--- a/framework/bundles/org.eclipse.ecf/src/org/eclipse/ecf/core/SharedObjectAddAbortException.java
+++ b/framework/bundles/org.eclipse.ecf/src/org/eclipse/ecf/core/SharedObjectAddAbortException.java
@@ -9,12 +9,15 @@
package org.eclipse.ecf.core;
+import java.util.Map;
+
public class SharedObjectAddAbortException extends SharedObjectAddException {
private static final long serialVersionUID = 4120851079287223088L;
protected long timeout = -1L;
-
+ protected Map causes;
+
public SharedObjectAddAbortException() {
super();
}
@@ -29,6 +32,10 @@ public class SharedObjectAddAbortException extends SharedObjectAddException {
super(msg, cause);
this.timeout = timeout;
}
+ public SharedObjectAddAbortException(String msg, Map causes, int timeout) {
+ super(msg);
+ this.causes = causes;
+ }
public SharedObjectAddAbortException(Throwable cause) {
super(cause);
}
@@ -36,5 +43,7 @@ public class SharedObjectAddAbortException extends SharedObjectAddException {
public long getTimeout() {
return timeout;
}
-
+ public Map getCauses() {
+ return causes;
+ }
} \ No newline at end of file
diff --git a/framework/bundles/org.eclipse.ecf/src/org/eclipse/ecf/core/util/IEventProcessor.java b/framework/bundles/org.eclipse.ecf/src/org/eclipse/ecf/core/util/IEventProcessor.java
index 8fbb8c0a9..e4e118171 100644
--- a/framework/bundles/org.eclipse.ecf/src/org/eclipse/ecf/core/util/IEventProcessor.java
+++ b/framework/bundles/org.eclipse.ecf/src/org/eclipse/ecf/core/util/IEventProcessor.java
@@ -16,5 +16,5 @@ public interface IEventProcessor {
* Process given Event
* @param e the Event to process
*/
- public Event processEvent(Event e);
+ public Event processEvent(Event event);
} \ No newline at end of file

Back to the top