Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/syncing/RepositorySynchronizer.java')
-rw-r--r--plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/syncing/RepositorySynchronizer.java65
1 files changed, 59 insertions, 6 deletions
diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/syncing/RepositorySynchronizer.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/syncing/RepositorySynchronizer.java
index 33bdeb1f3b..a2fe4df2de 100644
--- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/syncing/RepositorySynchronizer.java
+++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/syncing/RepositorySynchronizer.java
@@ -11,16 +11,20 @@
package org.eclipse.emf.cdo.internal.server.syncing;
import org.eclipse.emf.cdo.common.CDOCommonRepository;
+import org.eclipse.emf.cdo.common.CDOCommonSession.Options.LockNotificationMode;
import org.eclipse.emf.cdo.common.CDOCommonSession.Options.PassiveUpdateMode;
import org.eclipse.emf.cdo.common.branch.CDOBranch;
import org.eclipse.emf.cdo.common.branch.CDOBranchCreatedEvent;
import org.eclipse.emf.cdo.common.commit.CDOCommitInfo;
import org.eclipse.emf.cdo.common.id.CDOID;
+import org.eclipse.emf.cdo.common.lock.CDOLockChangeInfo;
import org.eclipse.emf.cdo.internal.common.revision.NOOPRevisionCache;
import org.eclipse.emf.cdo.internal.server.bundle.OM;
+import org.eclipse.emf.cdo.server.StoreThreadLocal;
import org.eclipse.emf.cdo.session.CDOSessionConfiguration;
import org.eclipse.emf.cdo.session.CDOSessionConfigurationFactory;
import org.eclipse.emf.cdo.session.CDOSessionInvalidationEvent;
+import org.eclipse.emf.cdo.session.CDOSessionLocksChangedEvent;
import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevisionCache;
import org.eclipse.emf.cdo.spi.server.InternalRepositorySynchronizer;
import org.eclipse.emf.cdo.spi.server.InternalSynchronizableRepository;
@@ -66,12 +70,18 @@ public class RepositorySynchronizer extends QueueRunner implements InternalRepos
private static final Integer COMMIT_PRIORITY = 3;
+ private static final Integer LOCKS_PRIORITY = COMMIT_PRIORITY;
+
private int retryInterval = DEFAULT_RETRY_INTERVAL;
private Object connectLock = new Object();
private InternalSynchronizableRepository localRepository;
+ /**
+ * The session that connects to the master; used passively to receive notifications, and actively to request
+ * replications.
+ */
private InternalCDOSession remoteSession;
private RemoteSessionListener remoteSessionListener = new RemoteSessionListener();
@@ -117,10 +127,10 @@ public class RepositorySynchronizer extends QueueRunner implements InternalRepos
return remoteSessionConfigurationFactory;
}
- public void setRemoteSessionConfigurationFactory(CDOSessionConfigurationFactory remoteSessionConfigurationFactory)
+ public void setRemoteSessionConfigurationFactory(CDOSessionConfigurationFactory masterSessionConfigurationFactory)
{
- checkArg(remoteSessionConfigurationFactory, "remoteSessionConfigurationFactory"); //$NON-NLS-1$
- this.remoteSessionConfigurationFactory = remoteSessionConfigurationFactory;
+ checkArg(masterSessionConfigurationFactory, "remoteSessionConfigurationFactory"); //$NON-NLS-1$
+ remoteSessionConfigurationFactory = masterSessionConfigurationFactory;
}
public InternalCDOSession getRemoteSession()
@@ -206,7 +216,7 @@ public class RepositorySynchronizer extends QueueRunner implements InternalRepos
super.doDeactivate();
}
- private void disconnect()
+ private void handleDisconnect()
{
OM.LOG.info("Disconnected from master.");
if (localRepository.getRootResourceID() == null)
@@ -299,12 +309,17 @@ public class RepositorySynchronizer extends QueueRunner implements InternalRepos
addWork(new CommitRunnable(e));
}
}
+ else if (event instanceof CDOSessionLocksChangedEvent)
+ {
+ CDOSessionLocksChangedEvent e = (CDOSessionLocksChangedEvent)event;
+ addWork(new LocksRunnable(e));
+ }
else if (event instanceof ILifecycleEvent)
{
ILifecycleEvent e = (ILifecycleEvent)event;
if (e.getKind() == ILifecycleEvent.Kind.DEACTIVATED && e.getSource() == remoteSession)
{
- disconnect();
+ handleDisconnect();
}
}
}
@@ -346,6 +361,7 @@ public class RepositorySynchronizer extends QueueRunner implements InternalRepos
{
CDOSessionConfiguration masterConfiguration = remoteSessionConfigurationFactory.createSessionConfiguration();
masterConfiguration.setPassiveUpdateMode(PassiveUpdateMode.ADDITIONS);
+ masterConfiguration.setLockNotificationMode(LockNotificationMode.ALWAYS);
remoteSession = (InternalCDOSession)masterConfiguration.openSession();
@@ -442,7 +458,7 @@ public class RepositorySynchronizer extends QueueRunner implements InternalRepos
{
OM.LOG.warn("Replication attempt failed. Retrying in " + retryInterval + " seconds...", ex);
sleepRetryInterval();
- disconnect();
+ handleDisconnect();
}
}
}
@@ -573,4 +589,41 @@ public class RepositorySynchronizer extends QueueRunner implements InternalRepos
return COMMIT_PRIORITY;
}
}
+
+ /**
+ * @author Caspar De Groot
+ */
+ private final class LocksRunnable extends QueueRunnable
+ {
+ private CDOLockChangeInfo lockChangeInfo;
+
+ public LocksRunnable(CDOLockChangeInfo lockChangeInfo)
+ {
+ this.lockChangeInfo = lockChangeInfo;
+ }
+
+ public void run()
+ {
+ try
+ {
+ StoreThreadLocal.setSession(localRepository.getReplicatorSession());
+ localRepository.handleLockChangeInfo(lockChangeInfo);
+ }
+ catch (Exception ex)
+ {
+ // TODO (CD) Retry as for commit?
+ ex.printStackTrace();
+ }
+ finally
+ {
+ StoreThreadLocal.release();
+ }
+ }
+
+ @Override
+ protected Integer getPriority()
+ {
+ return LOCKS_PRIORITY;
+ }
+ }
}

Back to the top