diff options
| author | slewis | 2005-07-07 06:01:35 +0000 |
|---|---|---|
| committer | slewis | 2005-07-07 06:01:35 +0000 |
| commit | ebd38aa723eb3b70b11a66a6b0a50ad3f5222b16 (patch) | |
| tree | c483962da26787524e78058635044d8378039d92 | |
| parent | 2d930aeeae912ca8c1c798ce0ae57269b47c1218 (diff) | |
| download | org.eclipse.ecf-ebd38aa723eb3b70b11a66a6b0a50ad3f5222b16.tar.gz org.eclipse.ecf-ebd38aa723eb3b70b11a66a6b0a50ad3f5222b16.tar.xz org.eclipse.ecf-ebd38aa723eb3b70b11a66a6b0a50ad3f5222b16.zip | |
Introduced interface ISharedObjectInternal to allow event processor classes acces to ISharedObject implementation internals and messaging. Renamed some classes to support transactional replication of shared objects
| -rw-r--r-- | framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java | 196 | ||||
| -rw-r--r-- | framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOManager.java | 6 | ||||
| -rw-r--r-- | framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java | 75 | ||||
| -rw-r--r-- | framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/ISharedObjectInternal.java | 18 | ||||
| -rw-r--r-- | framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/SharedObjectReplication.java (renamed from framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TwoPhaseCommit.java) | 9 | ||||
| -rw-r--r-- | framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TPCommitEventProcessor.java (renamed from framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TransactionEventProcessor.java) | 58 |
6 files changed, 210 insertions, 152 deletions
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java index 886362f2d..3e32ab180 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOContainer.java @@ -110,10 +110,10 @@ public abstract class SOContainer implements ISharedObjectContainer { ISharedObject obj = load(description); // Create wrapper object and move from loading to active // list. - SOWrapper wrap = makeRemoteSharedObjectWrapper( - fromID, description, obj); - - wrap.init(); + SOWrapper wrap = makeRemoteSharedObjectWrapper(fromID, + description, obj); + + wrap.init(); // Check to make sure thread has not been // interrupted...if it has, throw if (Thread.currentThread().isInterrupted() @@ -231,21 +231,33 @@ public abstract class SOContainer implements ISharedObjectContainer { return getID(); } } + static Trace debug = Trace.create("container"); + public static final String DEFAULT_OBJECT_ARG_KEY = SOContainer.class .getName() + ".sharedobjectargs"; + public static final String DEFAULT_OBJECT_ARGTYPES_KEY = SOContainer.class .getName() + ".sharedobjectargs"; + protected ISharedObjectContainerConfig config = null; + protected SOContainerGMM groupManager = null; + protected boolean isClosing = false; + private Vector listeners = null; + protected ThreadGroup loadingThreadGroup = null; + protected MessageReceiver receiver; + private long sequenceNumber = 0L; + protected SOManager sharedObjectManager = null; + protected ISharedObjectPolicy policy = null; protected ThreadGroup sharedObjectThreadGroup = null; @@ -280,34 +292,35 @@ public abstract class SOContainer implements ISharedObjectContainer { this.policy = policy; } } + protected boolean addNewRemoteMember(ID memberID, Object data) { debug("addNewRemoteMember:" + memberID); return groupManager.addMember(new Member(memberID, data)); } - protected ISharedObject addSharedObject0(SharedObjectDescription sd, - ISharedObject s) throws Exception { - addSharedObjectWrapper(makeSharedObjectWrapper(sd, s)); - return s; + protected ISharedObjectContainerTransaction addSharedObject0( + SharedObjectDescription sd, ISharedObject s) throws Exception { + return addSharedObjectWrapper(makeSharedObjectWrapper(sd, s)); } protected void addSharedObjectAndWait(SharedObjectDescription sd, - ISharedObject s, ISharedObjectContainerTransaction t) - throws Exception { + ISharedObject s) throws Exception { if (sd.getID() == null || s == null) { throw new SharedObjectAddException( "Object id is null, or instance is null"); } - addSharedObject0(sd, s); + ISharedObjectContainerTransaction t = addSharedObject0(sd, s); // Wait right here until committed if (t != null) t.waitToCommit(); } - protected void addSharedObjectWrapper(SOWrapper wrapper) throws Exception { + protected ISharedObjectContainerTransaction addSharedObjectWrapper( + SOWrapper wrapper) throws Exception { if (wrapper == null) - return; + return null; ID id = wrapper.getObjID(); + ISharedObjectContainerTransaction transaction = null; synchronized (getGroupMembershipLock()) { Object obj = groupManager.getFromAny(id); if (obj != null) { @@ -316,9 +329,13 @@ public abstract class SOContainer implements ISharedObjectContainer { } // Call initialize. If this throws it halts everything wrapper.init(); + // Call getAdapter(ISharedObjectContainerTransaction) + transaction = (ISharedObjectContainerTransaction) wrapper.sharedObject + .getAdapter(ISharedObjectContainerTransaction.class); // Put in table groupManager.addSharedObjectToActive(wrapper); } + return transaction; } protected boolean addToLoading(LoadingSharedObject lso) { @@ -347,17 +364,18 @@ public abstract class SOContainer implements ISharedObjectContainer { * to be created * * @return Object null if the create message is to be ignored, non-null if - * the creation should continue + * the creation should continue * * @throws Exception * may throw any Exception to communicate back (via * sendCreateResponse) to the sender that the creation has * failed */ - protected Object checkRemoteCreate(ID fromID, ID toID, SharedObjectDescription desc) throws Exception { + protected Object checkRemoteCreate(ID fromID, ID toID, + SharedObjectDescription desc) throws Exception { debug("checkRemoteCreate(" + fromID + "," + toID + "," + desc + ")"); if (policy != null) { - policy.checkAddSharedObject(fromID,toID,getID(),desc); + policy.checkAddSharedObject(fromID, toID, getID(), desc); } return desc; } @@ -392,11 +410,9 @@ public abstract class SOContainer implements ISharedObjectContainer { sharedObjectManager = null; } /* - if (sharedObjectThreadGroup != null) { - sharedObjectThreadGroup.interrupt(); - sharedObjectThreadGroup = null; - } - */ + * if (sharedObjectThreadGroup != null) { + * sharedObjectThreadGroup.interrupt(); sharedObjectThreadGroup = null; } + */ if (loadingThreadGroup != null) { loadingThreadGroup.interrupt(); loadingThreadGroup = null; @@ -571,14 +587,14 @@ public abstract class SOContainer implements ISharedObjectContainer { try { obj = ois.readObject(); } catch (ClassNotFoundException e) { - e.printStackTrace(System.err); + e.printStackTrace(System.err); dumpStack("class not found for message", e); return null; } if (obj instanceof ContainerMessage) { return (ContainerMessage) obj; } else { - System.out.println("message is not a containermessage "+obj); + System.out.println("message is not a containermessage " + obj); debug("message received is not containermessage:" + obj); return null; } @@ -744,18 +760,24 @@ public abstract class SOContainer implements ISharedObjectContainer { ID toID = mess.getToContainerID(); ContainerMessage.SharedObjectMessage resp = (ContainerMessage.SharedObjectMessage) mess .getData(); - + synchronized (getGroupMembershipLock()) { if (toID == null || toID.equals(getID())) { SOWrapper sow = getSharedObjectWrapper(resp - .getFromSharedObjectID()); + .getFromSharedObjectID()); if (sow != null) { - try { - sow.deliverSharedObjectMessage(fromID, (Serializable) deserializeSharedObjectMessage((byte []) resp.getData())); - } catch (ClassNotFoundException e) { - dumpStack("Classnotfoundexception in handleSharedObjectMessage",e); - e.printStackTrace(System.err); - } + try { + sow + .deliverSharedObjectMessage( + fromID, + (Serializable) deserializeSharedObjectMessage((byte[]) resp + .getData())); + } catch (ClassNotFoundException e) { + dumpStack( + "Classnotfoundexception in handleSharedObjectMessage", + e); + e.printStackTrace(System.err); + } } } forward(fromID, toID, mess); @@ -906,8 +928,8 @@ public abstract class SOContainer implements ISharedObjectContainer { debug("Ignoring ContainerMessage from null sender...ignoring"); return null; } - //OK..let it continue on it's journey - return contmess; + // OK..let it continue on it's journey + return contmess; } else { debug("Ignoring invalid ContainerMessage:" + mess); return null; @@ -952,20 +974,23 @@ public abstract class SOContainer implements ISharedObjectContainer { } } - protected abstract ID getIDForConnection(IAsynchConnection connection); - + protected abstract ID getIDForConnection(IAsynchConnection connection); + protected void processDisconnect(DisconnectConnectionEvent e) { debug("processDisconnect:" + e); try { - // Get connection responsible for disconnect event - IAsynchConnection conn = (IAsynchConnection) e.getConnection(); - if (!conn.isConnected()) return; - ID fromID = null; - synchronized (getGroupMembershipLock()) { - fromID = getIDForConnection(conn); - memberLeave(fromID,conn); - } - if (fromID != null) fireContainerEvent(new SharedObjectContainerDepartedEvent(getID(),fromID)); + // Get connection responsible for disconnect event + IAsynchConnection conn = (IAsynchConnection) e.getConnection(); + if (!conn.isConnected()) + return; + ID fromID = null; + synchronized (getGroupMembershipLock()) { + fromID = getIDForConnection(conn); + memberLeave(fromID, conn); + } + if (fromID != null) + fireContainerEvent(new SharedObjectContainerDepartedEvent( + getID(), fromID)); } catch (Exception except) { logException("Exception in processDisconnect ", except); } @@ -974,7 +999,8 @@ public abstract class SOContainer implements ISharedObjectContainer { protected Serializable processSynch(SynchConnectionEvent e) throws IOException { debug("processSynch:" + e); - ContainerMessage mess = deserializeContainerMessage((byte[]) e.getData()); + ContainerMessage mess = deserializeContainerMessage((byte[]) e + .getData()); Serializable data = mess.getData(); // Must be non null if (data != null && data instanceof ContainerMessage.LeaveGroupMessage) { @@ -1085,50 +1111,58 @@ public abstract class SOContainer implements ISharedObjectContainer { } } - protected byte [] serializeSharedObjectMessage(ID sharedObjectID, Object message) throws IOException { - if (!(message instanceof Serializable)) throw new NotSerializableException("sharedobjectmessage "+message+" not serializable"); - ByteArrayOutputStream bouts = new ByteArrayOutputStream(); - IdentifiableObjectOutputStream ioos = new IdentifiableObjectOutputStream(sharedObjectID.getName(),bouts); - ioos.writeObject(message); - return bouts.toByteArray(); - } - protected Object deserializeSharedObjectMessage(byte [] bytes) throws IOException, ClassNotFoundException { - ByteArrayInputStream bins = new ByteArrayInputStream(bytes); - IdentifiableObjectInputStream iins = new IdentifiableObjectInputStream(new IClassLoaderMapper() { - public ClassLoader mapNameToClassLoader(String name) { - ISharedObjectManager manager = getSharedObjectManager(); - ID [] ids = manager.getSharedObjectIDs(); - ID found = null; - for(int i=0; i < ids.length; i++) { - ID id = ids[i]; - if (name.equals(id.getName())) { - found = id; - break; - } - } - if (found == null) return null; - ISharedObject obj = manager.getSharedObject(found); - if (obj == null) return null; - return obj.getClass().getClassLoader(); - } - - },bins); - Object obj = iins.readObject(); - return obj; - } + protected byte[] serializeSharedObjectMessage(ID sharedObjectID, + Object message) throws IOException { + if (!(message instanceof Serializable)) + throw new NotSerializableException("sharedobjectmessage " + message + + " not serializable"); + ByteArrayOutputStream bouts = new ByteArrayOutputStream(); + IdentifiableObjectOutputStream ioos = new IdentifiableObjectOutputStream( + sharedObjectID.getName(), bouts); + ioos.writeObject(message); + return bouts.toByteArray(); + } + + protected Object deserializeSharedObjectMessage(byte[] bytes) + throws IOException, ClassNotFoundException { + ByteArrayInputStream bins = new ByteArrayInputStream(bytes); + IdentifiableObjectInputStream iins = new IdentifiableObjectInputStream( + new IClassLoaderMapper() { + public ClassLoader mapNameToClassLoader(String name) { + ISharedObjectManager manager = getSharedObjectManager(); + ID[] ids = manager.getSharedObjectIDs(); + ID found = null; + for (int i = 0; i < ids.length; i++) { + ID id = ids[i]; + if (name.equals(id.getName())) { + found = id; + break; + } + } + if (found == null) + return null; + ISharedObject obj = manager.getSharedObject(found); + if (obj == null) + return null; + return obj.getClass().getClassLoader(); + } + + }, bins); + Object obj = iins.readObject(); + return obj; + } + protected void sendMessage(ID toContainerID, ID sharedObjectID, Object message) throws IOException { if (message == null) return; - byte [] sendData = serializeSharedObjectMessage(sharedObjectID,message); - sendSharedObjectMessage(toContainerID, sharedObjectID, - sendData); + byte[] sendData = serializeSharedObjectMessage(sharedObjectID, message); + sendSharedObjectMessage(toContainerID, sharedObjectID, sendData); } protected void sendSharedObjectMessage(ID toContainerID, ID fromSharedObject, Serializable data) throws IOException { - - + sendMessage(ContainerMessage.makeSharedObjectMessage(getID(), toContainerID, getNextSequenceNumber(), fromSharedObject, data)); } diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOManager.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOManager.java index 4ca5280e3..997cbe52a 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOManager.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/SOManager.java @@ -23,9 +23,9 @@ import java.util.Enumeration; import java.util.List; import java.util.Map; import java.util.Vector; + import org.eclipse.ecf.core.ISharedObject; import org.eclipse.ecf.core.ISharedObjectConnector; -import org.eclipse.ecf.core.ISharedObjectContainerTransaction; import org.eclipse.ecf.core.ISharedObjectManager; import org.eclipse.ecf.core.SharedObjectAddException; import org.eclipse.ecf.core.SharedObjectConnectException; @@ -197,9 +197,7 @@ public class SOManager implements ISharedObjectManager { sharedObject.getClass().getClassLoader(), sharedObjectID, container.getID(), sharedObject.getClass().getName(), properties, 0); - ISharedObjectContainerTransaction transaction = (ISharedObjectContainerTransaction) so - .getAdapter(ISharedObjectContainerTransaction.class); - container.addSharedObjectAndWait(sd, so, transaction); + container.addSharedObjectAndWait(sd, so); } catch (Exception e) { dumpStack("Exception in addSharedObject", e); SharedObjectAddException newExcept = new SharedObjectAddException( diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java index e69e99c73..092de353a 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java @@ -34,17 +34,20 @@ import org.eclipse.ecf.provider.Trace; * @author slewis * */ -public class BaseSharedObject implements ISharedObject, IIdentifiable { +public class BaseSharedObject implements ISharedObject, IIdentifiable, ISharedObjectInternal { - protected static final String TRANSACTIONAL_SUFFIX = ".transactional"; private static long identifier = 0L; + public static final String TRANSACTION_PROPERTY_NAME = ISharedObjectContainerTransaction.class.getName(); + Trace trace = Trace.create("basesharedobject"); ISharedObjectConfig config = null; List eventProcessors = new Vector(); - Boolean transactional = null; - protected static long getIdentifier() { + Integer transactionTimeout = new Integer(-1); + ISharedObjectContainerTransaction transaction = null; + + protected static long getNextIdentifier() { return identifier++; } private void trace(String msg) { @@ -57,12 +60,15 @@ public class BaseSharedObject implements ISharedObject, IIdentifiable { trace.dumpStack(t,getID()+":"+msg); } } - protected void addEventProcessor(IEventProcessor proc) { + public void addEventProcessor(IEventProcessor proc) { eventProcessors.add(proc); } - protected boolean removeEventProcessor(IEventProcessor proc) { + public boolean removeEventProcessor(IEventProcessor proc) { return eventProcessors.remove(proc); } + public void clearEventProcessors() { + eventProcessors.clear(); + } protected void fireEventProcessors(Event event) { if (event == null) return; Event evt = event; @@ -90,27 +96,31 @@ public class BaseSharedObject implements ISharedObject, IIdentifiable { throws SharedObjectInitException { this.config = initData; trace("init("+initData+")"); + initTransaction(); + } + protected void initTransaction() { Map props = config.getProperties(); - Object o = props.get(this.getClass().getName()+TRANSACTIONAL_SUFFIX); - if (o instanceof Boolean) { - Boolean b = (Boolean) o; - if (b != null && b.booleanValue()) { + // If transaction property is set, get Integer value and use for transaction timeout + Object o = props.get(TRANSACTION_PROPERTY_NAME); + if (o instanceof Integer) { + Integer trans = (Integer) o; + if (trans != null && trans.intValue() != -1) { // transactional... - new TwoPhaseCommit(this); + transactionTimeout = trans; + transaction = new SharedObjectReplication(this,transactionTimeout.intValue()); } } - } protected ISharedObjectConfig getConfig() { return config; } - protected ISharedObjectContext getContext() { + public ISharedObjectContext getContext() { ISharedObjectConfig c = getConfig(); if (c == null) { return null; } else return config.getContext(); } - protected ID getHomeID() { + public ID getHomeID() { ISharedObjectConfig conf = getConfig(); if (conf == null) return null; else return conf.getHomeContainerID(); @@ -127,7 +137,7 @@ public class BaseSharedObject implements ISharedObject, IIdentifiable { return null; } else return context.getGroupID(); } - protected boolean isPrimary() { + public boolean isPrimary() { ID local = getLocalID(); ID home = getHomeID(); if (local == null || home == null) { @@ -171,8 +181,9 @@ public class BaseSharedObject implements ISharedObject, IIdentifiable { */ public Object getAdapter(Class clazz) { if (clazz.equals(ISharedObjectContainerTransaction.class)) { - transactional = new Boolean(true); - return new TwoPhaseCommit(this); + if (transactionTimeout == null || transactionTimeout.intValue() == -1) { + return null; + } else return transaction; } return null; } @@ -222,35 +233,9 @@ public class BaseSharedObject implements ISharedObject, IIdentifiable { } } - protected void replicateToRemote(ID remote) { - trace("replicateToRemote(" + remote + ")"); - try { - // Get current group membership - ISharedObjectContext context = getContext(); - if (context == null) return; - ID[] group = context.getGroupMemberIDs(); - if (group == null || group.length < 1) { - // we're done - return; - } - SharedObjectDescription createInfo = getReplicaDescription(remote); - if (createInfo != null) { - context.sendCreate(remote, createInfo); - } else { - return; - } - } catch (IOException e) { - traceStack("Exception in replicate("+remote+")", e); - return; - } - } - protected SharedObjectDescription getReplicaDescription(ID receiver) { - Map props = getConfig().getProperties(); - props.put(this.getClass().getName()+TRANSACTIONAL_SUFFIX,transactional); + public SharedObjectDescription getReplicaDescription(ID receiver) { return new SharedObjectDescription(getID(), getClass().getName(), - props, getIdentifier()); + getConfig().getProperties(), getNextIdentifier()); } - - } diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/ISharedObjectInternal.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/ISharedObjectInternal.java new file mode 100644 index 000000000..7a7ace017 --- /dev/null +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/ISharedObjectInternal.java @@ -0,0 +1,18 @@ +package org.eclipse.ecf.provider.generic.sobject; + +import org.eclipse.ecf.core.IIdentifiable; +import org.eclipse.ecf.core.ISharedObjectContext; +import org.eclipse.ecf.core.SharedObjectDescription; +import org.eclipse.ecf.core.identity.ID; +import org.eclipse.ecf.core.util.IEventProcessor; + +public interface ISharedObjectInternal extends IIdentifiable { + public void addEventProcessor(IEventProcessor ep); + public boolean removeEventProcessor(IEventProcessor ep); + + public ISharedObjectContext getContext(); + public boolean isPrimary(); + public ID getHomeID(); + public SharedObjectDescription getReplicaDescription(ID remote); + public void destroySelf(); +} diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TwoPhaseCommit.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/SharedObjectReplication.java index 3885ac5a3..c5e56753a 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TwoPhaseCommit.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/SharedObjectReplication.java @@ -17,13 +17,14 @@ import org.eclipse.ecf.core.SharedObjectAddAbortException; * @author slewis * */ -public class TwoPhaseCommit implements +public class SharedObjectReplication implements ISharedObjectContainerTransaction { - TransactionEventProcessor eventProcessor = null; + TPCommitEventProcessor eventProcessor = null; - public TwoPhaseCommit(BaseSharedObject so) { - eventProcessor = new TransactionEventProcessor(so); + public SharedObjectReplication(ISharedObjectInternal so,int transactionTimeout) { + eventProcessor = new TPCommitEventProcessor(so,transactionTimeout); + so.addEventProcessor(eventProcessor); } /* (non-Javadoc) * @see org.eclipse.ecf.core.ISharedObjectContainerTransaction#waitToCommit() diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TransactionEventProcessor.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TPCommitEventProcessor.java index fbb24e440..b124901e4 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TransactionEventProcessor.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TPCommitEventProcessor.java @@ -11,6 +11,7 @@ package org.eclipse.ecf.provider.generic.sobject; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -19,6 +20,7 @@ import java.util.Vector; import org.eclipse.ecf.core.ISharedObjectContainerTransaction; import org.eclipse.ecf.core.ISharedObjectContext; import org.eclipse.ecf.core.SharedObjectAddAbortException; +import org.eclipse.ecf.core.SharedObjectDescription; import org.eclipse.ecf.core.events.ISharedObjectActivatedEvent; import org.eclipse.ecf.core.events.ISharedObjectCommitEvent; import org.eclipse.ecf.core.events.ISharedObjectContainerDepartedEvent; @@ -35,12 +37,12 @@ import org.eclipse.ecf.provider.Trace; * @author slewis * */ -public class TransactionEventProcessor implements IEventProcessor { +public class TPCommitEventProcessor implements IEventProcessor { public static final Trace trace = Trace.create("transactioneventprocessor"); public static final int DEFAULT_TIMEOUT = 30000; - BaseSharedObject sharedObject = null; + ISharedObjectInternal sharedObject = null; byte transactionState = ISharedObjectContainerTransaction.ACTIVE; Object lock = new Object(); @@ -48,14 +50,14 @@ public class TransactionEventProcessor implements IEventProcessor { Map failed = new HashMap(); int timeout = DEFAULT_TIMEOUT; int minFailedToAbort = 0; - - public TransactionEventProcessor(BaseSharedObject bse, int timeout) { - sharedObject = bse; - sharedObject.addEventProcessor(this); + long identifier = 0; + + public TPCommitEventProcessor(ISharedObjectInternal bse, int timeout) { + this.sharedObject = bse; this.timeout = timeout; } - public TransactionEventProcessor(BaseSharedObject bse) { + public TPCommitEventProcessor(ISharedObjectInternal bse) { this(bse, DEFAULT_TIMEOUT); } @@ -87,7 +89,7 @@ public class TransactionEventProcessor implements IEventProcessor { return getSharedObject().isPrimary(); } - protected BaseSharedObject getSharedObject() { + protected ISharedObjectInternal getSharedObject() { return sharedObject; } @@ -184,9 +186,30 @@ public class TransactionEventProcessor implements IEventProcessor { } } + protected void replicateTo(ID remote) { + try { + // Get current group membership + ISharedObjectContext context = getSharedObject().getContext(); + if (context == null) return; + ID[] group = context.getGroupMemberIDs(); + if (group == null || group.length < 1) { + // we're done + return; + } + SharedObjectDescription createInfo = getSharedObject().getReplicaDescription(remote); + if (createInfo != null) { + context.sendCreate(remote, createInfo); + } else { + return; + } + } catch (IOException e) { + traceStack("Exception in replicate("+remote+")", e); + return; + } + } protected void handlePrimaryActivated(ISharedObjectActivatedEvent event) { - getSharedObject().replicateToRemote(null); + replicateTo(null); addParticipants(getContext().getGroupMemberIDs()); setTransactionState(ISharedObjectContainerTransaction.VOTING); } @@ -195,7 +218,7 @@ public class TransactionEventProcessor implements IEventProcessor { try { // Try to respond with create success message back to host getContext().sendCreateResponse(getHomeID(), null, - BaseSharedObject.getIdentifier()); + BaseSharedObject.getNextIdentifier()); // If above succeeds, we're now in prepared state setTransactionState(ISharedObjectContainerTransaction.PREPARED); } catch (Exception except) { @@ -213,12 +236,12 @@ public class TransactionEventProcessor implements IEventProcessor { // replicate message synchronized (lock) { ID newMember = event.getJoinedContainerID(); - getSharedObject().replicateToRemote(newMember); + replicateTo(newMember); if (getTransactionState() == ISharedObjectContainerTransaction.VOTING) addParticipants(new ID[] { newMember }); } } else { - // we don't care + // we don't care as we are note transaction monitor } } @@ -241,7 +264,7 @@ public class TransactionEventProcessor implements IEventProcessor { lock.notifyAll(); } } else { - // We don't care + // we don't care as we are note transaction monitor } } @@ -256,7 +279,7 @@ public class TransactionEventProcessor implements IEventProcessor { lock.notifyAll(); } } else { - // we don't care + // we don't care as we are note transaction monitor } } @@ -318,7 +341,7 @@ public class TransactionEventProcessor implements IEventProcessor { trace("waitForFinish waiting " + wait + "ms on " + getSharedObject().getID()); if (wait <= 0L) - throw new SharedObjectAddAbortException("Timeout", + throw new SharedObjectAddAbortException("Timeout adding "+getSharedObject().getID()+" to "+getHomeID(), (Throwable) null, getTimeout()); // Wait right here lock.wait(wait); @@ -335,12 +358,11 @@ public class TransactionEventProcessor implements IEventProcessor { protected void doTMAbort(Throwable except) throws SharedObjectAddAbortException { trace("ABORTED:" + except); + // Set our own state variable to ABORTED + setTransactionState(ISharedObjectContainerTransaction.ABORTED); // Send destroy message here so all remotes get destroyed, and we remove // ourselves from local space as well. - getSharedObject().removeEventProcessor(this); getSharedObject().destroySelf(); - // Set our own state variable to ABORTED - setTransactionState(ISharedObjectContainerTransaction.ABORTED); // throw so caller gets exception and can deal with it if (except instanceof SharedObjectAddAbortException) throw (SharedObjectAddAbortException) except; |
