Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/UnitManager.java')
-rw-r--r--plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/UnitManager.java505
1 files changed, 440 insertions, 65 deletions
diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/UnitManager.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/UnitManager.java
index d22194f39b..cf107c59f6 100644
--- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/UnitManager.java
+++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/UnitManager.java
@@ -11,21 +11,32 @@
package org.eclipse.emf.cdo.internal.server;
import org.eclipse.emf.cdo.common.id.CDOID;
+import org.eclipse.emf.cdo.common.revision.CDORevision;
import org.eclipse.emf.cdo.common.revision.CDORevisionHandler;
-import org.eclipse.emf.cdo.server.IRepository;
+import org.eclipse.emf.cdo.common.revision.CDORevisionProvider;
+import org.eclipse.emf.cdo.common.revision.CDORevisionUtil;
import org.eclipse.emf.cdo.server.IStoreAccessor.UnitSupport;
import org.eclipse.emf.cdo.server.IUnit;
import org.eclipse.emf.cdo.server.IUnitManager;
import org.eclipse.emf.cdo.server.IView;
+import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevision;
+import org.eclipse.emf.cdo.spi.server.InternalCommitContext;
import org.eclipse.emf.cdo.spi.server.InternalRepository;
+import org.eclipse.emf.cdo.spi.server.InternalUnitManager;
import org.eclipse.net4j.util.container.Container;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -33,12 +44,16 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
/**
* @author Eike Stepper
*/
-public final class UnitManager extends Container<IUnit> implements IUnitManager
+public class UnitManager extends Container<IUnit> implements InternalUnitManager
{
private final InternalRepository repository;
private final Map<CDOID, IUnit> units = new HashMap<CDOID, IUnit>();
+ private final Map<CDOID, UnitInitializer> unitInitializers = new HashMap<CDOID, UnitInitializer>();
+
+ private final Set<ObjectAttacher> objectAttachers = new HashSet<ObjectAttacher>();
+
private final ReentrantReadWriteLock managerLock = new ReentrantReadWriteLock();
public UnitManager(InternalRepository repository)
@@ -46,18 +61,21 @@ public final class UnitManager extends Container<IUnit> implements IUnitManager
this.repository = repository;
}
- public final IRepository getRepository()
+ public final InternalRepository getRepository()
{
return repository;
}
public boolean isUnit(CDOID rootID)
{
+ checkActive();
+
ReadLock readLock = managerLock.readLock();
readLock.lock();
try
{
+ // No need to synchronize on units because all other modifiers hold the manager write lock.
return units.containsKey(rootID);
}
finally
@@ -68,48 +86,121 @@ public final class UnitManager extends Container<IUnit> implements IUnitManager
public IUnit createUnit(CDOID rootID, IView view, CDORevisionHandler revisionHandler)
{
+ checkActive();
+
+ int xxx; // TODO Check that units are not nested.
+
WriteLock writeLock = managerLock.writeLock();
+ UnitInitializer unitInitializer;
+ boolean hook = false;
+
+ ////////////////////////////////////
+ // Phase 1: Register (short, locked)
+ ////////////////////////////////////
+
writeLock.lock();
try
{
- Unit unit;
+ // No need to synchronize on units because all other access holds the manager lock.
if (units.containsKey(rootID))
{
return null;
}
- int xxx; // TODO Check that units are not nested.
-
- unit = new Unit(rootID);
- units.put(rootID, unit);
+ // No need to synchronize on unitInitializers because all other access holds the manager lock.
+ unitInitializer = unitInitializers.get(rootID);
+ if (unitInitializer != null)
+ {
+ hook = true;
+ }
+ else
+ {
+ unitInitializer = new UnitInitializer(rootID, view, revisionHandler);
- // Acquire unit write lock early here, release it in Unit.init()
- unit.unitLock.writeLock().lock();
+ // No need to synchronize on unitInitializers because all other access holds the manager lock.
+ unitInitializers.put(rootID, unitInitializer);
+ // Synchronize on objectAttachers because objectAttacherFinishedCommit() doesn't acquire the manager lock!
+ synchronized (objectAttachers)
+ {
+ for (ObjectAttacher objectAttacher : objectAttachers)
+ {
+ List<CDOID> ids = objectAttacher.removeUnmappedRevisionsFor(unitInitializer);
+ if (!ids.isEmpty())
+ {
+ unitInitializer.addObjectAttacher(objectAttacher, ids);
+ }
+ }
+ }
+ }
+ }
+ finally
+ {
writeLock.unlock();
- writeLock = null;
+ }
- unit.init(view, revisionHandler);
- fireElementAddedEvent(unit);
- return unit;
+ if (hook)
+ {
+ return unitInitializer.hook(rootID, view, revisionHandler);
+ }
+
+ IUnit unit = null;
+
+ try
+ {
+ /////////////////////////////////////////////////////
+ // Phase 2: Initialize (potentially long, not locked)
+ /////////////////////////////////////////////////////
+
+ unit = unitInitializer.initialize();
}
finally
{
- if (writeLock != null)
+ ///////////////////////////////////
+ // Phase 3: Publish (short, locked)
+ ///////////////////////////////////
+
+ try
{
- writeLock.unlock();
+ writeLock.lock();
+
+ try
+ {
+ // No need to synchronize on unitInitializers because all other access holds the manager lock.
+ unitInitializers.remove(rootID);
+
+ if (unit != null)
+ {
+ // No need to synchronize on units because all other access holds the manager lock.
+ units.put(rootID, unit);
+ }
+ }
+ finally
+ {
+ writeLock.unlock();
+ }
+ }
+ finally
+ {
+ unitInitializer.notifyHookedInitializers();
}
}
+
+ fireElementAddedEvent(unit);
+ return unit;
}
public IUnit getUnit(CDOID rootID)
{
+ checkActive();
+
ReadLock readLock = managerLock.readLock();
readLock.lock();
try
{
+ // No need to synchronize on units because all other modifiers hold the manager write lock.
return units.get(rootID);
}
finally
@@ -120,6 +211,7 @@ public final class UnitManager extends Container<IUnit> implements IUnitManager
public IUnit[] getUnits()
{
+ checkActive();
return getElements();
}
@@ -130,6 +222,7 @@ public final class UnitManager extends Container<IUnit> implements IUnitManager
try
{
+ // No need to synchronize on units because all other modifiers hold the manager write lock.
return units.values().toArray(new IUnit[units.size()]);
}
finally
@@ -138,46 +231,146 @@ public final class UnitManager extends Container<IUnit> implements IUnitManager
}
}
+ public InternalObjectAttacher attachObjects(InternalCommitContext commitContext)
+ {
+ checkActive();
+
+ long timeStamp = commitContext.getTimeStamp();
+
+ ObjectAttacher objectAttacher;
+ Map<CDOID, CDOID> unitMappings = new HashMap<CDOID, CDOID>();
+
+ ///////////////////////////////////////////////
+ // Phase 1: Analyze new objects (short, locked)
+ ///////////////////////////////////////////////
+
+ ReadLock readLock = managerLock.readLock();
+ readLock.lock();
+
+ try
+ {
+ Set<CDOID> rootIDs = new HashSet<CDOID>();
+
+ // No need to synchronize on units because all other modifiers hold the manager write lock.
+ rootIDs.addAll(units.keySet());
+
+ // No need to synchronize on unitInitializers because all other modifiers hold the manager write lock.
+ rootIDs.addAll(unitInitializers.keySet());
+
+ List<InternalCDORevision> unmappedRevisions = new ArrayList<InternalCDORevision>();
+ for (InternalCDORevision revision : commitContext.getNewObjects())
+ {
+ CDOID rootID = getUnit(revision, commitContext, rootIDs);
+ if (rootID != null)
+ {
+ unitMappings.put(revision.getID(), rootID);
+ }
+ else
+ {
+ unmappedRevisions.add(revision);
+ }
+ }
+
+ if (unmappedRevisions.isEmpty())
+ {
+ return null;
+ }
+
+ objectAttacher = new ObjectAttacher(commitContext, unmappedRevisions);
+
+ // Read lock holders must synchronize modifications of the private collections.
+ synchronized (objectAttachers)
+ {
+ objectAttachers.add(objectAttacher);
+ }
+ }
+ finally
+ {
+ readLock.unlock();
+ }
+
+ //////////////////////////////////////////////////////////
+ // Phase 2: Map objects to existing units (long, unlocked)
+ //////////////////////////////////////////////////////////
+
+ UnitSupport storeAccessor = (UnitSupport)commitContext.getAccessor();
+ storeAccessor.writeUnits(unitMappings, timeStamp); // TODO Fork a monitor.
+
+ return objectAttacher;
+ }
+
+ /**
+ * Does not hold any manager lock when called.
+ */
+ public void objectAttacherFinishedCommit(ObjectAttacher objectAttacher)
+ {
+ checkActive();
+
+ synchronized (objectAttachers)
+ {
+ objectAttachers.remove(objectAttacher);
+ }
+ }
+
@Override
protected void doActivate() throws Exception
{
super.doActivate();
- UnitSupport unitSupport = (UnitSupport)repository.getStore().getReader(null);
+ UnitSupport storeAccessor = (UnitSupport)repository.getStore().getReader(null);
try
{
- List<CDOID> roots = unitSupport.readUnitRoots();
+ List<CDOID> roots = storeAccessor.readUnitRoots();
for (CDOID root : roots)
{
IUnit unit = new Unit(root);
+
+ // No need to synchronize on units because all other access call checkActive()
units.put(root, unit);
}
}
finally
{
- unitSupport.release();
+ storeAccessor.release();
}
}
@Override
protected void doDeactivate() throws Exception
{
+ // No need to synchronize on units because all other access call checkActive()
units.clear();
+
super.doDeactivate();
}
+ private static CDOID getUnit(InternalCDORevision revision, CDORevisionProvider revisionProvider, Set<CDOID> rootIDs)
+ {
+ CDOID id = revision.getID();
+ if (rootIDs.contains(id))
+ {
+ return id;
+ }
+
+ CDORevision parentRevision = CDORevisionUtil.getParentRevision(revision, revisionProvider);
+ if (parentRevision != null)
+ {
+ return getUnit((InternalCDORevision)parentRevision, revisionProvider, rootIDs);
+ }
+
+ return null;
+ }
+
/**
* @author Eike Stepper
*/
- public final class Unit implements IUnit
+ private final class Unit implements IUnit
{
private final CDOID rootID;
private final Set<IView> views = new HashSet<IView>();
- private final ReentrantReadWriteLock unitLock = new ReentrantReadWriteLock();
-
public Unit(CDOID rootID)
{
this.rootID = rootID;
@@ -193,90 +386,272 @@ public final class UnitManager extends Container<IUnit> implements IUnitManager
return rootID;
}
- public void init(IView view, CDORevisionHandler revisionHandler)
+ public boolean isOpen()
{
- // Write lock has been acquired by UnitManager.createUnit()
+ synchronized (views)
+ {
+ return !views.isEmpty();
+ }
+ }
- try
+ public void open(IView view, final CDORevisionHandler revisionHandler)
+ {
+ synchronized (views)
{
- UnitSupport unitSupport = (UnitSupport)repository.getStore().getWriter(null);
+ views.add(view);
+ }
- try
- {
- unitSupport.initUnit(view, rootID, revisionHandler);
- }
- finally
- {
- unitSupport.release();
- }
+ UnitSupport storeAccessor = (UnitSupport)repository.getStore().getReader(null);
+
+ try
+ {
+ storeAccessor.readUnit(view, rootID, revisionHandler);
}
finally
{
- // Write lock has been acquired by UnitManager.createUnit()
- unitLock.writeLock().unlock();
+ storeAccessor.release();
}
}
- public boolean isOpen()
+ public void close(IView view)
{
- ReadLock readLock = unitLock.readLock();
- readLock.lock();
+ synchronized (views)
+ {
+ views.remove(view);
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Unit[" + rootID + "]";
+ }
+
+ /**
+ * Does not hold any manager lock when called.
+ */
+ public void initialize(IView view, long timeStamp, CDORevisionHandler revisionHandler,
+ Map<ObjectAttacher, List<CDOID>> objectAttachers)
+ {
+ UnitSupport storeAccessor = (UnitSupport)repository.getStore().getWriter(null);
try
{
- return !views.isEmpty();
+ Object initResult = storeAccessor.initUnit(view, rootID, revisionHandler, timeStamp);
+
+ List<CDOID> ids = new ArrayList<CDOID>();
+ for (Entry<ObjectAttacher, List<CDOID>> entry : objectAttachers.entrySet())
+ {
+ ObjectAttacher objectAttacher = entry.getKey();
+ if (objectAttacher.awaitFinishedCommit())
+ {
+ ids.addAll(entry.getValue());
+ }
+ }
+
+ storeAccessor.finishUnit(view, rootID, revisionHandler, timeStamp, initResult, ids);
}
finally
{
- readLock.unlock();
+ storeAccessor.release();
}
}
+ }
- public void open(IView view, final CDORevisionHandler revisionHandler)
+ /**
+ * @author Eike Stepper
+ */
+ private final class UnitInitializer implements CDORevisionHandler
+ {
+ private final long timeStamp = repository.getTimeStamp();
+
+ private final Map<ObjectAttacher, List<CDOID>> concurrentObjectAttachers = new HashMap<ObjectAttacher, List<CDOID>>();
+
+ private final CountDownLatch unitInitialized = new CountDownLatch(1);
+
+ private final CDOID rootID;
+
+ private final IView view;
+
+ private final CDORevisionHandler revisionHandler;
+
+ private final List<CDORevisionHandler> hookedRevisionHandlers = new CopyOnWriteArrayList<CDORevisionHandler>();
+
+ private volatile boolean hasHookedRevisionHandlers;
+
+ private Unit unit;
+
+ public UnitInitializer(CDOID rootID, IView view, CDORevisionHandler revisionHandler)
{
- ReadLock readLock = unitLock.readLock();
- readLock.lock();
+ this.rootID = rootID;
+ this.view = view;
+ this.revisionHandler = revisionHandler;
+ }
- try
+ public CDOID getRootID()
+ {
+ return rootID;
+ }
+
+ /**
+ * Does not hold any manager lock when called.
+ */
+ public IUnit initialize()
+ {
+ unit = new Unit(rootID);
+ unit.initialize(view, timeStamp, revisionHandler, concurrentObjectAttachers);
+ return unit;
+ }
+
+ /**
+ * Does not hold any manager lock when called.
+ */
+ public IUnit hook(CDOID rootID, IView view, final CDORevisionHandler revisionHandler)
+ {
+ final Set<CDOID> ids = new HashSet<CDOID>();
+
+ hookedRevisionHandlers.add(new CDORevisionHandler()
{
- views.add(view);
+ public boolean handleRevision(CDORevision revision)
+ {
+ ids.add(revision.getID());
+ return revisionHandler.handleRevision(revision);
+ }
+ });
- UnitSupport unitSupport = (UnitSupport)repository.getStore().getReader(null);
+ // It's okay to do this unsynchronized. The worst thing that could happen is that the hooked revision handler is
+ // missed a few times during UnitInitializer.handleRevision(), but that's okay because it probably missed many
+ // revisions already and therefore performs an openUnit() subsequently, anyways. After all, hooked revision
+ // handlers,
+ // i.e., concurrent createUnit() calls for the same unit, are extremely rare.
+ hasHookedRevisionHandlers = true;
- try
+ try
+ {
+ // Now wait for the main revision handler to finish.
+ unitInitialized.await();
+ }
+ catch (InterruptedException ex)
+ {
+ return null;
+ }
+
+ // Now send the missed revisions.
+ unit.open(view, new CDORevisionHandler()
+ {
+ public boolean handleRevision(CDORevision revision)
{
- unitSupport.readUnit(view, rootID, revisionHandler);
+ if (ids.contains(revision.getID()))
+ {
+ // This revision has already been sent. Skip to the next one.
+ return true;
+ }
+
+ return revisionHandler.handleRevision(revision);
}
- finally
+ });
+
+ return unit;
+ }
+
+ /**
+ * Does not hold any manager lock when called.
+ */
+ public void notifyHookedInitializers()
+ {
+ unitInitialized.countDown();
+ }
+
+ public boolean handleRevision(CDORevision revision)
+ {
+ if (revisionHandler.handleRevision(revision))
+ {
+ if (hasHookedRevisionHandlers)
{
- unitSupport.release();
+ for (CDORevisionHandler hookedRevisionHandler : hookedRevisionHandlers)
+ {
+ hookedRevisionHandler.handleRevision(revision);
+ }
}
+
+ return true;
}
- finally
+
+ return false;
+ }
+
+ /**
+ * Holds the manager write lock when called.
+ */
+ public void addObjectAttacher(ObjectAttacher objectAttacher, List<CDOID> ids)
+ {
+ concurrentObjectAttachers.put(objectAttacher, ids);
+ }
+ }
+
+ /**
+ * @author Eike Stepper
+ */
+ private final class ObjectAttacher implements InternalObjectAttacher
+ {
+ private final InternalCommitContext commitContext;
+
+ private final List<InternalCDORevision> unmappedRevisions;
+
+ private final CountDownLatch commitFinished = new CountDownLatch(1);
+
+ private boolean commitSucceeded;
+
+ public ObjectAttacher(InternalCommitContext commitContext, List<InternalCDORevision> unmappedRevisions)
+ {
+ this.commitContext = commitContext;
+ this.unmappedRevisions = unmappedRevisions;
+ }
+
+ /**
+ * Does not hold any manager lock when called.
+ */
+ public void finishedCommit(boolean success)
+ {
+ objectAttacherFinishedCommit(this);
+
+ commitSucceeded = success;
+ commitFinished.countDown();
+ }
+
+ /**
+ * Holds the manager write lock when called.
+ */
+ public List<CDOID> removeUnmappedRevisionsFor(UnitInitializer unitInitializer)
+ {
+ List<CDOID> ids = new ArrayList<CDOID>();
+
+ Set<CDOID> rootIDs = Collections.singleton(unitInitializer.getRootID());
+ for (Iterator<InternalCDORevision> it = unmappedRevisions.iterator(); it.hasNext();)
{
- readLock.unlock();
+ InternalCDORevision revision = it.next();
+ if (getUnit(revision, commitContext, rootIDs) != null)
+ {
+ ids.add(revision.getID());
+ it.remove();
+ }
}
+
+ return ids;
}
- public void close(IView view)
+ public boolean awaitFinishedCommit()
{
- ReadLock readLock = unitLock.readLock();
- readLock.lock();
-
try
{
- views.remove(view);
+ commitFinished.await();
}
- finally
+ catch (InterruptedException ex)
{
- readLock.unlock();
+ return false;
}
- }
- @Override
- public String toString()
- {
- return "Unit[" + rootID + "]";
+ return commitSucceeded;
}
}
}

Back to the top