Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorslewis2005-07-04 23:19:55 +0000
committerslewis2005-07-04 23:19:55 +0000
commit795c0edf438616c2b640c678e715d7464202e293 (patch)
treef3d2877d150b0e50b1333c2aef61dc3affb79143
parent2afc9dcc278736c3715c0ee80603052ab4206533 (diff)
downloadorg.eclipse.ecf-795c0edf438616c2b640c678e715d7464202e293.tar.gz
org.eclipse.ecf-795c0edf438616c2b640c678e715d7464202e293.tar.xz
org.eclipse.ecf-795c0edf438616c2b640c678e715d7464202e293.zip
Completed TransactionEventProcessor
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java35
-rw-r--r--framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TransactionEventProcessor.java432
2 files changed, 382 insertions, 85 deletions
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java
index c0d52b4ab..bde3a6f2c 100644
--- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java
@@ -22,6 +22,7 @@ import org.eclipse.ecf.core.ISharedObject;
import org.eclipse.ecf.core.ISharedObjectConfig;
import org.eclipse.ecf.core.ISharedObjectContext;
import org.eclipse.ecf.core.ISharedObjectManager;
+import org.eclipse.ecf.core.SharedObjectDescription;
import org.eclipse.ecf.core.SharedObjectInitException;
import org.eclipse.ecf.core.identity.ID;
import org.eclipse.ecf.core.util.Event;
@@ -34,11 +35,16 @@ import org.eclipse.ecf.provider.Trace;
*/
public class BaseSharedObject implements ISharedObject, IIdentifiable {
+ private static long identifier = 0L;
+
Trace trace = Trace.create("basesharedobject");
ISharedObjectConfig config = null;
List eventProcessors = new Vector();
+ protected static long getIdentifier() {
+ return identifier++;
+ }
protected void trace(String msg) {
if (Trace.ON && trace != null) {
trace.msg(msg);
@@ -179,4 +185,33 @@ public class BaseSharedObject implements ISharedObject, IIdentifiable {
}
}
+ protected void replicate(ID remote) {
+ trace("replicate(" + remote + ")");
+ try {
+ // Get current group membership
+ ISharedObjectContext context = getContext();
+ if (context == null) return;
+ ID[] group = context.getGroupMemberIDs();
+ if (group == null || group.length < 1) {
+ // we're done
+ return;
+ }
+ SharedObjectDescription createInfo = getReplicaDescription(remote);
+ if (createInfo != null) {
+ context.sendCreate(remote, createInfo);
+ } else {
+ return;
+ }
+ } catch (IOException e) {
+ traceStack("Exception in replicate("+remote+")", e);
+ return;
+ }
+ }
+ protected SharedObjectDescription getReplicaDescription(ID receiver) {
+ return new SharedObjectDescription(getID(), getClass().getName(),
+ getConfig().getProperties(), getIdentifier());
+ }
+
+
+
}
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TransactionEventProcessor.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TransactionEventProcessor.java
index fb78050d5..7ea58a7c9 100644
--- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TransactionEventProcessor.java
+++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TransactionEventProcessor.java
@@ -1,6 +1,14 @@
-/**
- *
- */
+/****************************************************************************
+ * Copyright (c) 2004 Composent, Inc. and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Composent, Inc. - initial API and implementation
+ *****************************************************************************/
+
package org.eclipse.ecf.provider.generic.sobject;
import java.util.HashMap;
@@ -9,7 +17,13 @@ import java.util.Map;
import java.util.Vector;
import org.eclipse.ecf.core.ISharedObjectContainerTransaction;
+import org.eclipse.ecf.core.ISharedObjectContext;
import org.eclipse.ecf.core.SharedObjectAddAbortException;
+import org.eclipse.ecf.core.events.ISharedObjectActivatedEvent;
+import org.eclipse.ecf.core.events.ISharedObjectContainerDepartedEvent;
+import org.eclipse.ecf.core.events.ISharedObjectContainerJoinedEvent;
+import org.eclipse.ecf.core.events.ISharedObjectCreateResponseEvent;
+import org.eclipse.ecf.core.events.RemoteSharedObjectEvent;
import org.eclipse.ecf.core.identity.ID;
import org.eclipse.ecf.core.util.Event;
import org.eclipse.ecf.core.util.IEventProcessor;
@@ -17,126 +31,374 @@ import org.eclipse.ecf.provider.Trace;
/**
* @author slewis
- *
+ *
*/
public class TransactionEventProcessor implements IEventProcessor {
public static final Trace trace = Trace.create("transactioneventprocessor");
+
public static final int DEFAULT_TIMEOUT = 30000;
+
BaseSharedObject sharedObject = null;
+
byte transactionState = ISharedObjectContainerTransaction.ACTIVE;
+
Object lock = new Object();
+
List participants = new Vector();
+
Map failed = new HashMap();
+
int timeout = DEFAULT_TIMEOUT;
-
+
+ int minFailedToAbort = 0;
+
+ class CommitEvent extends RemoteSharedObjectEvent {
+ private static final long serialVersionUID = 6208586597348619413L;
+
+ public CommitEvent(ID senderObj, ID remoteCont) {
+ super(senderObj, remoteCont, null);
+ }
+ }
+
public TransactionEventProcessor(BaseSharedObject bse, int timeout) {
sharedObject = bse;
sharedObject.addEventProcessor(this);
this.timeout = timeout;
}
+
public TransactionEventProcessor(BaseSharedObject bse) {
- this(bse,DEFAULT_TIMEOUT);
- }
- protected void trace(String msg) {
- if (Trace.ON && trace != null) {
- trace.msg(sharedObject.getID()+":"+msg);
- }
- }
-
- protected void traceDump(String msg, Throwable t) {
- if (Trace.ON && trace != null) {
- trace.dumpStack(t, sharedObject.getID()+":"+msg);
- }
- }
-
- protected void addParticipants(ID [] ids) {
- if (ids != null) {
- for(int i=0; i < ids.length; i++) {
- trace("adding participant:"+ids[i]);
- if (!sharedObject.getHomeID().equals(ids[i])) participants.add(ids[i]);
- }
- }
+ this(bse, DEFAULT_TIMEOUT);
+ }
+
+ protected void trace(String msg) {
+ if (Trace.ON && trace != null) {
+ trace.msg(getSharedObject().getID() + ":"
+ + (getSharedObject().isPrimary() ? "primary" : "replica")
+ + msg);
+ }
}
+
+ protected void traceStack(String msg, Throwable t) {
+ if (Trace.ON && trace != null) {
+ trace.dumpStack(t, sharedObject.getID() + ":"
+ + (getSharedObject().isPrimary() ? "primary" : "replica")
+ + msg);
+ }
+ }
+
+ protected int getTimeout() {
+ return timeout;
+ }
+
+ protected int getMinFailedToAbort() {
+ return minFailedToAbort;
+ }
+
+ protected boolean isPrimary() {
+ return getSharedObject().isPrimary();
+ }
+
+ protected BaseSharedObject getSharedObject() {
+ return sharedObject;
+ }
+
+ protected ID getHomeID() {
+ return getSharedObject().getHomeID();
+ }
+
+ protected void addParticipants(ID[] ids) {
+ if (ids != null) {
+ for (int i = 0; i < ids.length; i++) {
+ trace("addParticipant(" + ids[i] + ")");
+ if (!getHomeID().equals(ids[i]))
+ participants.add(ids[i]);
+ }
+ }
+ }
+
protected void removeParticipant(ID id) {
if (id != null) {
- trace("removing participant:"+id);
+ trace("removeParticipant(" + id + ")");
participants.remove(id);
}
}
- protected void addFailed(ID remote, Throwable failure)
- {
- if (remote != null && failure != null) {
- trace("adding failed:"+remote+":exception:"+failure);
- failed.put(remote, failure);
- }
- }
- /* (non-Javadoc)
+ protected void addFailed(ID remote, Throwable failure) {
+ if (remote != null && failure != null) {
+ trace("addFailed(" + remote + "," + failure + ")");
+ failed.put(remote, failure);
+ }
+ }
+
+ protected ISharedObjectContext getContext() {
+ return getSharedObject().getContext();
+ }
+
+ /*
+ * Implementation of IEventProcessor. These methods are entry point methods
+ * for BaseSharedObject method dispatch to call
+ */
+ /*
+ * (non-Javadoc)
+ *
* @see org.eclipse.ecf.core.util.IEventProcessor#acceptEvent(org.eclipse.ecf.core.util.Event)
*/
public boolean acceptEvent(Event event) {
return true;
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.eclipse.ecf.core.util.IEventProcessor#processEvent(org.eclipse.ecf.core.util.Event)
*/
- public Event processEvent(Event e) {
- // TODO Auto-generated method stub
- return null;
+ public Event processEvent(Event event) {
+ if (event instanceof ISharedObjectActivatedEvent) {
+ trace("handleActivated(" + event + ")");
+ handleActivated((ISharedObjectActivatedEvent) event);
+ return event;
+ } else if (event instanceof ISharedObjectContainerJoinedEvent) {
+ trace("handleJoined(" + event + ")");
+ handleJoined((ISharedObjectContainerJoinedEvent) event);
+ return event;
+ } else if (event instanceof ISharedObjectCreateResponseEvent) {
+ trace("handleCreateResponse(" + event + ")");
+ handleCreateResponse((ISharedObjectCreateResponseEvent) event);
+ return event;
+ } else if (event instanceof ISharedObjectContainerDepartedEvent) {
+ trace("handleDeparted(" + event + ")");
+ handleDeparted((ISharedObjectContainerDepartedEvent) event);
+ return event;
+ } else if (event instanceof CommitEvent) {
+ trace("localCommitted(" + event + ")");
+ localCommitted();
+ return event;
+ }
+ return event;
+ }
+
+ protected void handleActivated(ISharedObjectActivatedEvent event) {
+ // No other state changes while this is going on
+ synchronized (lock) {
+ if (isPrimary()) {
+ // Primary
+ handlePrimaryActivated(event);
+ } else {
+ handleReplicaActivated(event);
+ }
+ // Notify any threads waiting on state change
+ lock.notifyAll();
+ }
+
+ }
+
+ protected void handlePrimaryActivated(ISharedObjectActivatedEvent event) {
+ getSharedObject().replicate(null);
+ addParticipants(getContext().getGroupMemberIDs());
+ setTransactionState(ISharedObjectContainerTransaction.VOTING);
+ }
+
+ protected void handleReplicaActivated(ISharedObjectActivatedEvent event) {
+ try {
+ // Try to respond with create success message back to host
+ getContext().sendCreateResponse(getHomeID(), null,
+ BaseSharedObject.getIdentifier());
+ // If above succeeds, we're now in prepared state
+ setTransactionState(ISharedObjectContainerTransaction.PREPARED);
+ } catch (Exception except) {
+ // If throws exception, we're doomed
+ traceStack("handleReplicaActivated(" + event + ")", except);
+ setTransactionState(ISharedObjectContainerTransaction.ABORTED);
+ }
+ }
+
+ protected void handleJoined(ISharedObjectContainerJoinedEvent event) {
+ if (isPrimary()) {
+ // If we are currently in VOTING state, then add the new member to
+ // list of participants
+ // and send replicate message. If not in voting state, just send
+ // replicate message
+ synchronized (lock) {
+ ID newMember = event.getJoinedContainerID();
+ getSharedObject().replicate(newMember);
+ if (getTransactionState() == ISharedObjectContainerTransaction.VOTING)
+ addParticipants(new ID[] { newMember });
+ else
+ getSharedObject().replicate(newMember);
+ }
+ } else {
+ // we don't care
+ }
+ }
+
+ protected void handleCreateResponse(ISharedObjectCreateResponseEvent event) {
+ if (isPrimary()) {
+ synchronized (lock) {
+ trace("handleCreateResponse(" + event + ")");
+ Throwable except = event.getException();
+ ID remoteID = event.getRemoteContainerID();
+ long ident = event.getSequence();
+ if (getTransactionState() == ISharedObjectContainerTransaction.VOTING) {
+ if (except == null) {
+ removeParticipant(remoteID);
+ } else {
+ addFailed(remoteID, except);
+ }
+ } else {
+ handleVotingCompletedCreateResponse(remoteID, except, ident);
+ }
+ lock.notifyAll();
+ }
+ } else {
+ // We don't care
+ }
+ }
+
+ protected void handleDeparted(ISharedObjectContainerDepartedEvent event) {
+ if (isPrimary()) {
+ ID remoteID = event.getDepartedContainerID();
+ synchronized (lock) {
+ if (getTransactionState() == ISharedObjectContainerTransaction.VOTING) {
+ addFailed(remoteID, new Exception("Container " + remoteID
+ + " left"));
+ }
+ lock.notifyAll();
+ }
+ } else {
+ // we don't care
+ }
+ }
+
+ protected void handleVotingCompletedCreateResponse(ID fromID, Throwable e,
+ long identifier) {
+ // If remote creation was successful, simply send commit message back.
+ if (e == null) {
+ try {
+ getSharedObject().getContext().sendMessage(
+ fromID,
+ new CommitEvent(getSharedObject().getID(),
+ getSharedObject().getLocalID()));
+ } catch (Exception e2) {
+ traceStack("Exception in sendCommit to " + fromID, e2);
+ }
+ } else {
+ // Too late to vote no
+ handlePostCommitFailure(fromID, e, identifier);
+ }
}
-
+
+ protected void handlePostCommitFailure(ID fromID, Throwable e,
+ long identifier) {
+ // Do nothing but report
+ trace("handlePostCommitFailure(" + fromID + "," + e + "," + identifier
+ + ")");
+ }
+
+ protected void sendCommit() throws SharedObjectAddAbortException {
+ try {
+ getContext().sendMessage(
+ null,
+ new CommitEvent(getSharedObject().getID(),
+ getSharedObject().getLocalID()));
+ } catch (Exception e2) {
+ doTMAbort(new SharedObjectAddAbortException(
+ "Exception sending commit message", e2, getTimeout()));
+ }
+ }
+
protected byte getTransactionState() {
synchronized (lock) {
return transactionState;
}
}
+
+ protected void setTransactionState(byte state) {
+ synchronized (lock) {
+ transactionState = state;
+ }
+ }
+
protected void waitToCommit() throws SharedObjectAddAbortException {
- synchronized (lock) {
- long end = System.currentTimeMillis() + timeout;
- try {
- while (!votingCompleted()) {
- long wait = end - System.currentTimeMillis();
- trace("waitForFinish waiting "+wait+"ms on "+sharedObject.getID());
- if (wait <= 0L) throw new SharedObjectAddAbortException("Timeout waiting for create responses",(Throwable) null,timeout);
- // Wait right here
- lock.wait(wait);
- }
- } catch (InterruptedException e) {
- throw new SharedObjectAddAbortException("Wait interrupted",(Throwable) null,timeout);
- } catch (SharedObjectAddAbortException e1) {
- // Aborted for some reason. Clean up.
- doAbort(e1);
- }
- // Success. Send commit to remotes and clean up before returning.
- doCommit();
- }
- }
- protected void doAbort(Throwable t) {
- // XXX TODO
- }
- protected void doCommit() {
- // XXX TODO
- }
- protected boolean votingCompleted() throws SharedObjectAddAbortException
- {
- // The test here is is we've received any indication of failed participants in
- // the transaction. If so, we throw.
- trace("voting completed test");
- if (failed.size() > 0) {
- trace("voting completed. Failures:"+failed);
- // Abort!
- throw new SharedObjectAddAbortException("Abort received",failed,timeout);
- // If no problems, and the number of participants to here from is 0, then we're done
- } else if (getTransactionState() == ISharedObjectContainerTransaction.VOTING && participants.size() == 0) {
- // Success!
- trace("votingCompleted:true");
- return true;
- }
- // Else continue waiting
- trace("voting not completed");
- return false;
- }
+ if (getTransactionState() == ISharedObjectContainerTransaction.COMMITTED)
+ return;
+ synchronized (lock) {
+ long end = System.currentTimeMillis() + getTimeout();
+ try {
+ while (!isVotingCompleted()) {
+ long wait = end - System.currentTimeMillis();
+ trace("waitForFinish waiting " + wait + "ms on "
+ + getSharedObject().getID());
+ if (wait <= 0L)
+ throw new SharedObjectAddAbortException("Timeout",
+ (Throwable) null, getTimeout());
+ // Wait right here
+ lock.wait(wait);
+ }
+ } catch (Exception except) {
+ // Aborted for some reason. Clean up and throw
+ doTMAbort(except);
+ }
+ // Success. Send commit to remotes and clean up before returning.
+ doTMCommit();
+ }
+ }
+
+ protected void doTMAbort(Throwable except)
+ throws SharedObjectAddAbortException {
+ trace("ABORTED:" + except);
+ // Send destroy message here so all remotes get destroyed, and we remove
+ // ourselves from local space as well.
+ getSharedObject().removeEventProcessor(this);
+ getSharedObject().destroySelf();
+ // Set our own state variable to ABORTED
+ setTransactionState(ISharedObjectContainerTransaction.ABORTED);
+ // throw so caller gets exception and can deal with it
+ if (except instanceof SharedObjectAddAbortException)
+ throw (SharedObjectAddAbortException) except;
+ else
+ throw new SharedObjectAddAbortException("Aborted", except,
+ getTimeout());
+ }
+
+ protected void doTMCommit() throws SharedObjectAddAbortException {
+ // Only forward commit message if the participantIDs array is not yet
+ // null,
+ // and the current membership is > 0 (we're connected to something)
+ if (getSharedObject().getContext().getGroupMemberIDs().length > 0) {
+ sendCommit();
+ }
+ // Call local committed message
+ localCommitted();
+ }
+
+ protected void localCommitted() {
+ // Set state variable to committed.
+ setTransactionState(ISharedObjectContainerTransaction.COMMITTED);
+ trace("COMMITTED!!!");
+ }
+
+ protected boolean isVotingCompleted() throws SharedObjectAddAbortException {
+ // The test here is is we've received any indication of failed
+ // participants in
+ // the transaction. If so, we throw.
+ if (failed.size() > getMinFailedToAbort()) {
+ // Abort!
+ trace("isVotingCompleted() aborting: failed > "
+ + getMinFailedToAbort() + ":failed=" + failed);
+ throw new SharedObjectAddAbortException("Abort received", failed,
+ getTimeout());
+ // If no problems, and the number of participants to here from is 0,
+ // then we're done
+ } else if (getTransactionState() == ISharedObjectContainerTransaction.VOTING
+ && participants.size() == 0) {
+ // Success!
+ trace("isVotingCompleted() returning true");
+ return true;
+ }
+ // Else continue waiting
+ trace("isVotingCompleted() returning false");
+ return false;
+ }
}

Back to the top