diff options
5 files changed, 340 insertions, 46 deletions
diff --git a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/CDONet4jSessionConfigurationImpl.java b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/CDONet4jSessionConfigurationImpl.java index 2f819cfb71..86cdc3f6ef 100644 --- a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/CDONet4jSessionConfigurationImpl.java +++ b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/CDONet4jSessionConfigurationImpl.java @@ -195,29 +195,7 @@ public class CDONet4jSessionConfigurationImpl extends CDOSessionConfigurationImp public void activateSession(InternalCDOSession session) throws Exception { super.activateSession(session); - - CDOClientProtocol protocol = new CDOClientProtocol(); - protocol.setInfraStructure(session); - if (streamWrapper != null) - { - protocol.setStreamWrapper(streamWrapper); - } - - session.setSessionProtocol(protocol); - if (connector != null) - { - protocol.setFailOverStrategy(new NOOPFailOverStrategy(connector)); - } - else if (failOverStrategy != null) - { - protocol.setFailOverStrategy(failOverStrategy); - } - - OpenSessionResult result = protocol.openSession(repositoryName, isPassiveUpdateEnabled(), getPassiveUpdateMode()); - session.setSessionID(result.getSessionID()); - session.setUserID(result.getUserID()); - session.setLastUpdateTime(result.getLastUpdateTime()); - session.setRepositoryInfo(new RepositoryInfo(repositoryName, result)); + OpenSessionResult result = initProtocol(session); if (packageRegistry == null) { @@ -273,6 +251,33 @@ public class CDONet4jSessionConfigurationImpl extends CDOSessionConfigurationImp } } + protected OpenSessionResult initProtocol(InternalCDOSession session) + { + CDOClientProtocol protocol = new CDOClientProtocol(); + protocol.setInfraStructure(session); + if (streamWrapper != null) + { + protocol.setStreamWrapper(streamWrapper); + } + + session.setSessionProtocol(protocol); + if (connector != null) + { + protocol.setFailOverStrategy(new NOOPFailOverStrategy(connector)); + } + else if (failOverStrategy != null) + { + protocol.setFailOverStrategy(failOverStrategy); + } + + OpenSessionResult result = protocol.openSession(repositoryName, isPassiveUpdateEnabled(), getPassiveUpdateMode()); + session.setSessionID(result.getSessionID()); + session.setUserID(result.getUserID()); + session.setLastUpdateTime(result.getLastUpdateTime()); + session.setRepositoryInfo(new RepositoryInfo(repositoryName, result)); + return result; + } + @Override public void deactivateSession(InternalCDOSession session) throws Exception { diff --git a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/FailoverCDOSessionConfigurationImpl.java b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/FailoverCDOSessionConfigurationImpl.java new file mode 100644 index 0000000000..cb010938ac --- /dev/null +++ b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/FailoverCDOSessionConfigurationImpl.java @@ -0,0 +1,163 @@ +/** + * Copyright (c) 2004 - 2010 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.emf.cdo.internal.net4j; + +import org.eclipse.emf.cdo.net4j.FailoverCDOSessionConfiguration; +import org.eclipse.emf.cdo.session.CDOSession.ExceptionHandler; + +import org.eclipse.net4j.connector.IConnector; +import org.eclipse.net4j.signal.RequestWithConfirmation; +import org.eclipse.net4j.signal.SignalProtocol; +import org.eclipse.net4j.signal.failover.IFailOverStrategy; +import org.eclipse.net4j.util.WrappedException; +import org.eclipse.net4j.util.collection.Pair; +import org.eclipse.net4j.util.container.IManagedContainer; +import org.eclipse.net4j.util.container.IPluginContainer; +import org.eclipse.net4j.util.io.ExtendedDataInputStream; +import org.eclipse.net4j.util.io.ExtendedDataOutputStream; + +import org.eclipse.emf.spi.cdo.InternalCDOSession; + +/** + * @author Eike Stepper + * @since 4.0 + */ +public class FailoverCDOSessionConfigurationImpl extends CDONet4jSessionConfigurationImpl implements + FailoverCDOSessionConfiguration +{ + private String monitorConnectorDescription; + + private String repositoryGroup; + + public FailoverCDOSessionConfigurationImpl(String monitorConnectorDescription, String repositoryGroup) + { + this.monitorConnectorDescription = monitorConnectorDescription; + this.repositoryGroup = repositoryGroup; + } + + public String getMonitorConnectorDescription() + { + return monitorConnectorDescription; + } + + public String getRepositoryGroup() + { + return repositoryGroup; + } + + @Override + public void setRepositoryName(String repositoryName) + { + throw new UnsupportedOperationException(); + } + + @Override + public void setConnector(IConnector connector) + { + throw new UnsupportedOperationException(); + } + + @Override + public void setFailOverStrategy(IFailOverStrategy failOverStrategy) + { + throw new UnsupportedOperationException(); + } + + @Override + public void setExceptionHandler(ExceptionHandler exceptionHandler) + { + throw new UnsupportedOperationException(); + } + + public void superSetRepositoryName(String repositoryName) + { + super.setRepositoryName(repositoryName); + } + + public void superSetConnector(IConnector connector) + { + super.setConnector(connector); + } + + public void superSetFailOverStrategy(IFailOverStrategy failOverStrategy) + { + super.setFailOverStrategy(failOverStrategy); + } + + public void superSetExceptionHandler(ExceptionHandler exceptionHandler) + { + super.setExceptionHandler(exceptionHandler); + } + + @Override + public InternalCDOSession createSession() + { + return new FailoverCDOSessionImpl(this); + } + + public void sessionProtocolDeactivated(FailoverCDOSessionImpl session) + { + Pair<String, String> info = queryRepositoryInfoFromMonitor(); + IConnector connector = getConnector(info.getElement1()); + String repositoryName = null; + + superSetConnector(connector); + superSetRepositoryName(repositoryName); + initProtocol(session); + + // TODO Re-register all remote sessions + // TODO Re-register all views + } + + protected Pair<String, String> queryRepositoryInfoFromMonitor() + { + SignalProtocol<Object> protocol = new SignalProtocol<Object>("failover-client"); + + try + { + protocol.open(getConnector(monitorConnectorDescription)); + return new RequestWithConfirmation<Pair<String, String>>(protocol, (short)1) + { + @Override + protected void requesting(ExtendedDataOutputStream out) throws Exception + { + out.writeString(repositoryGroup); + } + + @Override + protected Pair<String, String> confirming(ExtendedDataInputStream in) throws Exception + { + String connectorDescription = in.readString(); + String repositoryName = in.readString(); + return new Pair<String, String>(connectorDescription, repositoryName); + } + }.send(); + } + catch (Exception ex) + { + throw WrappedException.wrap(ex); + } + finally + { + protocol.close(); + } + } + + protected IConnector getConnector(String description) + { + return (IConnector)getContainer().getElement("org.eclipse.net4j.connectors", "tcp", description); + } + + protected IManagedContainer getContainer() + { + return IPluginContainer.INSTANCE; + } +} diff --git a/plugins/org.eclipse.emf.cdo.server.net4j/plugin.xml b/plugins/org.eclipse.emf.cdo.server.net4j/plugin.xml index 5a88a22341..024c171920 100644 --- a/plugins/org.eclipse.emf.cdo.server.net4j/plugin.xml +++ b/plugins/org.eclipse.emf.cdo.server.net4j/plugin.xml @@ -22,7 +22,11 @@ <factory productGroup="org.eclipse.net4j.serverProtocols" type="failover" - class="org.eclipse.emf.cdo.server.net4j.FailoverMonitor$Protocol$Factory"/> + class="org.eclipse.emf.cdo.server.net4j.FailoverMonitor$AgentProtocol$Factory"/> + <factory + productGroup="org.eclipse.net4j.serverProtocols" + type="failover-client" + class="org.eclipse.emf.cdo.server.net4j.FailoverMonitor$ClientProtocol$Factory"/> <factory productGroup="org.eclipse.emf.cdo.server.net4j.failoverMonitors" type="net4j" diff --git a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/CDONet4jServerUtil.java b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/CDONet4jServerUtil.java index 86dcba62e8..343cfc0943 100644 --- a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/CDONet4jServerUtil.java +++ b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/CDONet4jServerUtil.java @@ -29,7 +29,8 @@ public final class CDONet4jServerUtil { container.registerFactory(new CDOServerProtocolFactory(repositoryProvider)); container.registerFactory(new FailoverMonitor.Factory()); - container.registerFactory(new FailoverMonitor.Protocol.Factory(container)); + container.registerFactory(new FailoverMonitor.AgentProtocol.Factory(container)); + container.registerFactory(new FailoverMonitor.ClientProtocol.Factory(container)); } public static void prepareContainer(IManagedContainer container) diff --git a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java index 319ddccf9c..a25e21d6ca 100644 --- a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java +++ b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java @@ -12,7 +12,10 @@ package org.eclipse.emf.cdo.server.net4j; import org.eclipse.emf.cdo.server.internal.net4j.bundle.OM; +import org.eclipse.net4j.signal.IndicationWithResponse; import org.eclipse.net4j.signal.Request; +import org.eclipse.net4j.signal.SignalProtocol; +import org.eclipse.net4j.signal.SignalReactor; import org.eclipse.net4j.signal.heartbeat.HeartBeatProtocol; import org.eclipse.net4j.util.collection.Pair; import org.eclipse.net4j.util.container.Container; @@ -43,9 +46,9 @@ public class FailoverMonitor extends Container<Pair<String, String>> private String group; - private Map<Protocol, Pair<String, String>> agents = new HashMap<Protocol, Pair<String, String>>(); + private Map<AgentProtocol, Pair<String, String>> agents = new HashMap<AgentProtocol, Pair<String, String>>(); - private Protocol masterAgent; + private AgentProtocol masterAgent; public FailoverMonitor() { @@ -71,12 +74,12 @@ public class FailoverMonitor extends Container<Pair<String, String>> } } - public Map<Protocol, Pair<String, String>> getAgents() + public Map<AgentProtocol, Pair<String, String>> getAgents() { return Collections.unmodifiableMap(agents); } - public Protocol getMasterAgent() + public AgentProtocol getMasterAgent() { synchronized (agents) { @@ -84,7 +87,7 @@ public class FailoverMonitor extends Container<Pair<String, String>> } } - public void registerAgent(Protocol agent, String connectorDescription, String repositoryName) + public void registerAgent(AgentProtocol agent, String connectorDescription, String repositoryName) { Pair<String, String> pair = new Pair<String, String>(connectorDescription, repositoryName); synchronized (agents) @@ -101,7 +104,7 @@ public class FailoverMonitor extends Container<Pair<String, String>> fireElementAddedEvent(pair); } - public void deregisterAgent(Protocol agent) + public void deregisterAgent(AgentProtocol agent) { Pair<String, String> pair = null; synchronized (agents) @@ -135,15 +138,15 @@ public class FailoverMonitor extends Container<Pair<String, String>> checkState(group, "group"); } - protected Protocol electNewMaster(Map<Protocol, Pair<String, String>> agents) + protected AgentProtocol electNewMaster(Map<AgentProtocol, Pair<String, String>> agents) { return agents.keySet().iterator().next(); } - private void publishNewMaster(Protocol masterAgent) + private void publishNewMaster(AgentProtocol masterAgent) { final Pair<String, String> masterInfos = agents.get(masterAgent); - for (Protocol agent : agents.keySet()) + for (AgentProtocol agent : agents.keySet()) { final boolean master = agent == masterAgent; @@ -205,13 +208,38 @@ public class FailoverMonitor extends Container<Pair<String, String>> /** * @author Eike Stepper */ - public static class Protocol extends HeartBeatProtocol.Server + public static abstract class AbstractServerProtocolFactory extends ServerProtocolFactory implements + FailoverMonitor.Provider + { + private IManagedContainer container; + + protected AbstractServerProtocolFactory(String type) + { + this(type, IPluginContainer.INSTANCE); + } + + protected AbstractServerProtocolFactory(String type, IManagedContainer container) + { + super(type); + this.container = container; + } + + public FailoverMonitor getFailoverMonitor(String group) + { + return (FailoverMonitor)container.getElement(FailoverMonitor.PRODUCT_GROUP, "net4j", group); + } + } + + /** + * @author Eike Stepper + */ + public static class AgentProtocol extends HeartBeatProtocol.Server { private FailoverMonitor.Provider failoverMonitorProvider; private FailoverMonitor failoverMonitor; - public Protocol(Provider failOverMonitorProvider) + public AgentProtocol(Provider failOverMonitorProvider) { super(PROTOCOL_NAME); failoverMonitorProvider = failOverMonitorProvider; @@ -244,29 +272,122 @@ public class FailoverMonitor extends Container<Pair<String, String>> /** * @author Eike Stepper */ - public static class Factory extends ServerProtocolFactory implements FailoverMonitor.Provider + public static class Factory extends AbstractServerProtocolFactory { - private IManagedContainer container; - public Factory(IManagedContainer container) { - super(PROTOCOL_NAME); - this.container = container; + super(PROTOCOL_NAME, container); } public Factory() { - this(IPluginContainer.INSTANCE); + super(PROTOCOL_NAME); } - public Object create(String description) throws ProductCreationException + public AgentProtocol create(String description) throws ProductCreationException { - return new FailoverMonitor.Protocol(this); + return new AgentProtocol(this); + } + } + } + + /** + * @author Eike Stepper + */ + public static class ClientProtocol extends SignalProtocol<Object> + { + public static final String PROTOCOL_NAME = "failover-client"; //$NON-NLS-1$ + + public static final short SIGNAL_QUERY_REPOSITORY_INFO = 1; + + private FailoverMonitor.Provider failoverMonitorProvider; + + private FailoverMonitor failoverMonitor; + + public ClientProtocol(Provider failOverMonitorProvider) + { + super("failover-client"); + failoverMonitorProvider = failOverMonitorProvider; + } + + @Override + protected SignalReactor createSignalReactor(short signalID) + { + switch (signalID) + { + case SIGNAL_QUERY_REPOSITORY_INFO: + return new IndicationWithResponse(this, SIGNAL_QUERY_REPOSITORY_INFO) + { + @Override + protected void indicating(ExtendedDataInputStream in) throws Exception + { + String group = in.readString(); + failoverMonitor = failoverMonitorProvider.getFailoverMonitor(group); + if (failoverMonitor == null) + { + throw new IllegalStateException("No monitor available for fail-over group " + group); + } + } + + @Override + protected void responding(ExtendedDataOutputStream out) throws Exception + { + AgentProtocol masterAgent = getMasterInfos(); + Pair<String, String> masterInfos = failoverMonitor.getAgents().get(masterAgent); + + out.writeString(masterInfos.getElement1()); + out.writeString(masterInfos.getElement2()); + + for (int i = 0; i < 100; i++) + { + Thread.sleep(100L); + if (!getProtocol().isActive()) + { + return; + } + } + + getProtocol().deactivate(); + } + + protected AgentProtocol getMasterInfos() throws InterruptedException + { + for (;;) + { + AgentProtocol masterAgent = failoverMonitor.getMasterAgent(); + if (masterAgent != null) + { + return masterAgent; + } + + Thread.sleep(100L); + } + } + }; + + default: + return super.createSignalReactor(signalID); + } + } + + /** + * @author Eike Stepper + */ + public static class Factory extends AbstractServerProtocolFactory + { + public Factory(IManagedContainer container) + { + super(PROTOCOL_NAME, container); + } + + public Factory() + { + super(PROTOCOL_NAME); } - public FailoverMonitor getFailoverMonitor(String group) + public ClientProtocol create(String description) throws ProductCreationException { - return (FailoverMonitor)container.getElement(FailoverMonitor.PRODUCT_GROUP, "net4j", group); + return new ClientProtocol(this); } } } |