/** * Copyright (c) 2004 - 2011 Eike Stepper (Berlin, Germany) and others. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * Eike Stepper - initial API and implementation */ package org.eclipse.emf.cdo.internal.server.syncing; import org.eclipse.emf.cdo.common.CDOCommonRepository; import org.eclipse.emf.cdo.common.CDOCommonSession.Options.LockNotificationMode; import org.eclipse.emf.cdo.common.branch.CDOBranch; import org.eclipse.emf.cdo.common.branch.CDOBranchPoint; import org.eclipse.emf.cdo.common.commit.CDOChangeSetData; import org.eclipse.emf.cdo.common.commit.CDOCommitData; import org.eclipse.emf.cdo.common.commit.CDOCommitInfo; import org.eclipse.emf.cdo.common.commit.CDOCommitInfoHandler; import org.eclipse.emf.cdo.common.id.CDOID; import org.eclipse.emf.cdo.common.lob.CDOLob; import org.eclipse.emf.cdo.common.lock.CDOLockChangeInfo; import org.eclipse.emf.cdo.common.lock.CDOLockChangeInfo.Operation; import org.eclipse.emf.cdo.common.lock.CDOLockOwner; import org.eclipse.emf.cdo.common.lock.CDOLockState; import org.eclipse.emf.cdo.common.lock.IDurableLockingManager.LockArea; import org.eclipse.emf.cdo.common.model.CDOPackageUnit; import org.eclipse.emf.cdo.common.protocol.CDODataInput; import org.eclipse.emf.cdo.common.revision.CDOIDAndVersion; import org.eclipse.emf.cdo.common.revision.CDORevision; import org.eclipse.emf.cdo.common.revision.CDORevisionKey; import org.eclipse.emf.cdo.common.util.CDOCommonUtil; import org.eclipse.emf.cdo.common.util.CDOException; import org.eclipse.emf.cdo.internal.common.commit.CDOCommitDataImpl; import org.eclipse.emf.cdo.internal.server.Repository; import org.eclipse.emf.cdo.internal.server.TransactionCommitContext; import org.eclipse.emf.cdo.internal.server.syncing.OfflineClone.CommitContextData; import org.eclipse.emf.cdo.server.IStoreAccessor; import org.eclipse.emf.cdo.server.StoreThreadLocal; import org.eclipse.emf.cdo.spi.common.branch.InternalCDOBranch; import org.eclipse.emf.cdo.spi.common.branch.InternalCDOBranchManager; import org.eclipse.emf.cdo.spi.common.commit.InternalCDOCommitInfoManager; import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevisionCache; import org.eclipse.emf.cdo.spi.server.InternalCommitContext; import org.eclipse.emf.cdo.spi.server.InternalLockManager; import org.eclipse.emf.cdo.spi.server.InternalRepository; import org.eclipse.emf.cdo.spi.server.InternalRepositorySynchronizer; import org.eclipse.emf.cdo.spi.server.InternalSession; import org.eclipse.emf.cdo.spi.server.InternalSessionManager; import org.eclipse.emf.cdo.spi.server.InternalStore; import org.eclipse.emf.cdo.spi.server.InternalSynchronizableRepository; import org.eclipse.emf.cdo.spi.server.InternalTransaction; import org.eclipse.emf.cdo.spi.server.InternalView; import org.eclipse.emf.cdo.spi.server.SyncingUtil; import org.eclipse.net4j.util.WrappedException; import org.eclipse.net4j.util.concurrent.IRWLockManager.LockType; import org.eclipse.net4j.util.lifecycle.LifecycleUtil; import org.eclipse.net4j.util.om.monitor.Monitor; import org.eclipse.net4j.util.om.monitor.OMMonitor; import org.eclipse.net4j.util.transaction.TransactionException; import org.eclipse.emf.spi.cdo.CDOSessionProtocol; import org.eclipse.emf.spi.cdo.CDOSessionProtocol.CommitTransactionResult; import org.eclipse.emf.spi.cdo.CDOSessionProtocol.LockObjectsResult; import org.eclipse.emf.spi.cdo.CDOSessionProtocol.UnlockObjectsResult; import org.eclipse.emf.spi.cdo.InternalCDOSession; import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; /** * TODO: * * * @author Eike Stepper */ public abstract class SynchronizableRepository extends Repository.Default implements InternalSynchronizableRepository { protected static final CDOCommonRepository.Type MASTER = CDOCommonRepository.Type.MASTER; protected static final CDOCommonRepository.Type BACKUP = CDOCommonRepository.Type.BACKUP; protected static final CDOCommonRepository.Type CLONE = CDOCommonRepository.Type.CLONE; protected static final CDOCommonRepository.State INITIAL = CDOCommonRepository.State.INITIAL; protected static final CDOCommonRepository.State OFFLINE = CDOCommonRepository.State.OFFLINE; protected static final CDOCommonRepository.State SYNCING = CDOCommonRepository.State.SYNCING; protected static final CDOCommonRepository.State ONLINE = CDOCommonRepository.State.ONLINE; private static final String PROP_LAST_REPLICATED_BRANCH_ID = "org.eclipse.emf.cdo.server.lastReplicatedBranchID"; //$NON-NLS-1$ private static final String PROP_LAST_REPLICATED_COMMIT_TIME = "org.eclipse.emf.cdo.server.lastReplicatedCommitTime"; //$NON-NLS-1$ private static final String PROP_GRACEFULLY_SHUT_DOWN = "org.eclipse.emf.cdo.server.gracefullyShutDown"; //$NON-NLS-1$ private InternalRepositorySynchronizer synchronizer; private InternalSession replicatorSession; private int lastReplicatedBranchID = CDOBranch.MAIN_BRANCH_ID; private long lastReplicatedCommitTime = CDOBranchPoint.UNSPECIFIED_DATE; private int lastTransactionID; private ReadLock writeThroughCommitLock; private WriteLock handleCommitInfoLock; public SynchronizableRepository() { ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); writeThroughCommitLock = rwLock.readLock(); handleCommitInfoLock = rwLock.writeLock(); } public InternalRepositorySynchronizer getSynchronizer() { return synchronizer; } public void setSynchronizer(InternalRepositorySynchronizer synchronizer) { checkInactive(); this.synchronizer = synchronizer; } public InternalSession getReplicatorSession() { return replicatorSession; } @Override public Object[] getElements() { List list = Arrays.asList(super.getElements()); list.add(synchronizer); return list.toArray(); } public int getLastReplicatedBranchID() { return lastReplicatedBranchID; } public void setLastReplicatedBranchID(int lastReplicatedBranchID) { if (this.lastReplicatedBranchID < lastReplicatedBranchID) { this.lastReplicatedBranchID = lastReplicatedBranchID; } } public long getLastReplicatedCommitTime() { return lastReplicatedCommitTime; } public void setLastReplicatedCommitTime(long lastReplicatedCommitTime) { if (this.lastReplicatedCommitTime < lastReplicatedCommitTime) { this.lastReplicatedCommitTime = lastReplicatedCommitTime; } } public String[] getLockAreaIDs() { try { StoreThreadLocal.setSession(replicatorSession); final List areaIDs = new LinkedList(); getLockManager().getLockAreas(null, new LockArea.Handler() { public boolean handleLockArea(LockArea area) { areaIDs.add(area.getDurableLockingID()); return true; } }); return areaIDs.toArray(new String[areaIDs.size()]); } finally { StoreThreadLocal.release(); } } public void handleBranch(CDOBranch branch) { if (branch.isLocal()) { return; } int branchID = branch.getID(); String name = branch.getName(); CDOBranchPoint base = branch.getBase(); InternalCDOBranch baseBranch = (InternalCDOBranch)base.getBranch(); long baseTimeStamp = base.getTimeStamp(); InternalCDOBranchManager branchManager = getBranchManager(); branchManager.createBranch(branchID, name, baseBranch, baseTimeStamp); setLastReplicatedBranchID(branchID); } public void handleCommitInfo(CDOCommitInfo commitInfo) { CDOBranch branch = commitInfo.getBranch(); if (branch.isLocal()) { return; } long timeStamp = commitInfo.getTimeStamp(); CDOBranchPoint head = branch.getHead(); InternalTransaction transaction = replicatorSession.openTransaction(++lastTransactionID, head); ReplicatorCommitContext commitContext = new ReplicatorCommitContext(transaction, commitInfo); commitContext.preWrite(); boolean success = false; try { handleCommitInfoLock.lock(); commitContext.write(new Monitor()); commitContext.commit(new Monitor()); setLastCommitTimeStamp(timeStamp); setLastReplicatedCommitTime(timeStamp); success = true; } finally { handleCommitInfoLock.unlock(); commitContext.postCommit(success); transaction.close(); } } public void handleLockChangeInfo(CDOLockChangeInfo lockChangeInfo) { CDOLockOwner owner = lockChangeInfo.getLockOwner(); String durableLockingID = owner.getDurableLockingID(); CDOBranch viewedBranch = lockChangeInfo.getBranch(); InternalLockManager lockManager = getLockManager(); LockType lockType = lockChangeInfo.getLockType(); InternalView view = null; try { view = SyncingUtil.openViewWithLockArea(replicatorSession, lockManager, viewedBranch, durableLockingID); List lockables = new LinkedList(); for (CDOLockState lockState : lockChangeInfo.getLockStates()) { lockables.add(lockState.getLockedObject()); } if (lockChangeInfo.getOperation() == Operation.LOCK) { // If we can't lock immediately, there's a conflict, which means we're in big // trouble: somehow locks were obtained on the clone but not on the master. What to do? // TODO (CD) Consider this problem further // long timeout = 0; super.lock(view, lockType, lockables, null, timeout); } else if (lockChangeInfo.getOperation() == Operation.UNLOCK) { super.doUnlock(view, lockType, lockables); } else { throw new IllegalStateException("Unexpected: " + lockChangeInfo.getOperation()); } } finally { LifecycleUtil.deactivate(view); } } public boolean handleLockArea(LockArea area) { try { StoreThreadLocal.setSession(replicatorSession); getLockManager().updateLockArea(area); // TODO (CD) getSessionManager().sendLockNotification(sender, lockChangeInfo); return true; } finally { StoreThreadLocal.release(); } } public void replicateRaw(CDODataInput in, OMMonitor monitor) throws IOException { try { int fromBranchID = lastReplicatedBranchID + 1; int toBranchID = in.readInt(); long fromCommitTime = lastReplicatedCommitTime + 1L; long toCommitTime = in.readLong(); StoreThreadLocal.setSession(replicatorSession); IStoreAccessor.Raw accessor = (IStoreAccessor.Raw)StoreThreadLocal.getAccessor(); accessor.rawImport(in, fromBranchID, toBranchID, fromCommitTime, toCommitTime, monitor); replicateRawReviseRevisions(); replicateRawNotifyClients(lastReplicatedCommitTime, toCommitTime); setLastReplicatedBranchID(toBranchID); setLastReplicatedCommitTime(toCommitTime); setLastCommitTimeStamp(toCommitTime); } finally { StoreThreadLocal.release(); } } public void goOnline() { if (getState() == OFFLINE) { LifecycleUtil.activate(synchronizer); // Do not set the state to ONLINE yet; the synchronizer will set it to SYNCING first, // and then to ONLINE after a succesful replication. } } public void goOffline() { if (getState() != OFFLINE) { LifecycleUtil.deactivate(synchronizer); setState(OFFLINE); } } private void replicateRawReviseRevisions() { InternalCDORevisionCache cache = getRevisionManager().getCache(); for (CDORevision revision : cache.getCurrentRevisions()) { cache.removeRevision(revision.getID(), revision); } } private void replicateRawNotifyClients(long fromCommitTime, long toCommitTime) { InternalCDOCommitInfoManager manager = getCommitInfoManager(); InternalSessionManager sessionManager = getSessionManager(); Map branches = replicateRawGetBranches(fromCommitTime, toCommitTime); for (Entry entry : branches.entrySet()) { CDOBranch branch = entry.getKey(); TimeRange range = entry.getValue(); fromCommitTime = range.getTime1(); toCommitTime = range.getTime2(); CDOBranchPoint startPoint = branch.getPoint(fromCommitTime); CDOBranchPoint endPoint = branch.getPoint(toCommitTime); CDOChangeSetData changeSet = getChangeSet(startPoint, endPoint); List newPackages = Collections.emptyList(); // TODO Notify about new packages List newObjects = changeSet.getNewObjects(); List changedObjects = changeSet.getChangedObjects(); List detachedObjects = changeSet.getDetachedObjects(); CDOCommitData data = new CDOCommitDataImpl(newPackages, newObjects, changedObjects, detachedObjects); String comment = ""; //$NON-NLS-1$ CDOCommitInfo commitInfo = manager.createCommitInfo(branch, toCommitTime, fromCommitTime, SYSTEM_USER_ID, comment, data); sessionManager.sendCommitNotification(replicatorSession, commitInfo); } } private Map replicateRawGetBranches(long fromCommitTime, long toCommitTime) { final Map branches = new HashMap(); CDOCommitInfoHandler handler = new CDOCommitInfoHandler() { public void handleCommitInfo(CDOCommitInfo commitInfo) { CDOBranch branch = commitInfo.getBranch(); long timeStamp = commitInfo.getTimeStamp(); TimeRange range = branches.get(branch); if (range == null) { branches.put(branch, new TimeRange(timeStamp)); } else { range.update(timeStamp); } } }; getCommitInfoManager().getCommitInfos(null, fromCommitTime, toCommitTime, handler); return branches; } @Override public abstract InternalCommitContext createCommitContext(InternalTransaction transaction); protected InternalCommitContext createNormalCommitContext(InternalTransaction transaction) { return super.createCommitContext(transaction); } protected InternalCommitContext createWriteThroughCommitContext(InternalTransaction transaction) { return new WriteThroughCommitContext(transaction); } @Override protected void doBeforeActivate() throws Exception { super.doBeforeActivate(); checkState(synchronizer, "synchronizer"); //$NON-NLS-1$ } @Override protected void doActivate() throws Exception { super.doActivate(); InternalStore store = getStore(); if (!store.isFirstStart()) { Map map = store.getPersistentProperties(Collections.singleton(PROP_GRACEFULLY_SHUT_DOWN)); if (!map.containsKey(PROP_GRACEFULLY_SHUT_DOWN)) { setReplicationCountersToLatest(); } else { Set names = new HashSet(); names.add(PROP_LAST_REPLICATED_BRANCH_ID); names.add(PROP_LAST_REPLICATED_COMMIT_TIME); map = store.getPersistentProperties(names); setLastReplicatedBranchID(Integer.valueOf(map.get(PROP_LAST_REPLICATED_BRANCH_ID))); setLastReplicatedCommitTime(Long.valueOf(map.get(PROP_LAST_REPLICATED_COMMIT_TIME))); } } store.removePersistentProperties(Collections.singleton(PROP_GRACEFULLY_SHUT_DOWN)); if (getType() != MASTER) { startSynchronization(); } } @Override protected void doDeactivate() throws Exception { stopSynchronization(); Map map = new HashMap(); map.put(PROP_LAST_REPLICATED_BRANCH_ID, Integer.toString(lastReplicatedBranchID)); map.put(PROP_LAST_REPLICATED_COMMIT_TIME, Long.toString(lastReplicatedCommitTime)); map.put(PROP_GRACEFULLY_SHUT_DOWN, Boolean.TRUE.toString()); InternalStore store = getStore(); store.setPersistentProperties(map); super.doDeactivate(); } protected void startSynchronization() { replicatorSession = getSessionManager().openSession(null); replicatorSession.options().setPassiveUpdateEnabled(false); replicatorSession.options().setLockNotificationMode(LockNotificationMode.OFF); synchronizer.setLocalRepository(this); synchronizer.activate(); } protected void stopSynchronization() { if (synchronizer != null) { synchronizer.deactivate(); } } protected void setReplicationCountersToLatest() { setLastReplicatedBranchID(getStore().getLastBranchID()); setLastReplicatedCommitTime(getStore().getLastNonLocalCommitTime()); } protected void doInitRootResource() { super.initRootResource(); } @Override protected void initRootResource() { setState(INITIAL); } @Override public LockObjectsResult lock(InternalView view, LockType lockType, List revisionKeys, long timeout) { if (view.getBranch().isLocal()) { return super.lock(view, lockType, revisionKeys, timeout); } if (getState() != ONLINE) { throw new CDOException("Cannot lock in a non-local branch when clone is not connected to master"); } return lockThrough(view, lockType, revisionKeys, timeout); } private LockObjectsResult lockOnMaster(InternalView view, LockType type, List revKeys, long timeout) throws InterruptedException { // Delegate locking to the master InternalCDOSession remoteSession = getSynchronizer().getRemoteSession(); CDOSessionProtocol sessionProtocol = remoteSession.getSessionProtocol(); String areaID = view.getDurableLockingID(); if (areaID == null) { throw new IllegalStateException("Durable locking is not enabled."); } LockObjectsResult masterLockingResult = sessionProtocol.delegateLockObjects(areaID, revKeys, view.getBranch(), type, timeout); if (masterLockingResult.isSuccessful() && masterLockingResult.isWaitForUpdate()) { if (!getSynchronizer().getRemoteSession().options().isPassiveUpdateEnabled()) { throw new AssertionError( "Master lock result requires clone to wait, but clone does not have passiveUpdates enabled."); } long requiredTimestamp = masterLockingResult.getRequiredTimestamp(); remoteSession.waitForUpdate(requiredTimestamp); } return masterLockingResult; } private LockObjectsResult lockThrough(InternalView view, LockType type, List keys, long timeout) { try { LockObjectsResult masterLockingResult = lockOnMaster(view, type, keys, timeout); if (!masterLockingResult.isSuccessful()) { return masterLockingResult; } LockObjectsResult localLockingResult = super.lock(view, type, keys, timeout); return localLockingResult; } catch (InterruptedException ex) { throw WrappedException.wrap(ex); } } @Override public UnlockObjectsResult unlock(InternalView view, LockType lockType, List objectIDs) { if (view.getBranch().isLocal()) { super.unlock(view, lockType, objectIDs); } if (getState() != ONLINE) { throw new CDOException("Cannot unlock in a non-local branch when clone is not connected to master"); } return unlockThrough(view, lockType, objectIDs); } private void unlockOnMaster(InternalView view, LockType lockType, List objectIDs) { InternalCDOSession remoteSession = getSynchronizer().getRemoteSession(); CDOSessionProtocol sessionProtocol = remoteSession.getSessionProtocol(); String lockAreaID = view.getDurableLockingID(); if (lockAreaID == null) { throw new IllegalStateException("Durable locking is not enabled."); } sessionProtocol.delegateUnlockObjects(lockAreaID, objectIDs, lockType); } private UnlockObjectsResult unlockThrough(InternalView view, LockType lockType, List objectIDs) { unlockOnMaster(view, lockType, objectIDs); return super.unlock(view, lockType, objectIDs); } /** * @author Eike Stepper */ private static final class TimeRange { private long time1; private long time2; public TimeRange(long time) { time1 = time; time2 = time; } public void update(long time) { if (time < time1) { time1 = time; } if (time > time2) { time2 = time; } } public long getTime1() { return time1; } public long getTime2() { return time2; } @Override public String toString() { return "[" + CDOCommonUtil.formatTimeStamp(time1) + " - " + CDOCommonUtil.formatTimeStamp(time1) + "]"; } } /** * @author Eike Stepper */ protected final class WriteThroughCommitContext extends TransactionCommitContext { public WriteThroughCommitContext(InternalTransaction transaction) { super(transaction); } @Override public void preWrite() { // Do nothing } @Override public void write(OMMonitor monitor) { // Do nothing } @Override public void commit(OMMonitor monitor) { InternalTransaction transaction = getTransaction(); // Prepare commit to the master CDOBranch branch = transaction.getBranch(); String userID = getUserID(); String comment = getCommitComment(); CDOCommitData commitData = new CommitContextData(this); Collection> lobs = Collections.emptySet(); // Delegate commit to the master CDOSessionProtocol sessionProtocol = getSynchronizer().getRemoteSession().getSessionProtocol(); CommitTransactionResult result = sessionProtocol.commitDelegation(branch, userID, comment, commitData, getDetachedObjectTypes(), lobs, monitor); // Stop if commit to master failed String rollbackMessage = result.getRollbackMessage(); if (rollbackMessage != null) { throw new TransactionException(rollbackMessage); } // Prepare data needed for commit result and commit notifications long timeStamp = result.getTimeStamp(); setTimeStamp(timeStamp); addIDMappings(result.getIDMappings()); applyIDMappings(new Monitor()); try { writeThroughCommitLock.lock(); // Commit to the local repository super.preWrite(); super.write(new Monitor()); super.commit(new Monitor()); } finally { writeThroughCommitLock.unlock(); } // Remember commit time in the local repository setLastCommitTimeStamp(timeStamp); setLastReplicatedCommitTime(timeStamp); // Remember commit time in the replicator session. getSynchronizer().getRemoteSession().setLastUpdateTime(timeStamp); } @Override protected long[] createTimeStamp(OMMonitor monitor) { // Already set after commit to the master. // Do not call getTimeStamp() of the enclosing Repo class!!! InternalRepository repository = getTransaction().getSession().getManager().getRepository(); return repository.forceCommitTimeStamp(WriteThroughCommitContext.this.getTimeStamp(), monitor); } @Override protected void lockObjects() throws InterruptedException { // Do nothing } private void addIDMappings(Map idMappings) { for (Map.Entry idMapping : idMappings.entrySet()) { CDOID oldID = idMapping.getKey(); CDOID newID = idMapping.getValue(); addIDMapping(oldID, newID); } } } }