diff options
Diffstat (limited to 'plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/UnitManager.java')
-rw-r--r-- | plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/UnitManager.java | 505 |
1 files changed, 440 insertions, 65 deletions
diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/UnitManager.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/UnitManager.java index d22194f39b..cf107c59f6 100644 --- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/UnitManager.java +++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/UnitManager.java @@ -11,21 +11,32 @@ package org.eclipse.emf.cdo.internal.server; import org.eclipse.emf.cdo.common.id.CDOID; +import org.eclipse.emf.cdo.common.revision.CDORevision; import org.eclipse.emf.cdo.common.revision.CDORevisionHandler; -import org.eclipse.emf.cdo.server.IRepository; +import org.eclipse.emf.cdo.common.revision.CDORevisionProvider; +import org.eclipse.emf.cdo.common.revision.CDORevisionUtil; import org.eclipse.emf.cdo.server.IStoreAccessor.UnitSupport; import org.eclipse.emf.cdo.server.IUnit; import org.eclipse.emf.cdo.server.IUnitManager; import org.eclipse.emf.cdo.server.IView; +import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevision; +import org.eclipse.emf.cdo.spi.server.InternalCommitContext; import org.eclipse.emf.cdo.spi.server.InternalRepository; +import org.eclipse.emf.cdo.spi.server.InternalUnitManager; import org.eclipse.net4j.util.container.Container; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -33,12 +44,16 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; /** * @author Eike Stepper */ -public final class UnitManager extends Container<IUnit> implements IUnitManager +public class UnitManager extends Container<IUnit> implements InternalUnitManager { private final InternalRepository repository; private final Map<CDOID, IUnit> units = new HashMap<CDOID, IUnit>(); + private final Map<CDOID, UnitInitializer> unitInitializers = new HashMap<CDOID, UnitInitializer>(); + + private final Set<ObjectAttacher> objectAttachers = new HashSet<ObjectAttacher>(); + private final ReentrantReadWriteLock managerLock = new ReentrantReadWriteLock(); public UnitManager(InternalRepository repository) @@ -46,18 +61,21 @@ public final class UnitManager extends Container<IUnit> implements IUnitManager this.repository = repository; } - public final IRepository getRepository() + public final InternalRepository getRepository() { return repository; } public boolean isUnit(CDOID rootID) { + checkActive(); + ReadLock readLock = managerLock.readLock(); readLock.lock(); try { + // No need to synchronize on units because all other modifiers hold the manager write lock. return units.containsKey(rootID); } finally @@ -68,48 +86,121 @@ public final class UnitManager extends Container<IUnit> implements IUnitManager public IUnit createUnit(CDOID rootID, IView view, CDORevisionHandler revisionHandler) { + checkActive(); + + int xxx; // TODO Check that units are not nested. + WriteLock writeLock = managerLock.writeLock(); + UnitInitializer unitInitializer; + boolean hook = false; + + //////////////////////////////////// + // Phase 1: Register (short, locked) + //////////////////////////////////// + writeLock.lock(); try { - Unit unit; + // No need to synchronize on units because all other access holds the manager lock. if (units.containsKey(rootID)) { return null; } - int xxx; // TODO Check that units are not nested. - - unit = new Unit(rootID); - units.put(rootID, unit); + // No need to synchronize on unitInitializers because all other access holds the manager lock. + unitInitializer = unitInitializers.get(rootID); + if (unitInitializer != null) + { + hook = true; + } + else + { + unitInitializer = new UnitInitializer(rootID, view, revisionHandler); - // Acquire unit write lock early here, release it in Unit.init() - unit.unitLock.writeLock().lock(); + // No need to synchronize on unitInitializers because all other access holds the manager lock. + unitInitializers.put(rootID, unitInitializer); + // Synchronize on objectAttachers because objectAttacherFinishedCommit() doesn't acquire the manager lock! + synchronized (objectAttachers) + { + for (ObjectAttacher objectAttacher : objectAttachers) + { + List<CDOID> ids = objectAttacher.removeUnmappedRevisionsFor(unitInitializer); + if (!ids.isEmpty()) + { + unitInitializer.addObjectAttacher(objectAttacher, ids); + } + } + } + } + } + finally + { writeLock.unlock(); - writeLock = null; + } - unit.init(view, revisionHandler); - fireElementAddedEvent(unit); - return unit; + if (hook) + { + return unitInitializer.hook(rootID, view, revisionHandler); + } + + IUnit unit = null; + + try + { + ///////////////////////////////////////////////////// + // Phase 2: Initialize (potentially long, not locked) + ///////////////////////////////////////////////////// + + unit = unitInitializer.initialize(); } finally { - if (writeLock != null) + /////////////////////////////////// + // Phase 3: Publish (short, locked) + /////////////////////////////////// + + try { - writeLock.unlock(); + writeLock.lock(); + + try + { + // No need to synchronize on unitInitializers because all other access holds the manager lock. + unitInitializers.remove(rootID); + + if (unit != null) + { + // No need to synchronize on units because all other access holds the manager lock. + units.put(rootID, unit); + } + } + finally + { + writeLock.unlock(); + } + } + finally + { + unitInitializer.notifyHookedInitializers(); } } + + fireElementAddedEvent(unit); + return unit; } public IUnit getUnit(CDOID rootID) { + checkActive(); + ReadLock readLock = managerLock.readLock(); readLock.lock(); try { + // No need to synchronize on units because all other modifiers hold the manager write lock. return units.get(rootID); } finally @@ -120,6 +211,7 @@ public final class UnitManager extends Container<IUnit> implements IUnitManager public IUnit[] getUnits() { + checkActive(); return getElements(); } @@ -130,6 +222,7 @@ public final class UnitManager extends Container<IUnit> implements IUnitManager try { + // No need to synchronize on units because all other modifiers hold the manager write lock. return units.values().toArray(new IUnit[units.size()]); } finally @@ -138,46 +231,146 @@ public final class UnitManager extends Container<IUnit> implements IUnitManager } } + public InternalObjectAttacher attachObjects(InternalCommitContext commitContext) + { + checkActive(); + + long timeStamp = commitContext.getTimeStamp(); + + ObjectAttacher objectAttacher; + Map<CDOID, CDOID> unitMappings = new HashMap<CDOID, CDOID>(); + + /////////////////////////////////////////////// + // Phase 1: Analyze new objects (short, locked) + /////////////////////////////////////////////// + + ReadLock readLock = managerLock.readLock(); + readLock.lock(); + + try + { + Set<CDOID> rootIDs = new HashSet<CDOID>(); + + // No need to synchronize on units because all other modifiers hold the manager write lock. + rootIDs.addAll(units.keySet()); + + // No need to synchronize on unitInitializers because all other modifiers hold the manager write lock. + rootIDs.addAll(unitInitializers.keySet()); + + List<InternalCDORevision> unmappedRevisions = new ArrayList<InternalCDORevision>(); + for (InternalCDORevision revision : commitContext.getNewObjects()) + { + CDOID rootID = getUnit(revision, commitContext, rootIDs); + if (rootID != null) + { + unitMappings.put(revision.getID(), rootID); + } + else + { + unmappedRevisions.add(revision); + } + } + + if (unmappedRevisions.isEmpty()) + { + return null; + } + + objectAttacher = new ObjectAttacher(commitContext, unmappedRevisions); + + // Read lock holders must synchronize modifications of the private collections. + synchronized (objectAttachers) + { + objectAttachers.add(objectAttacher); + } + } + finally + { + readLock.unlock(); + } + + ////////////////////////////////////////////////////////// + // Phase 2: Map objects to existing units (long, unlocked) + ////////////////////////////////////////////////////////// + + UnitSupport storeAccessor = (UnitSupport)commitContext.getAccessor(); + storeAccessor.writeUnits(unitMappings, timeStamp); // TODO Fork a monitor. + + return objectAttacher; + } + + /** + * Does not hold any manager lock when called. + */ + public void objectAttacherFinishedCommit(ObjectAttacher objectAttacher) + { + checkActive(); + + synchronized (objectAttachers) + { + objectAttachers.remove(objectAttacher); + } + } + @Override protected void doActivate() throws Exception { super.doActivate(); - UnitSupport unitSupport = (UnitSupport)repository.getStore().getReader(null); + UnitSupport storeAccessor = (UnitSupport)repository.getStore().getReader(null); try { - List<CDOID> roots = unitSupport.readUnitRoots(); + List<CDOID> roots = storeAccessor.readUnitRoots(); for (CDOID root : roots) { IUnit unit = new Unit(root); + + // No need to synchronize on units because all other access call checkActive() units.put(root, unit); } } finally { - unitSupport.release(); + storeAccessor.release(); } } @Override protected void doDeactivate() throws Exception { + // No need to synchronize on units because all other access call checkActive() units.clear(); + super.doDeactivate(); } + private static CDOID getUnit(InternalCDORevision revision, CDORevisionProvider revisionProvider, Set<CDOID> rootIDs) + { + CDOID id = revision.getID(); + if (rootIDs.contains(id)) + { + return id; + } + + CDORevision parentRevision = CDORevisionUtil.getParentRevision(revision, revisionProvider); + if (parentRevision != null) + { + return getUnit((InternalCDORevision)parentRevision, revisionProvider, rootIDs); + } + + return null; + } + /** * @author Eike Stepper */ - public final class Unit implements IUnit + private final class Unit implements IUnit { private final CDOID rootID; private final Set<IView> views = new HashSet<IView>(); - private final ReentrantReadWriteLock unitLock = new ReentrantReadWriteLock(); - public Unit(CDOID rootID) { this.rootID = rootID; @@ -193,90 +386,272 @@ public final class UnitManager extends Container<IUnit> implements IUnitManager return rootID; } - public void init(IView view, CDORevisionHandler revisionHandler) + public boolean isOpen() { - // Write lock has been acquired by UnitManager.createUnit() + synchronized (views) + { + return !views.isEmpty(); + } + } - try + public void open(IView view, final CDORevisionHandler revisionHandler) + { + synchronized (views) { - UnitSupport unitSupport = (UnitSupport)repository.getStore().getWriter(null); + views.add(view); + } - try - { - unitSupport.initUnit(view, rootID, revisionHandler); - } - finally - { - unitSupport.release(); - } + UnitSupport storeAccessor = (UnitSupport)repository.getStore().getReader(null); + + try + { + storeAccessor.readUnit(view, rootID, revisionHandler); } finally { - // Write lock has been acquired by UnitManager.createUnit() - unitLock.writeLock().unlock(); + storeAccessor.release(); } } - public boolean isOpen() + public void close(IView view) { - ReadLock readLock = unitLock.readLock(); - readLock.lock(); + synchronized (views) + { + views.remove(view); + } + } + + @Override + public String toString() + { + return "Unit[" + rootID + "]"; + } + + /** + * Does not hold any manager lock when called. + */ + public void initialize(IView view, long timeStamp, CDORevisionHandler revisionHandler, + Map<ObjectAttacher, List<CDOID>> objectAttachers) + { + UnitSupport storeAccessor = (UnitSupport)repository.getStore().getWriter(null); try { - return !views.isEmpty(); + Object initResult = storeAccessor.initUnit(view, rootID, revisionHandler, timeStamp); + + List<CDOID> ids = new ArrayList<CDOID>(); + for (Entry<ObjectAttacher, List<CDOID>> entry : objectAttachers.entrySet()) + { + ObjectAttacher objectAttacher = entry.getKey(); + if (objectAttacher.awaitFinishedCommit()) + { + ids.addAll(entry.getValue()); + } + } + + storeAccessor.finishUnit(view, rootID, revisionHandler, timeStamp, initResult, ids); } finally { - readLock.unlock(); + storeAccessor.release(); } } + } - public void open(IView view, final CDORevisionHandler revisionHandler) + /** + * @author Eike Stepper + */ + private final class UnitInitializer implements CDORevisionHandler + { + private final long timeStamp = repository.getTimeStamp(); + + private final Map<ObjectAttacher, List<CDOID>> concurrentObjectAttachers = new HashMap<ObjectAttacher, List<CDOID>>(); + + private final CountDownLatch unitInitialized = new CountDownLatch(1); + + private final CDOID rootID; + + private final IView view; + + private final CDORevisionHandler revisionHandler; + + private final List<CDORevisionHandler> hookedRevisionHandlers = new CopyOnWriteArrayList<CDORevisionHandler>(); + + private volatile boolean hasHookedRevisionHandlers; + + private Unit unit; + + public UnitInitializer(CDOID rootID, IView view, CDORevisionHandler revisionHandler) { - ReadLock readLock = unitLock.readLock(); - readLock.lock(); + this.rootID = rootID; + this.view = view; + this.revisionHandler = revisionHandler; + } - try + public CDOID getRootID() + { + return rootID; + } + + /** + * Does not hold any manager lock when called. + */ + public IUnit initialize() + { + unit = new Unit(rootID); + unit.initialize(view, timeStamp, revisionHandler, concurrentObjectAttachers); + return unit; + } + + /** + * Does not hold any manager lock when called. + */ + public IUnit hook(CDOID rootID, IView view, final CDORevisionHandler revisionHandler) + { + final Set<CDOID> ids = new HashSet<CDOID>(); + + hookedRevisionHandlers.add(new CDORevisionHandler() { - views.add(view); + public boolean handleRevision(CDORevision revision) + { + ids.add(revision.getID()); + return revisionHandler.handleRevision(revision); + } + }); - UnitSupport unitSupport = (UnitSupport)repository.getStore().getReader(null); + // It's okay to do this unsynchronized. The worst thing that could happen is that the hooked revision handler is + // missed a few times during UnitInitializer.handleRevision(), but that's okay because it probably missed many + // revisions already and therefore performs an openUnit() subsequently, anyways. After all, hooked revision + // handlers, + // i.e., concurrent createUnit() calls for the same unit, are extremely rare. + hasHookedRevisionHandlers = true; - try + try + { + // Now wait for the main revision handler to finish. + unitInitialized.await(); + } + catch (InterruptedException ex) + { + return null; + } + + // Now send the missed revisions. + unit.open(view, new CDORevisionHandler() + { + public boolean handleRevision(CDORevision revision) { - unitSupport.readUnit(view, rootID, revisionHandler); + if (ids.contains(revision.getID())) + { + // This revision has already been sent. Skip to the next one. + return true; + } + + return revisionHandler.handleRevision(revision); } - finally + }); + + return unit; + } + + /** + * Does not hold any manager lock when called. + */ + public void notifyHookedInitializers() + { + unitInitialized.countDown(); + } + + public boolean handleRevision(CDORevision revision) + { + if (revisionHandler.handleRevision(revision)) + { + if (hasHookedRevisionHandlers) { - unitSupport.release(); + for (CDORevisionHandler hookedRevisionHandler : hookedRevisionHandlers) + { + hookedRevisionHandler.handleRevision(revision); + } } + + return true; } - finally + + return false; + } + + /** + * Holds the manager write lock when called. + */ + public void addObjectAttacher(ObjectAttacher objectAttacher, List<CDOID> ids) + { + concurrentObjectAttachers.put(objectAttacher, ids); + } + } + + /** + * @author Eike Stepper + */ + private final class ObjectAttacher implements InternalObjectAttacher + { + private final InternalCommitContext commitContext; + + private final List<InternalCDORevision> unmappedRevisions; + + private final CountDownLatch commitFinished = new CountDownLatch(1); + + private boolean commitSucceeded; + + public ObjectAttacher(InternalCommitContext commitContext, List<InternalCDORevision> unmappedRevisions) + { + this.commitContext = commitContext; + this.unmappedRevisions = unmappedRevisions; + } + + /** + * Does not hold any manager lock when called. + */ + public void finishedCommit(boolean success) + { + objectAttacherFinishedCommit(this); + + commitSucceeded = success; + commitFinished.countDown(); + } + + /** + * Holds the manager write lock when called. + */ + public List<CDOID> removeUnmappedRevisionsFor(UnitInitializer unitInitializer) + { + List<CDOID> ids = new ArrayList<CDOID>(); + + Set<CDOID> rootIDs = Collections.singleton(unitInitializer.getRootID()); + for (Iterator<InternalCDORevision> it = unmappedRevisions.iterator(); it.hasNext();) { - readLock.unlock(); + InternalCDORevision revision = it.next(); + if (getUnit(revision, commitContext, rootIDs) != null) + { + ids.add(revision.getID()); + it.remove(); + } } + + return ids; } - public void close(IView view) + public boolean awaitFinishedCommit() { - ReadLock readLock = unitLock.readLock(); - readLock.lock(); - try { - views.remove(view); + commitFinished.await(); } - finally + catch (InterruptedException ex) { - readLock.unlock(); + return false; } - } - @Override - public String toString() - { - return "Unit[" + rootID + "]"; + return commitSucceeded; } } } |