diff options
author | Eike Stepper | 2012-06-06 09:15:05 +0000 |
---|---|---|
committer | Eike Stepper | 2012-06-06 09:15:05 +0000 |
commit | beaf88a6d18849a3d00476ca046a111f636ace5a (patch) | |
tree | b4dbc0ddc54963632692f0d0db83877dabd73a57 /plugins/org.eclipse.emf.cdo.server.net4j/src/org | |
parent | e5bc42789a5fa73d5f2c9a2dac4f0d0c177f9b63 (diff) | |
download | cdo-beaf88a6d18849a3d00476ca046a111f636ace5a.tar.gz cdo-beaf88a6d18849a3d00476ca046a111f636ace5a.tar.xz cdo-beaf88a6d18849a3d00476ca046a111f636ace5a.zip |
Update Javadocs
Diffstat (limited to 'plugins/org.eclipse.emf.cdo.server.net4j/src/org')
3 files changed, 766 insertions, 739 deletions
diff --git a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/CDONet4jServerUtil.java b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/CDONet4jServerUtil.java index 344e9252b6..7c92710648 100644 --- a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/CDONet4jServerUtil.java +++ b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/CDONet4jServerUtil.java @@ -1,40 +1,43 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-package org.eclipse.emf.cdo.server.net4j;
-
-import org.eclipse.emf.cdo.server.IRepositoryProvider;
-import org.eclipse.emf.cdo.server.internal.net4j.protocol.CDOServerProtocolFactory;
-import org.eclipse.emf.cdo.spi.server.ContainerRepositoryProvider;
-
-import org.eclipse.net4j.util.container.IManagedContainer;
-
-/**
- * @author Eike Stepper
- */
-public final class CDONet4jServerUtil
-{
- private CDONet4jServerUtil()
- {
- }
-
- public static void prepareContainer(IManagedContainer container, IRepositoryProvider repositoryProvider)
- {
- container.registerFactory(new CDOServerProtocolFactory(repositoryProvider));
- container.registerFactory(new FailoverMonitor.Factory());
- container.registerFactory(new FailoverMonitor.AgentProtocol.Factory(container));
- container.registerFactory(new FailoverMonitor.ClientProtocol.Factory(container));
- }
-
- public static void prepareContainer(IManagedContainer container)
- {
- prepareContainer(container, new ContainerRepositoryProvider(container));
- }
-}
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.emf.cdo.server.net4j; + +import org.eclipse.emf.cdo.common.CDOCommonSession; +import org.eclipse.emf.cdo.server.IRepositoryProvider; +import org.eclipse.emf.cdo.server.internal.net4j.protocol.CDOServerProtocolFactory; +import org.eclipse.emf.cdo.spi.server.ContainerRepositoryProvider; + +import org.eclipse.net4j.util.container.IManagedContainer; + +/** + * Various static methods that may help with the server-side setup to support Net4j-specific CDO {@link CDOCommonSession sessions}. + * + * @author Eike Stepper + */ +public final class CDONet4jServerUtil +{ + private CDONet4jServerUtil() + { + } + + public static void prepareContainer(IManagedContainer container, IRepositoryProvider repositoryProvider) + { + container.registerFactory(new CDOServerProtocolFactory(repositoryProvider)); + container.registerFactory(new FailoverMonitor.Factory()); + container.registerFactory(new FailoverMonitor.AgentProtocol.Factory(container)); + container.registerFactory(new FailoverMonitor.ClientProtocol.Factory(container)); + } + + public static void prepareContainer(IManagedContainer container) + { + prepareContainer(container, new ContainerRepositoryProvider(container)); + } +} diff --git a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverAgent.java b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverAgent.java index 23a2ea92c8..5497a2139c 100644 --- a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverAgent.java +++ b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverAgent.java @@ -1,286 +1,290 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-package org.eclipse.emf.cdo.server.net4j;
-
-import org.eclipse.emf.cdo.common.CDOCommonRepository;
-import org.eclipse.emf.cdo.server.CDOServerUtil;
-import org.eclipse.emf.cdo.server.ISynchronizableRepository;
-import org.eclipse.emf.cdo.session.CDOSessionConfiguration;
-import org.eclipse.emf.cdo.session.CDOSessionConfigurationFactory;
-import org.eclipse.emf.cdo.spi.server.InternalFailoverParticipant;
-import org.eclipse.emf.cdo.spi.server.InternalRepositorySynchronizer;
-
-import org.eclipse.net4j.connector.IConnector;
-import org.eclipse.net4j.signal.Indication;
-import org.eclipse.net4j.signal.SignalReactor;
-import org.eclipse.net4j.signal.heartbeat.HeartBeatProtocol;
-import org.eclipse.net4j.util.concurrent.TimerLifecycle;
-import org.eclipse.net4j.util.concurrent.TimerLifecycle.DaemonFactory;
-import org.eclipse.net4j.util.container.IManagedContainer;
-import org.eclipse.net4j.util.container.IPluginContainer;
-import org.eclipse.net4j.util.io.ExtendedDataInputStream;
-import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
-import org.eclipse.net4j.util.lifecycle.Lifecycle;
-import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
-
-import java.io.IOException;
-import java.util.Timer;
-
-/**
- * @author Eike Stepper
- * @since 4.0
- */
-public abstract class FailoverAgent extends Lifecycle implements CDOSessionConfigurationFactory
-{
- private IConnector monitorConnector;
-
- private Timer timer;
-
- private long rate;
-
- private long timeout;
-
- private String group;
-
- private String connectorDescription;
-
- private InternalFailoverParticipant repository;
-
- private Protocol protocol;
-
- private String masterConnectorDescription;
-
- private String masterRepositoryName;
-
- private InternalRepositorySynchronizer synchronizer;
-
- public FailoverAgent()
- {
- }
-
- public IConnector getMonitorConnector()
- {
- return monitorConnector;
- }
-
- public void setMonitorConnector(IConnector connector)
- {
- checkInactive();
- monitorConnector = connector;
- }
-
- public Timer getTimer()
- {
- return timer;
- }
-
- public void setTimer(Timer timer)
- {
- checkInactive();
- this.timer = timer;
- }
-
- public long getRate()
- {
- return rate;
- }
-
- public void setRate(long rate)
- {
- checkInactive();
- this.rate = rate;
- }
-
- public long getTimeout()
- {
- return timeout;
- }
-
- public void setTimeout(long timeout)
- {
- checkInactive();
- this.timeout = timeout;
- }
-
- public String getGroup()
- {
- return group;
- }
-
- public void setGroup(String group)
- {
- checkInactive();
- this.group = group;
- }
-
- public String getConnectorDescription()
- {
- return connectorDescription;
- }
-
- public void setConnectorDescription(String connectorDescription)
- {
- checkInactive();
- this.connectorDescription = connectorDescription;
- }
-
- public ISynchronizableRepository getRepository()
- {
- return repository;
- }
-
- public void setRepository(ISynchronizableRepository repository)
- {
- checkInactive();
-
- if (!(repository instanceof InternalFailoverParticipant))
- {
- throw new IllegalArgumentException("Not a failover participant: " + repository);
- }
-
- if (repository.getSynchronizer() != null)
- {
- throw new IllegalArgumentException("Synchronizer must be null: " + repository);
- }
-
- this.repository = (InternalFailoverParticipant)repository;
- }
-
- public Protocol getProtocol()
- {
- return protocol;
- }
-
- public CDOSessionConfiguration createSessionConfiguration()
- {
- return createSessionConfiguration(masterConnectorDescription, masterRepositoryName);
- }
-
- @Override
- protected void doBeforeActivate() throws Exception
- {
- super.doBeforeActivate();
- checkState(monitorConnector, "monitorConnector");
- checkState(group, "group");
- checkState(connectorDescription, "connectorDescription");
- checkState(repository, "repository");
- }
-
- @Override
- protected void doActivate() throws Exception
- {
- super.doActivate();
-
- if (timer == null)
- {
- timer = (Timer)getContainer().getElement(TimerLifecycle.PRODUCT_GROUP, DaemonFactory.TYPE, null);
- }
-
- synchronizer = (InternalRepositorySynchronizer)CDOServerUtil.createRepositorySynchronizer(this);
- repository.setSynchronizer(synchronizer);
- setMaster(); // Will be adjusted with the following SIGNAL_PUBLISH_MASTER
-
- LifecycleUtil.activate(repository);
-
- protocol = new Protocol(this);
- protocol.start(rate, timeout);
- }
-
- @Override
- protected void doDeactivate() throws Exception
- {
- protocol.close();
- protocol = null;
- timer = null;
- monitorConnector = null;
- super.doDeactivate();
- }
-
- protected void setMaster()
- {
- repository.setType(CDOCommonRepository.Type.MASTER);
- masterConnectorDescription = null;
- masterRepositoryName = null;
- }
-
- protected void setBackup(String connectorDescription, String repositoryName)
- {
- masterConnectorDescription = connectorDescription;
- masterRepositoryName = repositoryName;
- repository.setType(CDOCommonRepository.Type.BACKUP);
- }
-
- protected abstract CDOSessionConfiguration createSessionConfiguration(String connectorDescription,
- String repositoryName);
-
- protected IManagedContainer getContainer()
- {
- return IPluginContainer.INSTANCE;
- }
-
- /**
- * @author Eike Stepper
- */
- public static class Protocol extends HeartBeatProtocol
- {
- private FailoverAgent agent;
-
- public Protocol(FailoverAgent agent)
- {
- super(FailoverMonitor.PROTOCOL_NAME, agent.getMonitorConnector(), agent.getTimer());
- this.agent = agent;
- }
-
- public FailoverAgent getAgent()
- {
- return agent;
- }
-
- @Override
- protected void requestingStart(ExtendedDataOutputStream out, long rate) throws IOException
- {
- out.writeString(agent.getGroup());
- out.writeString(agent.getConnectorDescription());
- out.writeString(agent.getRepository().getName());
- super.requestingStart(out, rate);
- }
-
- @Override
- protected SignalReactor createSignalReactor(short signalID)
- {
- switch (signalID)
- {
- case FailoverMonitor.SIGNAL_PUBLISH_MASTER:
- return new Indication(this, FailoverMonitor.SIGNAL_PUBLISH_MASTER)
- {
- @Override
- protected void indicating(ExtendedDataInputStream in) throws Exception
- {
- boolean master = in.readBoolean();
- if (master)
- {
- agent.setMaster();
- }
- else
- {
- String connectorDescription = in.readString();
- String repositoryName = in.readString();
- agent.setBackup(connectorDescription, repositoryName);
- }
- }
- };
-
- default:
- return super.createSignalReactor(signalID);
- }
- }
- }
-}
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.emf.cdo.server.net4j; + +import org.eclipse.emf.cdo.common.CDOCommonRepository; +import org.eclipse.emf.cdo.server.CDOServerUtil; +import org.eclipse.emf.cdo.server.ISynchronizableRepository; +import org.eclipse.emf.cdo.session.CDOSessionConfiguration; +import org.eclipse.emf.cdo.session.CDOSessionConfigurationFactory; +import org.eclipse.emf.cdo.spi.server.InternalFailoverParticipant; +import org.eclipse.emf.cdo.spi.server.InternalRepositorySynchronizer; + +import org.eclipse.net4j.connector.IConnector; +import org.eclipse.net4j.signal.Indication; +import org.eclipse.net4j.signal.SignalReactor; +import org.eclipse.net4j.signal.heartbeat.HeartBeatProtocol; +import org.eclipse.net4j.util.concurrent.TimerLifecycle; +import org.eclipse.net4j.util.concurrent.TimerLifecycle.DaemonFactory; +import org.eclipse.net4j.util.container.IManagedContainer; +import org.eclipse.net4j.util.container.IPluginContainer; +import org.eclipse.net4j.util.io.ExtendedDataInputStream; +import org.eclipse.net4j.util.io.ExtendedDataOutputStream; +import org.eclipse.net4j.util.lifecycle.Lifecycle; +import org.eclipse.net4j.util.lifecycle.LifecycleUtil; + +import java.io.IOException; +import java.util.Timer; + +/** + * A repository-side agent for a {@link FailoverMonitor fail-over monitor}. + * + * @author Eike Stepper + * @since 4.0 + */ +public abstract class FailoverAgent extends Lifecycle implements CDOSessionConfigurationFactory +{ + private IConnector monitorConnector; + + private Timer timer; + + private long rate; + + private long timeout; + + private String group; + + private String connectorDescription; + + private InternalFailoverParticipant repository; + + private Protocol protocol; + + private String masterConnectorDescription; + + private String masterRepositoryName; + + private InternalRepositorySynchronizer synchronizer; + + public FailoverAgent() + { + } + + public IConnector getMonitorConnector() + { + return monitorConnector; + } + + public void setMonitorConnector(IConnector connector) + { + checkInactive(); + monitorConnector = connector; + } + + public Timer getTimer() + { + return timer; + } + + public void setTimer(Timer timer) + { + checkInactive(); + this.timer = timer; + } + + public long getRate() + { + return rate; + } + + public void setRate(long rate) + { + checkInactive(); + this.rate = rate; + } + + public long getTimeout() + { + return timeout; + } + + public void setTimeout(long timeout) + { + checkInactive(); + this.timeout = timeout; + } + + public String getGroup() + { + return group; + } + + public void setGroup(String group) + { + checkInactive(); + this.group = group; + } + + public String getConnectorDescription() + { + return connectorDescription; + } + + public void setConnectorDescription(String connectorDescription) + { + checkInactive(); + this.connectorDescription = connectorDescription; + } + + public ISynchronizableRepository getRepository() + { + return repository; + } + + public void setRepository(ISynchronizableRepository repository) + { + checkInactive(); + + if (!(repository instanceof InternalFailoverParticipant)) + { + throw new IllegalArgumentException("Not a failover participant: " + repository); + } + + if (repository.getSynchronizer() != null) + { + throw new IllegalArgumentException("Synchronizer must be null: " + repository); + } + + this.repository = (InternalFailoverParticipant)repository; + } + + public Protocol getProtocol() + { + return protocol; + } + + public CDOSessionConfiguration createSessionConfiguration() + { + return createSessionConfiguration(masterConnectorDescription, masterRepositoryName); + } + + @Override + protected void doBeforeActivate() throws Exception + { + super.doBeforeActivate(); + checkState(monitorConnector, "monitorConnector"); + checkState(group, "group"); + checkState(connectorDescription, "connectorDescription"); + checkState(repository, "repository"); + } + + @Override + protected void doActivate() throws Exception + { + super.doActivate(); + + if (timer == null) + { + timer = (Timer)getContainer().getElement(TimerLifecycle.PRODUCT_GROUP, DaemonFactory.TYPE, null); + } + + synchronizer = (InternalRepositorySynchronizer)CDOServerUtil.createRepositorySynchronizer(this); + repository.setSynchronizer(synchronizer); + setMaster(); // Will be adjusted with the following SIGNAL_PUBLISH_MASTER + + LifecycleUtil.activate(repository); + + protocol = new Protocol(this); + protocol.start(rate, timeout); + } + + @Override + protected void doDeactivate() throws Exception + { + protocol.close(); + protocol = null; + timer = null; + monitorConnector = null; + super.doDeactivate(); + } + + protected void setMaster() + { + repository.setType(CDOCommonRepository.Type.MASTER); + masterConnectorDescription = null; + masterRepositoryName = null; + } + + protected void setBackup(String connectorDescription, String repositoryName) + { + masterConnectorDescription = connectorDescription; + masterRepositoryName = repositoryName; + repository.setType(CDOCommonRepository.Type.BACKUP); + } + + protected abstract CDOSessionConfiguration createSessionConfiguration(String connectorDescription, + String repositoryName); + + protected IManagedContainer getContainer() + { + return IPluginContainer.INSTANCE; + } + + /** + * The agent-side implementation of the {@link FailoverMonitor fail-over monitor} protocol. + * + * @author Eike Stepper + */ + public static class Protocol extends HeartBeatProtocol + { + private FailoverAgent agent; + + public Protocol(FailoverAgent agent) + { + super(FailoverMonitor.PROTOCOL_NAME, agent.getMonitorConnector(), agent.getTimer()); + this.agent = agent; + } + + public FailoverAgent getAgent() + { + return agent; + } + + @Override + protected void requestingStart(ExtendedDataOutputStream out, long rate) throws IOException + { + out.writeString(agent.getGroup()); + out.writeString(agent.getConnectorDescription()); + out.writeString(agent.getRepository().getName()); + super.requestingStart(out, rate); + } + + @Override + protected SignalReactor createSignalReactor(short signalID) + { + switch (signalID) + { + case FailoverMonitor.SIGNAL_PUBLISH_MASTER: + return new Indication(this, FailoverMonitor.SIGNAL_PUBLISH_MASTER) + { + @Override + protected void indicating(ExtendedDataInputStream in) throws Exception + { + boolean master = in.readBoolean(); + if (master) + { + agent.setMaster(); + } + else + { + String connectorDescription = in.readString(); + String repositoryName = in.readString(); + agent.setBackup(connectorDescription, repositoryName); + } + } + }; + + default: + return super.createSignalReactor(signalID); + } + } + } +} diff --git a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java index ca36a61031..52888fa028 100644 --- a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java +++ b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/net4j/FailoverMonitor.java @@ -1,413 +1,433 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-package org.eclipse.emf.cdo.server.net4j;
-
-import org.eclipse.emf.cdo.server.internal.net4j.bundle.OM;
-import org.eclipse.emf.cdo.server.net4j.FailoverMonitor.AgentProtocol;
-
-import org.eclipse.net4j.signal.IndicationWithResponse;
-import org.eclipse.net4j.signal.Request;
-import org.eclipse.net4j.signal.SignalProtocol;
-import org.eclipse.net4j.signal.SignalReactor;
-import org.eclipse.net4j.signal.heartbeat.HeartBeatProtocol;
-import org.eclipse.net4j.util.container.Container;
-import org.eclipse.net4j.util.container.IManagedContainer;
-import org.eclipse.net4j.util.container.IPluginContainer;
-import org.eclipse.net4j.util.factory.ProductCreationException;
-import org.eclipse.net4j.util.io.ExtendedDataInputStream;
-import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
-
-import org.eclipse.spi.net4j.ServerProtocolFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * @author Eike Stepper
- * @since 4.0
- */
-public class FailoverMonitor extends Container<AgentProtocol>
-{
- public static final String PRODUCT_GROUP = "org.eclipse.emf.cdo.server.net4j.failoverMonitors";
-
- public static final String PROTOCOL_NAME = "failover"; //$NON-NLS-1$
-
- public static final short SIGNAL_PUBLISH_MASTER = 3;
-
- private String group;
-
- private List<AgentProtocol> agents = new ArrayList<AgentProtocol>();
-
- private AgentProtocol masterAgent;
-
- public FailoverMonitor()
- {
- }
-
- public String getGroup()
- {
- return group;
- }
-
- public void setGroup(String group)
- {
- checkInactive();
- this.group = group;
- }
-
- public AgentProtocol[] getElements()
- {
- synchronized (agents)
- {
- return agents.toArray(new AgentProtocol[agents.size()]);
- }
- }
-
- public AgentProtocol getMasterAgent()
- {
- synchronized (agents)
- {
- return masterAgent;
- }
- }
-
- public void registerAgent(AgentProtocol agent)
- {
- AgentProtocol newMasterAgent = null;
- AgentProtocol[] newAgents = null;
-
- synchronized (agents)
- {
- agents.add(agent);
- if (agents.size() == 1)
- {
- masterAgent = agent;
- }
-
- newMasterAgent = masterAgent;
- newAgents = getElements();
- }
-
- if (newMasterAgent != null)
- {
- publishNewMaster(newMasterAgent, newAgents);
- }
-
- fireElementAddedEvent(agent);
- }
-
- public void deregisterAgent(AgentProtocol agent)
- {
- AgentProtocol newMasterAgent = null;
- AgentProtocol[] newAgents = null;
-
- synchronized (agents)
- {
- if (!agents.remove(agent))
- {
- return;
- }
-
- if (masterAgent == agent)
- {
- if (agents.isEmpty())
- {
- masterAgent = null;
- }
- else
- {
- masterAgent = electNewMaster(agents);
- }
- }
-
- newMasterAgent = masterAgent;
- newAgents = getElements();
- }
-
- if (newMasterAgent != null)
- {
- publishNewMaster(newMasterAgent, newAgents);
- }
-
- fireElementRemovedEvent(agent);
- }
-
- @Override
- protected void doBeforeActivate() throws Exception
- {
- super.doBeforeActivate();
- checkState(group, "group");
- }
-
- protected AgentProtocol electNewMaster(List<AgentProtocol> agents)
- {
- return agents.iterator().next();
- }
-
- private void publishNewMaster(final AgentProtocol masterAgent, AgentProtocol[] agents)
- {
- for (final AgentProtocol agent : agents)
- {
- try
- {
- new Request(agent, SIGNAL_PUBLISH_MASTER)
- {
- @Override
- protected void requesting(ExtendedDataOutputStream out) throws Exception
- {
- if (agent == masterAgent)
- {
- out.writeBoolean(true);
- }
- else
- {
- out.writeBoolean(false);
- out.writeString(masterAgent.getConnectorDescription());
- out.writeString(masterAgent.getRepositoryName());
- }
- }
- }.sendAsync();
- }
- catch (Exception ex)
- {
- OM.LOG.error(ex);
- }
- }
- }
-
- /**
- * @author Eike Stepper
- */
- public interface Provider
- {
- public FailoverMonitor getFailoverMonitor(String group);
- }
-
- /**
- * @author Eike Stepper
- */
- public static class Factory extends org.eclipse.net4j.util.factory.Factory
- {
- public static final String TYPE = "net4j";
-
- public Factory()
- {
- super(PRODUCT_GROUP, TYPE);
- }
-
- public FailoverMonitor create(String description) throws ProductCreationException
- {
- FailoverMonitor monitor = new FailoverMonitor();
- monitor.setGroup(description);
- return monitor;
- }
- }
-
- /**
- * @author Eike Stepper
- */
- public static abstract class AbstractServerProtocolFactory extends ServerProtocolFactory implements
- FailoverMonitor.Provider
- {
- private IManagedContainer container;
-
- protected AbstractServerProtocolFactory(String type)
- {
- this(type, IPluginContainer.INSTANCE);
- }
-
- protected AbstractServerProtocolFactory(String type, IManagedContainer container)
- {
- super(type);
- this.container = container;
- }
-
- public FailoverMonitor getFailoverMonitor(String group)
- {
- return (FailoverMonitor)container.getElement(FailoverMonitor.PRODUCT_GROUP, "net4j", group);
- }
- }
-
- /**
- * @author Eike Stepper
- */
- public static class AgentProtocol extends HeartBeatProtocol.Server
- {
- private FailoverMonitor.Provider failoverMonitorProvider;
-
- private FailoverMonitor failoverMonitor;
-
- private String connectorDescription;
-
- private String repositoryName;
-
- public AgentProtocol(Provider failOverMonitorProvider)
- {
- super(PROTOCOL_NAME);
- failoverMonitorProvider = failOverMonitorProvider;
- }
-
- @Override
- public String toString()
- {
- return connectorDescription + "/" + repositoryName;
- }
-
- protected FailoverMonitor getFailoverMonitor()
- {
- return failoverMonitor;
- }
-
- protected String getConnectorDescription()
- {
- return connectorDescription;
- }
-
- protected String getRepositoryName()
- {
- return repositoryName;
- }
-
- @Override
- protected void indicatingStart(ExtendedDataInputStream in) throws IOException
- {
- String group = in.readString();
- connectorDescription = in.readString();
- repositoryName = in.readString();
-
- failoverMonitor = failoverMonitorProvider.getFailoverMonitor(group);
- if (failoverMonitor == null)
- {
- throw new IllegalStateException("No monitor available for fail-over group " + group);
- }
-
- failoverMonitor.registerAgent(this);
- super.indicatingStart(in);
- }
-
- @Override
- protected void doDeactivate() throws Exception
- {
- failoverMonitor.deregisterAgent(this);
- super.doDeactivate();
- }
-
- /**
- * @author Eike Stepper
- */
- public static class Factory extends AbstractServerProtocolFactory
- {
- public Factory(IManagedContainer container)
- {
- super(PROTOCOL_NAME, container);
- }
-
- public Factory()
- {
- super(PROTOCOL_NAME);
- }
-
- public AgentProtocol create(String description) throws ProductCreationException
- {
- return new AgentProtocol(this);
- }
- }
- }
-
- /**
- * @author Eike Stepper
- */
- public static class ClientProtocol extends SignalProtocol<Object>
- {
- public static final String PROTOCOL_NAME = "failover-client"; //$NON-NLS-1$
-
- public static final short SIGNAL_QUERY_REPOSITORY_INFO = 1;
-
- private FailoverMonitor.Provider failoverMonitorProvider;
-
- private FailoverMonitor failoverMonitor;
-
- public ClientProtocol(Provider failOverMonitorProvider)
- {
- super(PROTOCOL_NAME);
- failoverMonitorProvider = failOverMonitorProvider;
- }
-
- @Override
- protected SignalReactor createSignalReactor(short signalID)
- {
- switch (signalID)
- {
- case SIGNAL_QUERY_REPOSITORY_INFO:
- return new IndicationWithResponse(this, SIGNAL_QUERY_REPOSITORY_INFO, "QueryRepositoryInfo")
- {
- @Override
- protected void indicating(ExtendedDataInputStream in) throws Exception
- {
- String group = in.readString();
- failoverMonitor = failoverMonitorProvider.getFailoverMonitor(group);
- if (failoverMonitor == null)
- {
- throw new IllegalStateException("No monitor available for fail-over group " + group);
- }
- }
-
- @Override
- protected void responding(ExtendedDataOutputStream out) throws Exception
- {
- AgentProtocol masterAgent = getMasterAgent();
- out.writeString(masterAgent.getConnectorDescription());
- out.writeString(masterAgent.getRepositoryName());
- }
-
- protected AgentProtocol getMasterAgent() throws InterruptedException
- {
- for (;;)
- {
- AgentProtocol masterAgent = failoverMonitor.getMasterAgent();
- if (masterAgent != null)
- {
- return masterAgent;
- }
-
- Thread.sleep(100L);
- }
- }
- };
-
- default:
- return super.createSignalReactor(signalID);
- }
- }
-
- /**
- * @author Eike Stepper
- */
- public static class Factory extends AbstractServerProtocolFactory
- {
- public Factory(IManagedContainer container)
- {
- super(PROTOCOL_NAME, container);
- }
-
- public Factory()
- {
- super(PROTOCOL_NAME);
- }
-
- public ClientProtocol create(String description) throws ProductCreationException
- {
- return new ClientProtocol(this);
- }
- }
- }
-}
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.emf.cdo.server.net4j; + +import org.eclipse.emf.cdo.common.CDOCommonRepository.Type; +import org.eclipse.emf.cdo.server.internal.net4j.bundle.OM; +import org.eclipse.emf.cdo.server.net4j.FailoverMonitor.AgentProtocol; +import org.eclipse.emf.cdo.spi.server.InternalFailoverParticipant; + +import org.eclipse.net4j.signal.IndicationWithResponse; +import org.eclipse.net4j.signal.Request; +import org.eclipse.net4j.signal.SignalProtocol; +import org.eclipse.net4j.signal.SignalReactor; +import org.eclipse.net4j.signal.heartbeat.HeartBeatProtocol; +import org.eclipse.net4j.util.container.Container; +import org.eclipse.net4j.util.container.IManagedContainer; +import org.eclipse.net4j.util.container.IPluginContainer; +import org.eclipse.net4j.util.factory.ProductCreationException; +import org.eclipse.net4j.util.io.ExtendedDataInputStream; +import org.eclipse.net4j.util.io.ExtendedDataOutputStream; + +import org.eclipse.spi.net4j.ServerProtocolFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * A facility for monitoring a variable set of {@link InternalFailoverParticipant fail-over participant} repositories and electing, + * as well as promoting, a {@link Type#MASTER master} repository among them. + * + * @author Eike Stepper + * @since 4.0 + */ +public class FailoverMonitor extends Container<AgentProtocol> +{ + public static final String PRODUCT_GROUP = "org.eclipse.emf.cdo.server.net4j.failoverMonitors"; + + public static final String PROTOCOL_NAME = "failover"; //$NON-NLS-1$ + + public static final short SIGNAL_PUBLISH_MASTER = 3; + + private String group; + + private List<AgentProtocol> agents = new ArrayList<AgentProtocol>(); + + private AgentProtocol masterAgent; + + public FailoverMonitor() + { + } + + public String getGroup() + { + return group; + } + + public void setGroup(String group) + { + checkInactive(); + this.group = group; + } + + public AgentProtocol[] getElements() + { + synchronized (agents) + { + return agents.toArray(new AgentProtocol[agents.size()]); + } + } + + public AgentProtocol getMasterAgent() + { + synchronized (agents) + { + return masterAgent; + } + } + + public void registerAgent(AgentProtocol agent) + { + AgentProtocol newMasterAgent = null; + AgentProtocol[] newAgents = null; + + synchronized (agents) + { + agents.add(agent); + if (agents.size() == 1) + { + masterAgent = agent; + } + + newMasterAgent = masterAgent; + newAgents = getElements(); + } + + if (newMasterAgent != null) + { + publishNewMaster(newMasterAgent, newAgents); + } + + fireElementAddedEvent(agent); + } + + public void deregisterAgent(AgentProtocol agent) + { + AgentProtocol newMasterAgent = null; + AgentProtocol[] newAgents = null; + + synchronized (agents) + { + if (!agents.remove(agent)) + { + return; + } + + if (masterAgent == agent) + { + if (agents.isEmpty()) + { + masterAgent = null; + } + else + { + masterAgent = electNewMaster(agents); + } + } + + newMasterAgent = masterAgent; + newAgents = getElements(); + } + + if (newMasterAgent != null) + { + publishNewMaster(newMasterAgent, newAgents); + } + + fireElementRemovedEvent(agent); + } + + @Override + protected void doBeforeActivate() throws Exception + { + super.doBeforeActivate(); + checkState(group, "group"); + } + + protected AgentProtocol electNewMaster(List<AgentProtocol> agents) + { + return agents.iterator().next(); + } + + private void publishNewMaster(final AgentProtocol masterAgent, AgentProtocol[] agents) + { + for (final AgentProtocol agent : agents) + { + try + { + new Request(agent, SIGNAL_PUBLISH_MASTER) + { + @Override + protected void requesting(ExtendedDataOutputStream out) throws Exception + { + if (agent == masterAgent) + { + out.writeBoolean(true); + } + else + { + out.writeBoolean(false); + out.writeString(masterAgent.getConnectorDescription()); + out.writeString(masterAgent.getRepositoryName()); + } + } + }.sendAsync(); + } + catch (Exception ex) + { + OM.LOG.error(ex); + } + } + } + + /** + * Provides a {@link FailoverMonitor fail-over monitor} for a given named fail-over group. + * + * @author Eike Stepper + */ + public interface Provider + { + public FailoverMonitor getFailoverMonitor(String group); + } + + /** + * Creates {@link FailoverMonitor fail-over monitor} instances. + * + * @author Eike Stepper + */ + public static class Factory extends org.eclipse.net4j.util.factory.Factory + { + public static final String TYPE = "net4j"; + + public Factory() + { + super(PRODUCT_GROUP, TYPE); + } + + public FailoverMonitor create(String description) throws ProductCreationException + { + FailoverMonitor monitor = new FailoverMonitor(); + monitor.setGroup(description); + return monitor; + } + } + + /** + * An abstract base class for the {@link ServerProtocolFactory server-side protocol factories} + * required by a {@link FailoverMonitor fail-over monitor}. + * + * @author Eike Stepper + */ + public static abstract class AbstractServerProtocolFactory extends ServerProtocolFactory implements + FailoverMonitor.Provider + { + private IManagedContainer container; + + protected AbstractServerProtocolFactory(String type) + { + this(type, IPluginContainer.INSTANCE); + } + + protected AbstractServerProtocolFactory(String type, IManagedContainer container) + { + super(type); + this.container = container; + } + + public FailoverMonitor getFailoverMonitor(String group) + { + return (FailoverMonitor)container.getElement(FailoverMonitor.PRODUCT_GROUP, "net4j", group); + } + } + + /** + * The monitor-side implementation of the {@link FailoverMonitor fail-over monitor} agent protocol. + * + * @author Eike Stepper + */ + public static class AgentProtocol extends HeartBeatProtocol.Server + { + private FailoverMonitor.Provider failoverMonitorProvider; + + private FailoverMonitor failoverMonitor; + + private String connectorDescription; + + private String repositoryName; + + public AgentProtocol(Provider failOverMonitorProvider) + { + super(PROTOCOL_NAME); + failoverMonitorProvider = failOverMonitorProvider; + } + + @Override + public String toString() + { + return connectorDescription + "/" + repositoryName; + } + + protected FailoverMonitor getFailoverMonitor() + { + return failoverMonitor; + } + + protected String getConnectorDescription() + { + return connectorDescription; + } + + protected String getRepositoryName() + { + return repositoryName; + } + + @Override + protected void indicatingStart(ExtendedDataInputStream in) throws IOException + { + String group = in.readString(); + connectorDescription = in.readString(); + repositoryName = in.readString(); + + failoverMonitor = failoverMonitorProvider.getFailoverMonitor(group); + if (failoverMonitor == null) + { + throw new IllegalStateException("No monitor available for fail-over group " + group); + } + + failoverMonitor.registerAgent(this); + super.indicatingStart(in); + } + + @Override + protected void doDeactivate() throws Exception + { + failoverMonitor.deregisterAgent(this); + super.doDeactivate(); + } + + /** + * Creates {@link AgentProtocol fail-over agent protocol} instances. + * + * @author Eike Stepper + */ + public static class Factory extends AbstractServerProtocolFactory + { + public Factory(IManagedContainer container) + { + super(PROTOCOL_NAME, container); + } + + public Factory() + { + super(PROTOCOL_NAME); + } + + public AgentProtocol create(String description) throws ProductCreationException + { + return new AgentProtocol(this); + } + } + } + + /** + * The monitor-side implementation of the {@link FailoverMonitor fail-over monitor} client protocol. + * + * @author Eike Stepper + */ + public static class ClientProtocol extends SignalProtocol<Object> + { + public static final String PROTOCOL_NAME = "failover-client"; //$NON-NLS-1$ + + public static final short SIGNAL_QUERY_REPOSITORY_INFO = 1; + + private FailoverMonitor.Provider failoverMonitorProvider; + + private FailoverMonitor failoverMonitor; + + public ClientProtocol(Provider failOverMonitorProvider) + { + super(PROTOCOL_NAME); + failoverMonitorProvider = failOverMonitorProvider; + } + + @Override + protected SignalReactor createSignalReactor(short signalID) + { + switch (signalID) + { + case SIGNAL_QUERY_REPOSITORY_INFO: + return new IndicationWithResponse(this, SIGNAL_QUERY_REPOSITORY_INFO, "QueryRepositoryInfo") + { + @Override + protected void indicating(ExtendedDataInputStream in) throws Exception + { + String group = in.readString(); + failoverMonitor = failoverMonitorProvider.getFailoverMonitor(group); + if (failoverMonitor == null) + { + throw new IllegalStateException("No monitor available for fail-over group " + group); + } + } + + @Override + protected void responding(ExtendedDataOutputStream out) throws Exception + { + AgentProtocol masterAgent = getMasterAgent(); + out.writeString(masterAgent.getConnectorDescription()); + out.writeString(masterAgent.getRepositoryName()); + } + + protected AgentProtocol getMasterAgent() throws InterruptedException + { + for (;;) + { + AgentProtocol masterAgent = failoverMonitor.getMasterAgent(); + if (masterAgent != null) + { + return masterAgent; + } + + Thread.sleep(100L); + } + } + }; + + default: + return super.createSignalReactor(signalID); + } + } + + /** + * Creates {@link ClientProtocol fail-over client protocol} instances. + * + * @author Eike Stepper + */ + public static class Factory extends AbstractServerProtocolFactory + { + public Factory(IManagedContainer container) + { + super(PROTOCOL_NAME, container); + } + + public Factory() + { + super(PROTOCOL_NAME); + } + + public ClientProtocol create(String description) throws ProductCreationException + { + return new ClientProtocol(this); + } + } + } +} |