Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEike Stepper2010-09-22 15:20:24 +0000
committerEike Stepper2010-09-22 15:20:24 +0000
commit70dda6326113fbf48486e1e1906e04654e4b869f (patch)
tree33f1842804fd39be0b6b5dd37f612d327865b61a
parentb3444d952ce0a151400144c5cc70eb6aaf4b45db (diff)
downloadcdo-70dda6326113fbf48486e1e1906e04654e4b869f.tar.gz
cdo-70dda6326113fbf48486e1e1906e04654e4b869f.tar.xz
cdo-70dda6326113fbf48486e1e1906e04654e4b869f.zip
[325928] Provide FailoverMonitor server and FailoverAgents to coordinate fail-over scenarios
https://bugs.eclipse.org/bugs/show_bug.cgi?id=325928
-rw-r--r--plugins/org.eclipse.emf.cdo.examples/src/org/eclipse/emf/cdo/examples/server/FailoverExample.java63
-rw-r--r--plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/FailoverCDOSessionConfigurationImpl.java20
-rw-r--r--plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/CDONet4jUtil.java16
-rw-r--r--plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java128
4 files changed, 147 insertions, 80 deletions
diff --git a/plugins/org.eclipse.emf.cdo.examples/src/org/eclipse/emf/cdo/examples/server/FailoverExample.java b/plugins/org.eclipse.emf.cdo.examples/src/org/eclipse/emf/cdo/examples/server/FailoverExample.java
index 200fb9873d..3662029eab 100644
--- a/plugins/org.eclipse.emf.cdo.examples/src/org/eclipse/emf/cdo/examples/server/FailoverExample.java
+++ b/plugins/org.eclipse.emf.cdo.examples/src/org/eclipse/emf/cdo/examples/server/FailoverExample.java
@@ -16,7 +16,9 @@ import org.eclipse.emf.cdo.common.revision.cache.CDORevisionCache;
import org.eclipse.emf.cdo.common.util.RepositoryStateChangedEvent;
import org.eclipse.emf.cdo.common.util.RepositoryTypeChangedEvent;
import org.eclipse.emf.cdo.net4j.CDONet4jUtil;
+import org.eclipse.emf.cdo.net4j.CDOSession;
import org.eclipse.emf.cdo.net4j.CDOSessionConfiguration;
+import org.eclipse.emf.cdo.net4j.CDOSessionFailoverEvent;
import org.eclipse.emf.cdo.server.CDOServerUtil;
import org.eclipse.emf.cdo.server.IRepository;
import org.eclipse.emf.cdo.server.IRepositorySynchronizer;
@@ -39,7 +41,6 @@ import org.eclipse.net4j.db.IDBAdapter;
import org.eclipse.net4j.db.IDBConnectionProvider;
import org.eclipse.net4j.db.h2.H2Adapter;
import org.eclipse.net4j.tcp.TCPUtil;
-import org.eclipse.net4j.util.collection.Pair;
import org.eclipse.net4j.util.container.ContainerEventAdapter;
import org.eclipse.net4j.util.container.ContainerUtil;
import org.eclipse.net4j.util.container.IContainer;
@@ -54,7 +55,6 @@ import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
-import java.util.Map.Entry;
/**
* @author Eike Stepper
@@ -402,7 +402,7 @@ public abstract class FailoverExample
*/
public static class Monitored extends FailoverExample
{
- public static final String REPOSITORY_MONITOR_GROUP = "ExampleGroup";
+ public static final String REPOSITORY_GROUP = "ExampleGroup";
public static final int REPOSITORY_MONITOR_PORT = 2038;
@@ -440,7 +440,7 @@ public abstract class FailoverExample
agent.setMonitorConnector(createConnector("localhost:" + REPOSITORY_MONITOR_PORT));
agent.setConnectorDescription(host + ":" + port);
agent.setRepository(repository);
- agent.setGroup(REPOSITORY_MONITOR_GROUP);
+ agent.setGroup(REPOSITORY_GROUP);
agent.setRate(500L);
agent.setTimeout(2000L);
agent.activate();
@@ -457,36 +457,31 @@ public abstract class FailoverExample
{
IManagedContainer container = createContainer();
FailoverMonitor monitor = (FailoverMonitor)container.getElement(FailoverMonitor.PRODUCT_GROUP, "net4j",
- REPOSITORY_MONITOR_GROUP);
+ REPOSITORY_GROUP);
- monitor.addListener(new ContainerEventAdapter<Pair<String, String>>()
+ monitor.addListener(new ContainerEventAdapter<AgentProtocol>()
{
@Override
- protected void onAdded(IContainer<Pair<String, String>> monitor, Pair<String, String> agent)
+ protected void onAdded(IContainer<AgentProtocol> monitor, AgentProtocol agent)
{
dump((FailoverMonitor)monitor, "Added", agent);
}
@Override
- protected void onRemoved(IContainer<Pair<String, String>> monitor, Pair<String, String> agent)
+ protected void onRemoved(IContainer<AgentProtocol> monitor, AgentProtocol agent)
{
dump((FailoverMonitor)monitor, "Removed", agent);
}
- private void dump(FailoverMonitor monitor, String event, Pair<String, String> agent)
+ private void dump(FailoverMonitor monitor, String event, AgentProtocol agent)
{
- System.out.println(event + " agent " + format(agent));
- for (Entry<AgentProtocol, Pair<String, String>> entry : monitor.getAgents().entrySet())
+ System.out.println(event + " agent " + agent);
+ for (AgentProtocol element : monitor.getElements())
{
- String type = entry.getKey() == monitor.getMasterAgent() ? "MASTER: " : "BACKUP: ";
- System.out.println(" " + type + format(entry.getValue()));
+ String type = element == monitor.getMasterAgent() ? "MASTER: " : "BACKUP: ";
+ System.out.println(" " + type + element);
}
}
-
- private String format(Pair<String, String> agent)
- {
- return agent.getElement1() + "/" + agent.getElement2();
- }
});
container.getElement("org.eclipse.net4j.acceptors", TRANSPORT_TYPE, "0.0.0.0:" + REPOSITORY_MONITOR_PORT);
@@ -536,5 +531,37 @@ public abstract class FailoverExample
example.done();
}
}
+
+ /**
+ * @author Eike Stepper
+ */
+ public static final class Client
+ {
+ public static void main(String[] args) throws Exception
+ {
+ IManagedContainer container = createContainer();
+ CDOSessionConfiguration configuration = CDONet4jUtil.createFailoverSessionConfiguration(container, "localhost:"
+ + REPOSITORY_MONITOR_PORT, REPOSITORY_GROUP);
+
+ CDOSession session = configuration.openSession();
+ session.addListener(new IListener()
+ {
+ public void notifyEvent(IEvent event)
+ {
+ if (event instanceof CDOSessionFailoverEvent)
+ {
+ CDOSessionFailoverEvent e = (CDOSessionFailoverEvent)event;
+ System.out.println("Failover " + e.getType() + ": " + e.getSource().getRepositoryInfo());
+ }
+ }
+ });
+
+ System.out.println("Connected");
+ while (!session.isClosed())
+ {
+ Thread.sleep(100);
+ }
+ }
+ }
}
}
diff --git a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/FailoverCDOSessionConfigurationImpl.java b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/FailoverCDOSessionConfigurationImpl.java
index 9efd438c24..814ba8b10f 100644
--- a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/FailoverCDOSessionConfigurationImpl.java
+++ b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/FailoverCDOSessionConfigurationImpl.java
@@ -109,6 +109,7 @@ public class FailoverCDOSessionConfigurationImpl extends CDONet4jSessionConfigur
@Override
public InternalCDOSession createSession()
{
+ updateConnectorAndRepositoryName();
return new FailoverCDOSessionImpl(this);
}
@@ -120,12 +121,7 @@ public class FailoverCDOSessionConfigurationImpl extends CDONet4jSessionConfigur
setPassiveUpdateEnabled(session.options().isPassiveUpdateEnabled());
setPassiveUpdateMode(session.options().getPassiveUpdateMode());
- Pair<String, String> info = queryRepositoryInfoFromMonitor();
- IConnector connector = getConnector(info.getElement1());
- String repositoryName = null;
-
- superSetConnector(connector);
- superSetRepositoryName(repositoryName);
+ updateConnectorAndRepositoryName();
initProtocol(session);
reregisterViews(session, targets);
@@ -143,6 +139,18 @@ public class FailoverCDOSessionConfigurationImpl extends CDONet4jSessionConfigur
}
}
+ private void updateConnectorAndRepositoryName()
+ {
+ Pair<String, String> info = queryRepositoryInfoFromMonitor();
+ IConnector connector = getConnector(info.getElement1());
+ String repositoryName = info.getElement2();
+
+ System.out.println("Connecting to " + info.getElement1() + "/" + repositoryName);
+
+ superSetConnector(connector);
+ superSetRepositoryName(repositoryName);
+ }
+
protected Pair<String, String> queryRepositoryInfoFromMonitor()
{
SignalProtocol<Object> protocol = new SignalProtocol<Object>("failover-client");
diff --git a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/CDONet4jUtil.java b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/CDONet4jUtil.java
index 459bec877b..799dd9d546 100644
--- a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/CDONet4jUtil.java
+++ b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/CDONet4jUtil.java
@@ -37,6 +37,22 @@ public final class CDONet4jUtil
/**
* @since 4.0
*/
+ public static FailoverCDOSessionConfiguration createFailoverSessionConfiguration(final IManagedContainer container,
+ String monitorConnectorDescription, String repositoryGroup)
+ {
+ return new FailoverCDOSessionConfigurationImpl(monitorConnectorDescription, repositoryGroup)
+ {
+ @Override
+ protected IManagedContainer getContainer()
+ {
+ return container;
+ }
+ };
+ }
+
+ /**
+ * @since 4.0
+ */
public static FailoverCDOSessionConfiguration createFailoverSessionConfiguration(String monitorConnectorDescription,
String repositoryGroup)
{
diff --git a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java
index a25e21d6ca..eda3e30375 100644
--- a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java
+++ b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java
@@ -11,13 +11,13 @@
package org.eclipse.emf.cdo.server.net4j;
import org.eclipse.emf.cdo.server.internal.net4j.bundle.OM;
+import org.eclipse.emf.cdo.server.net4j.FailoverMonitor.AgentProtocol;
import org.eclipse.net4j.signal.IndicationWithResponse;
import org.eclipse.net4j.signal.Request;
import org.eclipse.net4j.signal.SignalProtocol;
import org.eclipse.net4j.signal.SignalReactor;
import org.eclipse.net4j.signal.heartbeat.HeartBeatProtocol;
-import org.eclipse.net4j.util.collection.Pair;
import org.eclipse.net4j.util.container.Container;
import org.eclipse.net4j.util.container.IManagedContainer;
import org.eclipse.net4j.util.container.IPluginContainer;
@@ -28,15 +28,14 @@ import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
import org.eclipse.spi.net4j.ServerProtocolFactory;
import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.List;
/**
* @author Eike Stepper
* @since 4.0
*/
-public class FailoverMonitor extends Container<Pair<String, String>>
+public class FailoverMonitor extends Container<AgentProtocol>
{
public static final String PRODUCT_GROUP = "org.eclipse.emf.cdo.server.net4j.failoverMonitors";
@@ -46,7 +45,7 @@ public class FailoverMonitor extends Container<Pair<String, String>>
private String group;
- private Map<AgentProtocol, Pair<String, String>> agents = new HashMap<AgentProtocol, Pair<String, String>>();
+ private List<AgentProtocol> agents = new ArrayList<AgentProtocol>();
private AgentProtocol masterAgent;
@@ -65,20 +64,14 @@ public class FailoverMonitor extends Container<Pair<String, String>>
this.group = group;
}
- @SuppressWarnings("unchecked")
- public Pair<String, String>[] getElements()
+ public AgentProtocol[] getElements()
{
synchronized (agents)
{
- return agents.values().toArray(new Pair[agents.size()]);
+ return agents.toArray(new AgentProtocol[agents.size()]);
}
}
- public Map<AgentProtocol, Pair<String, String>> getAgents()
- {
- return Collections.unmodifiableMap(agents);
- }
-
public AgentProtocol getMasterAgent()
{
synchronized (agents)
@@ -87,29 +80,41 @@ public class FailoverMonitor extends Container<Pair<String, String>>
}
}
- public void registerAgent(AgentProtocol agent, String connectorDescription, String repositoryName)
+ public void registerAgent(AgentProtocol agent)
{
- Pair<String, String> pair = new Pair<String, String>(connectorDescription, repositoryName);
+ AgentProtocol[] newAgents = null;
+ AgentProtocol newMasterAgent = null;
+
synchronized (agents)
{
- agents.put(agent, pair);
+ agents.add(agent);
if (agents.size() == 1)
{
masterAgent = agent;
+ newAgents = getElements();
+ newMasterAgent = masterAgent;
}
-
- publishNewMaster(masterAgent);
}
- fireElementAddedEvent(pair);
+ if (newMasterAgent != null)
+ {
+ publishNewMaster(newAgents, newMasterAgent);
+ fireElementAddedEvent(agent);
+ }
}
public void deregisterAgent(AgentProtocol agent)
{
- Pair<String, String> pair = null;
+ AgentProtocol[] newAgents = null;
+ AgentProtocol newMasterAgent = null;
+
synchronized (agents)
{
- pair = agents.remove(agent);
+ if (!agents.remove(agent))
+ {
+ return;
+ }
+
if (masterAgent == agent)
{
if (agents.isEmpty())
@@ -121,13 +126,15 @@ public class FailoverMonitor extends Container<Pair<String, String>>
masterAgent = electNewMaster(agents);
}
- publishNewMaster(masterAgent);
+ newAgents = getElements();
+ newMasterAgent = masterAgent;
}
}
- if (pair != null)
+ if (newMasterAgent != null)
{
- fireElementRemovedEvent(pair);
+ publishNewMaster(newAgents, newMasterAgent);
+ fireElementRemovedEvent(agent);
}
}
@@ -138,18 +145,15 @@ public class FailoverMonitor extends Container<Pair<String, String>>
checkState(group, "group");
}
- protected AgentProtocol electNewMaster(Map<AgentProtocol, Pair<String, String>> agents)
+ protected AgentProtocol electNewMaster(List<AgentProtocol> agents)
{
- return agents.keySet().iterator().next();
+ return agents.iterator().next();
}
- private void publishNewMaster(AgentProtocol masterAgent)
+ private void publishNewMaster(AgentProtocol[] agents, final AgentProtocol masterAgent)
{
- final Pair<String, String> masterInfos = agents.get(masterAgent);
- for (AgentProtocol agent : agents.keySet())
+ for (final AgentProtocol agent : agents)
{
- final boolean master = agent == masterAgent;
-
try
{
new Request(agent, SIGNAL_PUBLISH_MASTER)
@@ -157,15 +161,15 @@ public class FailoverMonitor extends Container<Pair<String, String>>
@Override
protected void requesting(ExtendedDataOutputStream out) throws Exception
{
- if (master)
+ if (agent == masterAgent)
{
out.writeBoolean(true);
}
else
{
out.writeBoolean(false);
- out.writeString(masterInfos.getElement1());
- out.writeString(masterInfos.getElement2());
+ out.writeString(masterAgent.getConnectorDescription());
+ out.writeString(masterAgent.getRepositoryName());
}
}
}.sendAsync();
@@ -239,6 +243,10 @@ public class FailoverMonitor extends Container<Pair<String, String>>
private FailoverMonitor failoverMonitor;
+ private String connectorDescription;
+
+ private String repositoryName;
+
public AgentProtocol(Provider failOverMonitorProvider)
{
super(PROTOCOL_NAME);
@@ -246,11 +254,32 @@ public class FailoverMonitor extends Container<Pair<String, String>>
}
@Override
+ public String toString()
+ {
+ return connectorDescription + "/" + repositoryName;
+ }
+
+ protected FailoverMonitor getFailoverMonitor()
+ {
+ return failoverMonitor;
+ }
+
+ protected String getConnectorDescription()
+ {
+ return connectorDescription;
+ }
+
+ protected String getRepositoryName()
+ {
+ return repositoryName;
+ }
+
+ @Override
protected void indicatingStart(ExtendedDataInputStream in) throws IOException
{
String group = in.readString();
- String connectorDescription = in.readString();
- String repositoryName = in.readString();
+ connectorDescription = in.readString();
+ repositoryName = in.readString();
failoverMonitor = failoverMonitorProvider.getFailoverMonitor(group);
if (failoverMonitor == null)
@@ -258,7 +287,7 @@ public class FailoverMonitor extends Container<Pair<String, String>>
throw new IllegalStateException("No monitor available for fail-over group " + group);
}
- failoverMonitor.registerAgent(this, connectorDescription, repositoryName);
+ failoverMonitor.registerAgent(this);
super.indicatingStart(in);
}
@@ -306,7 +335,7 @@ public class FailoverMonitor extends Container<Pair<String, String>>
public ClientProtocol(Provider failOverMonitorProvider)
{
- super("failover-client");
+ super(PROTOCOL_NAME);
failoverMonitorProvider = failOverMonitorProvider;
}
@@ -332,25 +361,12 @@ public class FailoverMonitor extends Container<Pair<String, String>>
@Override
protected void responding(ExtendedDataOutputStream out) throws Exception
{
- AgentProtocol masterAgent = getMasterInfos();
- Pair<String, String> masterInfos = failoverMonitor.getAgents().get(masterAgent);
-
- out.writeString(masterInfos.getElement1());
- out.writeString(masterInfos.getElement2());
-
- for (int i = 0; i < 100; i++)
- {
- Thread.sleep(100L);
- if (!getProtocol().isActive())
- {
- return;
- }
- }
-
- getProtocol().deactivate();
+ AgentProtocol masterAgent = getMasterAgent();
+ out.writeString(masterAgent.getConnectorDescription());
+ out.writeString(masterAgent.getRepositoryName());
}
- protected AgentProtocol getMasterInfos() throws InterruptedException
+ protected AgentProtocol getMasterAgent() throws InterruptedException
{
for (;;)
{

Back to the top