diff options
author | Eike Stepper | 2010-09-22 15:20:24 +0000 |
---|---|---|
committer | Eike Stepper | 2010-09-22 15:20:24 +0000 |
commit | 70dda6326113fbf48486e1e1906e04654e4b869f (patch) | |
tree | 33f1842804fd39be0b6b5dd37f612d327865b61a /plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java | |
parent | b3444d952ce0a151400144c5cc70eb6aaf4b45db (diff) | |
download | cdo-70dda6326113fbf48486e1e1906e04654e4b869f.tar.gz cdo-70dda6326113fbf48486e1e1906e04654e4b869f.tar.xz cdo-70dda6326113fbf48486e1e1906e04654e4b869f.zip |
[325928] Provide FailoverMonitor server and FailoverAgents to coordinate fail-over scenarios
https://bugs.eclipse.org/bugs/show_bug.cgi?id=325928
Diffstat (limited to 'plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java')
-rw-r--r-- | plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java | 128 |
1 files changed, 72 insertions, 56 deletions
diff --git a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java index a25e21d6ca..eda3e30375 100644 --- a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java +++ b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java @@ -11,13 +11,13 @@ package org.eclipse.emf.cdo.server.net4j; import org.eclipse.emf.cdo.server.internal.net4j.bundle.OM; +import org.eclipse.emf.cdo.server.net4j.FailoverMonitor.AgentProtocol; import org.eclipse.net4j.signal.IndicationWithResponse; import org.eclipse.net4j.signal.Request; import org.eclipse.net4j.signal.SignalProtocol; import org.eclipse.net4j.signal.SignalReactor; import org.eclipse.net4j.signal.heartbeat.HeartBeatProtocol; -import org.eclipse.net4j.util.collection.Pair; import org.eclipse.net4j.util.container.Container; import org.eclipse.net4j.util.container.IManagedContainer; import org.eclipse.net4j.util.container.IPluginContainer; @@ -28,15 +28,14 @@ import org.eclipse.net4j.util.io.ExtendedDataOutputStream; import org.eclipse.spi.net4j.ServerProtocolFactory; import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; +import java.util.ArrayList; +import java.util.List; /** * @author Eike Stepper * @since 4.0 */ -public class FailoverMonitor extends Container<Pair<String, String>> +public class FailoverMonitor extends Container<AgentProtocol> { public static final String PRODUCT_GROUP = "org.eclipse.emf.cdo.server.net4j.failoverMonitors"; @@ -46,7 +45,7 @@ public class FailoverMonitor extends Container<Pair<String, String>> private String group; - private Map<AgentProtocol, Pair<String, String>> agents = new HashMap<AgentProtocol, Pair<String, String>>(); + private List<AgentProtocol> agents = new ArrayList<AgentProtocol>(); private AgentProtocol masterAgent; @@ -65,20 +64,14 @@ public class FailoverMonitor extends Container<Pair<String, String>> this.group = group; } - @SuppressWarnings("unchecked") - public Pair<String, String>[] getElements() + public AgentProtocol[] getElements() { synchronized (agents) { - return agents.values().toArray(new Pair[agents.size()]); + return agents.toArray(new AgentProtocol[agents.size()]); } } - public Map<AgentProtocol, Pair<String, String>> getAgents() - { - return Collections.unmodifiableMap(agents); - } - public AgentProtocol getMasterAgent() { synchronized (agents) @@ -87,29 +80,41 @@ public class FailoverMonitor extends Container<Pair<String, String>> } } - public void registerAgent(AgentProtocol agent, String connectorDescription, String repositoryName) + public void registerAgent(AgentProtocol agent) { - Pair<String, String> pair = new Pair<String, String>(connectorDescription, repositoryName); + AgentProtocol[] newAgents = null; + AgentProtocol newMasterAgent = null; + synchronized (agents) { - agents.put(agent, pair); + agents.add(agent); if (agents.size() == 1) { masterAgent = agent; + newAgents = getElements(); + newMasterAgent = masterAgent; } - - publishNewMaster(masterAgent); } - fireElementAddedEvent(pair); + if (newMasterAgent != null) + { + publishNewMaster(newAgents, newMasterAgent); + fireElementAddedEvent(agent); + } } public void deregisterAgent(AgentProtocol agent) { - Pair<String, String> pair = null; + AgentProtocol[] newAgents = null; + AgentProtocol newMasterAgent = null; + synchronized (agents) { - pair = agents.remove(agent); + if (!agents.remove(agent)) + { + return; + } + if (masterAgent == agent) { if (agents.isEmpty()) @@ -121,13 +126,15 @@ public class FailoverMonitor extends Container<Pair<String, String>> masterAgent = electNewMaster(agents); } - publishNewMaster(masterAgent); + newAgents = getElements(); + newMasterAgent = masterAgent; } } - if (pair != null) + if (newMasterAgent != null) { - fireElementRemovedEvent(pair); + publishNewMaster(newAgents, newMasterAgent); + fireElementRemovedEvent(agent); } } @@ -138,18 +145,15 @@ public class FailoverMonitor extends Container<Pair<String, String>> checkState(group, "group"); } - protected AgentProtocol electNewMaster(Map<AgentProtocol, Pair<String, String>> agents) + protected AgentProtocol electNewMaster(List<AgentProtocol> agents) { - return agents.keySet().iterator().next(); + return agents.iterator().next(); } - private void publishNewMaster(AgentProtocol masterAgent) + private void publishNewMaster(AgentProtocol[] agents, final AgentProtocol masterAgent) { - final Pair<String, String> masterInfos = agents.get(masterAgent); - for (AgentProtocol agent : agents.keySet()) + for (final AgentProtocol agent : agents) { - final boolean master = agent == masterAgent; - try { new Request(agent, SIGNAL_PUBLISH_MASTER) @@ -157,15 +161,15 @@ public class FailoverMonitor extends Container<Pair<String, String>> @Override protected void requesting(ExtendedDataOutputStream out) throws Exception { - if (master) + if (agent == masterAgent) { out.writeBoolean(true); } else { out.writeBoolean(false); - out.writeString(masterInfos.getElement1()); - out.writeString(masterInfos.getElement2()); + out.writeString(masterAgent.getConnectorDescription()); + out.writeString(masterAgent.getRepositoryName()); } } }.sendAsync(); @@ -239,6 +243,10 @@ public class FailoverMonitor extends Container<Pair<String, String>> private FailoverMonitor failoverMonitor; + private String connectorDescription; + + private String repositoryName; + public AgentProtocol(Provider failOverMonitorProvider) { super(PROTOCOL_NAME); @@ -246,11 +254,32 @@ public class FailoverMonitor extends Container<Pair<String, String>> } @Override + public String toString() + { + return connectorDescription + "/" + repositoryName; + } + + protected FailoverMonitor getFailoverMonitor() + { + return failoverMonitor; + } + + protected String getConnectorDescription() + { + return connectorDescription; + } + + protected String getRepositoryName() + { + return repositoryName; + } + + @Override protected void indicatingStart(ExtendedDataInputStream in) throws IOException { String group = in.readString(); - String connectorDescription = in.readString(); - String repositoryName = in.readString(); + connectorDescription = in.readString(); + repositoryName = in.readString(); failoverMonitor = failoverMonitorProvider.getFailoverMonitor(group); if (failoverMonitor == null) @@ -258,7 +287,7 @@ public class FailoverMonitor extends Container<Pair<String, String>> throw new IllegalStateException("No monitor available for fail-over group " + group); } - failoverMonitor.registerAgent(this, connectorDescription, repositoryName); + failoverMonitor.registerAgent(this); super.indicatingStart(in); } @@ -306,7 +335,7 @@ public class FailoverMonitor extends Container<Pair<String, String>> public ClientProtocol(Provider failOverMonitorProvider) { - super("failover-client"); + super(PROTOCOL_NAME); failoverMonitorProvider = failOverMonitorProvider; } @@ -332,25 +361,12 @@ public class FailoverMonitor extends Container<Pair<String, String>> @Override protected void responding(ExtendedDataOutputStream out) throws Exception { - AgentProtocol masterAgent = getMasterInfos(); - Pair<String, String> masterInfos = failoverMonitor.getAgents().get(masterAgent); - - out.writeString(masterInfos.getElement1()); - out.writeString(masterInfos.getElement2()); - - for (int i = 0; i < 100; i++) - { - Thread.sleep(100L); - if (!getProtocol().isActive()) - { - return; - } - } - - getProtocol().deactivate(); + AgentProtocol masterAgent = getMasterAgent(); + out.writeString(masterAgent.getConnectorDescription()); + out.writeString(masterAgent.getRepositoryName()); } - protected AgentProtocol getMasterInfos() throws InterruptedException + protected AgentProtocol getMasterAgent() throws InterruptedException { for (;;) { |