diff options
author | Eike Stepper | 2010-09-22 15:20:24 +0000 |
---|---|---|
committer | Eike Stepper | 2010-09-22 15:20:24 +0000 |
commit | 70dda6326113fbf48486e1e1906e04654e4b869f (patch) | |
tree | 33f1842804fd39be0b6b5dd37f612d327865b61a | |
parent | b3444d952ce0a151400144c5cc70eb6aaf4b45db (diff) | |
download | cdo-70dda6326113fbf48486e1e1906e04654e4b869f.tar.gz cdo-70dda6326113fbf48486e1e1906e04654e4b869f.tar.xz cdo-70dda6326113fbf48486e1e1906e04654e4b869f.zip |
[325928] Provide FailoverMonitor server and FailoverAgents to coordinate fail-over scenarios
https://bugs.eclipse.org/bugs/show_bug.cgi?id=325928
4 files changed, 147 insertions, 80 deletions
diff --git a/plugins/org.eclipse.emf.cdo.examples/src/org/eclipse/emf/cdo/examples/server/FailoverExample.java b/plugins/org.eclipse.emf.cdo.examples/src/org/eclipse/emf/cdo/examples/server/FailoverExample.java index 200fb9873d..3662029eab 100644 --- a/plugins/org.eclipse.emf.cdo.examples/src/org/eclipse/emf/cdo/examples/server/FailoverExample.java +++ b/plugins/org.eclipse.emf.cdo.examples/src/org/eclipse/emf/cdo/examples/server/FailoverExample.java @@ -16,7 +16,9 @@ import org.eclipse.emf.cdo.common.revision.cache.CDORevisionCache; import org.eclipse.emf.cdo.common.util.RepositoryStateChangedEvent; import org.eclipse.emf.cdo.common.util.RepositoryTypeChangedEvent; import org.eclipse.emf.cdo.net4j.CDONet4jUtil; +import org.eclipse.emf.cdo.net4j.CDOSession; import org.eclipse.emf.cdo.net4j.CDOSessionConfiguration; +import org.eclipse.emf.cdo.net4j.CDOSessionFailoverEvent; import org.eclipse.emf.cdo.server.CDOServerUtil; import org.eclipse.emf.cdo.server.IRepository; import org.eclipse.emf.cdo.server.IRepositorySynchronizer; @@ -39,7 +41,6 @@ import org.eclipse.net4j.db.IDBAdapter; import org.eclipse.net4j.db.IDBConnectionProvider; import org.eclipse.net4j.db.h2.H2Adapter; import org.eclipse.net4j.tcp.TCPUtil; -import org.eclipse.net4j.util.collection.Pair; import org.eclipse.net4j.util.container.ContainerEventAdapter; import org.eclipse.net4j.util.container.ContainerUtil; import org.eclipse.net4j.util.container.IContainer; @@ -54,7 +55,6 @@ import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.HashMap; import java.util.Map; -import java.util.Map.Entry; /** * @author Eike Stepper @@ -402,7 +402,7 @@ public abstract class FailoverExample */ public static class Monitored extends FailoverExample { - public static final String REPOSITORY_MONITOR_GROUP = "ExampleGroup"; + public static final String REPOSITORY_GROUP = "ExampleGroup"; public static final int REPOSITORY_MONITOR_PORT = 2038; @@ -440,7 +440,7 @@ public abstract class FailoverExample agent.setMonitorConnector(createConnector("localhost:" + REPOSITORY_MONITOR_PORT)); agent.setConnectorDescription(host + ":" + port); agent.setRepository(repository); - agent.setGroup(REPOSITORY_MONITOR_GROUP); + agent.setGroup(REPOSITORY_GROUP); agent.setRate(500L); agent.setTimeout(2000L); agent.activate(); @@ -457,36 +457,31 @@ public abstract class FailoverExample { IManagedContainer container = createContainer(); FailoverMonitor monitor = (FailoverMonitor)container.getElement(FailoverMonitor.PRODUCT_GROUP, "net4j", - REPOSITORY_MONITOR_GROUP); + REPOSITORY_GROUP); - monitor.addListener(new ContainerEventAdapter<Pair<String, String>>() + monitor.addListener(new ContainerEventAdapter<AgentProtocol>() { @Override - protected void onAdded(IContainer<Pair<String, String>> monitor, Pair<String, String> agent) + protected void onAdded(IContainer<AgentProtocol> monitor, AgentProtocol agent) { dump((FailoverMonitor)monitor, "Added", agent); } @Override - protected void onRemoved(IContainer<Pair<String, String>> monitor, Pair<String, String> agent) + protected void onRemoved(IContainer<AgentProtocol> monitor, AgentProtocol agent) { dump((FailoverMonitor)monitor, "Removed", agent); } - private void dump(FailoverMonitor monitor, String event, Pair<String, String> agent) + private void dump(FailoverMonitor monitor, String event, AgentProtocol agent) { - System.out.println(event + " agent " + format(agent)); - for (Entry<AgentProtocol, Pair<String, String>> entry : monitor.getAgents().entrySet()) + System.out.println(event + " agent " + agent); + for (AgentProtocol element : monitor.getElements()) { - String type = entry.getKey() == monitor.getMasterAgent() ? "MASTER: " : "BACKUP: "; - System.out.println(" " + type + format(entry.getValue())); + String type = element == monitor.getMasterAgent() ? "MASTER: " : "BACKUP: "; + System.out.println(" " + type + element); } } - - private String format(Pair<String, String> agent) - { - return agent.getElement1() + "/" + agent.getElement2(); - } }); container.getElement("org.eclipse.net4j.acceptors", TRANSPORT_TYPE, "0.0.0.0:" + REPOSITORY_MONITOR_PORT); @@ -536,5 +531,37 @@ public abstract class FailoverExample example.done(); } } + + /** + * @author Eike Stepper + */ + public static final class Client + { + public static void main(String[] args) throws Exception + { + IManagedContainer container = createContainer(); + CDOSessionConfiguration configuration = CDONet4jUtil.createFailoverSessionConfiguration(container, "localhost:" + + REPOSITORY_MONITOR_PORT, REPOSITORY_GROUP); + + CDOSession session = configuration.openSession(); + session.addListener(new IListener() + { + public void notifyEvent(IEvent event) + { + if (event instanceof CDOSessionFailoverEvent) + { + CDOSessionFailoverEvent e = (CDOSessionFailoverEvent)event; + System.out.println("Failover " + e.getType() + ": " + e.getSource().getRepositoryInfo()); + } + } + }); + + System.out.println("Connected"); + while (!session.isClosed()) + { + Thread.sleep(100); + } + } + } } } diff --git a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/FailoverCDOSessionConfigurationImpl.java b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/FailoverCDOSessionConfigurationImpl.java index 9efd438c24..814ba8b10f 100644 --- a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/FailoverCDOSessionConfigurationImpl.java +++ b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/FailoverCDOSessionConfigurationImpl.java @@ -109,6 +109,7 @@ public class FailoverCDOSessionConfigurationImpl extends CDONet4jSessionConfigur @Override public InternalCDOSession createSession() { + updateConnectorAndRepositoryName(); return new FailoverCDOSessionImpl(this); } @@ -120,12 +121,7 @@ public class FailoverCDOSessionConfigurationImpl extends CDONet4jSessionConfigur setPassiveUpdateEnabled(session.options().isPassiveUpdateEnabled()); setPassiveUpdateMode(session.options().getPassiveUpdateMode()); - Pair<String, String> info = queryRepositoryInfoFromMonitor(); - IConnector connector = getConnector(info.getElement1()); - String repositoryName = null; - - superSetConnector(connector); - superSetRepositoryName(repositoryName); + updateConnectorAndRepositoryName(); initProtocol(session); reregisterViews(session, targets); @@ -143,6 +139,18 @@ public class FailoverCDOSessionConfigurationImpl extends CDONet4jSessionConfigur } } + private void updateConnectorAndRepositoryName() + { + Pair<String, String> info = queryRepositoryInfoFromMonitor(); + IConnector connector = getConnector(info.getElement1()); + String repositoryName = info.getElement2(); + + System.out.println("Connecting to " + info.getElement1() + "/" + repositoryName); + + superSetConnector(connector); + superSetRepositoryName(repositoryName); + } + protected Pair<String, String> queryRepositoryInfoFromMonitor() { SignalProtocol<Object> protocol = new SignalProtocol<Object>("failover-client"); diff --git a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/CDONet4jUtil.java b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/CDONet4jUtil.java index 459bec877b..799dd9d546 100644 --- a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/CDONet4jUtil.java +++ b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/CDONet4jUtil.java @@ -37,6 +37,22 @@ public final class CDONet4jUtil /** * @since 4.0 */ + public static FailoverCDOSessionConfiguration createFailoverSessionConfiguration(final IManagedContainer container, + String monitorConnectorDescription, String repositoryGroup) + { + return new FailoverCDOSessionConfigurationImpl(monitorConnectorDescription, repositoryGroup) + { + @Override + protected IManagedContainer getContainer() + { + return container; + } + }; + } + + /** + * @since 4.0 + */ public static FailoverCDOSessionConfiguration createFailoverSessionConfiguration(String monitorConnectorDescription, String repositoryGroup) { diff --git a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java index a25e21d6ca..eda3e30375 100644 --- a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java +++ b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java @@ -11,13 +11,13 @@ package org.eclipse.emf.cdo.server.net4j; import org.eclipse.emf.cdo.server.internal.net4j.bundle.OM; +import org.eclipse.emf.cdo.server.net4j.FailoverMonitor.AgentProtocol; import org.eclipse.net4j.signal.IndicationWithResponse; import org.eclipse.net4j.signal.Request; import org.eclipse.net4j.signal.SignalProtocol; import org.eclipse.net4j.signal.SignalReactor; import org.eclipse.net4j.signal.heartbeat.HeartBeatProtocol; -import org.eclipse.net4j.util.collection.Pair; import org.eclipse.net4j.util.container.Container; import org.eclipse.net4j.util.container.IManagedContainer; import org.eclipse.net4j.util.container.IPluginContainer; @@ -28,15 +28,14 @@ import org.eclipse.net4j.util.io.ExtendedDataOutputStream; import org.eclipse.spi.net4j.ServerProtocolFactory; import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; +import java.util.ArrayList; +import java.util.List; /** * @author Eike Stepper * @since 4.0 */ -public class FailoverMonitor extends Container<Pair<String, String>> +public class FailoverMonitor extends Container<AgentProtocol> { public static final String PRODUCT_GROUP = "org.eclipse.emf.cdo.server.net4j.failoverMonitors"; @@ -46,7 +45,7 @@ public class FailoverMonitor extends Container<Pair<String, String>> private String group; - private Map<AgentProtocol, Pair<String, String>> agents = new HashMap<AgentProtocol, Pair<String, String>>(); + private List<AgentProtocol> agents = new ArrayList<AgentProtocol>(); private AgentProtocol masterAgent; @@ -65,20 +64,14 @@ public class FailoverMonitor extends Container<Pair<String, String>> this.group = group; } - @SuppressWarnings("unchecked") - public Pair<String, String>[] getElements() + public AgentProtocol[] getElements() { synchronized (agents) { - return agents.values().toArray(new Pair[agents.size()]); + return agents.toArray(new AgentProtocol[agents.size()]); } } - public Map<AgentProtocol, Pair<String, String>> getAgents() - { - return Collections.unmodifiableMap(agents); - } - public AgentProtocol getMasterAgent() { synchronized (agents) @@ -87,29 +80,41 @@ public class FailoverMonitor extends Container<Pair<String, String>> } } - public void registerAgent(AgentProtocol agent, String connectorDescription, String repositoryName) + public void registerAgent(AgentProtocol agent) { - Pair<String, String> pair = new Pair<String, String>(connectorDescription, repositoryName); + AgentProtocol[] newAgents = null; + AgentProtocol newMasterAgent = null; + synchronized (agents) { - agents.put(agent, pair); + agents.add(agent); if (agents.size() == 1) { masterAgent = agent; + newAgents = getElements(); + newMasterAgent = masterAgent; } - - publishNewMaster(masterAgent); } - fireElementAddedEvent(pair); + if (newMasterAgent != null) + { + publishNewMaster(newAgents, newMasterAgent); + fireElementAddedEvent(agent); + } } public void deregisterAgent(AgentProtocol agent) { - Pair<String, String> pair = null; + AgentProtocol[] newAgents = null; + AgentProtocol newMasterAgent = null; + synchronized (agents) { - pair = agents.remove(agent); + if (!agents.remove(agent)) + { + return; + } + if (masterAgent == agent) { if (agents.isEmpty()) @@ -121,13 +126,15 @@ public class FailoverMonitor extends Container<Pair<String, String>> masterAgent = electNewMaster(agents); } - publishNewMaster(masterAgent); + newAgents = getElements(); + newMasterAgent = masterAgent; } } - if (pair != null) + if (newMasterAgent != null) { - fireElementRemovedEvent(pair); + publishNewMaster(newAgents, newMasterAgent); + fireElementRemovedEvent(agent); } } @@ -138,18 +145,15 @@ public class FailoverMonitor extends Container<Pair<String, String>> checkState(group, "group"); } - protected AgentProtocol electNewMaster(Map<AgentProtocol, Pair<String, String>> agents) + protected AgentProtocol electNewMaster(List<AgentProtocol> agents) { - return agents.keySet().iterator().next(); + return agents.iterator().next(); } - private void publishNewMaster(AgentProtocol masterAgent) + private void publishNewMaster(AgentProtocol[] agents, final AgentProtocol masterAgent) { - final Pair<String, String> masterInfos = agents.get(masterAgent); - for (AgentProtocol agent : agents.keySet()) + for (final AgentProtocol agent : agents) { - final boolean master = agent == masterAgent; - try { new Request(agent, SIGNAL_PUBLISH_MASTER) @@ -157,15 +161,15 @@ public class FailoverMonitor extends Container<Pair<String, String>> @Override protected void requesting(ExtendedDataOutputStream out) throws Exception { - if (master) + if (agent == masterAgent) { out.writeBoolean(true); } else { out.writeBoolean(false); - out.writeString(masterInfos.getElement1()); - out.writeString(masterInfos.getElement2()); + out.writeString(masterAgent.getConnectorDescription()); + out.writeString(masterAgent.getRepositoryName()); } } }.sendAsync(); @@ -239,6 +243,10 @@ public class FailoverMonitor extends Container<Pair<String, String>> private FailoverMonitor failoverMonitor; + private String connectorDescription; + + private String repositoryName; + public AgentProtocol(Provider failOverMonitorProvider) { super(PROTOCOL_NAME); @@ -246,11 +254,32 @@ public class FailoverMonitor extends Container<Pair<String, String>> } @Override + public String toString() + { + return connectorDescription + "/" + repositoryName; + } + + protected FailoverMonitor getFailoverMonitor() + { + return failoverMonitor; + } + + protected String getConnectorDescription() + { + return connectorDescription; + } + + protected String getRepositoryName() + { + return repositoryName; + } + + @Override protected void indicatingStart(ExtendedDataInputStream in) throws IOException { String group = in.readString(); - String connectorDescription = in.readString(); - String repositoryName = in.readString(); + connectorDescription = in.readString(); + repositoryName = in.readString(); failoverMonitor = failoverMonitorProvider.getFailoverMonitor(group); if (failoverMonitor == null) @@ -258,7 +287,7 @@ public class FailoverMonitor extends Container<Pair<String, String>> throw new IllegalStateException("No monitor available for fail-over group " + group); } - failoverMonitor.registerAgent(this, connectorDescription, repositoryName); + failoverMonitor.registerAgent(this); super.indicatingStart(in); } @@ -306,7 +335,7 @@ public class FailoverMonitor extends Container<Pair<String, String>> public ClientProtocol(Provider failOverMonitorProvider) { - super("failover-client"); + super(PROTOCOL_NAME); failoverMonitorProvider = failOverMonitorProvider; } @@ -332,25 +361,12 @@ public class FailoverMonitor extends Container<Pair<String, String>> @Override protected void responding(ExtendedDataOutputStream out) throws Exception { - AgentProtocol masterAgent = getMasterInfos(); - Pair<String, String> masterInfos = failoverMonitor.getAgents().get(masterAgent); - - out.writeString(masterInfos.getElement1()); - out.writeString(masterInfos.getElement2()); - - for (int i = 0; i < 100; i++) - { - Thread.sleep(100L); - if (!getProtocol().isActive()) - { - return; - } - } - - getProtocol().deactivate(); + AgentProtocol masterAgent = getMasterAgent(); + out.writeString(masterAgent.getConnectorDescription()); + out.writeString(masterAgent.getRepositoryName()); } - protected AgentProtocol getMasterInfos() throws InterruptedException + protected AgentProtocol getMasterAgent() throws InterruptedException { for (;;) { |