diff options
| author | slewis | 2005-07-04 02:09:56 +0000 |
|---|---|---|
| committer | slewis | 2005-07-04 02:09:56 +0000 |
| commit | 2afc9dcc278736c3715c0ee80603052ab4206533 (patch) | |
| tree | 5f30aa6d190c904149e41820663df02c73d87aef | |
| parent | 485d4ffabe0ca9c5bca504a19b383180918ba291 (diff) | |
| download | org.eclipse.ecf-2afc9dcc278736c3715c0ee80603052ab4206533.tar.gz org.eclipse.ecf-2afc9dcc278736c3715c0ee80603052ab4206533.tar.xz org.eclipse.ecf-2afc9dcc278736c3715c0ee80603052ab4206533.zip | |
Added code to support transactional distribution of shared objects
9 files changed, 201 insertions, 17 deletions
diff --git a/examples/bundles/org.eclipse.ecf.example.collab/src/org/eclipse/ecf/example/collab/Client.java b/examples/bundles/org.eclipse.ecf.example.collab/src/org/eclipse/ecf/example/collab/Client.java index bf312c723..1eb2ab446 100644 --- a/examples/bundles/org.eclipse.ecf.example.collab/src/org/eclipse/ecf/example/collab/Client.java +++ b/examples/bundles/org.eclipse.ecf.example.collab/src/org/eclipse/ecf/example/collab/Client.java @@ -253,7 +253,6 @@ public class Client { (username == null) ? USERNAME : username, proj); makeAndAddSharedObject(client, project, user, getSharedFileDirectoryForProject(project)); } - public synchronized ClientEntry isConnected(IResource project, String type) { if (type == null) type = GENERIC_CONTAINER_CLIENT_NAME; ClientEntry entry = getClientEntry(project,type); diff --git a/examples/bundles/org.eclipse.ecf.example.collab/src/org/eclipse/ecf/example/collab/share/TransactionSharedObject.java b/examples/bundles/org.eclipse.ecf.example.collab/src/org/eclipse/ecf/example/collab/share/TransactionSharedObject.java index 3950317f6..4557c7208 100644 --- a/examples/bundles/org.eclipse.ecf.example.collab/src/org/eclipse/ecf/example/collab/share/TransactionSharedObject.java +++ b/examples/bundles/org.eclipse.ecf.example.collab/src/org/eclipse/ecf/example/collab/share/TransactionSharedObject.java @@ -31,7 +31,7 @@ public class TransactionSharedObject extends GenericSharedObject implements ISha // Dummy inner class to provide lock static final class Lock {} - // Timeout value associated with this object's replication within a RepSpace + // Timeout value associated with this object's replication protected int timeout; // Replication state this object is currently in. protected byte state; @@ -279,7 +279,7 @@ public class TransactionSharedObject extends GenericSharedObject implements ISha Exception e = (Exception) failedParticipants.get(remoteID); trace("failed participant "+remoteID+" causing abort"); // Abort! - throw new SharedObjectAddAbortException("Abort received from RepSpace "+remoteID, e); + throw new SharedObjectAddAbortException("Abort received "+remoteID, e); // If no problems, and the number of participants to here from is 0, then we're done } else if (state == ISharedObjectContainerTransaction.VOTING && participantIDs.size() == 0) { // Success! diff --git a/examples/bundles/org.eclipse.ecf.example.collab/src/org/eclipse/ecf/example/collab/share/io/EclipseFileTransfer.java b/examples/bundles/org.eclipse.ecf.example.collab/src/org/eclipse/ecf/example/collab/share/io/EclipseFileTransfer.java index d80cf10e5..ac07895c1 100644 --- a/examples/bundles/org.eclipse.ecf.example.collab/src/org/eclipse/ecf/example/collab/share/io/EclipseFileTransfer.java +++ b/examples/bundles/org.eclipse.ecf.example.collab/src/org/eclipse/ecf/example/collab/share/io/EclipseFileTransfer.java @@ -179,8 +179,6 @@ public class EclipseFileTransfer extends FileTransferSharedObject implements // In this case, we're going to go ahead and continue anyway return true; // Abort! - // throw new AbortException("Abort received from RepSpace - // "+remoteID, e); // If no problems, and the number of participants to here from is 0, // then we're done } else if (state == ISharedObjectContainerTransaction.VOTING diff --git a/framework/bundles/org.eclipse.ecf.provider/.options b/framework/bundles/org.eclipse.ecf.provider/.options index 5b80749fb..1550ba4ca 100644 --- a/framework/bundles/org.eclipse.ecf.provider/.options +++ b/framework/bundles/org.eclipse.ecf.provider/.options @@ -8,4 +8,5 @@ org.eclipse.ecf.provider/debug/sharedobjectcontext = true org.eclipse.ecf.provider/debug/sharedobjectmanager = true org.eclipse.ecf.provider/debug/gmm = true org.eclipse.ecf.provider/debug/containerfactory = true -org.eclipse.ecf.provider/debug/basesharedobject = true
\ No newline at end of file +org.eclipse.ecf.provider/debug/basesharedobject = true +org.eclipse.ecf.provider/debug/transactioneventprocessor = true
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java index 7f9092c71..c0d52b4ab 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java @@ -11,6 +11,7 @@ package org.eclipse.ecf.provider.generic.sobject; +import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -20,6 +21,7 @@ import org.eclipse.ecf.core.IIdentifiable; import org.eclipse.ecf.core.ISharedObject; import org.eclipse.ecf.core.ISharedObjectConfig; import org.eclipse.ecf.core.ISharedObjectContext; +import org.eclipse.ecf.core.ISharedObjectManager; import org.eclipse.ecf.core.SharedObjectInitException; import org.eclipse.ecf.core.identity.ID; import org.eclipse.ecf.core.util.Event; @@ -47,10 +49,10 @@ public class BaseSharedObject implements ISharedObject, IIdentifiable { trace.dumpStack(t,msg); } } - protected void addProcessor(IEventProcessor proc) { + protected void addEventProcessor(IEventProcessor proc) { eventProcessors.add(proc); } - protected boolean removeProcessor(IEventProcessor proc) { + protected boolean removeEventProcessor(IEventProcessor proc) { return eventProcessors.remove(proc); } protected void fireEventProcessors(Event event) { @@ -141,5 +143,40 @@ public class BaseSharedObject implements ISharedObject, IIdentifiable { public ID getID() { return getConfig().getSharedObjectID(); } + public void destroySelf() { + if (isPrimary()) { + try { + // Send destroy message to all known remotes + destroyRemote(null); + } catch (IOException e) { + traceStack("Exception sending destroy message to remotes", e); + } + } + destroySelfLocal(); + } + + public void destroySelfLocal() { + try { + ISharedObjectConfig soconfig = getConfig(); + if (soconfig != null) { + ID myID = soconfig.getSharedObjectID(); + ISharedObjectContext context = getContext(); + if (context != null) { + ISharedObjectManager manager = context.getSharedObjectManager(); + if (manager != null) { + manager.removeSharedObject(myID); + } + } + } + } catch (Exception e) { + traceStack("Exception in destroySelfLocal()",e); + } + } + public void destroyRemote(ID remoteID) throws IOException { + ISharedObjectContext context = getContext(); + if (context != null) { + context.sendDispose(remoteID); + } + } } diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/ReplicateSharedObjectTransaction.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/ReplicateSharedObjectTransaction.java index bccd52ac8..2eb04d6e1 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/ReplicateSharedObjectTransaction.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/ReplicateSharedObjectTransaction.java @@ -20,25 +20,23 @@ import org.eclipse.ecf.core.SharedObjectAddAbortException; public class ReplicateSharedObjectTransaction implements ISharedObjectContainerTransaction { - BaseSharedObject sharedObject = null; + TransactionEventProcessor eventProcessor = null; public ReplicateSharedObjectTransaction(BaseSharedObject so) { - sharedObject = so; + eventProcessor = new TransactionEventProcessor(so); } /* (non-Javadoc) * @see org.eclipse.ecf.core.ISharedObjectContainerTransaction#waitToCommit() */ public void waitToCommit() throws SharedObjectAddAbortException { - // TODO Auto-generated method stub - + eventProcessor.waitToCommit(); } /* (non-Javadoc) * @see org.eclipse.ecf.core.ISharedObjectContainerTransaction#getTransactionState() */ public byte getTransactionState() { - // TODO Auto-generated method stub - return 0; + return eventProcessor.getTransactionState(); } } diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TransactionEventProcessor.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TransactionEventProcessor.java new file mode 100644 index 000000000..fb78050d5 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TransactionEventProcessor.java @@ -0,0 +1,142 @@ +/** + * + */ +package org.eclipse.ecf.provider.generic.sobject; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Vector; + +import org.eclipse.ecf.core.ISharedObjectContainerTransaction; +import org.eclipse.ecf.core.SharedObjectAddAbortException; +import org.eclipse.ecf.core.identity.ID; +import org.eclipse.ecf.core.util.Event; +import org.eclipse.ecf.core.util.IEventProcessor; +import org.eclipse.ecf.provider.Trace; + +/** + * @author slewis + * + */ +public class TransactionEventProcessor implements IEventProcessor { + + public static final Trace trace = Trace.create("transactioneventprocessor"); + public static final int DEFAULT_TIMEOUT = 30000; + BaseSharedObject sharedObject = null; + byte transactionState = ISharedObjectContainerTransaction.ACTIVE; + Object lock = new Object(); + List participants = new Vector(); + Map failed = new HashMap(); + int timeout = DEFAULT_TIMEOUT; + + public TransactionEventProcessor(BaseSharedObject bse, int timeout) { + sharedObject = bse; + sharedObject.addEventProcessor(this); + this.timeout = timeout; + } + public TransactionEventProcessor(BaseSharedObject bse) { + this(bse,DEFAULT_TIMEOUT); + } + protected void trace(String msg) { + if (Trace.ON && trace != null) { + trace.msg(sharedObject.getID()+":"+msg); + } + } + + protected void traceDump(String msg, Throwable t) { + if (Trace.ON && trace != null) { + trace.dumpStack(t, sharedObject.getID()+":"+msg); + } + } + + protected void addParticipants(ID [] ids) { + if (ids != null) { + for(int i=0; i < ids.length; i++) { + trace("adding participant:"+ids[i]); + if (!sharedObject.getHomeID().equals(ids[i])) participants.add(ids[i]); + } + } + } + protected void removeParticipant(ID id) { + if (id != null) { + trace("removing participant:"+id); + participants.remove(id); + } + } + protected void addFailed(ID remote, Throwable failure) + { + if (remote != null && failure != null) { + trace("adding failed:"+remote+":exception:"+failure); + failed.put(remote, failure); + } + } + + /* (non-Javadoc) + * @see org.eclipse.ecf.core.util.IEventProcessor#acceptEvent(org.eclipse.ecf.core.util.Event) + */ + public boolean acceptEvent(Event event) { + return true; + } + + /* (non-Javadoc) + * @see org.eclipse.ecf.core.util.IEventProcessor#processEvent(org.eclipse.ecf.core.util.Event) + */ + public Event processEvent(Event e) { + // TODO Auto-generated method stub + return null; + } + + protected byte getTransactionState() { + synchronized (lock) { + return transactionState; + } + } + protected void waitToCommit() throws SharedObjectAddAbortException { + synchronized (lock) { + long end = System.currentTimeMillis() + timeout; + try { + while (!votingCompleted()) { + long wait = end - System.currentTimeMillis(); + trace("waitForFinish waiting "+wait+"ms on "+sharedObject.getID()); + if (wait <= 0L) throw new SharedObjectAddAbortException("Timeout waiting for create responses",(Throwable) null,timeout); + // Wait right here + lock.wait(wait); + } + } catch (InterruptedException e) { + throw new SharedObjectAddAbortException("Wait interrupted",(Throwable) null,timeout); + } catch (SharedObjectAddAbortException e1) { + // Aborted for some reason. Clean up. + doAbort(e1); + } + // Success. Send commit to remotes and clean up before returning. + doCommit(); + } + } + protected void doAbort(Throwable t) { + // XXX TODO + } + protected void doCommit() { + // XXX TODO + } + protected boolean votingCompleted() throws SharedObjectAddAbortException + { + // The test here is is we've received any indication of failed participants in + // the transaction. If so, we throw. + trace("voting completed test"); + if (failed.size() > 0) { + trace("voting completed. Failures:"+failed); + // Abort! + throw new SharedObjectAddAbortException("Abort received",failed,timeout); + // If no problems, and the number of participants to here from is 0, then we're done + } else if (getTransactionState() == ISharedObjectContainerTransaction.VOTING && participants.size() == 0) { + // Success! + trace("votingCompleted:true"); + return true; + } + // Else continue waiting + trace("voting not completed"); + return false; + } + +} diff --git a/framework/bundles/org.eclipse.ecf/src/org/eclipse/ecf/core/SharedObjectAddAbortException.java b/framework/bundles/org.eclipse.ecf/src/org/eclipse/ecf/core/SharedObjectAddAbortException.java index aecb9b213..df12fb5f4 100644 --- a/framework/bundles/org.eclipse.ecf/src/org/eclipse/ecf/core/SharedObjectAddAbortException.java +++ b/framework/bundles/org.eclipse.ecf/src/org/eclipse/ecf/core/SharedObjectAddAbortException.java @@ -9,12 +9,15 @@ package org.eclipse.ecf.core; +import java.util.Map; + public class SharedObjectAddAbortException extends SharedObjectAddException { private static final long serialVersionUID = 4120851079287223088L; protected long timeout = -1L; - + protected Map causes; + public SharedObjectAddAbortException() { super(); } @@ -29,6 +32,10 @@ public class SharedObjectAddAbortException extends SharedObjectAddException { super(msg, cause); this.timeout = timeout; } + public SharedObjectAddAbortException(String msg, Map causes, int timeout) { + super(msg); + this.causes = causes; + } public SharedObjectAddAbortException(Throwable cause) { super(cause); } @@ -36,5 +43,7 @@ public class SharedObjectAddAbortException extends SharedObjectAddException { public long getTimeout() { return timeout; } - + public Map getCauses() { + return causes; + } }
\ No newline at end of file diff --git a/framework/bundles/org.eclipse.ecf/src/org/eclipse/ecf/core/util/IEventProcessor.java b/framework/bundles/org.eclipse.ecf/src/org/eclipse/ecf/core/util/IEventProcessor.java index 8fbb8c0a9..e4e118171 100644 --- a/framework/bundles/org.eclipse.ecf/src/org/eclipse/ecf/core/util/IEventProcessor.java +++ b/framework/bundles/org.eclipse.ecf/src/org/eclipse/ecf/core/util/IEventProcessor.java @@ -16,5 +16,5 @@ public interface IEventProcessor { * Process given Event * @param e the Event to process */ - public Event processEvent(Event e); + public Event processEvent(Event event); }
\ No newline at end of file |
