Skip to main content
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/CloneRepository.java482
-rw-r--r--plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/FailoverParticipant.java35
-rw-r--r--plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/RepositorySynchronizer.java (renamed from plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/CloneSynchronizer.java)112
-rw-r--r--plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/SynchronizableRepository.java507
-rw-r--r--plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/CDOServerUtil.java4
-rw-r--r--plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/RepositoryConfig.java6
6 files changed, 604 insertions, 542 deletions
diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/CloneRepository.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/CloneRepository.java
index eb500077ec..1b5c19e902 100644
--- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/CloneRepository.java
+++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/CloneRepository.java
@@ -10,78 +10,11 @@
*/
package org.eclipse.emf.cdo.internal.server.clone;
-import org.eclipse.emf.cdo.common.branch.CDOBranch;
-import org.eclipse.emf.cdo.common.branch.CDOBranchPoint;
-import org.eclipse.emf.cdo.common.branch.CDOBranchVersion;
-import org.eclipse.emf.cdo.common.commit.CDOCommitData;
-import org.eclipse.emf.cdo.common.commit.CDOCommitInfo;
-import org.eclipse.emf.cdo.common.id.CDOID;
-import org.eclipse.emf.cdo.common.id.CDOIDAndVersion;
-import org.eclipse.emf.cdo.common.id.CDOIDUtil;
-import org.eclipse.emf.cdo.common.model.CDOPackageInfo;
-import org.eclipse.emf.cdo.common.model.CDOPackageUnit;
-import org.eclipse.emf.cdo.common.revision.CDORevisionKey;
-import org.eclipse.emf.cdo.internal.server.Repository;
-import org.eclipse.emf.cdo.internal.server.TransactionCommitContext;
-import org.eclipse.emf.cdo.server.StoreThreadLocal;
-import org.eclipse.emf.cdo.spi.common.CDOReplicationContext;
-import org.eclipse.emf.cdo.spi.common.branch.InternalCDOBranch;
-import org.eclipse.emf.cdo.spi.common.branch.InternalCDOBranchManager;
-import org.eclipse.emf.cdo.spi.common.model.InternalCDOPackageUnit;
-import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevision;
-import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevisionDelta;
-import org.eclipse.emf.cdo.spi.server.InternalCommitContext;
-import org.eclipse.emf.cdo.spi.server.InternalSession;
-import org.eclipse.emf.cdo.spi.server.InternalStore;
-import org.eclipse.emf.cdo.spi.server.InternalTransaction;
-
-import org.eclipse.net4j.util.collection.IndexedList;
-import org.eclipse.net4j.util.om.monitor.Monitor;
-import org.eclipse.net4j.util.om.monitor.OMMonitor;
-import org.eclipse.net4j.util.transaction.TransactionException;
-
-import org.eclipse.emf.spi.cdo.CDOSessionProtocol;
-import org.eclipse.emf.spi.cdo.CDOSessionProtocol.CommitTransactionResult;
-import org.eclipse.emf.spi.cdo.InternalCDOSession;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
/**
- * TODO:
- * <ul>
- * <li>Handle new package units that had been committed during offline (testDisconnectAndCommitAndMergeWithNewPackages).
- * <li>Make CDOIDs of new objects temporary when merging out of temp branch.
- * <li>Provide custom branching strategies.
- * <li>Consider non-auditing masters.
- * <li>Test out-of-order commits.
- * </ul>
- *
* @author Eike Stepper
*/
-public class CloneRepository extends Repository.Default implements CDOReplicationContext
+public class CloneRepository extends SynchronizableRepository
{
- private static final String PROP_LAST_REPLICATED_BRANCH_ID = "org.eclipse.emf.cdo.server.clone.lastReplicatedBranchID"; //$NON-NLS-1$
-
- private static final String PROP_LAST_REPLICATED_COMMIT_TIME = "org.eclipse.emf.cdo.server.clone.lastReplicatedCommitTime"; //$NON-NLS-1$
-
- private static final String PROP_GRACEFULLY_SHUT_DOWN = "org.eclipse.emf.cdo.server.clone.gracefullyShutDown"; //$NON-NLS-1$
-
- private CloneSynchronizer synchronizer;
-
- private InternalSession replicatorSession;
-
- private int lastReplicatedBranchID = CDOBranch.MAIN_BRANCH_ID;
-
- private long lastReplicatedCommitTime = CDOBranchPoint.UNSPECIFIED_DATE;
-
- private int lastTransactionID;
-
public CloneRepository()
{
setState(State.OFFLINE);
@@ -92,417 +25,4 @@ public class CloneRepository extends Repository.Default implements CDOReplicatio
{
return Type.CLONE;
}
-
- public CloneSynchronizer getSynchronizer()
- {
- return synchronizer;
- }
-
- public void setSynchronizer(CloneSynchronizer synchronizer)
- {
- checkInactive();
- this.synchronizer = synchronizer;
- }
-
- @Override
- public Object[] getElements()
- {
- List<Object> list = Arrays.asList(super.getElements());
- list.add(synchronizer);
- return list.toArray();
- }
-
- public int getLastReplicatedBranchID()
- {
- return lastReplicatedBranchID;
- }
-
- public long getLastReplicatedCommitTime()
- {
- return lastReplicatedCommitTime;
- }
-
- public boolean isSqueezeCommitInfos()
- {
- return synchronizer.isSqueezeCommitInfos();
- }
-
- public void handleBranch(CDOBranch branch)
- {
- if (branch.isLocal())
- {
- return;
- }
-
- int branchID = branch.getID();
- String name = branch.getName();
-
- CDOBranchPoint base = branch.getBase();
- InternalCDOBranch baseBranch = (InternalCDOBranch)base.getBranch();
- long baseTimeStamp = base.getTimeStamp();
-
- InternalCDOBranchManager branchManager = getBranchManager();
- branchManager.createBranch(branchID, name, baseBranch, baseTimeStamp);
- lastReplicatedBranchID = branchID;
- }
-
- public void handleCommitInfo(CDOCommitInfo commitInfo)
- {
- CDOBranch branch = commitInfo.getBranch();
- if (branch.isLocal())
- {
- return;
- }
-
- long timeStamp = commitInfo.getTimeStamp();
- CDOBranchPoint head = branch.getHead();
-
- InternalTransaction transaction = replicatorSession.openTransaction(++lastTransactionID, head);
- boolean squeezed = isSqueezeCommitInfos() && lastReplicatedCommitTime != CDOBranchPoint.UNSPECIFIED_DATE;
- ReplicatorCommitContext commitContext = new ReplicatorCommitContext(transaction, commitInfo, squeezed);
- commitContext.preWrite();
- boolean success = false;
-
- try
- {
- commitContext.write(new Monitor());
- commitContext.commit(new Monitor());
-
- setLastCommitTimeStamp(timeStamp);
- lastReplicatedCommitTime = timeStamp;
- success = true;
- }
- finally
- {
- commitContext.postCommit(success);
- transaction.close();
- }
- }
-
- @Override
- public InternalCommitContext createCommitContext(InternalTransaction transaction)
- {
- CDOBranch branch = transaction.getBranch();
- if (branch.isLocal())
- {
- return createTransactionCommitContext(transaction);
- }
-
- if (getState() != State.ONLINE)
- {
- return createBranchingCommitContext(transaction, branch);
- }
-
- return createWriteThroughCommitContext(transaction);
- }
-
- protected InternalCommitContext createTransactionCommitContext(InternalTransaction transaction)
- {
- return super.createCommitContext(transaction);
- }
-
- protected InternalCommitContext createBranchingCommitContext(InternalTransaction transaction, CDOBranch branch)
- {
- long timeStamp = createCommitTimeStamp(null);
- CDOBranch offlineBranch = createOfflineBranch(branch, timeStamp - 1L);
- transaction.setBranchPoint(offlineBranch.getHead());
- return new BranchingCommitContext(transaction, timeStamp);
- }
-
- protected InternalCommitContext createWriteThroughCommitContext(InternalTransaction transaction)
- {
- return new WriteThroughCommitContext(transaction);
- }
-
- @Override
- protected void doBeforeActivate() throws Exception
- {
- super.doBeforeActivate();
- checkState(synchronizer, "synchronizer"); //$NON-NLS-1$
- }
-
- @Override
- protected void doActivate() throws Exception
- {
- super.doActivate();
-
- InternalStore store = getStore();
- if (!store.isFirstTime())
- {
- Map<String, String> map = store.getPropertyValues(Collections.singleton(PROP_GRACEFULLY_SHUT_DOWN));
- if (!map.containsKey(PROP_GRACEFULLY_SHUT_DOWN))
- {
- throw new IllegalStateException("Clone store was not gracefully shut down");
- }
-
- Set<String> names = new HashSet<String>();
- names.add(PROP_LAST_REPLICATED_BRANCH_ID);
- names.add(PROP_LAST_REPLICATED_COMMIT_TIME);
-
- map = store.getPropertyValues(names);
- lastReplicatedBranchID = Integer.valueOf(map.get(PROP_LAST_REPLICATED_BRANCH_ID));
- lastReplicatedCommitTime = Long.valueOf(map.get(PROP_LAST_REPLICATED_COMMIT_TIME));
- }
- else
- {
- store.removePropertyValues(Collections.singleton(PROP_GRACEFULLY_SHUT_DOWN));
- }
-
- replicatorSession = getSessionManager().openSession(null);
- replicatorSession.options().setPassiveUpdateEnabled(false);
-
- synchronizer.setClone(this);
- synchronizer.activate();
- }
-
- @Override
- protected void doDeactivate() throws Exception
- {
- synchronizer.deactivate();
-
- Map<String, String> map = new HashMap<String, String>();
- map.put(PROP_LAST_REPLICATED_BRANCH_ID, Integer.toString(lastReplicatedBranchID));
- map.put(PROP_LAST_REPLICATED_COMMIT_TIME, Long.toString(lastReplicatedCommitTime));
- map.put(PROP_GRACEFULLY_SHUT_DOWN, Boolean.TRUE.toString());
-
- InternalStore store = getStore();
- store.setPropertyValues(map);
-
- super.doDeactivate();
- }
-
- @Override
- protected void initRootResource()
- {
- setState(State.INITIAL);
- }
-
- protected CDOBranch createOfflineBranch(CDOBranch baseBranch, long baseTimeStamp)
- {
- try
- {
- StoreThreadLocal.setSession(replicatorSession);
- InternalCDOBranchManager branchManager = getBranchManager();
- return branchManager.createBranch(NEW_LOCAL_BRANCH,
- "Offline-" + baseTimeStamp, (InternalCDOBranch)baseBranch, baseTimeStamp); //$NON-NLS-1$
- }
- finally
- {
- StoreThreadLocal.release();
- }
- }
-
- /**
- * @author Eike Stepper
- */
- private static final class CommitContextData implements CDOCommitData
- {
- private InternalCommitContext commitContext;
-
- public CommitContextData(InternalCommitContext commitContext)
- {
- this.commitContext = commitContext;
- }
-
- public boolean isEmpty()
- {
- return false;
- }
-
- public List<CDOPackageUnit> getNewPackageUnits()
- {
- final InternalCDOPackageUnit[] newPackageUnits = commitContext.getNewPackageUnits();
- return new IndexedList<CDOPackageUnit>()
- {
- @Override
- public CDOPackageUnit get(int index)
- {
- return newPackageUnits[index];
- }
-
- @Override
- public int size()
- {
- return newPackageUnits.length;
- }
- };
- }
-
- public List<CDOIDAndVersion> getNewObjects()
- {
- final InternalCDORevision[] newObjects = commitContext.getNewObjects();
- return new IndexedList<CDOIDAndVersion>()
- {
- @Override
- public CDOIDAndVersion get(int index)
- {
- return newObjects[index];
- }
-
- @Override
- public int size()
- {
- return newObjects.length;
- }
- };
- }
-
- public List<CDORevisionKey> getChangedObjects()
- {
- final InternalCDORevisionDelta[] changedObjects = commitContext.getDirtyObjectDeltas();
- return new IndexedList<CDORevisionKey>()
- {
- @Override
- public CDORevisionKey get(int index)
- {
- return changedObjects[index];
- }
-
- @Override
- public int size()
- {
- return changedObjects.length;
- }
- };
- }
-
- public List<CDOIDAndVersion> getDetachedObjects()
- {
- final CDOID[] detachedObjects = commitContext.getDetachedObjects();
- return new IndexedList<CDOIDAndVersion>()
- {
- @Override
- public CDOIDAndVersion get(int index)
- {
- return CDOIDUtil.createIDAndVersion(detachedObjects[index], CDOBranchVersion.UNSPECIFIED_VERSION);
- }
-
- @Override
- public int size()
- {
- return detachedObjects.length;
- }
- };
- }
- }
-
- /**
- * @author Eike Stepper
- */
- private final class WriteThroughCommitContext extends TransactionCommitContext
- {
- public WriteThroughCommitContext(InternalTransaction transaction)
- {
- super(transaction);
- }
-
- @Override
- public void write(OMMonitor monitor)
- {
- // Do nothing
- }
-
- @Override
- public void commit(OMMonitor monitor)
- {
- InternalTransaction transaction = getTransaction();
-
- // Prepare commit to the master
- CDOBranch branch = transaction.getBranch();
- String userID = getUserID();
- String comment = getCommitComment();
- CDOCommitData commitData = new CommitContextData(this);
-
- // Delegate commit to the master
- CDOSessionProtocol sessionProtocol = ((InternalCDOSession)synchronizer.getMaster()).getSessionProtocol();
- CommitTransactionResult result = sessionProtocol.commitDelegation(branch, userID, comment, commitData, monitor);
-
- // Stop if commit to master failed
- String rollbackMessage = result.getRollbackMessage();
- if (rollbackMessage != null)
- {
- throw new TransactionException(rollbackMessage);
- }
-
- // Prepare data needed for commit result and commit notifications
- setTimeStamp(result.getTimeStamp());
- addMetaIDRanges(commitData.getNewPackageUnits());
- addIDMappings(result.getIDMappings());
- applyIDMappings(new Monitor());
-
- // Commit to the clone
- super.preWrite();
- super.write(new Monitor());
- super.commit(new Monitor());
-
- // Remember commit time in the clone
- setLastCommitTimeStamp(result.getTimeStamp());
- }
-
- @Override
- protected long createTimeStamp()
- {
- // Already set after commit to the master
- return getTimeStamp();
- }
-
- @Override
- protected void lockObjects() throws InterruptedException
- {
- // Do nothing
- }
-
- @Override
- protected void adjustMetaRanges()
- {
- // Do nothing
- }
-
- private void addMetaIDRanges(List<CDOPackageUnit> newPackageUnits)
- {
- for (CDOPackageUnit newPackageUnit : newPackageUnits)
- {
- for (CDOPackageInfo packageInfo : newPackageUnit.getPackageInfos())
- {
- addMetaIDRange(packageInfo.getMetaIDRange());
- }
- }
- }
-
- private void addIDMappings(Map<CDOID, CDOID> idMappings)
- {
- for (Map.Entry<CDOID, CDOID> idMapping : idMappings.entrySet())
- {
- CDOID oldID = idMapping.getKey();
- CDOID newID = idMapping.getValue();
- addIDMapping(oldID, newID);
- }
- }
- }
-
- /**
- * @author Eike Stepper
- */
- private final class BranchingCommitContext extends TransactionCommitContext
- {
- private long timeStamp;
-
- public BranchingCommitContext(InternalTransaction transaction, long timeStamp)
- {
- super(transaction);
- this.timeStamp = timeStamp;
- }
-
- @Override
- protected void lockObjects() throws InterruptedException
- {
- // Do nothing
- }
-
- @Override
- protected long createTimeStamp()
- {
- return timeStamp;
- }
- }
}
diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/FailoverParticipant.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/FailoverParticipant.java
new file mode 100644
index 0000000000..2c4a085416
--- /dev/null
+++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/FailoverParticipant.java
@@ -0,0 +1,35 @@
+/**
+ * Copyright (c) 2004 - 2010 Eike Stepper (Berlin, Germany) and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ */
+package org.eclipse.emf.cdo.internal.server.clone;
+
+/**
+ * @author Eike Stepper
+ */
+public class FailoverParticipant extends SynchronizableRepository
+{
+ private Type type;
+
+ public FailoverParticipant()
+ {
+ setState(State.OFFLINE);
+ }
+
+ @Override
+ public Type getType()
+ {
+ return type;
+ }
+
+ public void setType(Type type)
+ {
+ this.type = type;
+ }
+}
diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/CloneSynchronizer.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/RepositorySynchronizer.java
index 66cb027c45..ebfeac0819 100644
--- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/CloneSynchronizer.java
+++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/RepositorySynchronizer.java
@@ -40,11 +40,11 @@ import java.util.concurrent.PriorityBlockingQueue;
* @author Eike Stepper
* @since 3.0
*/
-public class CloneSynchronizer extends QueueRunner
+public class RepositorySynchronizer extends QueueRunner
{
public static final int DEFAULT_RETRY_INTERVAL = 3;
- private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_REPOSITORY, CloneSynchronizer.class);
+ private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_REPOSITORY, RepositorySynchronizer.class);
private static final Integer CONNECT_PRIORITY = 0;
@@ -54,59 +54,59 @@ public class CloneSynchronizer extends QueueRunner
private static final Integer COMMIT_PRIORITY = 3;
- private CloneRepository clone;
-
private int retryInterval = DEFAULT_RETRY_INTERVAL;
private Object connectLock = new Object();
- private CDOSessionConfigurationFactory masterFactory;
+ private SynchronizableRepository localRepository;
+
+ private InternalCDOSession remoteSession;
- private InternalCDOSession master;
+ private RemoteSessionListener remoteSessionListener = new RemoteSessionListener();
- private MasterListener masterListener = new MasterListener();
+ private CDOSessionConfigurationFactory remoteSessionConfigurationFactory;
private boolean squeezeCommitInfos;
- public CloneSynchronizer()
+ public RepositorySynchronizer()
{
}
- public CloneRepository getClone()
+ public int getRetryInterval()
{
- return clone;
+ return retryInterval;
}
- public void setClone(CloneRepository clone)
+ public void setRetryInterval(int retryInterval)
{
- checkInactive();
- this.clone = clone;
+ this.retryInterval = retryInterval;
}
- public int getRetryInterval()
+ public SynchronizableRepository getLocalRepository()
{
- return retryInterval;
+ return localRepository;
}
- public void setRetryInterval(int retryInterval)
+ public void setLocalRepository(SynchronizableRepository localRepository)
{
- this.retryInterval = retryInterval;
+ checkInactive();
+ this.localRepository = localRepository;
}
- public CDOSessionConfigurationFactory getMasterFactory()
+ public CDOSessionConfigurationFactory getRemoteSessionConfigurationFactory()
{
- return masterFactory;
+ return remoteSessionConfigurationFactory;
}
- public void setMasterFactory(CDOSessionConfigurationFactory masterFactory)
+ public void setRemoteSessionConfigurationFactory(CDOSessionConfigurationFactory remoteSessionConfigurationFactory)
{
- checkArg(masterFactory, "masterFactory"); //$NON-NLS-1$
- this.masterFactory = masterFactory;
+ checkArg(remoteSessionConfigurationFactory, "remoteSessionConfigurationFactory"); //$NON-NLS-1$
+ this.remoteSessionConfigurationFactory = remoteSessionConfigurationFactory;
}
- public CDOSession getMaster()
+ public CDOSession getRemoteSession()
{
- return master;
+ return remoteSession;
}
public boolean isSqueezeCommitInfos()
@@ -122,7 +122,7 @@ public class CloneSynchronizer extends QueueRunner
@Override
protected String getThreadName()
{
- return "CloneSynchronizer"; //$NON-NLS-1$
+ return "RepositorySynchronizer"; //$NON-NLS-1$
}
@Override
@@ -135,8 +135,8 @@ public class CloneSynchronizer extends QueueRunner
protected void doBeforeActivate() throws Exception
{
super.doBeforeActivate();
- checkState(masterFactory, "masterFactory"); //$NON-NLS-1$
- checkState(clone, "clone"); //$NON-NLS-1$
+ checkState(remoteSessionConfigurationFactory, "remoteSessionConfigurationFactory"); //$NON-NLS-1$
+ checkState(localRepository, "localRepository"); //$NON-NLS-1$
}
@Override
@@ -149,12 +149,12 @@ public class CloneSynchronizer extends QueueRunner
@Override
protected void doDeactivate() throws Exception
{
- if (master != null)
+ if (remoteSession != null)
{
- master.removeListener(masterListener);
- master.getBranchManager().removeListener(masterListener);
- master.close();
- master = null;
+ remoteSession.removeListener(remoteSessionListener);
+ remoteSession.getBranchManager().removeListener(remoteSessionListener);
+ remoteSession.close();
+ remoteSession = null;
}
super.doDeactivate();
@@ -163,18 +163,18 @@ public class CloneSynchronizer extends QueueRunner
private void disconnect()
{
OM.LOG.info("Disconnected from master.");
- if (clone.getRootResourceID() == null)
+ if (localRepository.getRootResourceID() == null)
{
- clone.setState(CloneRepository.State.INITIAL);
+ localRepository.setState(CloneRepository.State.INITIAL);
}
else
{
- clone.setState(CloneRepository.State.OFFLINE);
+ localRepository.setState(CloneRepository.State.OFFLINE);
}
- master.getBranchManager().removeListener(masterListener);
- master.removeListener(masterListener);
- master = null;
+ remoteSession.getBranchManager().removeListener(remoteSessionListener);
+ remoteSession.removeListener(remoteSessionListener);
+ remoteSession = null;
reconnect();
}
@@ -192,7 +192,7 @@ public class CloneSynchronizer extends QueueRunner
{
synchronized (connectLock)
{
- if (clone.getState().isConnected())
+ if (localRepository.getState().isConnected())
{
return;
}
@@ -215,7 +215,7 @@ public class CloneSynchronizer extends QueueRunner
/**
* @author Eike Stepper
*/
- private final class MasterListener implements IListener
+ private final class RemoteSessionListener implements IListener
{
public void notifyEvent(IEvent event)
{
@@ -240,7 +240,7 @@ public class CloneSynchronizer extends QueueRunner
else if (event instanceof ILifecycleEvent)
{
ILifecycleEvent e = (ILifecycleEvent)event;
- if (e.getKind() == ILifecycleEvent.Kind.DEACTIVATED && e.getSource() == master)
+ if (e.getKind() == ILifecycleEvent.Kind.DEACTIVATED && e.getSource() == remoteSession)
{
disconnect();
}
@@ -282,10 +282,10 @@ public class CloneSynchronizer extends QueueRunner
try
{
- CDOSessionConfiguration masterConfiguration = masterFactory.createSessionConfiguration();
+ CDOSessionConfiguration masterConfiguration = remoteSessionConfigurationFactory.createSessionConfiguration();
masterConfiguration.setPassiveUpdateMode(PassiveUpdateMode.ADDITIONS);
- master = (InternalCDOSession)masterConfiguration.openSession();
+ remoteSession = (InternalCDOSession)masterConfiguration.openSession();
ensureNOOPRevisionCache();
setRootResourceID();
@@ -315,8 +315,8 @@ public class CloneSynchronizer extends QueueRunner
}
OM.LOG.info("Connected to master.");
- master.addListener(masterListener);
- master.getBranchManager().addListener(masterListener);
+ remoteSession.addListener(remoteSessionListener);
+ remoteSession.getBranchManager().addListener(remoteSessionListener);
scheduleReplicate();
}
@@ -330,18 +330,18 @@ public class CloneSynchronizer extends QueueRunner
private void setRootResourceID()
{
- if (clone.getState() == CloneRepository.State.INITIAL)
+ if (localRepository.getState() == CloneRepository.State.INITIAL)
{
- CDOID rootResourceID = master.getRepositoryInfo().getRootResourceID();
- clone.setRootResourceID(rootResourceID);
- clone.setState(CloneRepository.State.OFFLINE);
+ CDOID rootResourceID = remoteSession.getRepositoryInfo().getRootResourceID();
+ localRepository.setRootResourceID(rootResourceID);
+ localRepository.setState(CloneRepository.State.OFFLINE);
}
}
private void ensureNOOPRevisionCache()
{
// Ensure that incoming revisions are not cached!
- InternalCDORevisionCache cache = master.getRevisionManager().getCache();
+ InternalCDORevisionCache cache = remoteSession.getRevisionManager().getCache();
if (!(cache instanceof NOOPRevisionCache))
{
throw new IllegalStateException("Master session does not use a NOOPRevisionCache: "
@@ -367,12 +367,12 @@ public class CloneSynchronizer extends QueueRunner
TRACER.trace("Synchronizing with master..."); //$NON-NLS-1$
}
- clone.setState(CloneRepository.State.SYNCING);
+ localRepository.setState(CloneRepository.State.SYNCING);
- CDOSessionProtocol sessionProtocol = master.getSessionProtocol();
- sessionProtocol.replicateRepository(clone);
+ CDOSessionProtocol sessionProtocol = remoteSession.getSessionProtocol();
+ sessionProtocol.replicateRepository(localRepository);
- clone.setState(CloneRepository.State.ONLINE);
+ localRepository.setState(CloneRepository.State.ONLINE);
OM.LOG.info("Synchronized with master.");
}
@@ -397,7 +397,7 @@ public class CloneSynchronizer extends QueueRunner
public void run()
{
- clone.handleBranch(branch);
+ localRepository.handleBranch(branch);
}
@Override
@@ -433,7 +433,7 @@ public class CloneSynchronizer extends QueueRunner
public void run()
{
- clone.handleCommitInfo(commitInfo);
+ localRepository.handleCommitInfo(commitInfo);
}
@Override
diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/SynchronizableRepository.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/SynchronizableRepository.java
new file mode 100644
index 0000000000..9e25c86077
--- /dev/null
+++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/SynchronizableRepository.java
@@ -0,0 +1,507 @@
+/**
+ * Copyright (c) 2004 - 2010 Eike Stepper (Berlin, Germany) and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ */
+package org.eclipse.emf.cdo.internal.server.clone;
+
+import org.eclipse.emf.cdo.common.branch.CDOBranch;
+import org.eclipse.emf.cdo.common.branch.CDOBranchPoint;
+import org.eclipse.emf.cdo.common.branch.CDOBranchVersion;
+import org.eclipse.emf.cdo.common.commit.CDOCommitData;
+import org.eclipse.emf.cdo.common.commit.CDOCommitInfo;
+import org.eclipse.emf.cdo.common.id.CDOID;
+import org.eclipse.emf.cdo.common.id.CDOIDAndVersion;
+import org.eclipse.emf.cdo.common.id.CDOIDUtil;
+import org.eclipse.emf.cdo.common.model.CDOPackageInfo;
+import org.eclipse.emf.cdo.common.model.CDOPackageUnit;
+import org.eclipse.emf.cdo.common.revision.CDORevisionKey;
+import org.eclipse.emf.cdo.internal.server.Repository;
+import org.eclipse.emf.cdo.internal.server.TransactionCommitContext;
+import org.eclipse.emf.cdo.server.StoreThreadLocal;
+import org.eclipse.emf.cdo.spi.common.CDOReplicationContext;
+import org.eclipse.emf.cdo.spi.common.branch.InternalCDOBranch;
+import org.eclipse.emf.cdo.spi.common.branch.InternalCDOBranchManager;
+import org.eclipse.emf.cdo.spi.common.model.InternalCDOPackageUnit;
+import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevision;
+import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevisionDelta;
+import org.eclipse.emf.cdo.spi.server.InternalCommitContext;
+import org.eclipse.emf.cdo.spi.server.InternalSession;
+import org.eclipse.emf.cdo.spi.server.InternalStore;
+import org.eclipse.emf.cdo.spi.server.InternalTransaction;
+
+import org.eclipse.net4j.util.collection.IndexedList;
+import org.eclipse.net4j.util.om.monitor.Monitor;
+import org.eclipse.net4j.util.om.monitor.OMMonitor;
+import org.eclipse.net4j.util.transaction.TransactionException;
+
+import org.eclipse.emf.spi.cdo.CDOSessionProtocol;
+import org.eclipse.emf.spi.cdo.CDOSessionProtocol.CommitTransactionResult;
+import org.eclipse.emf.spi.cdo.InternalCDOSession;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * TODO:
+ * <ul>
+ * <li>Handle new package units that had been committed during offline (testDisconnectAndCommitAndMergeWithNewPackages).
+ * <li>Make CDOIDs of new objects temporary when merging out of temp branch.
+ * <li>Provide custom branching strategies.
+ * <li>Consider non-auditing masters.
+ * <li>Test out-of-order commits.
+ * </ul>
+ *
+ * @author Eike Stepper
+ */
+public class SynchronizableRepository extends Repository.Default implements CDOReplicationContext
+{
+ private static final String PROP_LAST_REPLICATED_BRANCH_ID = "org.eclipse.emf.cdo.server.clone.lastReplicatedBranchID"; //$NON-NLS-1$
+
+ private static final String PROP_LAST_REPLICATED_COMMIT_TIME = "org.eclipse.emf.cdo.server.clone.lastReplicatedCommitTime"; //$NON-NLS-1$
+
+ private static final String PROP_GRACEFULLY_SHUT_DOWN = "org.eclipse.emf.cdo.server.clone.gracefullyShutDown"; //$NON-NLS-1$
+
+ private RepositorySynchronizer synchronizer;
+
+ private InternalSession replicatorSession;
+
+ private int lastReplicatedBranchID = CDOBranch.MAIN_BRANCH_ID;
+
+ private long lastReplicatedCommitTime = CDOBranchPoint.UNSPECIFIED_DATE;
+
+ private int lastTransactionID;
+
+ public SynchronizableRepository()
+ {
+ }
+
+ @Override
+ public Type getType()
+ {
+ return Type.CLONE;
+ }
+
+ public RepositorySynchronizer getSynchronizer()
+ {
+ return synchronizer;
+ }
+
+ public void setSynchronizer(RepositorySynchronizer synchronizer)
+ {
+ checkInactive();
+ this.synchronizer = synchronizer;
+ }
+
+ @Override
+ public Object[] getElements()
+ {
+ List<Object> list = Arrays.asList(super.getElements());
+ list.add(synchronizer);
+ return list.toArray();
+ }
+
+ public int getLastReplicatedBranchID()
+ {
+ return lastReplicatedBranchID;
+ }
+
+ public long getLastReplicatedCommitTime()
+ {
+ return lastReplicatedCommitTime;
+ }
+
+ public boolean isSqueezeCommitInfos()
+ {
+ return synchronizer.isSqueezeCommitInfos();
+ }
+
+ public void handleBranch(CDOBranch branch)
+ {
+ if (branch.isLocal())
+ {
+ return;
+ }
+
+ int branchID = branch.getID();
+ String name = branch.getName();
+
+ CDOBranchPoint base = branch.getBase();
+ InternalCDOBranch baseBranch = (InternalCDOBranch)base.getBranch();
+ long baseTimeStamp = base.getTimeStamp();
+
+ InternalCDOBranchManager branchManager = getBranchManager();
+ branchManager.createBranch(branchID, name, baseBranch, baseTimeStamp);
+ lastReplicatedBranchID = branchID;
+ }
+
+ public void handleCommitInfo(CDOCommitInfo commitInfo)
+ {
+ CDOBranch branch = commitInfo.getBranch();
+ if (branch.isLocal())
+ {
+ return;
+ }
+
+ long timeStamp = commitInfo.getTimeStamp();
+ CDOBranchPoint head = branch.getHead();
+
+ InternalTransaction transaction = replicatorSession.openTransaction(++lastTransactionID, head);
+ boolean squeezed = isSqueezeCommitInfos() && lastReplicatedCommitTime != CDOBranchPoint.UNSPECIFIED_DATE;
+ ReplicatorCommitContext commitContext = new ReplicatorCommitContext(transaction, commitInfo, squeezed);
+ commitContext.preWrite();
+ boolean success = false;
+
+ try
+ {
+ commitContext.write(new Monitor());
+ commitContext.commit(new Monitor());
+
+ setLastCommitTimeStamp(timeStamp);
+ lastReplicatedCommitTime = timeStamp;
+ success = true;
+ }
+ finally
+ {
+ commitContext.postCommit(success);
+ transaction.close();
+ }
+ }
+
+ @Override
+ public InternalCommitContext createCommitContext(InternalTransaction transaction)
+ {
+ CDOBranch branch = transaction.getBranch();
+ if (branch.isLocal())
+ {
+ return createTransactionCommitContext(transaction);
+ }
+
+ if (getState() != State.ONLINE)
+ {
+ return createBranchingCommitContext(transaction, branch);
+ }
+
+ return createWriteThroughCommitContext(transaction);
+ }
+
+ protected InternalCommitContext createTransactionCommitContext(InternalTransaction transaction)
+ {
+ return super.createCommitContext(transaction);
+ }
+
+ protected InternalCommitContext createBranchingCommitContext(InternalTransaction transaction, CDOBranch branch)
+ {
+ long timeStamp = createCommitTimeStamp(null);
+ CDOBranch offlineBranch = createOfflineBranch(branch, timeStamp - 1L);
+ transaction.setBranchPoint(offlineBranch.getHead());
+ return new BranchingCommitContext(transaction, timeStamp);
+ }
+
+ protected InternalCommitContext createWriteThroughCommitContext(InternalTransaction transaction)
+ {
+ return new WriteThroughCommitContext(transaction);
+ }
+
+ @Override
+ protected void doBeforeActivate() throws Exception
+ {
+ super.doBeforeActivate();
+ checkState(synchronizer, "synchronizer"); //$NON-NLS-1$
+ }
+
+ @Override
+ protected void doActivate() throws Exception
+ {
+ super.doActivate();
+
+ InternalStore store = getStore();
+ if (!store.isFirstTime())
+ {
+ Map<String, String> map = store.getPropertyValues(Collections.singleton(PROP_GRACEFULLY_SHUT_DOWN));
+ if (!map.containsKey(PROP_GRACEFULLY_SHUT_DOWN))
+ {
+ throw new IllegalStateException("Clone store was not gracefully shut down");
+ }
+
+ Set<String> names = new HashSet<String>();
+ names.add(PROP_LAST_REPLICATED_BRANCH_ID);
+ names.add(PROP_LAST_REPLICATED_COMMIT_TIME);
+
+ map = store.getPropertyValues(names);
+ lastReplicatedBranchID = Integer.valueOf(map.get(PROP_LAST_REPLICATED_BRANCH_ID));
+ lastReplicatedCommitTime = Long.valueOf(map.get(PROP_LAST_REPLICATED_COMMIT_TIME));
+ }
+ else
+ {
+ store.removePropertyValues(Collections.singleton(PROP_GRACEFULLY_SHUT_DOWN));
+ }
+
+ replicatorSession = getSessionManager().openSession(null);
+ replicatorSession.options().setPassiveUpdateEnabled(false);
+
+ synchronizer.setLocalRepository(this);
+ synchronizer.activate();
+ }
+
+ @Override
+ protected void doDeactivate() throws Exception
+ {
+ synchronizer.deactivate();
+
+ Map<String, String> map = new HashMap<String, String>();
+ map.put(PROP_LAST_REPLICATED_BRANCH_ID, Integer.toString(lastReplicatedBranchID));
+ map.put(PROP_LAST_REPLICATED_COMMIT_TIME, Long.toString(lastReplicatedCommitTime));
+ map.put(PROP_GRACEFULLY_SHUT_DOWN, Boolean.TRUE.toString());
+
+ InternalStore store = getStore();
+ store.setPropertyValues(map);
+
+ super.doDeactivate();
+ }
+
+ @Override
+ protected void initRootResource()
+ {
+ setState(State.INITIAL);
+ }
+
+ protected CDOBranch createOfflineBranch(CDOBranch baseBranch, long baseTimeStamp)
+ {
+ try
+ {
+ StoreThreadLocal.setSession(replicatorSession);
+ InternalCDOBranchManager branchManager = getBranchManager();
+ return branchManager.createBranch(NEW_LOCAL_BRANCH,
+ "Offline-" + baseTimeStamp, (InternalCDOBranch)baseBranch, baseTimeStamp); //$NON-NLS-1$
+ }
+ finally
+ {
+ StoreThreadLocal.release();
+ }
+ }
+
+ /**
+ * @author Eike Stepper
+ */
+ private static final class CommitContextData implements CDOCommitData
+ {
+ private InternalCommitContext commitContext;
+
+ public CommitContextData(InternalCommitContext commitContext)
+ {
+ this.commitContext = commitContext;
+ }
+
+ public boolean isEmpty()
+ {
+ return false;
+ }
+
+ public List<CDOPackageUnit> getNewPackageUnits()
+ {
+ final InternalCDOPackageUnit[] newPackageUnits = commitContext.getNewPackageUnits();
+ return new IndexedList<CDOPackageUnit>()
+ {
+ @Override
+ public CDOPackageUnit get(int index)
+ {
+ return newPackageUnits[index];
+ }
+
+ @Override
+ public int size()
+ {
+ return newPackageUnits.length;
+ }
+ };
+ }
+
+ public List<CDOIDAndVersion> getNewObjects()
+ {
+ final InternalCDORevision[] newObjects = commitContext.getNewObjects();
+ return new IndexedList<CDOIDAndVersion>()
+ {
+ @Override
+ public CDOIDAndVersion get(int index)
+ {
+ return newObjects[index];
+ }
+
+ @Override
+ public int size()
+ {
+ return newObjects.length;
+ }
+ };
+ }
+
+ public List<CDORevisionKey> getChangedObjects()
+ {
+ final InternalCDORevisionDelta[] changedObjects = commitContext.getDirtyObjectDeltas();
+ return new IndexedList<CDORevisionKey>()
+ {
+ @Override
+ public CDORevisionKey get(int index)
+ {
+ return changedObjects[index];
+ }
+
+ @Override
+ public int size()
+ {
+ return changedObjects.length;
+ }
+ };
+ }
+
+ public List<CDOIDAndVersion> getDetachedObjects()
+ {
+ final CDOID[] detachedObjects = commitContext.getDetachedObjects();
+ return new IndexedList<CDOIDAndVersion>()
+ {
+ @Override
+ public CDOIDAndVersion get(int index)
+ {
+ return CDOIDUtil.createIDAndVersion(detachedObjects[index], CDOBranchVersion.UNSPECIFIED_VERSION);
+ }
+
+ @Override
+ public int size()
+ {
+ return detachedObjects.length;
+ }
+ };
+ }
+ }
+
+ /**
+ * @author Eike Stepper
+ */
+ private final class WriteThroughCommitContext extends TransactionCommitContext
+ {
+ public WriteThroughCommitContext(InternalTransaction transaction)
+ {
+ super(transaction);
+ }
+
+ @Override
+ public void write(OMMonitor monitor)
+ {
+ // Do nothing
+ }
+
+ @Override
+ public void commit(OMMonitor monitor)
+ {
+ InternalTransaction transaction = getTransaction();
+
+ // Prepare commit to the master
+ CDOBranch branch = transaction.getBranch();
+ String userID = getUserID();
+ String comment = getCommitComment();
+ CDOCommitData commitData = new CommitContextData(this);
+
+ // Delegate commit to the master
+ CDOSessionProtocol sessionProtocol = ((InternalCDOSession)synchronizer.getRemoteSession()).getSessionProtocol();
+ CommitTransactionResult result = sessionProtocol.commitDelegation(branch, userID, comment, commitData, monitor);
+
+ // Stop if commit to master failed
+ String rollbackMessage = result.getRollbackMessage();
+ if (rollbackMessage != null)
+ {
+ throw new TransactionException(rollbackMessage);
+ }
+
+ // Prepare data needed for commit result and commit notifications
+ setTimeStamp(result.getTimeStamp());
+ addMetaIDRanges(commitData.getNewPackageUnits());
+ addIDMappings(result.getIDMappings());
+ applyIDMappings(new Monitor());
+
+ // Commit to the clone
+ super.preWrite();
+ super.write(new Monitor());
+ super.commit(new Monitor());
+
+ // Remember commit time in the clone
+ setLastCommitTimeStamp(result.getTimeStamp());
+ }
+
+ @Override
+ protected long createTimeStamp()
+ {
+ // Already set after commit to the master
+ return getTimeStamp();
+ }
+
+ @Override
+ protected void lockObjects() throws InterruptedException
+ {
+ // Do nothing
+ }
+
+ @Override
+ protected void adjustMetaRanges()
+ {
+ // Do nothing
+ }
+
+ private void addMetaIDRanges(List<CDOPackageUnit> newPackageUnits)
+ {
+ for (CDOPackageUnit newPackageUnit : newPackageUnits)
+ {
+ for (CDOPackageInfo packageInfo : newPackageUnit.getPackageInfos())
+ {
+ addMetaIDRange(packageInfo.getMetaIDRange());
+ }
+ }
+ }
+
+ private void addIDMappings(Map<CDOID, CDOID> idMappings)
+ {
+ for (Map.Entry<CDOID, CDOID> idMapping : idMappings.entrySet())
+ {
+ CDOID oldID = idMapping.getKey();
+ CDOID newID = idMapping.getValue();
+ addIDMapping(oldID, newID);
+ }
+ }
+ }
+
+ /**
+ * @author Eike Stepper
+ */
+ private final class BranchingCommitContext extends TransactionCommitContext
+ {
+ private long timeStamp;
+
+ public BranchingCommitContext(InternalTransaction transaction, long timeStamp)
+ {
+ super(transaction);
+ this.timeStamp = timeStamp;
+ }
+
+ @Override
+ protected void lockObjects() throws InterruptedException
+ {
+ // Do nothing
+ }
+
+ @Override
+ protected long createTimeStamp()
+ {
+ return timeStamp;
+ }
+ }
+}
diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/CDOServerUtil.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/CDOServerUtil.java
index 54b6b380cf..132defde5a 100644
--- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/CDOServerUtil.java
+++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/CDOServerUtil.java
@@ -15,7 +15,7 @@ import org.eclipse.emf.cdo.internal.server.Repository;
import org.eclipse.emf.cdo.internal.server.SessionManager;
import org.eclipse.emf.cdo.internal.server.bundle.OM;
import org.eclipse.emf.cdo.internal.server.clone.CloneRepository;
-import org.eclipse.emf.cdo.internal.server.clone.CloneSynchronizer;
+import org.eclipse.emf.cdo.internal.server.clone.RepositorySynchronizer;
import org.eclipse.emf.cdo.internal.server.embedded.EmbeddedClientSessionConfiguration;
import org.eclipse.emf.cdo.server.embedded.CDOSessionConfiguration;
import org.eclipse.emf.cdo.spi.server.InternalStore;
@@ -83,7 +83,7 @@ public final class CDOServerUtil
* @since 3.0
*/
public static IRepository createCloneRepository(String name, IStore store, Map<String, String> props,
- CloneSynchronizer synchronizer)
+ RepositorySynchronizer synchronizer)
{
CloneRepository repository = new CloneRepository();
repository.setName(name);
diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/RepositoryConfig.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/RepositoryConfig.java
index d0183673be..7316d4da96 100644
--- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/RepositoryConfig.java
+++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/RepositoryConfig.java
@@ -13,7 +13,7 @@ package org.eclipse.emf.cdo.tests.config.impl;
import org.eclipse.emf.cdo.internal.common.revision.CDORevisionManagerImpl;
import org.eclipse.emf.cdo.internal.common.revision.cache.noop.NOOPRevisionCache;
-import org.eclipse.emf.cdo.internal.server.clone.CloneSynchronizer;
+import org.eclipse.emf.cdo.internal.server.clone.RepositorySynchronizer;
import org.eclipse.emf.cdo.net4j.CDONet4jUtil;
import org.eclipse.emf.cdo.net4j.CDOSessionConfiguration;
import org.eclipse.emf.cdo.server.CDOServerUtil;
@@ -269,8 +269,8 @@ public abstract class RepositoryConfig extends Config implements IRepositoryConf
}
};
- CloneSynchronizer synchronizer = new CloneSynchronizer();
- synchronizer.setMasterFactory(masterFactory);
+ RepositorySynchronizer synchronizer = new RepositorySynchronizer();
+ synchronizer.setRemoteSessionConfigurationFactory(masterFactory);
synchronizer.setRetryInterval(1);
synchronizer.setSqueezeCommitInfos(getTestSqueezeCommitInfos());

Back to the top