Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorslewis2005-07-07 06:01:35 +0000
committerslewis2005-07-07 06:01:35 +0000
commitebd38aa723eb3b70b11a66a6b0a50ad3f5222b16 (patch)
treec483962da26787524e78058635044d8378039d92
parent2d930aeeae912ca8c1c798ce0ae57269b47c1218 (diff)
downloadorg.eclipse.ecf-ebd38aa723eb3b70b11a66a6b0a50ad3f5222b16.tar.gz
org.eclipse.ecf-ebd38aa723eb3b70b11a66a6b0a50ad3f5222b16.tar.xz
org.eclipse.ecf-ebd38aa723eb3b70b11a66a6b0a50ad3f5222b16.zip
Introduced interface ISharedObjectInternal to allow event processor classes acces to ISharedObject implementation internals and messaging. Renamed some classes to support transactional replication of shared objects
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java196
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOManager.java6
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java75
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/ISharedObjectInternal.java18
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/SharedObjectReplication.java (renamed from framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TwoPhaseCommit.java)9
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TPCommitEventProcessor.java (renamed from framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TransactionEventProcessor.java)58
6 files changed, 210 insertions, 152 deletions
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java
index 886362f2d..3e32ab180 100644
--- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java
@@ -110,10 +110,10 @@ public abstract class SOContainer implements ISharedObjectContainer {
ISharedObject obj = load(description);
// Create wrapper object and move from loading to active
// list.
- SOWrapper wrap = makeRemoteSharedObjectWrapper(
- fromID, description, obj);
-
- wrap.init();
+ SOWrapper wrap = makeRemoteSharedObjectWrapper(fromID,
+ description, obj);
+
+ wrap.init();
// Check to make sure thread has not been
// interrupted...if it has, throw
if (Thread.currentThread().isInterrupted()
@@ -231,21 +231,33 @@ public abstract class SOContainer implements ISharedObjectContainer {
return getID();
}
}
+
static Trace debug = Trace.create("container");
+
public static final String DEFAULT_OBJECT_ARG_KEY = SOContainer.class
.getName()
+ ".sharedobjectargs";
+
public static final String DEFAULT_OBJECT_ARGTYPES_KEY = SOContainer.class
.getName()
+ ".sharedobjectargs";
+
protected ISharedObjectContainerConfig config = null;
+
protected SOContainerGMM groupManager = null;
+
protected boolean isClosing = false;
+
private Vector listeners = null;
+
protected ThreadGroup loadingThreadGroup = null;
+
protected MessageReceiver receiver;
+
private long sequenceNumber = 0L;
+
protected SOManager sharedObjectManager = null;
+
protected ISharedObjectPolicy policy = null;
protected ThreadGroup sharedObjectThreadGroup = null;
@@ -280,34 +292,35 @@ public abstract class SOContainer implements ISharedObjectContainer {
this.policy = policy;
}
}
+
protected boolean addNewRemoteMember(ID memberID, Object data) {
debug("addNewRemoteMember:" + memberID);
return groupManager.addMember(new Member(memberID, data));
}
- protected ISharedObject addSharedObject0(SharedObjectDescription sd,
- ISharedObject s) throws Exception {
- addSharedObjectWrapper(makeSharedObjectWrapper(sd, s));
- return s;
+ protected ISharedObjectContainerTransaction addSharedObject0(
+ SharedObjectDescription sd, ISharedObject s) throws Exception {
+ return addSharedObjectWrapper(makeSharedObjectWrapper(sd, s));
}
protected void addSharedObjectAndWait(SharedObjectDescription sd,
- ISharedObject s, ISharedObjectContainerTransaction t)
- throws Exception {
+ ISharedObject s) throws Exception {
if (sd.getID() == null || s == null) {
throw new SharedObjectAddException(
"Object id is null, or instance is null");
}
- addSharedObject0(sd, s);
+ ISharedObjectContainerTransaction t = addSharedObject0(sd, s);
// Wait right here until committed
if (t != null)
t.waitToCommit();
}
- protected void addSharedObjectWrapper(SOWrapper wrapper) throws Exception {
+ protected ISharedObjectContainerTransaction addSharedObjectWrapper(
+ SOWrapper wrapper) throws Exception {
if (wrapper == null)
- return;
+ return null;
ID id = wrapper.getObjID();
+ ISharedObjectContainerTransaction transaction = null;
synchronized (getGroupMembershipLock()) {
Object obj = groupManager.getFromAny(id);
if (obj != null) {
@@ -316,9 +329,13 @@ public abstract class SOContainer implements ISharedObjectContainer {
}
// Call initialize. If this throws it halts everything
wrapper.init();
+ // Call getAdapter(ISharedObjectContainerTransaction)
+ transaction = (ISharedObjectContainerTransaction) wrapper.sharedObject
+ .getAdapter(ISharedObjectContainerTransaction.class);
// Put in table
groupManager.addSharedObjectToActive(wrapper);
}
+ return transaction;
}
protected boolean addToLoading(LoadingSharedObject lso) {
@@ -347,17 +364,18 @@ public abstract class SOContainer implements ISharedObjectContainer {
* to be created
*
* @return Object null if the create message is to be ignored, non-null if
- * the creation should continue
+ * the creation should continue
*
* @throws Exception
* may throw any Exception to communicate back (via
* sendCreateResponse) to the sender that the creation has
* failed
*/
- protected Object checkRemoteCreate(ID fromID, ID toID, SharedObjectDescription desc) throws Exception {
+ protected Object checkRemoteCreate(ID fromID, ID toID,
+ SharedObjectDescription desc) throws Exception {
debug("checkRemoteCreate(" + fromID + "," + toID + "," + desc + ")");
if (policy != null) {
- policy.checkAddSharedObject(fromID,toID,getID(),desc);
+ policy.checkAddSharedObject(fromID, toID, getID(), desc);
}
return desc;
}
@@ -392,11 +410,9 @@ public abstract class SOContainer implements ISharedObjectContainer {
sharedObjectManager = null;
}
/*
- if (sharedObjectThreadGroup != null) {
- sharedObjectThreadGroup.interrupt();
- sharedObjectThreadGroup = null;
- }
- */
+ * if (sharedObjectThreadGroup != null) {
+ * sharedObjectThreadGroup.interrupt(); sharedObjectThreadGroup = null; }
+ */
if (loadingThreadGroup != null) {
loadingThreadGroup.interrupt();
loadingThreadGroup = null;
@@ -571,14 +587,14 @@ public abstract class SOContainer implements ISharedObjectContainer {
try {
obj = ois.readObject();
} catch (ClassNotFoundException e) {
- e.printStackTrace(System.err);
+ e.printStackTrace(System.err);
dumpStack("class not found for message", e);
return null;
}
if (obj instanceof ContainerMessage) {
return (ContainerMessage) obj;
} else {
- System.out.println("message is not a containermessage "+obj);
+ System.out.println("message is not a containermessage " + obj);
debug("message received is not containermessage:" + obj);
return null;
}
@@ -744,18 +760,24 @@ public abstract class SOContainer implements ISharedObjectContainer {
ID toID = mess.getToContainerID();
ContainerMessage.SharedObjectMessage resp = (ContainerMessage.SharedObjectMessage) mess
.getData();
-
+
synchronized (getGroupMembershipLock()) {
if (toID == null || toID.equals(getID())) {
SOWrapper sow = getSharedObjectWrapper(resp
- .getFromSharedObjectID());
+ .getFromSharedObjectID());
if (sow != null) {
- try {
- sow.deliverSharedObjectMessage(fromID, (Serializable) deserializeSharedObjectMessage((byte []) resp.getData()));
- } catch (ClassNotFoundException e) {
- dumpStack("Classnotfoundexception in handleSharedObjectMessage",e);
- e.printStackTrace(System.err);
- }
+ try {
+ sow
+ .deliverSharedObjectMessage(
+ fromID,
+ (Serializable) deserializeSharedObjectMessage((byte[]) resp
+ .getData()));
+ } catch (ClassNotFoundException e) {
+ dumpStack(
+ "Classnotfoundexception in handleSharedObjectMessage",
+ e);
+ e.printStackTrace(System.err);
+ }
}
}
forward(fromID, toID, mess);
@@ -906,8 +928,8 @@ public abstract class SOContainer implements ISharedObjectContainer {
debug("Ignoring ContainerMessage from null sender...ignoring");
return null;
}
- //OK..let it continue on it's journey
- return contmess;
+ // OK..let it continue on it's journey
+ return contmess;
} else {
debug("Ignoring invalid ContainerMessage:" + mess);
return null;
@@ -952,20 +974,23 @@ public abstract class SOContainer implements ISharedObjectContainer {
}
}
- protected abstract ID getIDForConnection(IAsynchConnection connection);
-
+ protected abstract ID getIDForConnection(IAsynchConnection connection);
+
protected void processDisconnect(DisconnectConnectionEvent e) {
debug("processDisconnect:" + e);
try {
- // Get connection responsible for disconnect event
- IAsynchConnection conn = (IAsynchConnection) e.getConnection();
- if (!conn.isConnected()) return;
- ID fromID = null;
- synchronized (getGroupMembershipLock()) {
- fromID = getIDForConnection(conn);
- memberLeave(fromID,conn);
- }
- if (fromID != null) fireContainerEvent(new SharedObjectContainerDepartedEvent(getID(),fromID));
+ // Get connection responsible for disconnect event
+ IAsynchConnection conn = (IAsynchConnection) e.getConnection();
+ if (!conn.isConnected())
+ return;
+ ID fromID = null;
+ synchronized (getGroupMembershipLock()) {
+ fromID = getIDForConnection(conn);
+ memberLeave(fromID, conn);
+ }
+ if (fromID != null)
+ fireContainerEvent(new SharedObjectContainerDepartedEvent(
+ getID(), fromID));
} catch (Exception except) {
logException("Exception in processDisconnect ", except);
}
@@ -974,7 +999,8 @@ public abstract class SOContainer implements ISharedObjectContainer {
protected Serializable processSynch(SynchConnectionEvent e)
throws IOException {
debug("processSynch:" + e);
- ContainerMessage mess = deserializeContainerMessage((byte[]) e.getData());
+ ContainerMessage mess = deserializeContainerMessage((byte[]) e
+ .getData());
Serializable data = mess.getData();
// Must be non null
if (data != null && data instanceof ContainerMessage.LeaveGroupMessage) {
@@ -1085,50 +1111,58 @@ public abstract class SOContainer implements ISharedObjectContainer {
}
}
- protected byte [] serializeSharedObjectMessage(ID sharedObjectID, Object message) throws IOException {
- if (!(message instanceof Serializable)) throw new NotSerializableException("sharedobjectmessage "+message+" not serializable");
- ByteArrayOutputStream bouts = new ByteArrayOutputStream();
- IdentifiableObjectOutputStream ioos = new IdentifiableObjectOutputStream(sharedObjectID.getName(),bouts);
- ioos.writeObject(message);
- return bouts.toByteArray();
- }
- protected Object deserializeSharedObjectMessage(byte [] bytes) throws IOException, ClassNotFoundException {
- ByteArrayInputStream bins = new ByteArrayInputStream(bytes);
- IdentifiableObjectInputStream iins = new IdentifiableObjectInputStream(new IClassLoaderMapper() {
- public ClassLoader mapNameToClassLoader(String name) {
- ISharedObjectManager manager = getSharedObjectManager();
- ID [] ids = manager.getSharedObjectIDs();
- ID found = null;
- for(int i=0; i < ids.length; i++) {
- ID id = ids[i];
- if (name.equals(id.getName())) {
- found = id;
- break;
- }
- }
- if (found == null) return null;
- ISharedObject obj = manager.getSharedObject(found);
- if (obj == null) return null;
- return obj.getClass().getClassLoader();
- }
-
- },bins);
- Object obj = iins.readObject();
- return obj;
- }
+ protected byte[] serializeSharedObjectMessage(ID sharedObjectID,
+ Object message) throws IOException {
+ if (!(message instanceof Serializable))
+ throw new NotSerializableException("sharedobjectmessage " + message
+ + " not serializable");
+ ByteArrayOutputStream bouts = new ByteArrayOutputStream();
+ IdentifiableObjectOutputStream ioos = new IdentifiableObjectOutputStream(
+ sharedObjectID.getName(), bouts);
+ ioos.writeObject(message);
+ return bouts.toByteArray();
+ }
+
+ protected Object deserializeSharedObjectMessage(byte[] bytes)
+ throws IOException, ClassNotFoundException {
+ ByteArrayInputStream bins = new ByteArrayInputStream(bytes);
+ IdentifiableObjectInputStream iins = new IdentifiableObjectInputStream(
+ new IClassLoaderMapper() {
+ public ClassLoader mapNameToClassLoader(String name) {
+ ISharedObjectManager manager = getSharedObjectManager();
+ ID[] ids = manager.getSharedObjectIDs();
+ ID found = null;
+ for (int i = 0; i < ids.length; i++) {
+ ID id = ids[i];
+ if (name.equals(id.getName())) {
+ found = id;
+ break;
+ }
+ }
+ if (found == null)
+ return null;
+ ISharedObject obj = manager.getSharedObject(found);
+ if (obj == null)
+ return null;
+ return obj.getClass().getClassLoader();
+ }
+
+ }, bins);
+ Object obj = iins.readObject();
+ return obj;
+ }
+
protected void sendMessage(ID toContainerID, ID sharedObjectID,
Object message) throws IOException {
if (message == null)
return;
- byte [] sendData = serializeSharedObjectMessage(sharedObjectID,message);
- sendSharedObjectMessage(toContainerID, sharedObjectID,
- sendData);
+ byte[] sendData = serializeSharedObjectMessage(sharedObjectID, message);
+ sendSharedObjectMessage(toContainerID, sharedObjectID, sendData);
}
protected void sendSharedObjectMessage(ID toContainerID,
ID fromSharedObject, Serializable data) throws IOException {
-
-
+
sendMessage(ContainerMessage.makeSharedObjectMessage(getID(),
toContainerID, getNextSequenceNumber(), fromSharedObject, data));
}
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOManager.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOManager.java
index 4ca5280e3..997cbe52a 100644
--- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOManager.java
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOManager.java
@@ -23,9 +23,9 @@ import java.util.Enumeration;
import java.util.List;
import java.util.Map;
import java.util.Vector;
+
import org.eclipse.ecf.core.ISharedObject;
import org.eclipse.ecf.core.ISharedObjectConnector;
-import org.eclipse.ecf.core.ISharedObjectContainerTransaction;
import org.eclipse.ecf.core.ISharedObjectManager;
import org.eclipse.ecf.core.SharedObjectAddException;
import org.eclipse.ecf.core.SharedObjectConnectException;
@@ -197,9 +197,7 @@ public class SOManager implements ISharedObjectManager {
sharedObject.getClass().getClassLoader(), sharedObjectID,
container.getID(), sharedObject.getClass().getName(),
properties, 0);
- ISharedObjectContainerTransaction transaction = (ISharedObjectContainerTransaction) so
- .getAdapter(ISharedObjectContainerTransaction.class);
- container.addSharedObjectAndWait(sd, so, transaction);
+ container.addSharedObjectAndWait(sd, so);
} catch (Exception e) {
dumpStack("Exception in addSharedObject", e);
SharedObjectAddException newExcept = new SharedObjectAddException(
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java
index e69e99c73..092de353a 100644
--- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java
@@ -34,17 +34,20 @@ import org.eclipse.ecf.provider.Trace;
* @author slewis
*
*/
-public class BaseSharedObject implements ISharedObject, IIdentifiable {
+public class BaseSharedObject implements ISharedObject, IIdentifiable, ISharedObjectInternal {
- protected static final String TRANSACTIONAL_SUFFIX = ".transactional";
private static long identifier = 0L;
+ public static final String TRANSACTION_PROPERTY_NAME = ISharedObjectContainerTransaction.class.getName();
+
Trace trace = Trace.create("basesharedobject");
ISharedObjectConfig config = null;
List eventProcessors = new Vector();
- Boolean transactional = null;
- protected static long getIdentifier() {
+ Integer transactionTimeout = new Integer(-1);
+ ISharedObjectContainerTransaction transaction = null;
+
+ protected static long getNextIdentifier() {
return identifier++;
}
private void trace(String msg) {
@@ -57,12 +60,15 @@ public class BaseSharedObject implements ISharedObject, IIdentifiable {
trace.dumpStack(t,getID()+":"+msg);
}
}
- protected void addEventProcessor(IEventProcessor proc) {
+ public void addEventProcessor(IEventProcessor proc) {
eventProcessors.add(proc);
}
- protected boolean removeEventProcessor(IEventProcessor proc) {
+ public boolean removeEventProcessor(IEventProcessor proc) {
return eventProcessors.remove(proc);
}
+ public void clearEventProcessors() {
+ eventProcessors.clear();
+ }
protected void fireEventProcessors(Event event) {
if (event == null) return;
Event evt = event;
@@ -90,27 +96,31 @@ public class BaseSharedObject implements ISharedObject, IIdentifiable {
throws SharedObjectInitException {
this.config = initData;
trace("init("+initData+")");
+ initTransaction();
+ }
+ protected void initTransaction() {
Map props = config.getProperties();
- Object o = props.get(this.getClass().getName()+TRANSACTIONAL_SUFFIX);
- if (o instanceof Boolean) {
- Boolean b = (Boolean) o;
- if (b != null && b.booleanValue()) {
+ // If transaction property is set, get Integer value and use for transaction timeout
+ Object o = props.get(TRANSACTION_PROPERTY_NAME);
+ if (o instanceof Integer) {
+ Integer trans = (Integer) o;
+ if (trans != null && trans.intValue() != -1) {
// transactional...
- new TwoPhaseCommit(this);
+ transactionTimeout = trans;
+ transaction = new SharedObjectReplication(this,transactionTimeout.intValue());
}
}
-
}
protected ISharedObjectConfig getConfig() {
return config;
}
- protected ISharedObjectContext getContext() {
+ public ISharedObjectContext getContext() {
ISharedObjectConfig c = getConfig();
if (c == null) {
return null;
} else return config.getContext();
}
- protected ID getHomeID() {
+ public ID getHomeID() {
ISharedObjectConfig conf = getConfig();
if (conf == null) return null;
else return conf.getHomeContainerID();
@@ -127,7 +137,7 @@ public class BaseSharedObject implements ISharedObject, IIdentifiable {
return null;
} else return context.getGroupID();
}
- protected boolean isPrimary() {
+ public boolean isPrimary() {
ID local = getLocalID();
ID home = getHomeID();
if (local == null || home == null) {
@@ -171,8 +181,9 @@ public class BaseSharedObject implements ISharedObject, IIdentifiable {
*/
public Object getAdapter(Class clazz) {
if (clazz.equals(ISharedObjectContainerTransaction.class)) {
- transactional = new Boolean(true);
- return new TwoPhaseCommit(this);
+ if (transactionTimeout == null || transactionTimeout.intValue() == -1) {
+ return null;
+ } else return transaction;
}
return null;
}
@@ -222,35 +233,9 @@ public class BaseSharedObject implements ISharedObject, IIdentifiable {
}
}
- protected void replicateToRemote(ID remote) {
- trace("replicateToRemote(" + remote + ")");
- try {
- // Get current group membership
- ISharedObjectContext context = getContext();
- if (context == null) return;
- ID[] group = context.getGroupMemberIDs();
- if (group == null || group.length < 1) {
- // we're done
- return;
- }
- SharedObjectDescription createInfo = getReplicaDescription(remote);
- if (createInfo != null) {
- context.sendCreate(remote, createInfo);
- } else {
- return;
- }
- } catch (IOException e) {
- traceStack("Exception in replicate("+remote+")", e);
- return;
- }
- }
- protected SharedObjectDescription getReplicaDescription(ID receiver) {
- Map props = getConfig().getProperties();
- props.put(this.getClass().getName()+TRANSACTIONAL_SUFFIX,transactional);
+ public SharedObjectDescription getReplicaDescription(ID receiver) {
return new SharedObjectDescription(getID(), getClass().getName(),
- props, getIdentifier());
+ getConfig().getProperties(), getNextIdentifier());
}
-
-
}
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/ISharedObjectInternal.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/ISharedObjectInternal.java
new file mode 100644
index 000000000..7a7ace017
--- /dev/null
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/ISharedObjectInternal.java
@@ -0,0 +1,18 @@
+package org.eclipse.ecf.provider.generic.sobject;
+
+import org.eclipse.ecf.core.IIdentifiable;
+import org.eclipse.ecf.core.ISharedObjectContext;
+import org.eclipse.ecf.core.SharedObjectDescription;
+import org.eclipse.ecf.core.identity.ID;
+import org.eclipse.ecf.core.util.IEventProcessor;
+
+public interface ISharedObjectInternal extends IIdentifiable {
+ public void addEventProcessor(IEventProcessor ep);
+ public boolean removeEventProcessor(IEventProcessor ep);
+
+ public ISharedObjectContext getContext();
+ public boolean isPrimary();
+ public ID getHomeID();
+ public SharedObjectDescription getReplicaDescription(ID remote);
+ public void destroySelf();
+}
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TwoPhaseCommit.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/SharedObjectReplication.java
index 3885ac5a3..c5e56753a 100644
--- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TwoPhaseCommit.java
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/SharedObjectReplication.java
@@ -17,13 +17,14 @@ import org.eclipse.ecf.core.SharedObjectAddAbortException;
* @author slewis
*
*/
-public class TwoPhaseCommit implements
+public class SharedObjectReplication implements
ISharedObjectContainerTransaction {
- TransactionEventProcessor eventProcessor = null;
+ TPCommitEventProcessor eventProcessor = null;
- public TwoPhaseCommit(BaseSharedObject so) {
- eventProcessor = new TransactionEventProcessor(so);
+ public SharedObjectReplication(ISharedObjectInternal so,int transactionTimeout) {
+ eventProcessor = new TPCommitEventProcessor(so,transactionTimeout);
+ so.addEventProcessor(eventProcessor);
}
/* (non-Javadoc)
* @see org.eclipse.ecf.core.ISharedObjectContainerTransaction#waitToCommit()
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TransactionEventProcessor.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TPCommitEventProcessor.java
index fbb24e440..b124901e4 100644
--- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TransactionEventProcessor.java
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TPCommitEventProcessor.java
@@ -11,6 +11,7 @@
package org.eclipse.ecf.provider.generic.sobject;
+import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -19,6 +20,7 @@ import java.util.Vector;
import org.eclipse.ecf.core.ISharedObjectContainerTransaction;
import org.eclipse.ecf.core.ISharedObjectContext;
import org.eclipse.ecf.core.SharedObjectAddAbortException;
+import org.eclipse.ecf.core.SharedObjectDescription;
import org.eclipse.ecf.core.events.ISharedObjectActivatedEvent;
import org.eclipse.ecf.core.events.ISharedObjectCommitEvent;
import org.eclipse.ecf.core.events.ISharedObjectContainerDepartedEvent;
@@ -35,12 +37,12 @@ import org.eclipse.ecf.provider.Trace;
* @author slewis
*
*/
-public class TransactionEventProcessor implements IEventProcessor {
+public class TPCommitEventProcessor implements IEventProcessor {
public static final Trace trace = Trace.create("transactioneventprocessor");
public static final int DEFAULT_TIMEOUT = 30000;
- BaseSharedObject sharedObject = null;
+ ISharedObjectInternal sharedObject = null;
byte transactionState = ISharedObjectContainerTransaction.ACTIVE;
Object lock = new Object();
@@ -48,14 +50,14 @@ public class TransactionEventProcessor implements IEventProcessor {
Map failed = new HashMap();
int timeout = DEFAULT_TIMEOUT;
int minFailedToAbort = 0;
-
- public TransactionEventProcessor(BaseSharedObject bse, int timeout) {
- sharedObject = bse;
- sharedObject.addEventProcessor(this);
+ long identifier = 0;
+
+ public TPCommitEventProcessor(ISharedObjectInternal bse, int timeout) {
+ this.sharedObject = bse;
this.timeout = timeout;
}
- public TransactionEventProcessor(BaseSharedObject bse) {
+ public TPCommitEventProcessor(ISharedObjectInternal bse) {
this(bse, DEFAULT_TIMEOUT);
}
@@ -87,7 +89,7 @@ public class TransactionEventProcessor implements IEventProcessor {
return getSharedObject().isPrimary();
}
- protected BaseSharedObject getSharedObject() {
+ protected ISharedObjectInternal getSharedObject() {
return sharedObject;
}
@@ -184,9 +186,30 @@ public class TransactionEventProcessor implements IEventProcessor {
}
}
+ protected void replicateTo(ID remote) {
+ try {
+ // Get current group membership
+ ISharedObjectContext context = getSharedObject().getContext();
+ if (context == null) return;
+ ID[] group = context.getGroupMemberIDs();
+ if (group == null || group.length < 1) {
+ // we're done
+ return;
+ }
+ SharedObjectDescription createInfo = getSharedObject().getReplicaDescription(remote);
+ if (createInfo != null) {
+ context.sendCreate(remote, createInfo);
+ } else {
+ return;
+ }
+ } catch (IOException e) {
+ traceStack("Exception in replicate("+remote+")", e);
+ return;
+ }
+ }
protected void handlePrimaryActivated(ISharedObjectActivatedEvent event) {
- getSharedObject().replicateToRemote(null);
+ replicateTo(null);
addParticipants(getContext().getGroupMemberIDs());
setTransactionState(ISharedObjectContainerTransaction.VOTING);
}
@@ -195,7 +218,7 @@ public class TransactionEventProcessor implements IEventProcessor {
try {
// Try to respond with create success message back to host
getContext().sendCreateResponse(getHomeID(), null,
- BaseSharedObject.getIdentifier());
+ BaseSharedObject.getNextIdentifier());
// If above succeeds, we're now in prepared state
setTransactionState(ISharedObjectContainerTransaction.PREPARED);
} catch (Exception except) {
@@ -213,12 +236,12 @@ public class TransactionEventProcessor implements IEventProcessor {
// replicate message
synchronized (lock) {
ID newMember = event.getJoinedContainerID();
- getSharedObject().replicateToRemote(newMember);
+ replicateTo(newMember);
if (getTransactionState() == ISharedObjectContainerTransaction.VOTING)
addParticipants(new ID[] { newMember });
}
} else {
- // we don't care
+ // we don't care as we are note transaction monitor
}
}
@@ -241,7 +264,7 @@ public class TransactionEventProcessor implements IEventProcessor {
lock.notifyAll();
}
} else {
- // We don't care
+ // we don't care as we are note transaction monitor
}
}
@@ -256,7 +279,7 @@ public class TransactionEventProcessor implements IEventProcessor {
lock.notifyAll();
}
} else {
- // we don't care
+ // we don't care as we are note transaction monitor
}
}
@@ -318,7 +341,7 @@ public class TransactionEventProcessor implements IEventProcessor {
trace("waitForFinish waiting " + wait + "ms on "
+ getSharedObject().getID());
if (wait <= 0L)
- throw new SharedObjectAddAbortException("Timeout",
+ throw new SharedObjectAddAbortException("Timeout adding "+getSharedObject().getID()+" to "+getHomeID(),
(Throwable) null, getTimeout());
// Wait right here
lock.wait(wait);
@@ -335,12 +358,11 @@ public class TransactionEventProcessor implements IEventProcessor {
protected void doTMAbort(Throwable except)
throws SharedObjectAddAbortException {
trace("ABORTED:" + except);
+ // Set our own state variable to ABORTED
+ setTransactionState(ISharedObjectContainerTransaction.ABORTED);
// Send destroy message here so all remotes get destroyed, and we remove
// ourselves from local space as well.
- getSharedObject().removeEventProcessor(this);
getSharedObject().destroySelf();
- // Set our own state variable to ABORTED
- setTransactionState(ISharedObjectContainerTransaction.ABORTED);
// throw so caller gets exception and can deal with it
if (except instanceof SharedObjectAddAbortException)
throw (SharedObjectAddAbortException) except;

Back to the top