diff options
-rw-r--r-- | plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/CloneRepository.java | 482 | ||||
-rw-r--r-- | plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/FailoverParticipant.java | 35 | ||||
-rw-r--r-- | plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/RepositorySynchronizer.java (renamed from plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/CloneSynchronizer.java) | 112 | ||||
-rw-r--r-- | plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/SynchronizableRepository.java | 507 | ||||
-rw-r--r-- | plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/CDOServerUtil.java | 4 | ||||
-rw-r--r-- | plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/RepositoryConfig.java | 6 |
6 files changed, 604 insertions, 542 deletions
diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/CloneRepository.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/CloneRepository.java index eb500077ec..1b5c19e902 100644 --- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/CloneRepository.java +++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/CloneRepository.java @@ -10,78 +10,11 @@ */ package org.eclipse.emf.cdo.internal.server.clone; -import org.eclipse.emf.cdo.common.branch.CDOBranch; -import org.eclipse.emf.cdo.common.branch.CDOBranchPoint; -import org.eclipse.emf.cdo.common.branch.CDOBranchVersion; -import org.eclipse.emf.cdo.common.commit.CDOCommitData; -import org.eclipse.emf.cdo.common.commit.CDOCommitInfo; -import org.eclipse.emf.cdo.common.id.CDOID; -import org.eclipse.emf.cdo.common.id.CDOIDAndVersion; -import org.eclipse.emf.cdo.common.id.CDOIDUtil; -import org.eclipse.emf.cdo.common.model.CDOPackageInfo; -import org.eclipse.emf.cdo.common.model.CDOPackageUnit; -import org.eclipse.emf.cdo.common.revision.CDORevisionKey; -import org.eclipse.emf.cdo.internal.server.Repository; -import org.eclipse.emf.cdo.internal.server.TransactionCommitContext; -import org.eclipse.emf.cdo.server.StoreThreadLocal; -import org.eclipse.emf.cdo.spi.common.CDOReplicationContext; -import org.eclipse.emf.cdo.spi.common.branch.InternalCDOBranch; -import org.eclipse.emf.cdo.spi.common.branch.InternalCDOBranchManager; -import org.eclipse.emf.cdo.spi.common.model.InternalCDOPackageUnit; -import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevision; -import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevisionDelta; -import org.eclipse.emf.cdo.spi.server.InternalCommitContext; -import org.eclipse.emf.cdo.spi.server.InternalSession; -import org.eclipse.emf.cdo.spi.server.InternalStore; -import org.eclipse.emf.cdo.spi.server.InternalTransaction; - -import org.eclipse.net4j.util.collection.IndexedList; -import org.eclipse.net4j.util.om.monitor.Monitor; -import org.eclipse.net4j.util.om.monitor.OMMonitor; -import org.eclipse.net4j.util.transaction.TransactionException; - -import org.eclipse.emf.spi.cdo.CDOSessionProtocol; -import org.eclipse.emf.spi.cdo.CDOSessionProtocol.CommitTransactionResult; -import org.eclipse.emf.spi.cdo.InternalCDOSession; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - /** - * TODO: - * <ul> - * <li>Handle new package units that had been committed during offline (testDisconnectAndCommitAndMergeWithNewPackages). - * <li>Make CDOIDs of new objects temporary when merging out of temp branch. - * <li>Provide custom branching strategies. - * <li>Consider non-auditing masters. - * <li>Test out-of-order commits. - * </ul> - * * @author Eike Stepper */ -public class CloneRepository extends Repository.Default implements CDOReplicationContext +public class CloneRepository extends SynchronizableRepository { - private static final String PROP_LAST_REPLICATED_BRANCH_ID = "org.eclipse.emf.cdo.server.clone.lastReplicatedBranchID"; //$NON-NLS-1$ - - private static final String PROP_LAST_REPLICATED_COMMIT_TIME = "org.eclipse.emf.cdo.server.clone.lastReplicatedCommitTime"; //$NON-NLS-1$ - - private static final String PROP_GRACEFULLY_SHUT_DOWN = "org.eclipse.emf.cdo.server.clone.gracefullyShutDown"; //$NON-NLS-1$ - - private CloneSynchronizer synchronizer; - - private InternalSession replicatorSession; - - private int lastReplicatedBranchID = CDOBranch.MAIN_BRANCH_ID; - - private long lastReplicatedCommitTime = CDOBranchPoint.UNSPECIFIED_DATE; - - private int lastTransactionID; - public CloneRepository() { setState(State.OFFLINE); @@ -92,417 +25,4 @@ public class CloneRepository extends Repository.Default implements CDOReplicatio { return Type.CLONE; } - - public CloneSynchronizer getSynchronizer() - { - return synchronizer; - } - - public void setSynchronizer(CloneSynchronizer synchronizer) - { - checkInactive(); - this.synchronizer = synchronizer; - } - - @Override - public Object[] getElements() - { - List<Object> list = Arrays.asList(super.getElements()); - list.add(synchronizer); - return list.toArray(); - } - - public int getLastReplicatedBranchID() - { - return lastReplicatedBranchID; - } - - public long getLastReplicatedCommitTime() - { - return lastReplicatedCommitTime; - } - - public boolean isSqueezeCommitInfos() - { - return synchronizer.isSqueezeCommitInfos(); - } - - public void handleBranch(CDOBranch branch) - { - if (branch.isLocal()) - { - return; - } - - int branchID = branch.getID(); - String name = branch.getName(); - - CDOBranchPoint base = branch.getBase(); - InternalCDOBranch baseBranch = (InternalCDOBranch)base.getBranch(); - long baseTimeStamp = base.getTimeStamp(); - - InternalCDOBranchManager branchManager = getBranchManager(); - branchManager.createBranch(branchID, name, baseBranch, baseTimeStamp); - lastReplicatedBranchID = branchID; - } - - public void handleCommitInfo(CDOCommitInfo commitInfo) - { - CDOBranch branch = commitInfo.getBranch(); - if (branch.isLocal()) - { - return; - } - - long timeStamp = commitInfo.getTimeStamp(); - CDOBranchPoint head = branch.getHead(); - - InternalTransaction transaction = replicatorSession.openTransaction(++lastTransactionID, head); - boolean squeezed = isSqueezeCommitInfos() && lastReplicatedCommitTime != CDOBranchPoint.UNSPECIFIED_DATE; - ReplicatorCommitContext commitContext = new ReplicatorCommitContext(transaction, commitInfo, squeezed); - commitContext.preWrite(); - boolean success = false; - - try - { - commitContext.write(new Monitor()); - commitContext.commit(new Monitor()); - - setLastCommitTimeStamp(timeStamp); - lastReplicatedCommitTime = timeStamp; - success = true; - } - finally - { - commitContext.postCommit(success); - transaction.close(); - } - } - - @Override - public InternalCommitContext createCommitContext(InternalTransaction transaction) - { - CDOBranch branch = transaction.getBranch(); - if (branch.isLocal()) - { - return createTransactionCommitContext(transaction); - } - - if (getState() != State.ONLINE) - { - return createBranchingCommitContext(transaction, branch); - } - - return createWriteThroughCommitContext(transaction); - } - - protected InternalCommitContext createTransactionCommitContext(InternalTransaction transaction) - { - return super.createCommitContext(transaction); - } - - protected InternalCommitContext createBranchingCommitContext(InternalTransaction transaction, CDOBranch branch) - { - long timeStamp = createCommitTimeStamp(null); - CDOBranch offlineBranch = createOfflineBranch(branch, timeStamp - 1L); - transaction.setBranchPoint(offlineBranch.getHead()); - return new BranchingCommitContext(transaction, timeStamp); - } - - protected InternalCommitContext createWriteThroughCommitContext(InternalTransaction transaction) - { - return new WriteThroughCommitContext(transaction); - } - - @Override - protected void doBeforeActivate() throws Exception - { - super.doBeforeActivate(); - checkState(synchronizer, "synchronizer"); //$NON-NLS-1$ - } - - @Override - protected void doActivate() throws Exception - { - super.doActivate(); - - InternalStore store = getStore(); - if (!store.isFirstTime()) - { - Map<String, String> map = store.getPropertyValues(Collections.singleton(PROP_GRACEFULLY_SHUT_DOWN)); - if (!map.containsKey(PROP_GRACEFULLY_SHUT_DOWN)) - { - throw new IllegalStateException("Clone store was not gracefully shut down"); - } - - Set<String> names = new HashSet<String>(); - names.add(PROP_LAST_REPLICATED_BRANCH_ID); - names.add(PROP_LAST_REPLICATED_COMMIT_TIME); - - map = store.getPropertyValues(names); - lastReplicatedBranchID = Integer.valueOf(map.get(PROP_LAST_REPLICATED_BRANCH_ID)); - lastReplicatedCommitTime = Long.valueOf(map.get(PROP_LAST_REPLICATED_COMMIT_TIME)); - } - else - { - store.removePropertyValues(Collections.singleton(PROP_GRACEFULLY_SHUT_DOWN)); - } - - replicatorSession = getSessionManager().openSession(null); - replicatorSession.options().setPassiveUpdateEnabled(false); - - synchronizer.setClone(this); - synchronizer.activate(); - } - - @Override - protected void doDeactivate() throws Exception - { - synchronizer.deactivate(); - - Map<String, String> map = new HashMap<String, String>(); - map.put(PROP_LAST_REPLICATED_BRANCH_ID, Integer.toString(lastReplicatedBranchID)); - map.put(PROP_LAST_REPLICATED_COMMIT_TIME, Long.toString(lastReplicatedCommitTime)); - map.put(PROP_GRACEFULLY_SHUT_DOWN, Boolean.TRUE.toString()); - - InternalStore store = getStore(); - store.setPropertyValues(map); - - super.doDeactivate(); - } - - @Override - protected void initRootResource() - { - setState(State.INITIAL); - } - - protected CDOBranch createOfflineBranch(CDOBranch baseBranch, long baseTimeStamp) - { - try - { - StoreThreadLocal.setSession(replicatorSession); - InternalCDOBranchManager branchManager = getBranchManager(); - return branchManager.createBranch(NEW_LOCAL_BRANCH, - "Offline-" + baseTimeStamp, (InternalCDOBranch)baseBranch, baseTimeStamp); //$NON-NLS-1$ - } - finally - { - StoreThreadLocal.release(); - } - } - - /** - * @author Eike Stepper - */ - private static final class CommitContextData implements CDOCommitData - { - private InternalCommitContext commitContext; - - public CommitContextData(InternalCommitContext commitContext) - { - this.commitContext = commitContext; - } - - public boolean isEmpty() - { - return false; - } - - public List<CDOPackageUnit> getNewPackageUnits() - { - final InternalCDOPackageUnit[] newPackageUnits = commitContext.getNewPackageUnits(); - return new IndexedList<CDOPackageUnit>() - { - @Override - public CDOPackageUnit get(int index) - { - return newPackageUnits[index]; - } - - @Override - public int size() - { - return newPackageUnits.length; - } - }; - } - - public List<CDOIDAndVersion> getNewObjects() - { - final InternalCDORevision[] newObjects = commitContext.getNewObjects(); - return new IndexedList<CDOIDAndVersion>() - { - @Override - public CDOIDAndVersion get(int index) - { - return newObjects[index]; - } - - @Override - public int size() - { - return newObjects.length; - } - }; - } - - public List<CDORevisionKey> getChangedObjects() - { - final InternalCDORevisionDelta[] changedObjects = commitContext.getDirtyObjectDeltas(); - return new IndexedList<CDORevisionKey>() - { - @Override - public CDORevisionKey get(int index) - { - return changedObjects[index]; - } - - @Override - public int size() - { - return changedObjects.length; - } - }; - } - - public List<CDOIDAndVersion> getDetachedObjects() - { - final CDOID[] detachedObjects = commitContext.getDetachedObjects(); - return new IndexedList<CDOIDAndVersion>() - { - @Override - public CDOIDAndVersion get(int index) - { - return CDOIDUtil.createIDAndVersion(detachedObjects[index], CDOBranchVersion.UNSPECIFIED_VERSION); - } - - @Override - public int size() - { - return detachedObjects.length; - } - }; - } - } - - /** - * @author Eike Stepper - */ - private final class WriteThroughCommitContext extends TransactionCommitContext - { - public WriteThroughCommitContext(InternalTransaction transaction) - { - super(transaction); - } - - @Override - public void write(OMMonitor monitor) - { - // Do nothing - } - - @Override - public void commit(OMMonitor monitor) - { - InternalTransaction transaction = getTransaction(); - - // Prepare commit to the master - CDOBranch branch = transaction.getBranch(); - String userID = getUserID(); - String comment = getCommitComment(); - CDOCommitData commitData = new CommitContextData(this); - - // Delegate commit to the master - CDOSessionProtocol sessionProtocol = ((InternalCDOSession)synchronizer.getMaster()).getSessionProtocol(); - CommitTransactionResult result = sessionProtocol.commitDelegation(branch, userID, comment, commitData, monitor); - - // Stop if commit to master failed - String rollbackMessage = result.getRollbackMessage(); - if (rollbackMessage != null) - { - throw new TransactionException(rollbackMessage); - } - - // Prepare data needed for commit result and commit notifications - setTimeStamp(result.getTimeStamp()); - addMetaIDRanges(commitData.getNewPackageUnits()); - addIDMappings(result.getIDMappings()); - applyIDMappings(new Monitor()); - - // Commit to the clone - super.preWrite(); - super.write(new Monitor()); - super.commit(new Monitor()); - - // Remember commit time in the clone - setLastCommitTimeStamp(result.getTimeStamp()); - } - - @Override - protected long createTimeStamp() - { - // Already set after commit to the master - return getTimeStamp(); - } - - @Override - protected void lockObjects() throws InterruptedException - { - // Do nothing - } - - @Override - protected void adjustMetaRanges() - { - // Do nothing - } - - private void addMetaIDRanges(List<CDOPackageUnit> newPackageUnits) - { - for (CDOPackageUnit newPackageUnit : newPackageUnits) - { - for (CDOPackageInfo packageInfo : newPackageUnit.getPackageInfos()) - { - addMetaIDRange(packageInfo.getMetaIDRange()); - } - } - } - - private void addIDMappings(Map<CDOID, CDOID> idMappings) - { - for (Map.Entry<CDOID, CDOID> idMapping : idMappings.entrySet()) - { - CDOID oldID = idMapping.getKey(); - CDOID newID = idMapping.getValue(); - addIDMapping(oldID, newID); - } - } - } - - /** - * @author Eike Stepper - */ - private final class BranchingCommitContext extends TransactionCommitContext - { - private long timeStamp; - - public BranchingCommitContext(InternalTransaction transaction, long timeStamp) - { - super(transaction); - this.timeStamp = timeStamp; - } - - @Override - protected void lockObjects() throws InterruptedException - { - // Do nothing - } - - @Override - protected long createTimeStamp() - { - return timeStamp; - } - } } diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/FailoverParticipant.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/FailoverParticipant.java new file mode 100644 index 0000000000..2c4a085416 --- /dev/null +++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/FailoverParticipant.java @@ -0,0 +1,35 @@ +/** + * Copyright (c) 2004 - 2010 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.emf.cdo.internal.server.clone; + +/** + * @author Eike Stepper + */ +public class FailoverParticipant extends SynchronizableRepository +{ + private Type type; + + public FailoverParticipant() + { + setState(State.OFFLINE); + } + + @Override + public Type getType() + { + return type; + } + + public void setType(Type type) + { + this.type = type; + } +} diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/CloneSynchronizer.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/RepositorySynchronizer.java index 66cb027c45..ebfeac0819 100644 --- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/CloneSynchronizer.java +++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/RepositorySynchronizer.java @@ -40,11 +40,11 @@ import java.util.concurrent.PriorityBlockingQueue; * @author Eike Stepper * @since 3.0 */ -public class CloneSynchronizer extends QueueRunner +public class RepositorySynchronizer extends QueueRunner { public static final int DEFAULT_RETRY_INTERVAL = 3; - private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_REPOSITORY, CloneSynchronizer.class); + private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_REPOSITORY, RepositorySynchronizer.class); private static final Integer CONNECT_PRIORITY = 0; @@ -54,59 +54,59 @@ public class CloneSynchronizer extends QueueRunner private static final Integer COMMIT_PRIORITY = 3; - private CloneRepository clone; - private int retryInterval = DEFAULT_RETRY_INTERVAL; private Object connectLock = new Object(); - private CDOSessionConfigurationFactory masterFactory; + private SynchronizableRepository localRepository; + + private InternalCDOSession remoteSession; - private InternalCDOSession master; + private RemoteSessionListener remoteSessionListener = new RemoteSessionListener(); - private MasterListener masterListener = new MasterListener(); + private CDOSessionConfigurationFactory remoteSessionConfigurationFactory; private boolean squeezeCommitInfos; - public CloneSynchronizer() + public RepositorySynchronizer() { } - public CloneRepository getClone() + public int getRetryInterval() { - return clone; + return retryInterval; } - public void setClone(CloneRepository clone) + public void setRetryInterval(int retryInterval) { - checkInactive(); - this.clone = clone; + this.retryInterval = retryInterval; } - public int getRetryInterval() + public SynchronizableRepository getLocalRepository() { - return retryInterval; + return localRepository; } - public void setRetryInterval(int retryInterval) + public void setLocalRepository(SynchronizableRepository localRepository) { - this.retryInterval = retryInterval; + checkInactive(); + this.localRepository = localRepository; } - public CDOSessionConfigurationFactory getMasterFactory() + public CDOSessionConfigurationFactory getRemoteSessionConfigurationFactory() { - return masterFactory; + return remoteSessionConfigurationFactory; } - public void setMasterFactory(CDOSessionConfigurationFactory masterFactory) + public void setRemoteSessionConfigurationFactory(CDOSessionConfigurationFactory remoteSessionConfigurationFactory) { - checkArg(masterFactory, "masterFactory"); //$NON-NLS-1$ - this.masterFactory = masterFactory; + checkArg(remoteSessionConfigurationFactory, "remoteSessionConfigurationFactory"); //$NON-NLS-1$ + this.remoteSessionConfigurationFactory = remoteSessionConfigurationFactory; } - public CDOSession getMaster() + public CDOSession getRemoteSession() { - return master; + return remoteSession; } public boolean isSqueezeCommitInfos() @@ -122,7 +122,7 @@ public class CloneSynchronizer extends QueueRunner @Override protected String getThreadName() { - return "CloneSynchronizer"; //$NON-NLS-1$ + return "RepositorySynchronizer"; //$NON-NLS-1$ } @Override @@ -135,8 +135,8 @@ public class CloneSynchronizer extends QueueRunner protected void doBeforeActivate() throws Exception { super.doBeforeActivate(); - checkState(masterFactory, "masterFactory"); //$NON-NLS-1$ - checkState(clone, "clone"); //$NON-NLS-1$ + checkState(remoteSessionConfigurationFactory, "remoteSessionConfigurationFactory"); //$NON-NLS-1$ + checkState(localRepository, "localRepository"); //$NON-NLS-1$ } @Override @@ -149,12 +149,12 @@ public class CloneSynchronizer extends QueueRunner @Override protected void doDeactivate() throws Exception { - if (master != null) + if (remoteSession != null) { - master.removeListener(masterListener); - master.getBranchManager().removeListener(masterListener); - master.close(); - master = null; + remoteSession.removeListener(remoteSessionListener); + remoteSession.getBranchManager().removeListener(remoteSessionListener); + remoteSession.close(); + remoteSession = null; } super.doDeactivate(); @@ -163,18 +163,18 @@ public class CloneSynchronizer extends QueueRunner private void disconnect() { OM.LOG.info("Disconnected from master."); - if (clone.getRootResourceID() == null) + if (localRepository.getRootResourceID() == null) { - clone.setState(CloneRepository.State.INITIAL); + localRepository.setState(CloneRepository.State.INITIAL); } else { - clone.setState(CloneRepository.State.OFFLINE); + localRepository.setState(CloneRepository.State.OFFLINE); } - master.getBranchManager().removeListener(masterListener); - master.removeListener(masterListener); - master = null; + remoteSession.getBranchManager().removeListener(remoteSessionListener); + remoteSession.removeListener(remoteSessionListener); + remoteSession = null; reconnect(); } @@ -192,7 +192,7 @@ public class CloneSynchronizer extends QueueRunner { synchronized (connectLock) { - if (clone.getState().isConnected()) + if (localRepository.getState().isConnected()) { return; } @@ -215,7 +215,7 @@ public class CloneSynchronizer extends QueueRunner /** * @author Eike Stepper */ - private final class MasterListener implements IListener + private final class RemoteSessionListener implements IListener { public void notifyEvent(IEvent event) { @@ -240,7 +240,7 @@ public class CloneSynchronizer extends QueueRunner else if (event instanceof ILifecycleEvent) { ILifecycleEvent e = (ILifecycleEvent)event; - if (e.getKind() == ILifecycleEvent.Kind.DEACTIVATED && e.getSource() == master) + if (e.getKind() == ILifecycleEvent.Kind.DEACTIVATED && e.getSource() == remoteSession) { disconnect(); } @@ -282,10 +282,10 @@ public class CloneSynchronizer extends QueueRunner try { - CDOSessionConfiguration masterConfiguration = masterFactory.createSessionConfiguration(); + CDOSessionConfiguration masterConfiguration = remoteSessionConfigurationFactory.createSessionConfiguration(); masterConfiguration.setPassiveUpdateMode(PassiveUpdateMode.ADDITIONS); - master = (InternalCDOSession)masterConfiguration.openSession(); + remoteSession = (InternalCDOSession)masterConfiguration.openSession(); ensureNOOPRevisionCache(); setRootResourceID(); @@ -315,8 +315,8 @@ public class CloneSynchronizer extends QueueRunner } OM.LOG.info("Connected to master."); - master.addListener(masterListener); - master.getBranchManager().addListener(masterListener); + remoteSession.addListener(remoteSessionListener); + remoteSession.getBranchManager().addListener(remoteSessionListener); scheduleReplicate(); } @@ -330,18 +330,18 @@ public class CloneSynchronizer extends QueueRunner private void setRootResourceID() { - if (clone.getState() == CloneRepository.State.INITIAL) + if (localRepository.getState() == CloneRepository.State.INITIAL) { - CDOID rootResourceID = master.getRepositoryInfo().getRootResourceID(); - clone.setRootResourceID(rootResourceID); - clone.setState(CloneRepository.State.OFFLINE); + CDOID rootResourceID = remoteSession.getRepositoryInfo().getRootResourceID(); + localRepository.setRootResourceID(rootResourceID); + localRepository.setState(CloneRepository.State.OFFLINE); } } private void ensureNOOPRevisionCache() { // Ensure that incoming revisions are not cached! - InternalCDORevisionCache cache = master.getRevisionManager().getCache(); + InternalCDORevisionCache cache = remoteSession.getRevisionManager().getCache(); if (!(cache instanceof NOOPRevisionCache)) { throw new IllegalStateException("Master session does not use a NOOPRevisionCache: " @@ -367,12 +367,12 @@ public class CloneSynchronizer extends QueueRunner TRACER.trace("Synchronizing with master..."); //$NON-NLS-1$ } - clone.setState(CloneRepository.State.SYNCING); + localRepository.setState(CloneRepository.State.SYNCING); - CDOSessionProtocol sessionProtocol = master.getSessionProtocol(); - sessionProtocol.replicateRepository(clone); + CDOSessionProtocol sessionProtocol = remoteSession.getSessionProtocol(); + sessionProtocol.replicateRepository(localRepository); - clone.setState(CloneRepository.State.ONLINE); + localRepository.setState(CloneRepository.State.ONLINE); OM.LOG.info("Synchronized with master."); } @@ -397,7 +397,7 @@ public class CloneSynchronizer extends QueueRunner public void run() { - clone.handleBranch(branch); + localRepository.handleBranch(branch); } @Override @@ -433,7 +433,7 @@ public class CloneSynchronizer extends QueueRunner public void run() { - clone.handleCommitInfo(commitInfo); + localRepository.handleCommitInfo(commitInfo); } @Override diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/SynchronizableRepository.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/SynchronizableRepository.java new file mode 100644 index 0000000000..9e25c86077 --- /dev/null +++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/clone/SynchronizableRepository.java @@ -0,0 +1,507 @@ +/** + * Copyright (c) 2004 - 2010 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.emf.cdo.internal.server.clone; + +import org.eclipse.emf.cdo.common.branch.CDOBranch; +import org.eclipse.emf.cdo.common.branch.CDOBranchPoint; +import org.eclipse.emf.cdo.common.branch.CDOBranchVersion; +import org.eclipse.emf.cdo.common.commit.CDOCommitData; +import org.eclipse.emf.cdo.common.commit.CDOCommitInfo; +import org.eclipse.emf.cdo.common.id.CDOID; +import org.eclipse.emf.cdo.common.id.CDOIDAndVersion; +import org.eclipse.emf.cdo.common.id.CDOIDUtil; +import org.eclipse.emf.cdo.common.model.CDOPackageInfo; +import org.eclipse.emf.cdo.common.model.CDOPackageUnit; +import org.eclipse.emf.cdo.common.revision.CDORevisionKey; +import org.eclipse.emf.cdo.internal.server.Repository; +import org.eclipse.emf.cdo.internal.server.TransactionCommitContext; +import org.eclipse.emf.cdo.server.StoreThreadLocal; +import org.eclipse.emf.cdo.spi.common.CDOReplicationContext; +import org.eclipse.emf.cdo.spi.common.branch.InternalCDOBranch; +import org.eclipse.emf.cdo.spi.common.branch.InternalCDOBranchManager; +import org.eclipse.emf.cdo.spi.common.model.InternalCDOPackageUnit; +import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevision; +import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevisionDelta; +import org.eclipse.emf.cdo.spi.server.InternalCommitContext; +import org.eclipse.emf.cdo.spi.server.InternalSession; +import org.eclipse.emf.cdo.spi.server.InternalStore; +import org.eclipse.emf.cdo.spi.server.InternalTransaction; + +import org.eclipse.net4j.util.collection.IndexedList; +import org.eclipse.net4j.util.om.monitor.Monitor; +import org.eclipse.net4j.util.om.monitor.OMMonitor; +import org.eclipse.net4j.util.transaction.TransactionException; + +import org.eclipse.emf.spi.cdo.CDOSessionProtocol; +import org.eclipse.emf.spi.cdo.CDOSessionProtocol.CommitTransactionResult; +import org.eclipse.emf.spi.cdo.InternalCDOSession; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * TODO: + * <ul> + * <li>Handle new package units that had been committed during offline (testDisconnectAndCommitAndMergeWithNewPackages). + * <li>Make CDOIDs of new objects temporary when merging out of temp branch. + * <li>Provide custom branching strategies. + * <li>Consider non-auditing masters. + * <li>Test out-of-order commits. + * </ul> + * + * @author Eike Stepper + */ +public class SynchronizableRepository extends Repository.Default implements CDOReplicationContext +{ + private static final String PROP_LAST_REPLICATED_BRANCH_ID = "org.eclipse.emf.cdo.server.clone.lastReplicatedBranchID"; //$NON-NLS-1$ + + private static final String PROP_LAST_REPLICATED_COMMIT_TIME = "org.eclipse.emf.cdo.server.clone.lastReplicatedCommitTime"; //$NON-NLS-1$ + + private static final String PROP_GRACEFULLY_SHUT_DOWN = "org.eclipse.emf.cdo.server.clone.gracefullyShutDown"; //$NON-NLS-1$ + + private RepositorySynchronizer synchronizer; + + private InternalSession replicatorSession; + + private int lastReplicatedBranchID = CDOBranch.MAIN_BRANCH_ID; + + private long lastReplicatedCommitTime = CDOBranchPoint.UNSPECIFIED_DATE; + + private int lastTransactionID; + + public SynchronizableRepository() + { + } + + @Override + public Type getType() + { + return Type.CLONE; + } + + public RepositorySynchronizer getSynchronizer() + { + return synchronizer; + } + + public void setSynchronizer(RepositorySynchronizer synchronizer) + { + checkInactive(); + this.synchronizer = synchronizer; + } + + @Override + public Object[] getElements() + { + List<Object> list = Arrays.asList(super.getElements()); + list.add(synchronizer); + return list.toArray(); + } + + public int getLastReplicatedBranchID() + { + return lastReplicatedBranchID; + } + + public long getLastReplicatedCommitTime() + { + return lastReplicatedCommitTime; + } + + public boolean isSqueezeCommitInfos() + { + return synchronizer.isSqueezeCommitInfos(); + } + + public void handleBranch(CDOBranch branch) + { + if (branch.isLocal()) + { + return; + } + + int branchID = branch.getID(); + String name = branch.getName(); + + CDOBranchPoint base = branch.getBase(); + InternalCDOBranch baseBranch = (InternalCDOBranch)base.getBranch(); + long baseTimeStamp = base.getTimeStamp(); + + InternalCDOBranchManager branchManager = getBranchManager(); + branchManager.createBranch(branchID, name, baseBranch, baseTimeStamp); + lastReplicatedBranchID = branchID; + } + + public void handleCommitInfo(CDOCommitInfo commitInfo) + { + CDOBranch branch = commitInfo.getBranch(); + if (branch.isLocal()) + { + return; + } + + long timeStamp = commitInfo.getTimeStamp(); + CDOBranchPoint head = branch.getHead(); + + InternalTransaction transaction = replicatorSession.openTransaction(++lastTransactionID, head); + boolean squeezed = isSqueezeCommitInfos() && lastReplicatedCommitTime != CDOBranchPoint.UNSPECIFIED_DATE; + ReplicatorCommitContext commitContext = new ReplicatorCommitContext(transaction, commitInfo, squeezed); + commitContext.preWrite(); + boolean success = false; + + try + { + commitContext.write(new Monitor()); + commitContext.commit(new Monitor()); + + setLastCommitTimeStamp(timeStamp); + lastReplicatedCommitTime = timeStamp; + success = true; + } + finally + { + commitContext.postCommit(success); + transaction.close(); + } + } + + @Override + public InternalCommitContext createCommitContext(InternalTransaction transaction) + { + CDOBranch branch = transaction.getBranch(); + if (branch.isLocal()) + { + return createTransactionCommitContext(transaction); + } + + if (getState() != State.ONLINE) + { + return createBranchingCommitContext(transaction, branch); + } + + return createWriteThroughCommitContext(transaction); + } + + protected InternalCommitContext createTransactionCommitContext(InternalTransaction transaction) + { + return super.createCommitContext(transaction); + } + + protected InternalCommitContext createBranchingCommitContext(InternalTransaction transaction, CDOBranch branch) + { + long timeStamp = createCommitTimeStamp(null); + CDOBranch offlineBranch = createOfflineBranch(branch, timeStamp - 1L); + transaction.setBranchPoint(offlineBranch.getHead()); + return new BranchingCommitContext(transaction, timeStamp); + } + + protected InternalCommitContext createWriteThroughCommitContext(InternalTransaction transaction) + { + return new WriteThroughCommitContext(transaction); + } + + @Override + protected void doBeforeActivate() throws Exception + { + super.doBeforeActivate(); + checkState(synchronizer, "synchronizer"); //$NON-NLS-1$ + } + + @Override + protected void doActivate() throws Exception + { + super.doActivate(); + + InternalStore store = getStore(); + if (!store.isFirstTime()) + { + Map<String, String> map = store.getPropertyValues(Collections.singleton(PROP_GRACEFULLY_SHUT_DOWN)); + if (!map.containsKey(PROP_GRACEFULLY_SHUT_DOWN)) + { + throw new IllegalStateException("Clone store was not gracefully shut down"); + } + + Set<String> names = new HashSet<String>(); + names.add(PROP_LAST_REPLICATED_BRANCH_ID); + names.add(PROP_LAST_REPLICATED_COMMIT_TIME); + + map = store.getPropertyValues(names); + lastReplicatedBranchID = Integer.valueOf(map.get(PROP_LAST_REPLICATED_BRANCH_ID)); + lastReplicatedCommitTime = Long.valueOf(map.get(PROP_LAST_REPLICATED_COMMIT_TIME)); + } + else + { + store.removePropertyValues(Collections.singleton(PROP_GRACEFULLY_SHUT_DOWN)); + } + + replicatorSession = getSessionManager().openSession(null); + replicatorSession.options().setPassiveUpdateEnabled(false); + + synchronizer.setLocalRepository(this); + synchronizer.activate(); + } + + @Override + protected void doDeactivate() throws Exception + { + synchronizer.deactivate(); + + Map<String, String> map = new HashMap<String, String>(); + map.put(PROP_LAST_REPLICATED_BRANCH_ID, Integer.toString(lastReplicatedBranchID)); + map.put(PROP_LAST_REPLICATED_COMMIT_TIME, Long.toString(lastReplicatedCommitTime)); + map.put(PROP_GRACEFULLY_SHUT_DOWN, Boolean.TRUE.toString()); + + InternalStore store = getStore(); + store.setPropertyValues(map); + + super.doDeactivate(); + } + + @Override + protected void initRootResource() + { + setState(State.INITIAL); + } + + protected CDOBranch createOfflineBranch(CDOBranch baseBranch, long baseTimeStamp) + { + try + { + StoreThreadLocal.setSession(replicatorSession); + InternalCDOBranchManager branchManager = getBranchManager(); + return branchManager.createBranch(NEW_LOCAL_BRANCH, + "Offline-" + baseTimeStamp, (InternalCDOBranch)baseBranch, baseTimeStamp); //$NON-NLS-1$ + } + finally + { + StoreThreadLocal.release(); + } + } + + /** + * @author Eike Stepper + */ + private static final class CommitContextData implements CDOCommitData + { + private InternalCommitContext commitContext; + + public CommitContextData(InternalCommitContext commitContext) + { + this.commitContext = commitContext; + } + + public boolean isEmpty() + { + return false; + } + + public List<CDOPackageUnit> getNewPackageUnits() + { + final InternalCDOPackageUnit[] newPackageUnits = commitContext.getNewPackageUnits(); + return new IndexedList<CDOPackageUnit>() + { + @Override + public CDOPackageUnit get(int index) + { + return newPackageUnits[index]; + } + + @Override + public int size() + { + return newPackageUnits.length; + } + }; + } + + public List<CDOIDAndVersion> getNewObjects() + { + final InternalCDORevision[] newObjects = commitContext.getNewObjects(); + return new IndexedList<CDOIDAndVersion>() + { + @Override + public CDOIDAndVersion get(int index) + { + return newObjects[index]; + } + + @Override + public int size() + { + return newObjects.length; + } + }; + } + + public List<CDORevisionKey> getChangedObjects() + { + final InternalCDORevisionDelta[] changedObjects = commitContext.getDirtyObjectDeltas(); + return new IndexedList<CDORevisionKey>() + { + @Override + public CDORevisionKey get(int index) + { + return changedObjects[index]; + } + + @Override + public int size() + { + return changedObjects.length; + } + }; + } + + public List<CDOIDAndVersion> getDetachedObjects() + { + final CDOID[] detachedObjects = commitContext.getDetachedObjects(); + return new IndexedList<CDOIDAndVersion>() + { + @Override + public CDOIDAndVersion get(int index) + { + return CDOIDUtil.createIDAndVersion(detachedObjects[index], CDOBranchVersion.UNSPECIFIED_VERSION); + } + + @Override + public int size() + { + return detachedObjects.length; + } + }; + } + } + + /** + * @author Eike Stepper + */ + private final class WriteThroughCommitContext extends TransactionCommitContext + { + public WriteThroughCommitContext(InternalTransaction transaction) + { + super(transaction); + } + + @Override + public void write(OMMonitor monitor) + { + // Do nothing + } + + @Override + public void commit(OMMonitor monitor) + { + InternalTransaction transaction = getTransaction(); + + // Prepare commit to the master + CDOBranch branch = transaction.getBranch(); + String userID = getUserID(); + String comment = getCommitComment(); + CDOCommitData commitData = new CommitContextData(this); + + // Delegate commit to the master + CDOSessionProtocol sessionProtocol = ((InternalCDOSession)synchronizer.getRemoteSession()).getSessionProtocol(); + CommitTransactionResult result = sessionProtocol.commitDelegation(branch, userID, comment, commitData, monitor); + + // Stop if commit to master failed + String rollbackMessage = result.getRollbackMessage(); + if (rollbackMessage != null) + { + throw new TransactionException(rollbackMessage); + } + + // Prepare data needed for commit result and commit notifications + setTimeStamp(result.getTimeStamp()); + addMetaIDRanges(commitData.getNewPackageUnits()); + addIDMappings(result.getIDMappings()); + applyIDMappings(new Monitor()); + + // Commit to the clone + super.preWrite(); + super.write(new Monitor()); + super.commit(new Monitor()); + + // Remember commit time in the clone + setLastCommitTimeStamp(result.getTimeStamp()); + } + + @Override + protected long createTimeStamp() + { + // Already set after commit to the master + return getTimeStamp(); + } + + @Override + protected void lockObjects() throws InterruptedException + { + // Do nothing + } + + @Override + protected void adjustMetaRanges() + { + // Do nothing + } + + private void addMetaIDRanges(List<CDOPackageUnit> newPackageUnits) + { + for (CDOPackageUnit newPackageUnit : newPackageUnits) + { + for (CDOPackageInfo packageInfo : newPackageUnit.getPackageInfos()) + { + addMetaIDRange(packageInfo.getMetaIDRange()); + } + } + } + + private void addIDMappings(Map<CDOID, CDOID> idMappings) + { + for (Map.Entry<CDOID, CDOID> idMapping : idMappings.entrySet()) + { + CDOID oldID = idMapping.getKey(); + CDOID newID = idMapping.getValue(); + addIDMapping(oldID, newID); + } + } + } + + /** + * @author Eike Stepper + */ + private final class BranchingCommitContext extends TransactionCommitContext + { + private long timeStamp; + + public BranchingCommitContext(InternalTransaction transaction, long timeStamp) + { + super(transaction); + this.timeStamp = timeStamp; + } + + @Override + protected void lockObjects() throws InterruptedException + { + // Do nothing + } + + @Override + protected long createTimeStamp() + { + return timeStamp; + } + } +} diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/CDOServerUtil.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/CDOServerUtil.java index 54b6b380cf..132defde5a 100644 --- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/CDOServerUtil.java +++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/CDOServerUtil.java @@ -15,7 +15,7 @@ import org.eclipse.emf.cdo.internal.server.Repository; import org.eclipse.emf.cdo.internal.server.SessionManager; import org.eclipse.emf.cdo.internal.server.bundle.OM; import org.eclipse.emf.cdo.internal.server.clone.CloneRepository; -import org.eclipse.emf.cdo.internal.server.clone.CloneSynchronizer; +import org.eclipse.emf.cdo.internal.server.clone.RepositorySynchronizer; import org.eclipse.emf.cdo.internal.server.embedded.EmbeddedClientSessionConfiguration; import org.eclipse.emf.cdo.server.embedded.CDOSessionConfiguration; import org.eclipse.emf.cdo.spi.server.InternalStore; @@ -83,7 +83,7 @@ public final class CDOServerUtil * @since 3.0 */ public static IRepository createCloneRepository(String name, IStore store, Map<String, String> props, - CloneSynchronizer synchronizer) + RepositorySynchronizer synchronizer) { CloneRepository repository = new CloneRepository(); repository.setName(name); diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/RepositoryConfig.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/RepositoryConfig.java index d0183673be..7316d4da96 100644 --- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/RepositoryConfig.java +++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/config/impl/RepositoryConfig.java @@ -13,7 +13,7 @@ package org.eclipse.emf.cdo.tests.config.impl; import org.eclipse.emf.cdo.internal.common.revision.CDORevisionManagerImpl; import org.eclipse.emf.cdo.internal.common.revision.cache.noop.NOOPRevisionCache; -import org.eclipse.emf.cdo.internal.server.clone.CloneSynchronizer; +import org.eclipse.emf.cdo.internal.server.clone.RepositorySynchronizer; import org.eclipse.emf.cdo.net4j.CDONet4jUtil; import org.eclipse.emf.cdo.net4j.CDOSessionConfiguration; import org.eclipse.emf.cdo.server.CDOServerUtil; @@ -269,8 +269,8 @@ public abstract class RepositoryConfig extends Config implements IRepositoryConf } }; - CloneSynchronizer synchronizer = new CloneSynchronizer(); - synchronizer.setMasterFactory(masterFactory); + RepositorySynchronizer synchronizer = new RepositorySynchronizer(); + synchronizer.setRemoteSessionConfigurationFactory(masterFactory); synchronizer.setRetryInterval(1); synchronizer.setSqueezeCommitInfos(getTestSqueezeCommitInfos()); |