Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEike Stepper2010-09-22 10:39:55 +0000
committerEike Stepper2010-09-22 10:39:55 +0000
commita40380421fe64c1acdd4050de89296612a4cb3e6 (patch)
treee8f1f5d54cdc1858932e572d7c310198baf7b3f9
parentc3c44231bcc5b7a4c283d421aef67292e2d60d5b (diff)
downloadcdo-a40380421fe64c1acdd4050de89296612a4cb3e6.tar.gz
cdo-a40380421fe64c1acdd4050de89296612a4cb3e6.tar.xz
cdo-a40380421fe64c1acdd4050de89296612a4cb3e6.zip
[325928] Provide FailoverMonitor server and FailoverAgents to coordinate fail-over scenarios
https://bugs.eclipse.org/bugs/show_bug.cgi?id=325928
-rw-r--r--plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/CDONet4jSessionConfigurationImpl.java51
-rw-r--r--plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/FailoverCDOSessionConfigurationImpl.java163
-rw-r--r--plugins/org.eclipse.emf.cdo.server.net4j/plugin.xml6
-rw-r--r--plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/CDONet4jServerUtil.java3
-rw-r--r--plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java163
5 files changed, 340 insertions, 46 deletions
diff --git a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/CDONet4jSessionConfigurationImpl.java b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/CDONet4jSessionConfigurationImpl.java
index 2f819cfb71..86cdc3f6ef 100644
--- a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/CDONet4jSessionConfigurationImpl.java
+++ b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/CDONet4jSessionConfigurationImpl.java
@@ -195,29 +195,7 @@ public class CDONet4jSessionConfigurationImpl extends CDOSessionConfigurationImp
public void activateSession(InternalCDOSession session) throws Exception
{
super.activateSession(session);
-
- CDOClientProtocol protocol = new CDOClientProtocol();
- protocol.setInfraStructure(session);
- if (streamWrapper != null)
- {
- protocol.setStreamWrapper(streamWrapper);
- }
-
- session.setSessionProtocol(protocol);
- if (connector != null)
- {
- protocol.setFailOverStrategy(new NOOPFailOverStrategy(connector));
- }
- else if (failOverStrategy != null)
- {
- protocol.setFailOverStrategy(failOverStrategy);
- }
-
- OpenSessionResult result = protocol.openSession(repositoryName, isPassiveUpdateEnabled(), getPassiveUpdateMode());
- session.setSessionID(result.getSessionID());
- session.setUserID(result.getUserID());
- session.setLastUpdateTime(result.getLastUpdateTime());
- session.setRepositoryInfo(new RepositoryInfo(repositoryName, result));
+ OpenSessionResult result = initProtocol(session);
if (packageRegistry == null)
{
@@ -273,6 +251,33 @@ public class CDONet4jSessionConfigurationImpl extends CDOSessionConfigurationImp
}
}
+ protected OpenSessionResult initProtocol(InternalCDOSession session)
+ {
+ CDOClientProtocol protocol = new CDOClientProtocol();
+ protocol.setInfraStructure(session);
+ if (streamWrapper != null)
+ {
+ protocol.setStreamWrapper(streamWrapper);
+ }
+
+ session.setSessionProtocol(protocol);
+ if (connector != null)
+ {
+ protocol.setFailOverStrategy(new NOOPFailOverStrategy(connector));
+ }
+ else if (failOverStrategy != null)
+ {
+ protocol.setFailOverStrategy(failOverStrategy);
+ }
+
+ OpenSessionResult result = protocol.openSession(repositoryName, isPassiveUpdateEnabled(), getPassiveUpdateMode());
+ session.setSessionID(result.getSessionID());
+ session.setUserID(result.getUserID());
+ session.setLastUpdateTime(result.getLastUpdateTime());
+ session.setRepositoryInfo(new RepositoryInfo(repositoryName, result));
+ return result;
+ }
+
@Override
public void deactivateSession(InternalCDOSession session) throws Exception
{
diff --git a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/FailoverCDOSessionConfigurationImpl.java b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/FailoverCDOSessionConfigurationImpl.java
new file mode 100644
index 0000000000..cb010938ac
--- /dev/null
+++ b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/FailoverCDOSessionConfigurationImpl.java
@@ -0,0 +1,163 @@
+/**
+ * Copyright (c) 2004 - 2010 Eike Stepper (Berlin, Germany) and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ */
+package org.eclipse.emf.cdo.internal.net4j;
+
+import org.eclipse.emf.cdo.net4j.FailoverCDOSessionConfiguration;
+import org.eclipse.emf.cdo.session.CDOSession.ExceptionHandler;
+
+import org.eclipse.net4j.connector.IConnector;
+import org.eclipse.net4j.signal.RequestWithConfirmation;
+import org.eclipse.net4j.signal.SignalProtocol;
+import org.eclipse.net4j.signal.failover.IFailOverStrategy;
+import org.eclipse.net4j.util.WrappedException;
+import org.eclipse.net4j.util.collection.Pair;
+import org.eclipse.net4j.util.container.IManagedContainer;
+import org.eclipse.net4j.util.container.IPluginContainer;
+import org.eclipse.net4j.util.io.ExtendedDataInputStream;
+import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
+
+import org.eclipse.emf.spi.cdo.InternalCDOSession;
+
+/**
+ * @author Eike Stepper
+ * @since 4.0
+ */
+public class FailoverCDOSessionConfigurationImpl extends CDONet4jSessionConfigurationImpl implements
+ FailoverCDOSessionConfiguration
+{
+ private String monitorConnectorDescription;
+
+ private String repositoryGroup;
+
+ public FailoverCDOSessionConfigurationImpl(String monitorConnectorDescription, String repositoryGroup)
+ {
+ this.monitorConnectorDescription = monitorConnectorDescription;
+ this.repositoryGroup = repositoryGroup;
+ }
+
+ public String getMonitorConnectorDescription()
+ {
+ return monitorConnectorDescription;
+ }
+
+ public String getRepositoryGroup()
+ {
+ return repositoryGroup;
+ }
+
+ @Override
+ public void setRepositoryName(String repositoryName)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setConnector(IConnector connector)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setFailOverStrategy(IFailOverStrategy failOverStrategy)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setExceptionHandler(ExceptionHandler exceptionHandler)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void superSetRepositoryName(String repositoryName)
+ {
+ super.setRepositoryName(repositoryName);
+ }
+
+ public void superSetConnector(IConnector connector)
+ {
+ super.setConnector(connector);
+ }
+
+ public void superSetFailOverStrategy(IFailOverStrategy failOverStrategy)
+ {
+ super.setFailOverStrategy(failOverStrategy);
+ }
+
+ public void superSetExceptionHandler(ExceptionHandler exceptionHandler)
+ {
+ super.setExceptionHandler(exceptionHandler);
+ }
+
+ @Override
+ public InternalCDOSession createSession()
+ {
+ return new FailoverCDOSessionImpl(this);
+ }
+
+ public void sessionProtocolDeactivated(FailoverCDOSessionImpl session)
+ {
+ Pair<String, String> info = queryRepositoryInfoFromMonitor();
+ IConnector connector = getConnector(info.getElement1());
+ String repositoryName = null;
+
+ superSetConnector(connector);
+ superSetRepositoryName(repositoryName);
+ initProtocol(session);
+
+ // TODO Re-register all remote sessions
+ // TODO Re-register all views
+ }
+
+ protected Pair<String, String> queryRepositoryInfoFromMonitor()
+ {
+ SignalProtocol<Object> protocol = new SignalProtocol<Object>("failover-client");
+
+ try
+ {
+ protocol.open(getConnector(monitorConnectorDescription));
+ return new RequestWithConfirmation<Pair<String, String>>(protocol, (short)1)
+ {
+ @Override
+ protected void requesting(ExtendedDataOutputStream out) throws Exception
+ {
+ out.writeString(repositoryGroup);
+ }
+
+ @Override
+ protected Pair<String, String> confirming(ExtendedDataInputStream in) throws Exception
+ {
+ String connectorDescription = in.readString();
+ String repositoryName = in.readString();
+ return new Pair<String, String>(connectorDescription, repositoryName);
+ }
+ }.send();
+ }
+ catch (Exception ex)
+ {
+ throw WrappedException.wrap(ex);
+ }
+ finally
+ {
+ protocol.close();
+ }
+ }
+
+ protected IConnector getConnector(String description)
+ {
+ return (IConnector)getContainer().getElement("org.eclipse.net4j.connectors", "tcp", description);
+ }
+
+ protected IManagedContainer getContainer()
+ {
+ return IPluginContainer.INSTANCE;
+ }
+}
diff --git a/plugins/org.eclipse.emf.cdo.server.net4j/plugin.xml b/plugins/org.eclipse.emf.cdo.server.net4j/plugin.xml
index 5a88a22341..024c171920 100644
--- a/plugins/org.eclipse.emf.cdo.server.net4j/plugin.xml
+++ b/plugins/org.eclipse.emf.cdo.server.net4j/plugin.xml
@@ -22,7 +22,11 @@
<factory
productGroup="org.eclipse.net4j.serverProtocols"
type="failover"
- class="org.eclipse.emf.cdo.server.net4j.FailoverMonitor$Protocol$Factory"/>
+ class="org.eclipse.emf.cdo.server.net4j.FailoverMonitor$AgentProtocol$Factory"/>
+ <factory
+ productGroup="org.eclipse.net4j.serverProtocols"
+ type="failover-client"
+ class="org.eclipse.emf.cdo.server.net4j.FailoverMonitor$ClientProtocol$Factory"/>
<factory
productGroup="org.eclipse.emf.cdo.server.net4j.failoverMonitors"
type="net4j"
diff --git a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/CDONet4jServerUtil.java b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/CDONet4jServerUtil.java
index 86dcba62e8..343cfc0943 100644
--- a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/CDONet4jServerUtil.java
+++ b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/CDONet4jServerUtil.java
@@ -29,7 +29,8 @@ public final class CDONet4jServerUtil
{
container.registerFactory(new CDOServerProtocolFactory(repositoryProvider));
container.registerFactory(new FailoverMonitor.Factory());
- container.registerFactory(new FailoverMonitor.Protocol.Factory(container));
+ container.registerFactory(new FailoverMonitor.AgentProtocol.Factory(container));
+ container.registerFactory(new FailoverMonitor.ClientProtocol.Factory(container));
}
public static void prepareContainer(IManagedContainer container)
diff --git a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java
index 319ddccf9c..a25e21d6ca 100644
--- a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java
+++ b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java
@@ -12,7 +12,10 @@ package org.eclipse.emf.cdo.server.net4j;
import org.eclipse.emf.cdo.server.internal.net4j.bundle.OM;
+import org.eclipse.net4j.signal.IndicationWithResponse;
import org.eclipse.net4j.signal.Request;
+import org.eclipse.net4j.signal.SignalProtocol;
+import org.eclipse.net4j.signal.SignalReactor;
import org.eclipse.net4j.signal.heartbeat.HeartBeatProtocol;
import org.eclipse.net4j.util.collection.Pair;
import org.eclipse.net4j.util.container.Container;
@@ -43,9 +46,9 @@ public class FailoverMonitor extends Container<Pair<String, String>>
private String group;
- private Map<Protocol, Pair<String, String>> agents = new HashMap<Protocol, Pair<String, String>>();
+ private Map<AgentProtocol, Pair<String, String>> agents = new HashMap<AgentProtocol, Pair<String, String>>();
- private Protocol masterAgent;
+ private AgentProtocol masterAgent;
public FailoverMonitor()
{
@@ -71,12 +74,12 @@ public class FailoverMonitor extends Container<Pair<String, String>>
}
}
- public Map<Protocol, Pair<String, String>> getAgents()
+ public Map<AgentProtocol, Pair<String, String>> getAgents()
{
return Collections.unmodifiableMap(agents);
}
- public Protocol getMasterAgent()
+ public AgentProtocol getMasterAgent()
{
synchronized (agents)
{
@@ -84,7 +87,7 @@ public class FailoverMonitor extends Container<Pair<String, String>>
}
}
- public void registerAgent(Protocol agent, String connectorDescription, String repositoryName)
+ public void registerAgent(AgentProtocol agent, String connectorDescription, String repositoryName)
{
Pair<String, String> pair = new Pair<String, String>(connectorDescription, repositoryName);
synchronized (agents)
@@ -101,7 +104,7 @@ public class FailoverMonitor extends Container<Pair<String, String>>
fireElementAddedEvent(pair);
}
- public void deregisterAgent(Protocol agent)
+ public void deregisterAgent(AgentProtocol agent)
{
Pair<String, String> pair = null;
synchronized (agents)
@@ -135,15 +138,15 @@ public class FailoverMonitor extends Container<Pair<String, String>>
checkState(group, "group");
}
- protected Protocol electNewMaster(Map<Protocol, Pair<String, String>> agents)
+ protected AgentProtocol electNewMaster(Map<AgentProtocol, Pair<String, String>> agents)
{
return agents.keySet().iterator().next();
}
- private void publishNewMaster(Protocol masterAgent)
+ private void publishNewMaster(AgentProtocol masterAgent)
{
final Pair<String, String> masterInfos = agents.get(masterAgent);
- for (Protocol agent : agents.keySet())
+ for (AgentProtocol agent : agents.keySet())
{
final boolean master = agent == masterAgent;
@@ -205,13 +208,38 @@ public class FailoverMonitor extends Container<Pair<String, String>>
/**
* @author Eike Stepper
*/
- public static class Protocol extends HeartBeatProtocol.Server
+ public static abstract class AbstractServerProtocolFactory extends ServerProtocolFactory implements
+ FailoverMonitor.Provider
+ {
+ private IManagedContainer container;
+
+ protected AbstractServerProtocolFactory(String type)
+ {
+ this(type, IPluginContainer.INSTANCE);
+ }
+
+ protected AbstractServerProtocolFactory(String type, IManagedContainer container)
+ {
+ super(type);
+ this.container = container;
+ }
+
+ public FailoverMonitor getFailoverMonitor(String group)
+ {
+ return (FailoverMonitor)container.getElement(FailoverMonitor.PRODUCT_GROUP, "net4j", group);
+ }
+ }
+
+ /**
+ * @author Eike Stepper
+ */
+ public static class AgentProtocol extends HeartBeatProtocol.Server
{
private FailoverMonitor.Provider failoverMonitorProvider;
private FailoverMonitor failoverMonitor;
- public Protocol(Provider failOverMonitorProvider)
+ public AgentProtocol(Provider failOverMonitorProvider)
{
super(PROTOCOL_NAME);
failoverMonitorProvider = failOverMonitorProvider;
@@ -244,29 +272,122 @@ public class FailoverMonitor extends Container<Pair<String, String>>
/**
* @author Eike Stepper
*/
- public static class Factory extends ServerProtocolFactory implements FailoverMonitor.Provider
+ public static class Factory extends AbstractServerProtocolFactory
{
- private IManagedContainer container;
-
public Factory(IManagedContainer container)
{
- super(PROTOCOL_NAME);
- this.container = container;
+ super(PROTOCOL_NAME, container);
}
public Factory()
{
- this(IPluginContainer.INSTANCE);
+ super(PROTOCOL_NAME);
}
- public Object create(String description) throws ProductCreationException
+ public AgentProtocol create(String description) throws ProductCreationException
{
- return new FailoverMonitor.Protocol(this);
+ return new AgentProtocol(this);
+ }
+ }
+ }
+
+ /**
+ * @author Eike Stepper
+ */
+ public static class ClientProtocol extends SignalProtocol<Object>
+ {
+ public static final String PROTOCOL_NAME = "failover-client"; //$NON-NLS-1$
+
+ public static final short SIGNAL_QUERY_REPOSITORY_INFO = 1;
+
+ private FailoverMonitor.Provider failoverMonitorProvider;
+
+ private FailoverMonitor failoverMonitor;
+
+ public ClientProtocol(Provider failOverMonitorProvider)
+ {
+ super("failover-client");
+ failoverMonitorProvider = failOverMonitorProvider;
+ }
+
+ @Override
+ protected SignalReactor createSignalReactor(short signalID)
+ {
+ switch (signalID)
+ {
+ case SIGNAL_QUERY_REPOSITORY_INFO:
+ return new IndicationWithResponse(this, SIGNAL_QUERY_REPOSITORY_INFO)
+ {
+ @Override
+ protected void indicating(ExtendedDataInputStream in) throws Exception
+ {
+ String group = in.readString();
+ failoverMonitor = failoverMonitorProvider.getFailoverMonitor(group);
+ if (failoverMonitor == null)
+ {
+ throw new IllegalStateException("No monitor available for fail-over group " + group);
+ }
+ }
+
+ @Override
+ protected void responding(ExtendedDataOutputStream out) throws Exception
+ {
+ AgentProtocol masterAgent = getMasterInfos();
+ Pair<String, String> masterInfos = failoverMonitor.getAgents().get(masterAgent);
+
+ out.writeString(masterInfos.getElement1());
+ out.writeString(masterInfos.getElement2());
+
+ for (int i = 0; i < 100; i++)
+ {
+ Thread.sleep(100L);
+ if (!getProtocol().isActive())
+ {
+ return;
+ }
+ }
+
+ getProtocol().deactivate();
+ }
+
+ protected AgentProtocol getMasterInfos() throws InterruptedException
+ {
+ for (;;)
+ {
+ AgentProtocol masterAgent = failoverMonitor.getMasterAgent();
+ if (masterAgent != null)
+ {
+ return masterAgent;
+ }
+
+ Thread.sleep(100L);
+ }
+ }
+ };
+
+ default:
+ return super.createSignalReactor(signalID);
+ }
+ }
+
+ /**
+ * @author Eike Stepper
+ */
+ public static class Factory extends AbstractServerProtocolFactory
+ {
+ public Factory(IManagedContainer container)
+ {
+ super(PROTOCOL_NAME, container);
+ }
+
+ public Factory()
+ {
+ super(PROTOCOL_NAME);
}
- public FailoverMonitor getFailoverMonitor(String group)
+ public ClientProtocol create(String description) throws ProductCreationException
{
- return (FailoverMonitor)container.getElement(FailoverMonitor.PRODUCT_GROUP, "net4j", group);
+ return new ClientProtocol(this);
}
}
}

Back to the top