diff options
Diffstat (limited to 'plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/CDOSessionImpl.java')
-rw-r--r-- | plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/CDOSessionImpl.java | 450 |
1 files changed, 251 insertions, 199 deletions
diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/CDOSessionImpl.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/CDOSessionImpl.java index b783e08b74..e20477f37b 100644 --- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/CDOSessionImpl.java +++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/session/CDOSessionImpl.java @@ -47,6 +47,7 @@ import org.eclipse.emf.cdo.common.revision.delta.CDOMoveFeatureDelta; import org.eclipse.emf.cdo.common.revision.delta.CDORemoveFeatureDelta; import org.eclipse.emf.cdo.common.revision.delta.CDORevisionDelta; import org.eclipse.emf.cdo.common.revision.delta.CDOSetFeatureDelta; +import org.eclipse.emf.cdo.common.util.CDOCommonUtil; import org.eclipse.emf.cdo.common.util.CDOException; import org.eclipse.emf.cdo.common.util.RepositoryStateChangedEvent; import org.eclipse.emf.cdo.common.util.RepositoryTypeChangedEvent; @@ -93,7 +94,7 @@ import org.eclipse.net4j.util.WrappedException; import org.eclipse.net4j.util.concurrent.IRWLockManager; import org.eclipse.net4j.util.concurrent.IRWLockManager.LockType; import org.eclipse.net4j.util.concurrent.IRWOLockManager; -import org.eclipse.net4j.util.concurrent.QueueRunner; +import org.eclipse.net4j.util.concurrent.QueueRunner2; import org.eclipse.net4j.util.concurrent.RWOLockManager; import org.eclipse.net4j.util.event.Event; import org.eclipse.net4j.util.event.EventUtil; @@ -105,6 +106,7 @@ import org.eclipse.net4j.util.lifecycle.ILifecycle; import org.eclipse.net4j.util.lifecycle.LifecycleEventAdapter; import org.eclipse.net4j.util.lifecycle.LifecycleUtil; import org.eclipse.net4j.util.om.log.OMLogger; +import org.eclipse.net4j.util.om.trace.ContextTracer; import org.eclipse.net4j.util.options.OptionsEvent; import org.eclipse.net4j.util.security.IPasswordCredentialsProvider; @@ -145,6 +147,8 @@ import java.util.Set; */ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl implements InternalCDOSession { + private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SESSION, CDOSessionImpl.class); + private ExceptionHandler exceptionHandler; private CDOIDGenerator idGenerator; @@ -180,9 +184,7 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme private CDOSession.Options options = createOptions(); - private OutOfSequenceInvalidations outOfSequenceInvalidations = new OutOfSequenceInvalidations(); - - private QueueRunner invalidationRunner; + private Invalidator invalidator = new Invalidator(); private CDORepositoryInfo repositoryInfo; @@ -1045,165 +1047,25 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme } } - @Deprecated - public void invalidate(CDOCommitInfo commitInfo, InternalCDOTransaction sender) + public Object startLocalCommit() { - invalidate(commitInfo, sender, true); + return invalidator.startLocalCommit(); } - public void invalidate(CDOCommitInfo commitInfo, InternalCDOTransaction sender, boolean clearResourcePathCache) + public void endLocalCommit(Object token) { - long previousTimeStamp = commitInfo.getPreviousTimeStamp(); - long lastUpdateTime = getLastUpdateTime(); - - if (previousTimeStamp < lastUpdateTime) - { - previousTimeStamp = lastUpdateTime; - } - - synchronized (outOfSequenceInvalidations) - { - outOfSequenceInvalidations.put(previousTimeStamp, new OutOfSequenceInvalidation(commitInfo, sender, - clearResourcePathCache)); - } - - long nextPreviousTimeStamp = lastUpdateTime; - for (;;) - { - Runnable invalidationRunnable = null; - synchronized (outOfSequenceInvalidations) - { - OutOfSequenceInvalidation currentInvalidation = outOfSequenceInvalidations.remove(nextPreviousTimeStamp); - if (currentInvalidation == null) - { - // If we don't have the invalidation that follows the last one we processed, - // then there is nothing we can do right now - break; - } - - final CDOCommitInfo currentCommitInfo = currentInvalidation.getCommitInfo(); - final InternalCDOTransaction currentSender = currentInvalidation.getSender(); - final boolean currentClearResourcePathCache = currentInvalidation.isClearResourcePathCache(); - nextPreviousTimeStamp = currentCommitInfo.getTimeStamp(); - - invalidationRunnable = new Runnable() - { - public void run() - { - invalidateOrdered(currentCommitInfo, currentSender, currentClearResourcePathCache); - } - }; - - if (sender == null) - { - QueueRunner invalidationRunner = getInvalidationRunner(); - invalidationRunner.addWork(invalidationRunnable); - invalidationRunnable = null; - } - } - - if (invalidationRunnable != null) - { - invalidationRunnable.run(); - } - } - } - - /** - * This method is synchronized on outOfSequenceInvalidations by the caller! - */ - private QueueRunner getInvalidationRunner() - { - if (invalidationRunner == null) - { - invalidationRunner = new QueueRunner() - { - @Override - protected void noWork(WorkContext context) - { - if (isClosed()) - { - context.terminate(); - } - } - - @Override - protected String getThreadName() - { - return "CDOSessionInvalidationRunner-" + CDOSessionImpl.this; - } - }; - - invalidationRunner.activate(); - } - - return invalidationRunner; - } - - private void invalidateOrdered(CDOCommitInfo commitInfo, InternalCDOTransaction sender, boolean clearResourcePathCache) - { - Map<CDOID, InternalCDORevision> oldRevisions = null; - boolean success = commitInfo.getBranch() != null; - if (success) - { - oldRevisions = reviseRevisions(commitInfo); - } - - if (options.isPassiveUpdateEnabled()) - { - setLastUpdateTime(commitInfo.getTimeStamp()); - } - - if (success) - { - fireInvalidationEvent(sender, commitInfo); - commitInfoManager.notifyCommitInfoHandlers(commitInfo); - } - - for (InternalCDOView view : getViews()) - { - if (view != sender) - { - invalidateView(commitInfo, view, oldRevisions, clearResourcePathCache); - } - else - { - view.setLastUpdateTime(commitInfo.getTimeStamp()); - } - } + invalidator.endLocalCommit(token); } - private void invalidateView(CDOCommitInfo commitInfo, InternalCDOView view, - Map<CDOID, InternalCDORevision> oldRevisions, boolean clearResourcePathCache) + @Deprecated + public void invalidate(CDOCommitInfo commitInfo, InternalCDOTransaction sender) { - try - { - CDOBranch branch = commitInfo.getBranch(); - long lastUpdateTime = commitInfo.getTimeStamp(); - List<CDORevisionKey> allChangedObjects = commitInfo.getChangedObjects(); - List<CDOIDAndVersion> allDetachedObjects = commitInfo.getDetachedObjects(); - view.invalidate(branch, lastUpdateTime, allChangedObjects, allDetachedObjects, oldRevisions, true, - clearResourcePathCache); - } - catch (RuntimeException ex) - { - if (view.isActive()) - { - OM.LOG.error(ex); - } - else - { - OM.LOG.info(Messages.getString("CDOSessionImpl.1")); //$NON-NLS-1$ - } - } + invalidate(commitInfo, sender, true); } - /** - * @since 2.0 - */ - public void fireInvalidationEvent(InternalCDOTransaction sender, CDOCommitInfo commitInfo) + public void invalidate(CDOCommitInfo commitInfo, InternalCDOTransaction sender, boolean clearResourcePathCache) { - fireEvent(new InvalidationEvent(sender, commitInfo)); + invalidator.reorderInvalidations(commitInfo, sender, clearResourcePathCache); } public Object getAdapter(@SuppressWarnings("rawtypes") Class adapter) @@ -1393,6 +1255,7 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme protected void doActivate() throws Exception { super.doActivate(); + LifecycleUtil.activate(invalidator); InternalCDORemoteSessionManager remoteSessionManager = new CDORemoteSessionManagerImpl(); remoteSessionManager.setLocalSession(this); @@ -1406,11 +1269,9 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme @Override protected void doDeactivate() throws Exception { + LifecycleUtil.deactivate(invalidator); super.doDeactivate(); - LifecycleUtil.deactivate(invalidationRunner); - outOfSequenceInvalidations.clear(); - unhookSessionProtocol(); CDORemoteSessionManager remoteSessionManager = getRemoteSessionManager(); @@ -1455,49 +1316,6 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme /** * @author Eike Stepper - */ - private static final class OutOfSequenceInvalidation - { - private CDOCommitInfo commitInfo; - - private InternalCDOTransaction sender; - - private boolean clearResourcePathCache; - - public OutOfSequenceInvalidation(CDOCommitInfo commitInfo, InternalCDOTransaction sender, - boolean clearResourcePathCache) - { - this.commitInfo = commitInfo; - this.sender = sender; - this.clearResourcePathCache = clearResourcePathCache; - } - - public CDOCommitInfo getCommitInfo() - { - return commitInfo; - } - - public InternalCDOTransaction getSender() - { - return sender; - } - - public boolean isClearResourcePathCache() - { - return clearResourcePathCache; - } - } - - /** - * @author Eike Stepper - */ - private static final class OutOfSequenceInvalidations extends HashMap<Long, OutOfSequenceInvalidation> - { - private static final long serialVersionUID = 1L; - } - - /** - * @author Eike Stepper * @since 2.0 */ protected class OptionsImpl extends Notifier implements Options @@ -1829,6 +1647,240 @@ public abstract class CDOSessionImpl extends CDOTransactionContainerImpl impleme /** * @author Eike Stepper */ + private class Invalidator extends QueueRunner2<Invalidation> + { + private static final boolean DEBUG = false; + + private final Set<Object> unfinishedLocalCommits = new HashSet<Object>(); + + private final List<Invalidation> reorderQueue = new ArrayList<Invalidation>(); + + public Invalidator() + { + } + + public synchronized Object startLocalCommit() + { + if (!isActive()) + { + return null; + } + + final String threadName = Thread.currentThread().getName(); + Object token = new Object() + { + @Override + public String toString() + { + return threadName; + } + }; + + unfinishedLocalCommits.add(token); + return token; + } + + public synchronized void endLocalCommit(Object token) + { + unfinishedLocalCommits.remove(token); + } + + public synchronized void reorderInvalidations(CDOCommitInfo commitInfo, InternalCDOTransaction sender, + boolean clearResourcePathCache) + { + if (!isActive()) + { + return; + } + + Invalidation invalidation = new Invalidation(commitInfo, sender, clearResourcePathCache); + reorderQueue.add(invalidation); + Collections.sort(reorderQueue); + + if (DEBUG) + { + IOUtil.OUT().println( + CDOSessionImpl.this + " [" + getLastUpdateTime() % 10000 + "] " + commitInfo.getPreviousTimeStamp() % 10000 + + " --> " + commitInfo.getTimeStamp() % 10000 + " reorderQueue=" + reorderQueue + + " unfinishedLocalCommits=" + unfinishedLocalCommits); + } + + scheduleInvalidations(); + } + + public synchronized void scheduleInvalidations() + { + while (!reorderQueue.isEmpty() && canProcess(reorderQueue.get(0))) + { + Invalidation invalidation0 = reorderQueue.remove(0); + addWork(invalidation0); + } + } + + protected boolean canProcess(Invalidation invalidation) + { + if (options().isPassiveUpdateEnabled()) + { + long previousTimeStamp = invalidation.getPreviousTimeStamp(); + long lastUpdateTime = getLastUpdateTime(); + return previousTimeStamp <= lastUpdateTime; // Can be smaller in replication scenarios + } + + return unfinishedLocalCommits.size() == 1; // Ourselves + } + + @Override + protected void noWork(WorkContext context) + { + if (isClosed()) + { + context.terminate(); + } + } + + @Override + protected String getThreadName() + { + return "CDOSessionInvalidator-" + CDOSessionImpl.this; + } + } + + /** + * @author Eike Stepper + */ + private final class Invalidation implements Comparable<Invalidation>, Runnable + { + private final CDOCommitInfo commitInfo; + + private final InternalCDOTransaction sender; + + private final boolean clearResourcePathCache; + + public Invalidation(CDOCommitInfo commitInfo, InternalCDOTransaction sender, boolean clearResourcePathCache) + { + this.commitInfo = commitInfo; + this.sender = sender; + this.clearResourcePathCache = clearResourcePathCache; + } + + public long getTimeStamp() + { + return commitInfo.getTimeStamp(); + } + + public long getPreviousTimeStamp() + { + return commitInfo.getPreviousTimeStamp(); + } + + public int compareTo(Invalidation o) + { + return CDOCommonUtil.compareTimeStamps(getTimeStamp(), o.getTimeStamp()); + } + + @Override + public String toString() + { + return Long.toString(commitInfo.getTimeStamp() % 10000); + } + + public void run() + { + long timeStamp = commitInfo.getTimeStamp(); + + if (Invalidator.DEBUG) + { + IOUtil.OUT().println( + CDOSessionImpl.this + " [" + getLastUpdateTime() % 10000 + "] " + timeStamp % 10000 + " INVALIDATE"); + } + + try + { + Map<CDOID, InternalCDORevision> oldRevisions = null; + boolean success = commitInfo.getBranch() != null; + if (success) + { + oldRevisions = reviseRevisions(commitInfo); + } + + if (options.isPassiveUpdateEnabled()/* || sender != null */) + { + setLastUpdateTime(timeStamp); + } + + if (success) + { + CDOSessionImpl.this.fireEvent(new InvalidationEvent(sender, commitInfo)); + commitInfoManager.notifyCommitInfoHandlers(commitInfo); + } + + for (InternalCDOView view : getViews()) + { + invalidateView(commitInfo, view, sender, oldRevisions, clearResourcePathCache); + } + } + catch (RuntimeException ex) + { + if (isActive()) + { + throw ex; + } + + if (TRACER.isEnabled()) + { + TRACER.trace(Messages.getString("CDOSessionImpl.2")); //$NON-NLS-1$ + } + } + finally + { + // setLastUpdateTimeStamp() is not synchronized with the Invalidator. + // Give the Invalidator another chance to schedule Invalidations. + invalidator.scheduleInvalidations(); + } + } + + private void invalidateView(CDOCommitInfo commitInfo, InternalCDOView view, InternalCDOTransaction sender, + Map<CDOID, InternalCDORevision> oldRevisions, boolean clearResourcePathCache) + { + try + { + long lastUpdateTime = commitInfo.getTimeStamp(); + if (view == sender) + { + // The committing view (sender) is already valid, just the timestamp must be set "in sequence". + // Setting the sender's timestamp synchronously can lead to deadlock + view.invalidate(null, lastUpdateTime, null, null, null, true, false); + } + else + { + CDOBranch branch = commitInfo.getBranch(); + List<CDORevisionKey> allChangedObjects = commitInfo.getChangedObjects(); + List<CDOIDAndVersion> allDetachedObjects = commitInfo.getDetachedObjects(); + + view.invalidate(branch, lastUpdateTime, allChangedObjects, allDetachedObjects, oldRevisions, true, + clearResourcePathCache); + } + } + catch (RuntimeException ex) + { + if (view.isActive()) + { + OM.LOG.error(ex); + } + else + { + if (TRACER.isEnabled()) + { + TRACER.trace(Messages.getString("CDOSessionImpl.1")); //$NON-NLS-1$ + } + } + } + } + } + + /** + * @author Eike Stepper + */ private final class InvalidationEvent extends Event implements CDOSessionInvalidationEvent { private static final long serialVersionUID = 1L; |