diff options
Diffstat (limited to 'plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/syncing/RepositorySynchronizer.java')
-rw-r--r-- | plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/syncing/RepositorySynchronizer.java | 65 |
1 files changed, 59 insertions, 6 deletions
diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/syncing/RepositorySynchronizer.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/syncing/RepositorySynchronizer.java index 33bdeb1f3b..a2fe4df2de 100644 --- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/syncing/RepositorySynchronizer.java +++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/syncing/RepositorySynchronizer.java @@ -11,16 +11,20 @@ package org.eclipse.emf.cdo.internal.server.syncing; import org.eclipse.emf.cdo.common.CDOCommonRepository; +import org.eclipse.emf.cdo.common.CDOCommonSession.Options.LockNotificationMode; import org.eclipse.emf.cdo.common.CDOCommonSession.Options.PassiveUpdateMode; import org.eclipse.emf.cdo.common.branch.CDOBranch; import org.eclipse.emf.cdo.common.branch.CDOBranchCreatedEvent; import org.eclipse.emf.cdo.common.commit.CDOCommitInfo; import org.eclipse.emf.cdo.common.id.CDOID; +import org.eclipse.emf.cdo.common.lock.CDOLockChangeInfo; import org.eclipse.emf.cdo.internal.common.revision.NOOPRevisionCache; import org.eclipse.emf.cdo.internal.server.bundle.OM; +import org.eclipse.emf.cdo.server.StoreThreadLocal; import org.eclipse.emf.cdo.session.CDOSessionConfiguration; import org.eclipse.emf.cdo.session.CDOSessionConfigurationFactory; import org.eclipse.emf.cdo.session.CDOSessionInvalidationEvent; +import org.eclipse.emf.cdo.session.CDOSessionLocksChangedEvent; import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevisionCache; import org.eclipse.emf.cdo.spi.server.InternalRepositorySynchronizer; import org.eclipse.emf.cdo.spi.server.InternalSynchronizableRepository; @@ -66,12 +70,18 @@ public class RepositorySynchronizer extends QueueRunner implements InternalRepos private static final Integer COMMIT_PRIORITY = 3; + private static final Integer LOCKS_PRIORITY = COMMIT_PRIORITY; + private int retryInterval = DEFAULT_RETRY_INTERVAL; private Object connectLock = new Object(); private InternalSynchronizableRepository localRepository; + /** + * The session that connects to the master; used passively to receive notifications, and actively to request + * replications. + */ private InternalCDOSession remoteSession; private RemoteSessionListener remoteSessionListener = new RemoteSessionListener(); @@ -117,10 +127,10 @@ public class RepositorySynchronizer extends QueueRunner implements InternalRepos return remoteSessionConfigurationFactory; } - public void setRemoteSessionConfigurationFactory(CDOSessionConfigurationFactory remoteSessionConfigurationFactory) + public void setRemoteSessionConfigurationFactory(CDOSessionConfigurationFactory masterSessionConfigurationFactory) { - checkArg(remoteSessionConfigurationFactory, "remoteSessionConfigurationFactory"); //$NON-NLS-1$ - this.remoteSessionConfigurationFactory = remoteSessionConfigurationFactory; + checkArg(masterSessionConfigurationFactory, "remoteSessionConfigurationFactory"); //$NON-NLS-1$ + remoteSessionConfigurationFactory = masterSessionConfigurationFactory; } public InternalCDOSession getRemoteSession() @@ -206,7 +216,7 @@ public class RepositorySynchronizer extends QueueRunner implements InternalRepos super.doDeactivate(); } - private void disconnect() + private void handleDisconnect() { OM.LOG.info("Disconnected from master."); if (localRepository.getRootResourceID() == null) @@ -299,12 +309,17 @@ public class RepositorySynchronizer extends QueueRunner implements InternalRepos addWork(new CommitRunnable(e)); } } + else if (event instanceof CDOSessionLocksChangedEvent) + { + CDOSessionLocksChangedEvent e = (CDOSessionLocksChangedEvent)event; + addWork(new LocksRunnable(e)); + } else if (event instanceof ILifecycleEvent) { ILifecycleEvent e = (ILifecycleEvent)event; if (e.getKind() == ILifecycleEvent.Kind.DEACTIVATED && e.getSource() == remoteSession) { - disconnect(); + handleDisconnect(); } } } @@ -346,6 +361,7 @@ public class RepositorySynchronizer extends QueueRunner implements InternalRepos { CDOSessionConfiguration masterConfiguration = remoteSessionConfigurationFactory.createSessionConfiguration(); masterConfiguration.setPassiveUpdateMode(PassiveUpdateMode.ADDITIONS); + masterConfiguration.setLockNotificationMode(LockNotificationMode.ALWAYS); remoteSession = (InternalCDOSession)masterConfiguration.openSession(); @@ -442,7 +458,7 @@ public class RepositorySynchronizer extends QueueRunner implements InternalRepos { OM.LOG.warn("Replication attempt failed. Retrying in " + retryInterval + " seconds...", ex); sleepRetryInterval(); - disconnect(); + handleDisconnect(); } } } @@ -573,4 +589,41 @@ public class RepositorySynchronizer extends QueueRunner implements InternalRepos return COMMIT_PRIORITY; } } + + /** + * @author Caspar De Groot + */ + private final class LocksRunnable extends QueueRunnable + { + private CDOLockChangeInfo lockChangeInfo; + + public LocksRunnable(CDOLockChangeInfo lockChangeInfo) + { + this.lockChangeInfo = lockChangeInfo; + } + + public void run() + { + try + { + StoreThreadLocal.setSession(localRepository.getReplicatorSession()); + localRepository.handleLockChangeInfo(lockChangeInfo); + } + catch (Exception ex) + { + // TODO (CD) Retry as for commit? + ex.printStackTrace(); + } + finally + { + StoreThreadLocal.release(); + } + } + + @Override + protected Integer getPriority() + { + return LOCKS_PRIORITY; + } + } } |