diff options
| author | slewis | 2005-07-04 23:19:55 +0000 |
|---|---|---|
| committer | slewis | 2005-07-04 23:19:55 +0000 |
| commit | 795c0edf438616c2b640c678e715d7464202e293 (patch) | |
| tree | f3d2877d150b0e50b1333c2aef61dc3affb79143 | |
| parent | 2afc9dcc278736c3715c0ee80603052ab4206533 (diff) | |
| download | org.eclipse.ecf-795c0edf438616c2b640c678e715d7464202e293.tar.gz org.eclipse.ecf-795c0edf438616c2b640c678e715d7464202e293.tar.xz org.eclipse.ecf-795c0edf438616c2b640c678e715d7464202e293.zip | |
Completed TransactionEventProcessor
2 files changed, 382 insertions, 85 deletions
diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java index c0d52b4ab..bde3a6f2c 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/BaseSharedObject.java @@ -22,6 +22,7 @@ import org.eclipse.ecf.core.ISharedObject; import org.eclipse.ecf.core.ISharedObjectConfig; import org.eclipse.ecf.core.ISharedObjectContext; import org.eclipse.ecf.core.ISharedObjectManager; +import org.eclipse.ecf.core.SharedObjectDescription; import org.eclipse.ecf.core.SharedObjectInitException; import org.eclipse.ecf.core.identity.ID; import org.eclipse.ecf.core.util.Event; @@ -34,11 +35,16 @@ import org.eclipse.ecf.provider.Trace; */ public class BaseSharedObject implements ISharedObject, IIdentifiable { + private static long identifier = 0L; + Trace trace = Trace.create("basesharedobject"); ISharedObjectConfig config = null; List eventProcessors = new Vector(); + protected static long getIdentifier() { + return identifier++; + } protected void trace(String msg) { if (Trace.ON && trace != null) { trace.msg(msg); @@ -179,4 +185,33 @@ public class BaseSharedObject implements ISharedObject, IIdentifiable { } } + protected void replicate(ID remote) { + trace("replicate(" + remote + ")"); + try { + // Get current group membership + ISharedObjectContext context = getContext(); + if (context == null) return; + ID[] group = context.getGroupMemberIDs(); + if (group == null || group.length < 1) { + // we're done + return; + } + SharedObjectDescription createInfo = getReplicaDescription(remote); + if (createInfo != null) { + context.sendCreate(remote, createInfo); + } else { + return; + } + } catch (IOException e) { + traceStack("Exception in replicate("+remote+")", e); + return; + } + } + protected SharedObjectDescription getReplicaDescription(ID receiver) { + return new SharedObjectDescription(getID(), getClass().getName(), + getConfig().getProperties(), getIdentifier()); + } + + + } diff --git a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TransactionEventProcessor.java b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TransactionEventProcessor.java index fb78050d5..7ea58a7c9 100644 --- a/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TransactionEventProcessor.java +++ b/framework/bundles/org.eclipse.ecf.provider/src/org/eclipse/ecf/provider/generic/sobject/TransactionEventProcessor.java @@ -1,6 +1,14 @@ -/** - * - */ +/**************************************************************************** + * Copyright (c) 2004 Composent, Inc. and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Composent, Inc. - initial API and implementation + *****************************************************************************/ + package org.eclipse.ecf.provider.generic.sobject; import java.util.HashMap; @@ -9,7 +17,13 @@ import java.util.Map; import java.util.Vector; import org.eclipse.ecf.core.ISharedObjectContainerTransaction; +import org.eclipse.ecf.core.ISharedObjectContext; import org.eclipse.ecf.core.SharedObjectAddAbortException; +import org.eclipse.ecf.core.events.ISharedObjectActivatedEvent; +import org.eclipse.ecf.core.events.ISharedObjectContainerDepartedEvent; +import org.eclipse.ecf.core.events.ISharedObjectContainerJoinedEvent; +import org.eclipse.ecf.core.events.ISharedObjectCreateResponseEvent; +import org.eclipse.ecf.core.events.RemoteSharedObjectEvent; import org.eclipse.ecf.core.identity.ID; import org.eclipse.ecf.core.util.Event; import org.eclipse.ecf.core.util.IEventProcessor; @@ -17,126 +31,374 @@ import org.eclipse.ecf.provider.Trace; /** * @author slewis - * + * */ public class TransactionEventProcessor implements IEventProcessor { public static final Trace trace = Trace.create("transactioneventprocessor"); + public static final int DEFAULT_TIMEOUT = 30000; + BaseSharedObject sharedObject = null; + byte transactionState = ISharedObjectContainerTransaction.ACTIVE; + Object lock = new Object(); + List participants = new Vector(); + Map failed = new HashMap(); + int timeout = DEFAULT_TIMEOUT; - + + int minFailedToAbort = 0; + + class CommitEvent extends RemoteSharedObjectEvent { + private static final long serialVersionUID = 6208586597348619413L; + + public CommitEvent(ID senderObj, ID remoteCont) { + super(senderObj, remoteCont, null); + } + } + public TransactionEventProcessor(BaseSharedObject bse, int timeout) { sharedObject = bse; sharedObject.addEventProcessor(this); this.timeout = timeout; } + public TransactionEventProcessor(BaseSharedObject bse) { - this(bse,DEFAULT_TIMEOUT); - } - protected void trace(String msg) { - if (Trace.ON && trace != null) { - trace.msg(sharedObject.getID()+":"+msg); - } - } - - protected void traceDump(String msg, Throwable t) { - if (Trace.ON && trace != null) { - trace.dumpStack(t, sharedObject.getID()+":"+msg); - } - } - - protected void addParticipants(ID [] ids) { - if (ids != null) { - for(int i=0; i < ids.length; i++) { - trace("adding participant:"+ids[i]); - if (!sharedObject.getHomeID().equals(ids[i])) participants.add(ids[i]); - } - } + this(bse, DEFAULT_TIMEOUT); + } + + protected void trace(String msg) { + if (Trace.ON && trace != null) { + trace.msg(getSharedObject().getID() + ":" + + (getSharedObject().isPrimary() ? "primary" : "replica") + + msg); + } } + + protected void traceStack(String msg, Throwable t) { + if (Trace.ON && trace != null) { + trace.dumpStack(t, sharedObject.getID() + ":" + + (getSharedObject().isPrimary() ? "primary" : "replica") + + msg); + } + } + + protected int getTimeout() { + return timeout; + } + + protected int getMinFailedToAbort() { + return minFailedToAbort; + } + + protected boolean isPrimary() { + return getSharedObject().isPrimary(); + } + + protected BaseSharedObject getSharedObject() { + return sharedObject; + } + + protected ID getHomeID() { + return getSharedObject().getHomeID(); + } + + protected void addParticipants(ID[] ids) { + if (ids != null) { + for (int i = 0; i < ids.length; i++) { + trace("addParticipant(" + ids[i] + ")"); + if (!getHomeID().equals(ids[i])) + participants.add(ids[i]); + } + } + } + protected void removeParticipant(ID id) { if (id != null) { - trace("removing participant:"+id); + trace("removeParticipant(" + id + ")"); participants.remove(id); } } - protected void addFailed(ID remote, Throwable failure) - { - if (remote != null && failure != null) { - trace("adding failed:"+remote+":exception:"+failure); - failed.put(remote, failure); - } - } - /* (non-Javadoc) + protected void addFailed(ID remote, Throwable failure) { + if (remote != null && failure != null) { + trace("addFailed(" + remote + "," + failure + ")"); + failed.put(remote, failure); + } + } + + protected ISharedObjectContext getContext() { + return getSharedObject().getContext(); + } + + /* + * Implementation of IEventProcessor. These methods are entry point methods + * for BaseSharedObject method dispatch to call + */ + /* + * (non-Javadoc) + * * @see org.eclipse.ecf.core.util.IEventProcessor#acceptEvent(org.eclipse.ecf.core.util.Event) */ public boolean acceptEvent(Event event) { return true; } - /* (non-Javadoc) + /* + * (non-Javadoc) + * * @see org.eclipse.ecf.core.util.IEventProcessor#processEvent(org.eclipse.ecf.core.util.Event) */ - public Event processEvent(Event e) { - // TODO Auto-generated method stub - return null; + public Event processEvent(Event event) { + if (event instanceof ISharedObjectActivatedEvent) { + trace("handleActivated(" + event + ")"); + handleActivated((ISharedObjectActivatedEvent) event); + return event; + } else if (event instanceof ISharedObjectContainerJoinedEvent) { + trace("handleJoined(" + event + ")"); + handleJoined((ISharedObjectContainerJoinedEvent) event); + return event; + } else if (event instanceof ISharedObjectCreateResponseEvent) { + trace("handleCreateResponse(" + event + ")"); + handleCreateResponse((ISharedObjectCreateResponseEvent) event); + return event; + } else if (event instanceof ISharedObjectContainerDepartedEvent) { + trace("handleDeparted(" + event + ")"); + handleDeparted((ISharedObjectContainerDepartedEvent) event); + return event; + } else if (event instanceof CommitEvent) { + trace("localCommitted(" + event + ")"); + localCommitted(); + return event; + } + return event; + } + + protected void handleActivated(ISharedObjectActivatedEvent event) { + // No other state changes while this is going on + synchronized (lock) { + if (isPrimary()) { + // Primary + handlePrimaryActivated(event); + } else { + handleReplicaActivated(event); + } + // Notify any threads waiting on state change + lock.notifyAll(); + } + + } + + protected void handlePrimaryActivated(ISharedObjectActivatedEvent event) { + getSharedObject().replicate(null); + addParticipants(getContext().getGroupMemberIDs()); + setTransactionState(ISharedObjectContainerTransaction.VOTING); + } + + protected void handleReplicaActivated(ISharedObjectActivatedEvent event) { + try { + // Try to respond with create success message back to host + getContext().sendCreateResponse(getHomeID(), null, + BaseSharedObject.getIdentifier()); + // If above succeeds, we're now in prepared state + setTransactionState(ISharedObjectContainerTransaction.PREPARED); + } catch (Exception except) { + // If throws exception, we're doomed + traceStack("handleReplicaActivated(" + event + ")", except); + setTransactionState(ISharedObjectContainerTransaction.ABORTED); + } + } + + protected void handleJoined(ISharedObjectContainerJoinedEvent event) { + if (isPrimary()) { + // If we are currently in VOTING state, then add the new member to + // list of participants + // and send replicate message. If not in voting state, just send + // replicate message + synchronized (lock) { + ID newMember = event.getJoinedContainerID(); + getSharedObject().replicate(newMember); + if (getTransactionState() == ISharedObjectContainerTransaction.VOTING) + addParticipants(new ID[] { newMember }); + else + getSharedObject().replicate(newMember); + } + } else { + // we don't care + } + } + + protected void handleCreateResponse(ISharedObjectCreateResponseEvent event) { + if (isPrimary()) { + synchronized (lock) { + trace("handleCreateResponse(" + event + ")"); + Throwable except = event.getException(); + ID remoteID = event.getRemoteContainerID(); + long ident = event.getSequence(); + if (getTransactionState() == ISharedObjectContainerTransaction.VOTING) { + if (except == null) { + removeParticipant(remoteID); + } else { + addFailed(remoteID, except); + } + } else { + handleVotingCompletedCreateResponse(remoteID, except, ident); + } + lock.notifyAll(); + } + } else { + // We don't care + } + } + + protected void handleDeparted(ISharedObjectContainerDepartedEvent event) { + if (isPrimary()) { + ID remoteID = event.getDepartedContainerID(); + synchronized (lock) { + if (getTransactionState() == ISharedObjectContainerTransaction.VOTING) { + addFailed(remoteID, new Exception("Container " + remoteID + + " left")); + } + lock.notifyAll(); + } + } else { + // we don't care + } + } + + protected void handleVotingCompletedCreateResponse(ID fromID, Throwable e, + long identifier) { + // If remote creation was successful, simply send commit message back. + if (e == null) { + try { + getSharedObject().getContext().sendMessage( + fromID, + new CommitEvent(getSharedObject().getID(), + getSharedObject().getLocalID())); + } catch (Exception e2) { + traceStack("Exception in sendCommit to " + fromID, e2); + } + } else { + // Too late to vote no + handlePostCommitFailure(fromID, e, identifier); + } } - + + protected void handlePostCommitFailure(ID fromID, Throwable e, + long identifier) { + // Do nothing but report + trace("handlePostCommitFailure(" + fromID + "," + e + "," + identifier + + ")"); + } + + protected void sendCommit() throws SharedObjectAddAbortException { + try { + getContext().sendMessage( + null, + new CommitEvent(getSharedObject().getID(), + getSharedObject().getLocalID())); + } catch (Exception e2) { + doTMAbort(new SharedObjectAddAbortException( + "Exception sending commit message", e2, getTimeout())); + } + } + protected byte getTransactionState() { synchronized (lock) { return transactionState; } } + + protected void setTransactionState(byte state) { + synchronized (lock) { + transactionState = state; + } + } + protected void waitToCommit() throws SharedObjectAddAbortException { - synchronized (lock) { - long end = System.currentTimeMillis() + timeout; - try { - while (!votingCompleted()) { - long wait = end - System.currentTimeMillis(); - trace("waitForFinish waiting "+wait+"ms on "+sharedObject.getID()); - if (wait <= 0L) throw new SharedObjectAddAbortException("Timeout waiting for create responses",(Throwable) null,timeout); - // Wait right here - lock.wait(wait); - } - } catch (InterruptedException e) { - throw new SharedObjectAddAbortException("Wait interrupted",(Throwable) null,timeout); - } catch (SharedObjectAddAbortException e1) { - // Aborted for some reason. Clean up. - doAbort(e1); - } - // Success. Send commit to remotes and clean up before returning. - doCommit(); - } - } - protected void doAbort(Throwable t) { - // XXX TODO - } - protected void doCommit() { - // XXX TODO - } - protected boolean votingCompleted() throws SharedObjectAddAbortException - { - // The test here is is we've received any indication of failed participants in - // the transaction. If so, we throw. - trace("voting completed test"); - if (failed.size() > 0) { - trace("voting completed. Failures:"+failed); - // Abort! - throw new SharedObjectAddAbortException("Abort received",failed,timeout); - // If no problems, and the number of participants to here from is 0, then we're done - } else if (getTransactionState() == ISharedObjectContainerTransaction.VOTING && participants.size() == 0) { - // Success! - trace("votingCompleted:true"); - return true; - } - // Else continue waiting - trace("voting not completed"); - return false; - } + if (getTransactionState() == ISharedObjectContainerTransaction.COMMITTED) + return; + synchronized (lock) { + long end = System.currentTimeMillis() + getTimeout(); + try { + while (!isVotingCompleted()) { + long wait = end - System.currentTimeMillis(); + trace("waitForFinish waiting " + wait + "ms on " + + getSharedObject().getID()); + if (wait <= 0L) + throw new SharedObjectAddAbortException("Timeout", + (Throwable) null, getTimeout()); + // Wait right here + lock.wait(wait); + } + } catch (Exception except) { + // Aborted for some reason. Clean up and throw + doTMAbort(except); + } + // Success. Send commit to remotes and clean up before returning. + doTMCommit(); + } + } + + protected void doTMAbort(Throwable except) + throws SharedObjectAddAbortException { + trace("ABORTED:" + except); + // Send destroy message here so all remotes get destroyed, and we remove + // ourselves from local space as well. + getSharedObject().removeEventProcessor(this); + getSharedObject().destroySelf(); + // Set our own state variable to ABORTED + setTransactionState(ISharedObjectContainerTransaction.ABORTED); + // throw so caller gets exception and can deal with it + if (except instanceof SharedObjectAddAbortException) + throw (SharedObjectAddAbortException) except; + else + throw new SharedObjectAddAbortException("Aborted", except, + getTimeout()); + } + + protected void doTMCommit() throws SharedObjectAddAbortException { + // Only forward commit message if the participantIDs array is not yet + // null, + // and the current membership is > 0 (we're connected to something) + if (getSharedObject().getContext().getGroupMemberIDs().length > 0) { + sendCommit(); + } + // Call local committed message + localCommitted(); + } + + protected void localCommitted() { + // Set state variable to committed. + setTransactionState(ISharedObjectContainerTransaction.COMMITTED); + trace("COMMITTED!!!"); + } + + protected boolean isVotingCompleted() throws SharedObjectAddAbortException { + // The test here is is we've received any indication of failed + // participants in + // the transaction. If so, we throw. + if (failed.size() > getMinFailedToAbort()) { + // Abort! + trace("isVotingCompleted() aborting: failed > " + + getMinFailedToAbort() + ":failed=" + failed); + throw new SharedObjectAddAbortException("Abort received", failed, + getTimeout()); + // If no problems, and the number of participants to here from is 0, + // then we're done + } else if (getTransactionState() == ISharedObjectContainerTransaction.VOTING + && participants.size() == 0) { + // Success! + trace("isVotingCompleted() returning true"); + return true; + } + // Else continue waiting + trace("isVotingCompleted() returning false"); + return false; + } } |
