Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorslewis2005-08-05 21:06:16 +0000
committerslewis2005-08-05 21:06:16 +0000
commit49ee832caacdae21c7e12a3c846cfd3f6da669ea (patch)
tree20fcdf0256d20f3141fbbb1350fca5bd1ffd954a
parentb341f3f23e4cb0a54e1d161bb99e9469a1c8e016 (diff)
downloadorg.eclipse.ecf-49ee832caacdae21c7e12a3c846cfd3f6da669ea.tar.gz
org.eclipse.ecf-49ee832caacdae21c7e12a3c846cfd3f6da669ea.tar.xz
org.eclipse.ecf-49ee832caacdae21c7e12a3c846cfd3f6da669ea.zip
Update to transactional replication mechanism to support transactional replication to a subset of group participants.
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java49
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/ISharedObjectInternal.java2
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/SharedObjectReplication.java6
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TPCommitEventProcessor.java51
4 files changed, 86 insertions, 22 deletions
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java
index 092de353a..863d546e7 100644
--- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java
@@ -20,6 +20,7 @@ import java.util.Vector;
import org.eclipse.ecf.core.IIdentifiable;
import org.eclipse.ecf.core.ISharedObject;
import org.eclipse.ecf.core.ISharedObjectConfig;
+import org.eclipse.ecf.core.ISharedObjectContainer;
import org.eclipse.ecf.core.ISharedObjectContainerTransaction;
import org.eclipse.ecf.core.ISharedObjectContext;
import org.eclipse.ecf.core.ISharedObjectManager;
@@ -37,16 +38,30 @@ import org.eclipse.ecf.provider.Trace;
public class BaseSharedObject implements ISharedObject, IIdentifiable, ISharedObjectInternal {
private static long identifier = 0L;
+
public static final String TRANSACTION_PROPERTY_NAME = ISharedObjectContainerTransaction.class.getName();
+ public static final int TRANSACTION_TIMEOUT_DEFAULT = -1;
Trace trace = Trace.create("basesharedobject");
ISharedObjectConfig config = null;
List eventProcessors = new Vector();
- Integer transactionTimeout = new Integer(-1);
+ int transactionTimeout = TRANSACTION_TIMEOUT_DEFAULT;
ISharedObjectContainerTransaction transaction = null;
+ ID [] excludedContainerIDs;
+ public BaseSharedObject(int transactionTimeout, ID [] excludedContainers) {
+ super();
+ this.transactionTimeout = transactionTimeout;
+ this.excludedContainerIDs = excludedContainers;
+ }
+ public BaseSharedObject(int transactionTimeout) {
+ this(transactionTimeout,null);
+ }
+ public BaseSharedObject() {
+ this(TRANSACTION_TIMEOUT_DEFAULT,null);
+ }
protected static long getNextIdentifier() {
return identifier++;
}
@@ -104,12 +119,14 @@ public class BaseSharedObject implements ISharedObject, IIdentifiable, ISharedOb
Object o = props.get(TRANSACTION_PROPERTY_NAME);
if (o instanceof Integer) {
Integer trans = (Integer) o;
- if (trans != null && trans.intValue() != -1) {
+ if (trans != null) {
// transactional...
- transactionTimeout = trans;
- transaction = new SharedObjectReplication(this,transactionTimeout.intValue());
+ transactionTimeout = trans.intValue();
}
}
+ if (transactionTimeout != TRANSACTION_TIMEOUT_DEFAULT) {
+ transaction = new SharedObjectReplication(this,transactionTimeout,excludedContainerIDs);
+ }
}
protected ISharedObjectConfig getConfig() {
return config;
@@ -180,10 +197,13 @@ public class BaseSharedObject implements ISharedObject, IIdentifiable, ISharedOb
* @see org.eclipse.ecf.core.ISharedObject#getAdapter(java.lang.Class)
*/
public Object getAdapter(Class clazz) {
+ if (clazz.equals(ISharedObjectContainer.class)) {
+ return this;
+ }
if (clazz.equals(ISharedObjectContainerTransaction.class)) {
- if (transactionTimeout == null || transactionTimeout.intValue() == -1) {
- return null;
- } else return transaction;
+ if (transactionTimeout != TRANSACTION_TIMEOUT_DEFAULT) {
+ return transaction;
+ }
}
return null;
}
@@ -234,8 +254,21 @@ public class BaseSharedObject implements ISharedObject, IIdentifiable, ISharedOb
}
public SharedObjectDescription getReplicaDescription(ID receiver) {
- return new SharedObjectDescription(getID(), getClass().getName(),
+ return new SharedObjectDescription(getID(), getClass().getName(),
getConfig().getProperties(), getNextIdentifier());
}
+ public SharedObjectDescription[] getReplicaDescriptions(ID [] receivers) {
+ SharedObjectDescription [] descriptions = null;
+ if (receivers == null || receivers.length == 1) {
+ descriptions = new SharedObjectDescription[1];
+ descriptions[0] = getReplicaDescription((receivers==null)?null:receivers[0]);
+ } else {
+ descriptions = new SharedObjectDescription[receivers.length];
+ for(int i=0; i < receivers.length; i++) {
+ descriptions[i] = getReplicaDescription(receivers[i]);
+ }
+ }
+ return descriptions;
+ }
}
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/ISharedObjectInternal.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/ISharedObjectInternal.java
index 7a7ace017..fa933c8ba 100644
--- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/ISharedObjectInternal.java
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/ISharedObjectInternal.java
@@ -13,6 +13,6 @@ public interface ISharedObjectInternal extends IIdentifiable {
public ISharedObjectContext getContext();
public boolean isPrimary();
public ID getHomeID();
- public SharedObjectDescription getReplicaDescription(ID remote);
+ public SharedObjectDescription[] getReplicaDescriptions(ID [] remotes);
public void destroySelf();
}
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/SharedObjectReplication.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/SharedObjectReplication.java
index c5e56753a..aa76a005f 100644
--- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/SharedObjectReplication.java
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/SharedObjectReplication.java
@@ -12,6 +12,7 @@ package org.eclipse.ecf.provider.generic.sobject;
import org.eclipse.ecf.core.ISharedObjectContainerTransaction;
import org.eclipse.ecf.core.SharedObjectAddAbortException;
+import org.eclipse.ecf.core.identity.ID;
/**
* @author slewis
@@ -21,7 +22,10 @@ public class SharedObjectReplication implements
ISharedObjectContainerTransaction {
TPCommitEventProcessor eventProcessor = null;
-
+ public SharedObjectReplication(ISharedObjectInternal so, int transactionTimeout, ID [] exceptions) {
+ eventProcessor = new TPCommitEventProcessor(so,transactionTimeout,exceptions);
+ so.addEventProcessor(eventProcessor);
+ }
public SharedObjectReplication(ISharedObjectInternal so,int transactionTimeout) {
eventProcessor = new TPCommitEventProcessor(so,transactionTimeout);
so.addEventProcessor(eventProcessor);
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TPCommitEventProcessor.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TPCommitEventProcessor.java
index b124901e4..b69cc660e 100644
--- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TPCommitEventProcessor.java
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TPCommitEventProcessor.java
@@ -12,6 +12,7 @@
package org.eclipse.ecf.provider.generic.sobject;
import java.io.IOException;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -51,10 +52,15 @@ public class TPCommitEventProcessor implements IEventProcessor {
int timeout = DEFAULT_TIMEOUT;
int minFailedToAbort = 0;
long identifier = 0;
+ ID [] exceptions;
- public TPCommitEventProcessor(ISharedObjectInternal bse, int timeout) {
- this.sharedObject = bse;
+ public TPCommitEventProcessor(ISharedObjectInternal bse, int timeout, ID [] except) {
+ this.sharedObject= bse;
this.timeout = timeout;
+ this.exceptions = except;
+ }
+ public TPCommitEventProcessor(ISharedObjectInternal bse, int timeout) {
+ this(bse,timeout,null);
}
public TPCommitEventProcessor(ISharedObjectInternal bse) {
@@ -97,6 +103,14 @@ public class TPCommitEventProcessor implements IEventProcessor {
return getSharedObject().getHomeID();
}
+ protected ID [] filter(ID [] ids) {
+ if (exceptions == null) return ids;
+ List aList = Arrays.asList(ids);
+ for(int i=0; i < exceptions.length; i++) {
+ aList.remove(exceptions[i]);
+ }
+ return (ID []) aList.toArray(new ID[] {});
+ }
protected void addParticipants(ID[] ids) {
if (ids != null) {
for (int i = 0; i < ids.length; i++) {
@@ -186,7 +200,7 @@ public class TPCommitEventProcessor implements IEventProcessor {
}
}
- protected void replicateTo(ID remote) {
+ protected void replicateTo(ID [] remotes) {
try {
// Get current group membership
ISharedObjectContext context = getSharedObject().getContext();
@@ -196,21 +210,34 @@ public class TPCommitEventProcessor implements IEventProcessor {
// we're done
return;
}
- SharedObjectDescription createInfo = getSharedObject().getReplicaDescription(remote);
- if (createInfo != null) {
- context.sendCreate(remote, createInfo);
- } else {
- return;
+ SharedObjectDescription[] createInfos = getSharedObject().getReplicaDescriptions(remotes);
+ if (createInfos != null) {
+ if (createInfos.length == 1) {
+ context.sendCreate((remotes==null)?null:remotes[0],createInfos[0]);
+ } else {
+ for(int i=0; i < remotes.length; i++) {
+ context.sendCreate(remotes[i],createInfos[i]);
+ }
+ }
}
} catch (IOException e) {
- traceStack("Exception in replicate("+remote+")", e);
+ traceStack("Exception in replicateTo("+Arrays.asList(remotes)+")", e);
return;
}
}
protected void handlePrimaryActivated(ISharedObjectActivatedEvent event) {
- replicateTo(null);
- addParticipants(getContext().getGroupMemberIDs());
+ // if we don't have any exceptions replicate to all remotes
+ ID [] groupMembers = getContext().getGroupMemberIDs();
+ ID [] replicaContainers = groupMembers;
+ if (exceptions == null) {
+ replicateTo(null);
+ } else {
+ // We do have some exceptions, so filter these out
+ replicaContainers = filter(groupMembers);
+ replicateTo(replicaContainers);
+ }
+ addParticipants(replicaContainers);
setTransactionState(ISharedObjectContainerTransaction.VOTING);
}
@@ -236,7 +263,7 @@ public class TPCommitEventProcessor implements IEventProcessor {
// replicate message
synchronized (lock) {
ID newMember = event.getJoinedContainerID();
- replicateTo(newMember);
+ replicateTo(new ID[] { newMember });
if (getTransactionState() == ISharedObjectContainerTransaction.VOTING)
addParticipants(new ID[] { newMember });
}

Back to the top