diff options
| author | slewis | 2005-08-05 21:06:16 +0000 |
|---|---|---|
| committer | slewis | 2005-08-05 21:06:16 +0000 |
| commit | 49ee832caacdae21c7e12a3c846cfd3f6da669ea (patch) | |
| tree | 20fcdf0256d20f3141fbbb1350fca5bd1ffd954a | |
| parent | b341f3f23e4cb0a54e1d161bb99e9469a1c8e016 (diff) | |
| download | org.eclipse.ecf-49ee832caacdae21c7e12a3c846cfd3f6da669ea.tar.gz org.eclipse.ecf-49ee832caacdae21c7e12a3c846cfd3f6da669ea.tar.xz org.eclipse.ecf-49ee832caacdae21c7e12a3c846cfd3f6da669ea.zip | |
Update to transactional replication mechanism to support transactional replication to a subset of group participants.
4 files changed, 86 insertions, 22 deletions
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java index 092de353a..863d546e7 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java @@ -20,6 +20,7 @@ import java.util.Vector; import org.eclipse.ecf.core.IIdentifiable; import org.eclipse.ecf.core.ISharedObject; import org.eclipse.ecf.core.ISharedObjectConfig; +import org.eclipse.ecf.core.ISharedObjectContainer; import org.eclipse.ecf.core.ISharedObjectContainerTransaction; import org.eclipse.ecf.core.ISharedObjectContext; import org.eclipse.ecf.core.ISharedObjectManager; @@ -37,16 +38,30 @@ import org.eclipse.ecf.provider.Trace; public class BaseSharedObject implements ISharedObject, IIdentifiable, ISharedObjectInternal { private static long identifier = 0L; + public static final String TRANSACTION_PROPERTY_NAME = ISharedObjectContainerTransaction.class.getName(); + public static final int TRANSACTION_TIMEOUT_DEFAULT = -1; Trace trace = Trace.create("basesharedobject"); ISharedObjectConfig config = null; List eventProcessors = new Vector(); - Integer transactionTimeout = new Integer(-1); + int transactionTimeout = TRANSACTION_TIMEOUT_DEFAULT; ISharedObjectContainerTransaction transaction = null; + ID [] excludedContainerIDs; + public BaseSharedObject(int transactionTimeout, ID [] excludedContainers) { + super(); + this.transactionTimeout = transactionTimeout; + this.excludedContainerIDs = excludedContainers; + } + public BaseSharedObject(int transactionTimeout) { + this(transactionTimeout,null); + } + public BaseSharedObject() { + this(TRANSACTION_TIMEOUT_DEFAULT,null); + } protected static long getNextIdentifier() { return identifier++; } @@ -104,12 +119,14 @@ public class BaseSharedObject implements ISharedObject, IIdentifiable, ISharedOb Object o = props.get(TRANSACTION_PROPERTY_NAME); if (o instanceof Integer) { Integer trans = (Integer) o; - if (trans != null && trans.intValue() != -1) { + if (trans != null) { // transactional... - transactionTimeout = trans; - transaction = new SharedObjectReplication(this,transactionTimeout.intValue()); + transactionTimeout = trans.intValue(); } } + if (transactionTimeout != TRANSACTION_TIMEOUT_DEFAULT) { + transaction = new SharedObjectReplication(this,transactionTimeout,excludedContainerIDs); + } } protected ISharedObjectConfig getConfig() { return config; @@ -180,10 +197,13 @@ public class BaseSharedObject implements ISharedObject, IIdentifiable, ISharedOb * @see org.eclipse.ecf.core.ISharedObject#getAdapter(java.lang.Class) */ public Object getAdapter(Class clazz) { + if (clazz.equals(ISharedObjectContainer.class)) { + return this; + } if (clazz.equals(ISharedObjectContainerTransaction.class)) { - if (transactionTimeout == null || transactionTimeout.intValue() == -1) { - return null; - } else return transaction; + if (transactionTimeout != TRANSACTION_TIMEOUT_DEFAULT) { + return transaction; + } } return null; } @@ -234,8 +254,21 @@ public class BaseSharedObject implements ISharedObject, IIdentifiable, ISharedOb } public SharedObjectDescription getReplicaDescription(ID receiver) { - return new SharedObjectDescription(getID(), getClass().getName(), + return new SharedObjectDescription(getID(), getClass().getName(), getConfig().getProperties(), getNextIdentifier()); } + public SharedObjectDescription[] getReplicaDescriptions(ID [] receivers) { + SharedObjectDescription [] descriptions = null; + if (receivers == null || receivers.length == 1) { + descriptions = new SharedObjectDescription[1]; + descriptions[0] = getReplicaDescription((receivers==null)?null:receivers[0]); + } else { + descriptions = new SharedObjectDescription[receivers.length]; + for(int i=0; i < receivers.length; i++) { + descriptions[i] = getReplicaDescription(receivers[i]); + } + } + return descriptions; + } } diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/ISharedObjectInternal.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/ISharedObjectInternal.java index 7a7ace017..fa933c8ba 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/ISharedObjectInternal.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/ISharedObjectInternal.java @@ -13,6 +13,6 @@ public interface ISharedObjectInternal extends IIdentifiable { public ISharedObjectContext getContext(); public boolean isPrimary(); public ID getHomeID(); - public SharedObjectDescription getReplicaDescription(ID remote); + public SharedObjectDescription[] getReplicaDescriptions(ID [] remotes); public void destroySelf(); } diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/SharedObjectReplication.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/SharedObjectReplication.java index c5e56753a..aa76a005f 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/SharedObjectReplication.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/SharedObjectReplication.java @@ -12,6 +12,7 @@ package org.eclipse.ecf.provider.generic.sobject; import org.eclipse.ecf.core.ISharedObjectContainerTransaction; import org.eclipse.ecf.core.SharedObjectAddAbortException; +import org.eclipse.ecf.core.identity.ID; /** * @author slewis @@ -21,7 +22,10 @@ public class SharedObjectReplication implements ISharedObjectContainerTransaction { TPCommitEventProcessor eventProcessor = null; - + public SharedObjectReplication(ISharedObjectInternal so, int transactionTimeout, ID [] exceptions) { + eventProcessor = new TPCommitEventProcessor(so,transactionTimeout,exceptions); + so.addEventProcessor(eventProcessor); + } public SharedObjectReplication(ISharedObjectInternal so,int transactionTimeout) { eventProcessor = new TPCommitEventProcessor(so,transactionTimeout); so.addEventProcessor(eventProcessor); diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TPCommitEventProcessor.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TPCommitEventProcessor.java index b124901e4..b69cc660e 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TPCommitEventProcessor.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TPCommitEventProcessor.java @@ -12,6 +12,7 @@ package org.eclipse.ecf.provider.generic.sobject; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -51,10 +52,15 @@ public class TPCommitEventProcessor implements IEventProcessor { int timeout = DEFAULT_TIMEOUT; int minFailedToAbort = 0; long identifier = 0; + ID [] exceptions; - public TPCommitEventProcessor(ISharedObjectInternal bse, int timeout) { - this.sharedObject = bse; + public TPCommitEventProcessor(ISharedObjectInternal bse, int timeout, ID [] except) { + this.sharedObject= bse; this.timeout = timeout; + this.exceptions = except; + } + public TPCommitEventProcessor(ISharedObjectInternal bse, int timeout) { + this(bse,timeout,null); } public TPCommitEventProcessor(ISharedObjectInternal bse) { @@ -97,6 +103,14 @@ public class TPCommitEventProcessor implements IEventProcessor { return getSharedObject().getHomeID(); } + protected ID [] filter(ID [] ids) { + if (exceptions == null) return ids; + List aList = Arrays.asList(ids); + for(int i=0; i < exceptions.length; i++) { + aList.remove(exceptions[i]); + } + return (ID []) aList.toArray(new ID[] {}); + } protected void addParticipants(ID[] ids) { if (ids != null) { for (int i = 0; i < ids.length; i++) { @@ -186,7 +200,7 @@ public class TPCommitEventProcessor implements IEventProcessor { } } - protected void replicateTo(ID remote) { + protected void replicateTo(ID [] remotes) { try { // Get current group membership ISharedObjectContext context = getSharedObject().getContext(); @@ -196,21 +210,34 @@ public class TPCommitEventProcessor implements IEventProcessor { // we're done return; } - SharedObjectDescription createInfo = getSharedObject().getReplicaDescription(remote); - if (createInfo != null) { - context.sendCreate(remote, createInfo); - } else { - return; + SharedObjectDescription[] createInfos = getSharedObject().getReplicaDescriptions(remotes); + if (createInfos != null) { + if (createInfos.length == 1) { + context.sendCreate((remotes==null)?null:remotes[0],createInfos[0]); + } else { + for(int i=0; i < remotes.length; i++) { + context.sendCreate(remotes[i],createInfos[i]); + } + } } } catch (IOException e) { - traceStack("Exception in replicate("+remote+")", e); + traceStack("Exception in replicateTo("+Arrays.asList(remotes)+")", e); return; } } protected void handlePrimaryActivated(ISharedObjectActivatedEvent event) { - replicateTo(null); - addParticipants(getContext().getGroupMemberIDs()); + // if we don't have any exceptions replicate to all remotes + ID [] groupMembers = getContext().getGroupMemberIDs(); + ID [] replicaContainers = groupMembers; + if (exceptions == null) { + replicateTo(null); + } else { + // We do have some exceptions, so filter these out + replicaContainers = filter(groupMembers); + replicateTo(replicaContainers); + } + addParticipants(replicaContainers); setTransactionState(ISharedObjectContainerTransaction.VOTING); } @@ -236,7 +263,7 @@ public class TPCommitEventProcessor implements IEventProcessor { // replicate message synchronized (lock) { ID newMember = event.getJoinedContainerID(); - replicateTo(newMember); + replicateTo(new ID[] { newMember }); if (getTransactionState() == ISharedObjectContainerTransaction.VOTING) addParticipants(new ID[] { newMember }); } |
