diff options
Diffstat (limited to 'plugins/org.eclipse.emf.cdo.net4j')
15 files changed, 842 insertions, 189 deletions
diff --git a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/CDONet4jSessionConfigurationImpl.java b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/CDONet4jSessionConfigurationImpl.java index c013a3aa00..73b806172c 100644 --- a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/CDONet4jSessionConfigurationImpl.java +++ b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/CDONet4jSessionConfigurationImpl.java @@ -17,6 +17,7 @@ import org.eclipse.emf.cdo.session.CDORepositoryInfo; import org.eclipse.emf.internal.cdo.session.CDOSessionConfigurationImpl; import org.eclipse.net4j.connector.IConnector; +import org.eclipse.net4j.signal.SignalProtocol; import org.eclipse.net4j.util.CheckUtil; import org.eclipse.net4j.util.io.IStreamWrapper; @@ -38,6 +39,8 @@ public class CDONet4jSessionConfigurationImpl extends CDOSessionConfigurationImp private IStreamWrapper streamWrapper; + private long signalTimeout = SignalProtocol.DEFAULT_TIMEOUT; + public CDONet4jSessionConfigurationImpl() { } @@ -49,12 +52,6 @@ public class CDONet4jSessionConfigurationImpl extends CDOSessionConfigurationImp public void setRepositoryName(String repositoryName) { - checkNotOpen(); - uncheckedSetRepositoryName(repositoryName); - } - - protected void uncheckedSetRepositoryName(String repositoryName) - { this.repositoryName = repositoryName; } @@ -85,6 +82,16 @@ public class CDONet4jSessionConfigurationImpl extends CDOSessionConfigurationImp this.streamWrapper = streamWrapper; } + public long getSignalTimeout() + { + return signalTimeout; + } + + public void setSignalTimeout(long signalTimeout) + { + this.signalTimeout = signalTimeout; + } + @Override public org.eclipse.emf.cdo.net4j.CDOSession openSession() { @@ -110,6 +117,7 @@ public class CDONet4jSessionConfigurationImpl extends CDOSessionConfigurationImp sessionImpl.setStreamWrapper(streamWrapper); sessionImpl.setConnector(connector); sessionImpl.setRepositoryName(repositoryName); + sessionImpl.setSignalTimeout(signalTimeout); } /** diff --git a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/CDONet4jSessionImpl.java b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/CDONet4jSessionImpl.java index 4d27cc1c5b..5bc307a3d4 100644 --- a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/CDONet4jSessionImpl.java +++ b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/CDONet4jSessionImpl.java @@ -34,9 +34,11 @@ import org.eclipse.emf.cdo.spi.common.model.InternalCDOPackageUnit; import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevisionManager; import org.eclipse.emf.internal.cdo.session.CDOSessionImpl; +import org.eclipse.emf.internal.cdo.session.DelegatingSessionProtocol; import org.eclipse.net4j.connector.IConnector; import org.eclipse.net4j.signal.ISignalProtocol; +import org.eclipse.net4j.signal.SignalProtocol; import org.eclipse.net4j.util.io.IStreamWrapper; import org.eclipse.emf.ecore.EcorePackage; @@ -52,27 +54,61 @@ public class CDONet4jSessionImpl extends CDOSessionImpl implements org.eclipse.e private IConnector connector; - protected String repositoryName; // TODO (CD) Eliminate? (Duplicates name in repoInfo field) + private String repositoryName; + + private long signalTimeout = SignalProtocol.DEFAULT_TIMEOUT; public CDONet4jSessionImpl() { } + public IStreamWrapper getStreamWrapper() + { + return streamWrapper; + } + public void setStreamWrapper(IStreamWrapper streamWrapper) { this.streamWrapper = streamWrapper; } + public IConnector getConnector() + { + return connector; + } + public void setConnector(IConnector connector) { this.connector = connector; } + public String getRepositoryName() + { + return repositoryName; + } + public void setRepositoryName(String repositoryName) { this.repositoryName = repositoryName; } + public long getSignalTimeout() + { + return signalTimeout; + } + + public void setSignalTimeout(long signalTimeout) + { + this.signalTimeout = signalTimeout; + + // Deal with the possibility that the sessionProtocol has already been created. + CDOClientProtocol clientProtocol = getClientProtocol(); + if (clientProtocol != null) + { + clientProtocol.setTimeout(this.signalTimeout); + } + } + @Override public OptionsImpl options() { @@ -86,10 +122,11 @@ public class CDONet4jSessionImpl extends CDOSessionImpl implements org.eclipse.e } @Override - protected void activateSession() throws Exception + protected void doActivate() throws Exception { - super.activateSession(); - OpenSessionResult result = initProtocol(); + OpenSessionResult result = openSession(); + + super.doActivate(); InternalCDOPackageRegistry packageRegistry = getPackageRegistry(); if (packageRegistry == null) @@ -153,18 +190,45 @@ public class CDONet4jSessionImpl extends CDOSessionImpl implements org.eclipse.e } } - protected OpenSessionResult initProtocol() + private CDOClientProtocol createProtocol() { CDOClientProtocol protocol = new CDOClientProtocol(); + protocol.setTimeout(signalTimeout); protocol.setInfraStructure(this); if (streamWrapper != null) { protocol.setStreamWrapper(streamWrapper); } + protocol.open(connector); + return protocol; + } + + /** + * Gets the CDOClientProtocol instance, which may be wrapped inside a DelegatingSessionProtocol + */ + private CDOClientProtocol getClientProtocol() + { + CDOSessionProtocol protocol = getSessionProtocol(); + CDOClientProtocol clientProtocol; + if (protocol instanceof DelegatingSessionProtocol) + { + clientProtocol = (CDOClientProtocol)((DelegatingSessionProtocol)protocol).getDelegate(); + } + else + { + clientProtocol = (CDOClientProtocol)protocol; + } + return clientProtocol; + } + + protected OpenSessionResult openSession() + { + CDOClientProtocol protocol = createProtocol(); setSessionProtocol(protocol); - protocol.open(connector); + hookSessionProtocol(); + // TODO (CD) The next call is on the CDOClientProtocol; shouldn't it be on the DelegatingSessionProtocol instead? OpenSessionResult result = protocol.openSession(repositoryName, options().isPassiveUpdateEnabled(), options() .getPassiveUpdateMode()); setSessionID(result.getSessionID()); @@ -175,15 +239,12 @@ public class CDONet4jSessionImpl extends CDOSessionImpl implements org.eclipse.e } @Override - protected void deactivateSession() throws Exception + protected void doDeactivate() throws Exception { + super.doDeactivate(); + getCommitInfoManager().deactivate(); getRevisionManager().deactivate(); - - // branchManager.deactivate(); - // packageRegistry.deactivate(); - - super.deactivateSession(); } /** diff --git a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/CDOSessionRecoveryEventImpl.java b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/CDOSessionRecoveryEventImpl.java new file mode 100644 index 0000000000..aeffe7edfb --- /dev/null +++ b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/CDOSessionRecoveryEventImpl.java @@ -0,0 +1,40 @@ +/** + * Copyright (c) 2004 - 2010 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Caspar De Groot - initial API and implementation + */ +package org.eclipse.emf.cdo.internal.net4j; + +import org.eclipse.emf.cdo.net4j.CDOSessionRecoveryEvent; +import org.eclipse.emf.cdo.session.CDOSession; + +/** + * @author Caspar De Groot + */ +public class CDOSessionRecoveryEventImpl implements CDOSessionRecoveryEvent +{ + private CDOSession source; + + private Type type; + + public CDOSessionRecoveryEventImpl(CDOSession source, Type type) + { + this.type = type; + this.source = source; + } + + public CDOSession getSource() + { + return source; + } + + public Type getType() + { + return type; + } +} diff --git a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/FailoverCDOSessionConfigurationImpl.java b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/FailoverCDOSessionConfigurationImpl.java index b604ceaeb9..16d926340e 100644 --- a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/FailoverCDOSessionConfigurationImpl.java +++ b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/FailoverCDOSessionConfigurationImpl.java @@ -22,21 +22,20 @@ import org.eclipse.emf.spi.cdo.InternalCDOSession; * @author Eike Stepper * @since 4.0 */ -public class FailoverCDOSessionConfigurationImpl extends CDONet4jSessionConfigurationImpl implements +public class FailoverCDOSessionConfigurationImpl extends RecoveringCDOSessionConfigurationImpl implements FailoverCDOSessionConfiguration { private String monitorConnectorDescription; private String repositoryGroup; - private IManagedContainer container; - public FailoverCDOSessionConfigurationImpl(String monitorConnectorDescription, String repositoryGroup, IManagedContainer container) { + super(container); + this.monitorConnectorDescription = monitorConnectorDescription; this.repositoryGroup = repositoryGroup; - this.container = container; } public String getMonitorConnectorDescription() @@ -81,6 +80,5 @@ public class FailoverCDOSessionConfigurationImpl extends CDONet4jSessionConfigur FailoverCDOSessionImpl sessionImpl = (FailoverCDOSessionImpl)session; sessionImpl.setMonitorConnectionDescription(monitorConnectorDescription); sessionImpl.setRepositoryGroup(repositoryGroup); - sessionImpl.setContainer(container); } } diff --git a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/FailoverCDOSessionImpl.java b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/FailoverCDOSessionImpl.java index f0e803339d..34231a5fb0 100644 --- a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/FailoverCDOSessionImpl.java +++ b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/FailoverCDOSessionImpl.java @@ -7,57 +7,31 @@ * * Contributors: * Eike Stepper - initial API and implementation + * Caspar De Groot - maintenance */ package org.eclipse.emf.cdo.internal.net4j; -import org.eclipse.emf.cdo.common.branch.CDOBranchPoint; -import org.eclipse.emf.cdo.net4j.CDOSessionFailoverEvent; -import org.eclipse.emf.cdo.session.CDOSession; -import org.eclipse.emf.cdo.spi.common.branch.CDOBranchUtil; -import org.eclipse.emf.cdo.spi.common.branch.InternalCDOBranchManager; -import org.eclipse.emf.cdo.spi.common.commit.InternalCDOCommitInfoManager; -import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevisionManager; -import org.eclipse.emf.cdo.transaction.CDOTransaction; - -import org.eclipse.net4j.Net4jUtil; import org.eclipse.net4j.connector.IConnector; import org.eclipse.net4j.signal.RequestWithConfirmation; import org.eclipse.net4j.signal.SignalProtocol; -import org.eclipse.net4j.signal.heartbeat.HeartBeatProtocol; import org.eclipse.net4j.util.ObjectUtil; import org.eclipse.net4j.util.WrappedException; -import org.eclipse.net4j.util.container.IManagedContainer; import org.eclipse.net4j.util.io.ExtendedDataInputStream; import org.eclipse.net4j.util.io.ExtendedDataOutputStream; -import org.eclipse.emf.spi.cdo.CDOSessionProtocol; -import org.eclipse.emf.spi.cdo.InternalCDOView; - -import java.util.ArrayList; -import java.util.List; - /** * @author Eike Stepper */ -public class FailoverCDOSessionImpl extends CDONet4jSessionImpl +public class FailoverCDOSessionImpl extends RecoveringCDOSessionImpl { - private IManagedContainer container; - private String monitorConnectorDescription; private String repositoryGroup; - private String repositoryConnectorDescription; - public FailoverCDOSessionImpl() { } - public void setContainer(IManagedContainer container) - { - this.container = container; - } - public void setMonitorConnectionDescription(String monitorConnectorDescription) { this.monitorConnectorDescription = monitorConnectorDescription; @@ -69,116 +43,26 @@ public class FailoverCDOSessionImpl extends CDONet4jSessionImpl } @Override - protected void sessionProtocolDeactivated() - { - fireFailoverEvent(CDOSessionFailoverEvent.Type.STARTED); - - unhookSessionProtocol(); - List<AfterFailoverRunnable> runnables = failover(); - CDOSessionProtocol sessionProtocol = hookSessionProtocol(); - - for (AfterFailoverRunnable runnable : runnables) - { - runnable.run(sessionProtocol); - } - - fireFailoverEvent(CDOSessionFailoverEvent.Type.FINISHED); - } - - private void fireFailoverEvent(final CDOSessionFailoverEvent.Type type) - { - fireEvent(new CDOSessionFailoverEvent() - { - public CDOSession getSource() - { - return FailoverCDOSessionImpl.this; - } - - public Type getType() - { - return type; - } - }); - } - - public List<AfterFailoverRunnable> failover() - { - try - { - List<AfterFailoverRunnable> runnables = new ArrayList<AfterFailoverRunnable>(); - for (InternalCDOView view : getViews()) - { - runnables.add(new OpenViewRunnable(view)); - } - - updateConnectorAndRepositoryName(); - initProtocol(); - - // The revisionManager, branchManager, and commitInfoManager, hold their own - // references to the sessionProtocol. We need to update those: - - InternalCDORevisionManager revisionManager = getRevisionManager(); - revisionManager.deactivate(); - revisionManager.setRevisionLoader(getSessionProtocol()); - revisionManager.activate(); - - InternalCDOBranchManager branchManager = getBranchManager(); - branchManager.deactivate(); - branchManager.setBranchLoader(getSessionProtocol()); - branchManager.activate(); - - InternalCDOCommitInfoManager commitInfoManager = getCommitInfoManager(); - commitInfoManager.deactivate(); - commitInfoManager.setCommitInfoLoader(getSessionProtocol()); - commitInfoManager.activate(); - - return runnables; - } - catch (RuntimeException ex) - { - deactivate(); - throw ex; - } - catch (Error ex) - { - deactivate(); - throw ex; - } - } - - @Override - protected void activateSession() throws Exception - { - updateConnectorAndRepositoryName(); - super.activateSession(); - } - - private void updateConnectorAndRepositoryName() + protected void updateConnectorAndRepositoryName() { - // System.out.println("Querying fail-over monitor..."); queryRepositoryInfoFromMonitor(); - - // System.out.println("Connecting to " + repositoryConnectorDescription + "/" + repositoryName + "..."); - IConnector connector = getConnector(repositoryConnectorDescription); - new HeartBeatProtocol(connector, container).start(1000L, 5000L); - + IConnector connector = createTCPConnector(getUseHeartBeat()); setConnector(connector); - setRepositoryName(repositoryName); } protected void queryRepositoryInfoFromMonitor() { - IConnector connector = getConnector(monitorConnectorDescription); + IConnector connector = getTCPConnector(monitorConnectorDescription); SignalProtocol<Object> protocol = new SignalProtocol<Object>("failover-client"); protocol.open(connector); try { - String oldRepositoryConnectorDescription = repositoryConnectorDescription; - String oldRepositoryName = repositoryName; + String oldRepositoryConnectorDescription = getRepositoryConnectorDescription(); + String oldRepositoryName = getRepositoryName(); - while (ObjectUtil.equals(repositoryConnectorDescription, oldRepositoryConnectorDescription) - && ObjectUtil.equals(repositoryName, oldRepositoryName)) + while (ObjectUtil.equals(getRepositoryConnectorDescription(), oldRepositoryConnectorDescription) + && ObjectUtil.equals(getRepositoryName(), oldRepositoryName)) { new RequestWithConfirmation<Boolean>(protocol, (short)1, "QueryRepositoryInfo") { @@ -191,8 +75,8 @@ public class FailoverCDOSessionImpl extends CDONet4jSessionImpl @Override protected Boolean confirming(ExtendedDataInputStream in) throws Exception { - repositoryConnectorDescription = in.readString(); - repositoryName = in.readString(); + setRepositoryConnectorDescription(in.readString()); + setRepositoryName(in.readString()); return true; } }.send(); @@ -211,41 +95,4 @@ public class FailoverCDOSessionImpl extends CDONet4jSessionImpl } } } - - protected IConnector getConnector(String description) - { - return Net4jUtil.getConnector(container, "tcp", description); - } - - /** - * @author Eike Stepper - */ - public static interface AfterFailoverRunnable - { - public void run(CDOSessionProtocol sessionProtocol); - } - - /** - * @author Eike Stepper - */ - private final class OpenViewRunnable implements AfterFailoverRunnable - { - private int viewID; - - private CDOBranchPoint branchPoint; - - private boolean transaction; - - public OpenViewRunnable(InternalCDOView view) - { - viewID = view.getViewID(); - branchPoint = CDOBranchUtil.copyBranchPoint(view); - transaction = view instanceof CDOTransaction; - } - - public void run(CDOSessionProtocol sessionProtocol) - { - sessionProtocol.openView(viewID, branchPoint, !transaction); - } - } } diff --git a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/ReconnectingCDOSessionConfigurationImpl.java b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/ReconnectingCDOSessionConfigurationImpl.java new file mode 100644 index 0000000000..b71f5bfab1 --- /dev/null +++ b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/ReconnectingCDOSessionConfigurationImpl.java @@ -0,0 +1,90 @@ +/** + * Copyright (c) 2004 - 2010 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Caspar De Groot - initial API and implementation + */ +package org.eclipse.emf.cdo.internal.net4j; + +import org.eclipse.emf.cdo.net4j.ReconnectingCDOSessionConfiguration; +import org.eclipse.emf.cdo.session.CDOSession.ExceptionHandler; + +import org.eclipse.net4j.util.container.IManagedContainer; + +import org.eclipse.emf.spi.cdo.InternalCDOSession; + +/** + * @author Caspar De Groot + */ +public class ReconnectingCDOSessionConfigurationImpl extends RecoveringCDOSessionConfigurationImpl implements + ReconnectingCDOSessionConfiguration +{ + private String hostAndPort; + + private long reconnectInterval = 0; + + private int maxReconnectAttempts = Integer.MAX_VALUE; + + public ReconnectingCDOSessionConfigurationImpl(String hostAndPort, String repositoryName, IManagedContainer container) + { + super(container); + + this.hostAndPort = hostAndPort; + setRepositoryName(repositoryName); + } + + public long getReconnectInterval() + { + return reconnectInterval; + } + + public void setReconnectInterval(long reconnectInterval) + { + this.reconnectInterval = reconnectInterval; + } + + public int getMaxReconnectAttempts() + { + return maxReconnectAttempts; + } + + public void setMaxReconnectAttempts(int maxReconnectAttempts) + { + this.maxReconnectAttempts = maxReconnectAttempts; + } + + @Override + public void setExceptionHandler(ExceptionHandler handler) + { + throw new UnsupportedOperationException(); + } + + @Override + public InternalCDOSession createSession() + { + ReconnectingCDOSessionImpl session = new ReconnectingCDOSessionImpl(); + + // A ReconnectingCDOSessionImpl has its own exceptionHandler; but the configuration mechanism + // expects the configuration object (i.e. *this*) to hold a reference to the desired handler. + // We therefore fetch the handler from the session and plug it into *this*, so that the + // config mechanism can proceed normally. (It will "set" the same handler again.) + // + super.setExceptionHandler(session.getExceptionHandler()); + return session; + } + + @Override + protected void configureSession(InternalCDOSession session) + { + super.configureSession(session); + + ReconnectingCDOSessionImpl sessionImpl = (ReconnectingCDOSessionImpl)session; + sessionImpl.setRepositoryConnectorDescription(hostAndPort); + sessionImpl.setReconnectInterval(reconnectInterval); + sessionImpl.setMaxReconnectAttempts(maxReconnectAttempts); + } +} diff --git a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/ReconnectingCDOSessionImpl.java b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/ReconnectingCDOSessionImpl.java new file mode 100644 index 0000000000..1f29b9e3b7 --- /dev/null +++ b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/ReconnectingCDOSessionImpl.java @@ -0,0 +1,127 @@ +/** + * Copyright (c) 2004 - 2010 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Caspar De Groot - initial API and implementation + */ +package org.eclipse.emf.cdo.internal.net4j; + +import org.eclipse.net4j.connector.ConnectorException; +import org.eclipse.net4j.connector.IConnector; + +/** + * @author Caspar De Groot + */ +public class ReconnectingCDOSessionImpl extends RecoveringCDOSessionImpl +{ + private long reconnectInterval = 0; + + private int maxReconnectAttempts = Integer.MAX_VALUE; + + public ReconnectingCDOSessionImpl() + { + } + + public long getReconnectInterval() + { + return reconnectInterval; + } + + public void setReconnectInterval(long reconnectInterval) + { + this.reconnectInterval = reconnectInterval; + } + + public int getMaxReconnectAttempts() + { + return maxReconnectAttempts; + } + + public void setMaxReconnectAttempts(int maxReconnectAttempts) + { + this.maxReconnectAttempts = maxReconnectAttempts; + } + + @Override + public void setConnector(IConnector connector) + { + // Do nothing (ignore an externally configured connector) + // Note: we cannot throw UnsupportedOperationException because the + // SessionConfig object will call this. + } + + @Override + public void setRepositoryConnectorDescription(String description) + { + if (getRepositoryConnectorDescription() != null) + { + throw new IllegalStateException("Don't call setRepositoryConnectorDescription more than once"); + } + + super.setRepositoryConnectorDescription(description); + } + + @Override + protected void updateConnectorAndRepositoryName() + { + removeTCPConnector(); + + IConnector newConnector = null; + int failedAttempts = 0; + long startOfLastAttempt = 0; + + while (newConnector == null && failedAttempts < maxReconnectAttempts) + { + try + { + if (startOfLastAttempt > 0) + { + delayAsNeeded(startOfLastAttempt); + } + + startOfLastAttempt = System.currentTimeMillis(); + newConnector = createTCPConnector(getUseHeartBeat()); + } + catch (ConnectorException ex) + { + failedAttempts++; + } + } + + if (newConnector == null) + { + throw new RuntimeException("Recovery failed"); // TODO (CD) Create custom exception type? + } + + super.setConnector(newConnector); + } + + private void delayAsNeeded(long startOfLastAttempt) + { + long timeToWait = requiredDelay(startOfLastAttempt); + while (timeToWait > 0) + { + try + { + Thread.sleep(timeToWait); + timeToWait = 0; + } + catch (InterruptedException ex) + { + timeToWait = requiredDelay(startOfLastAttempt); + } + } + } + + private long requiredDelay(long startOfLastAttempt) + { + long now = System.currentTimeMillis(); + long timeSinceLastAttempt = now - startOfLastAttempt; + long timeToWait = reconnectInterval - timeSinceLastAttempt; + return timeToWait; + } +} diff --git a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/RecoveringCDOSessionConfigurationImpl.java b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/RecoveringCDOSessionConfigurationImpl.java new file mode 100644 index 0000000000..fbda8ef635 --- /dev/null +++ b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/RecoveringCDOSessionConfigurationImpl.java @@ -0,0 +1,102 @@ +/** + * Copyright (c) 2004 - 2010 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Caspar De Groot - initial API and implementation + */ +package org.eclipse.emf.cdo.internal.net4j; + +import org.eclipse.emf.cdo.net4j.RecoveringCDOSessionConfiguration; + +import org.eclipse.net4j.util.container.IManagedContainer; + +import org.eclipse.emf.spi.cdo.InternalCDOSession; + +/** + * @author Caspar De Groot + */ +public abstract class RecoveringCDOSessionConfigurationImpl extends CDONet4jSessionConfigurationImpl implements + RecoveringCDOSessionConfiguration +{ + private IManagedContainer container; + + private boolean heartBeatEnabled; + + private long heartBeatPeriod; + + private long heartBeatTimeout; + + private long connectorTimeout; + + public RecoveringCDOSessionConfigurationImpl(IManagedContainer container) + { + this.container = container; + } + + protected IManagedContainer getContainer() + { + return container; + } + + public long getConnectorTimeout() + { + return connectorTimeout; + } + + public void setConnectorTimeout(long timeout) + { + connectorTimeout = timeout; + } + + public boolean isHeartBeatEnabled() + { + return heartBeatEnabled; + } + + public void setHeartBeatEnabled(boolean enabled) + { + heartBeatEnabled = enabled; + } + + public long getHeartBeatTimeout() + { + return heartBeatTimeout; + } + + public void setHeartBeatTimeout(long timeout) + { + heartBeatTimeout = timeout; + } + + public long getHeartBeatPeriod() + { + return heartBeatPeriod; + } + + public void setHeartBeatPeriod(long period) + { + heartBeatPeriod = period; + } + + @Override + protected void configureSession(InternalCDOSession session) + { + super.configureSession(session); + + if (heartBeatEnabled && (heartBeatPeriod == 0 || heartBeatTimeout == 0)) + { + throw new IllegalStateException("Cannot use a heartbeat with zero value set for period or timeout."); + } + + RecoveringCDOSessionImpl sessionImpl = (RecoveringCDOSessionImpl)session; + sessionImpl.setContainer(getContainer()); + sessionImpl.setUseHeartBeat(heartBeatEnabled); + sessionImpl.setHeartBeatPeriod(heartBeatPeriod); + sessionImpl.setHeartBeatTimeout(heartBeatTimeout); + sessionImpl.setConnectorTimeout(connectorTimeout); + } +} diff --git a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/RecoveringCDOSessionImpl.java b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/RecoveringCDOSessionImpl.java new file mode 100644 index 0000000000..589c8b01a8 --- /dev/null +++ b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/RecoveringCDOSessionImpl.java @@ -0,0 +1,299 @@ +/** + * Copyright (c) 2004 - 2010 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Caspar De Groot - initial API and implementation + */ +package org.eclipse.emf.cdo.internal.net4j; + +import org.eclipse.emf.cdo.common.branch.CDOBranchPoint; +import org.eclipse.emf.cdo.net4j.CDOSessionRecoveryEvent; +import org.eclipse.emf.cdo.session.CDOSession; +import org.eclipse.emf.cdo.session.CDOSessionEvent; +import org.eclipse.emf.cdo.spi.common.branch.CDOBranchUtil; +import org.eclipse.emf.cdo.spi.common.branch.InternalCDOBranchManager; +import org.eclipse.emf.cdo.spi.common.commit.InternalCDOCommitInfoManager; +import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevisionManager; +import org.eclipse.emf.cdo.transaction.CDOTransaction; + +import org.eclipse.net4j.Net4jUtil; +import org.eclipse.net4j.connector.IConnector; +import org.eclipse.net4j.signal.heartbeat.HeartBeatProtocol; +import org.eclipse.net4j.util.container.IContainerDelta; +import org.eclipse.net4j.util.container.IContainerEvent; +import org.eclipse.net4j.util.container.IManagedContainer; +import org.eclipse.net4j.util.event.IEvent; +import org.eclipse.net4j.util.event.IListener; +import org.eclipse.net4j.util.lifecycle.LifecycleUtil; + +import org.eclipse.emf.spi.cdo.CDOSessionProtocol; +import org.eclipse.emf.spi.cdo.InternalCDOView; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author Caspar De Groot + */ +public abstract class RecoveringCDOSessionImpl extends CDONet4jSessionImpl +{ + private IManagedContainer container; + + private String repositoryConnectorDescription; + + private boolean useHeartBeat; + + private long heartBeatPeriod = 1000L; + + private long heartBeatTimeout = 5000L; + + private long connectorTimeout = 10000L; + + public RecoveringCDOSessionImpl() + { + setExceptionHandler(new RecoveringExceptionHandler()); + } + + public long getConnectorTimeout() + { + return connectorTimeout; + } + + public void setConnectorTimeout(long connectorTimeout) + { + this.connectorTimeout = connectorTimeout; + } + + public void setContainer(IManagedContainer container) + { + this.container = container; + } + + public IManagedContainer getContainer() + { + return container; + } + + public void setUseHeartBeat(boolean useHeartBeat) + { + this.useHeartBeat = useHeartBeat; + } + + public boolean getUseHeartBeat() + { + return useHeartBeat; + } + + public void setHeartBeatTimeout(long timeout) + { + heartBeatTimeout = timeout; + } + + public long getHeartBeatTimeout() + { + return heartBeatTimeout; + } + + public void setHeartBeatPeriod(long period) + { + heartBeatPeriod = period; + } + + public long getHeartBeatPeriod() + { + return heartBeatPeriod; + } + + @Override + protected void sessionProtocolDeactivated() + { + recover(); + } + + protected void recover() + { + fireEvent(createRecoveryStartedEvent()); + + CDOSessionProtocol oldSessionProtocol = getSessionProtocol(); + unhookSessionProtocol(); + List<AfterRecoveryRunnable> runnables = recoverSession(); + + // Check if the the sessionProtocol was replaced. (This may not be the case + // if the protocol is wrapped inside a DelegatingSessionProtocol.) + // + CDOSessionProtocol newSessionProtocol = getSessionProtocol(); + if (newSessionProtocol != oldSessionProtocol) + { + handleProtocolChange(oldSessionProtocol, newSessionProtocol); + } + + for (AfterRecoveryRunnable runnable : runnables) + { + runnable.run(newSessionProtocol); + } + + fireEvent(createRecoveryFinishedEvent()); + } + + protected void handleProtocolChange(CDOSessionProtocol oldProtocol, CDOSessionProtocol newProtocol) + { + // The revisionManager, branchManager, and commitInfoManager, hold their own + // references to the sessionProtocol. We need to update those: + + InternalCDORevisionManager revisionManager = getRevisionManager(); + revisionManager.deactivate(); + revisionManager.setRevisionLoader(newProtocol); + revisionManager.activate(); + + InternalCDOBranchManager branchManager = getBranchManager(); + branchManager.deactivate(); + branchManager.setBranchLoader(newProtocol); + branchManager.activate(); + + InternalCDOCommitInfoManager commitInfoManager = getCommitInfoManager(); + commitInfoManager.deactivate(); + commitInfoManager.setCommitInfoLoader(newProtocol); + commitInfoManager.activate(); + } + + protected CDOSessionEvent createRecoveryStartedEvent() + { + return new CDOSessionRecoveryEventImpl(this, CDOSessionRecoveryEvent.Type.STARTED); + } + + protected CDOSessionEvent createRecoveryFinishedEvent() + { + return new CDOSessionRecoveryEventImpl(this, CDOSessionRecoveryEvent.Type.FINISHED); + } + + protected IConnector createTCPConnector(boolean heartBeat) + { + IConnector connector = getTCPConnector(repositoryConnectorDescription); + if (heartBeat) + { + new HeartBeatProtocol(connector, container).start(heartBeatPeriod, heartBeatTimeout); + } + + connector.addListener(new AutoCloser()); + return connector; + } + + protected IConnector getTCPConnector(String description) + { + return Net4jUtil.getConnector(getContainer(), "tcp", description, connectorTimeout); + } + + protected List<AfterRecoveryRunnable> recoverSession() + { + try + { + List<AfterRecoveryRunnable> runnables = new ArrayList<AfterRecoveryRunnable>(); + for (InternalCDOView view : getViews()) + { + runnables.add(new OpenViewRunnable(view)); + } + + updateConnectorAndRepositoryName(); + openSession(); + + return runnables; + } + catch (RuntimeException ex) + { + deactivate(); + throw ex; + } + catch (Error ex) + { + deactivate(); + throw ex; + } + } + + protected IConnector removeTCPConnector() + { + return (IConnector)container.removeElement("org.eclipse.net4j.connectors", "tcp", repositoryConnectorDescription); + } + + protected void setRepositoryConnectorDescription(String description) + { + repositoryConnectorDescription = description; + } + + protected String getRepositoryConnectorDescription() + { + return repositoryConnectorDescription; + } + + @Override + protected void doActivate() throws Exception + { + updateConnectorAndRepositoryName(); + super.doActivate(); + } + + protected abstract void updateConnectorAndRepositoryName(); + + /** + * @author Eike Stepper + */ + public static interface AfterRecoveryRunnable + { + public void run(CDOSessionProtocol sessionProtocol); + } + + private class RecoveringExceptionHandler implements ExceptionHandler + { + public void handleException(CDOSession session, int attempt, Exception exception) throws Exception + { + recover(); + } + } + + /** + * @author Eike Stepper + */ + public final class OpenViewRunnable implements AfterRecoveryRunnable + { + private int viewID; + + private CDOBranchPoint branchPoint; + + private boolean transaction; + + public OpenViewRunnable(InternalCDOView view) + { + viewID = view.getViewID(); + branchPoint = CDOBranchUtil.copyBranchPoint(view); + transaction = view instanceof CDOTransaction; + } + + public void run(CDOSessionProtocol sessionProtocol) + { + sessionProtocol.openView(viewID, branchPoint, !transaction); + } + } + + private static class AutoCloser implements IListener + { + public void notifyEvent(IEvent event) + { + if (event instanceof IContainerEvent<?>) + { + IContainerEvent<?> containerEvent = (IContainerEvent<?>)event; + if (containerEvent.getDelta().getKind() == IContainerDelta.Kind.REMOVED) + { + IConnector connector = (IConnector)event.getSource(); + if (connector.getChannels().size() == 0) + { + LifecycleUtil.deactivate(connector); + } + } + } + } + } +} diff --git a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/CDONet4jUtil.java b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/CDONet4jUtil.java index 271ea81a43..6ced663eb6 100644 --- a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/CDONet4jUtil.java +++ b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/CDONet4jUtil.java @@ -16,6 +16,7 @@ import org.eclipse.emf.cdo.eresource.CDOResourceFactory; import org.eclipse.emf.cdo.internal.net4j.CDONet4jSessionConfigurationImpl; import org.eclipse.emf.cdo.internal.net4j.FailoverCDOSessionConfigurationImpl; import org.eclipse.emf.cdo.internal.net4j.Net4jSessionFactory; +import org.eclipse.emf.cdo.internal.net4j.ReconnectingCDOSessionConfigurationImpl; import org.eclipse.emf.cdo.internal.net4j.protocol.CDOClientProtocolFactory; import org.eclipse.emf.cdo.util.CDOUtil; import org.eclipse.emf.cdo.view.CDOViewProvider; @@ -80,6 +81,15 @@ public final class CDONet4jUtil /** * @since 4.0 */ + public static ReconnectingCDOSessionConfiguration createReconnectingSessionConfiguration(String hostAndPort, + String repoName, IManagedContainer container) + { + return new ReconnectingCDOSessionConfigurationImpl(hostAndPort, repoName, container); + } + + /** + * @since 4.0 + */ public static FailoverCDOSessionConfiguration createFailoverSessionConfiguration(String monitorConnectorDescription, String repositoryGroup) { diff --git a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/CDOSessionConfiguration.java b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/CDOSessionConfiguration.java index d2ec0f9f09..f202a65c04 100644 --- a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/CDOSessionConfiguration.java +++ b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/CDOSessionConfiguration.java @@ -37,6 +37,16 @@ public interface CDOSessionConfiguration extends org.eclipse.emf.cdo.session.CDO public void setStreamWrapper(IStreamWrapper streamWrapper); /** + * @since 4.0 + */ + public long getSignalTimeout(); + + /** + * @since 4.0 + */ + public void setSignalTimeout(long timeout); + + /** * @see CDOSession#getPackageRegistry() */ public CDOPackageRegistry getPackageRegistry(); diff --git a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/CDOSessionFailoverEvent.java b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/CDOSessionRecoveryEvent.java index 8da6229d58..96679b7532 100644 --- a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/CDOSessionFailoverEvent.java +++ b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/CDOSessionRecoveryEvent.java @@ -7,6 +7,7 @@ * * Contributors: * Eike Stepper - initial API and implementation + * Caspar De Groot - maintenance */ package org.eclipse.emf.cdo.net4j; @@ -16,7 +17,7 @@ import org.eclipse.emf.cdo.session.CDOSessionEvent; * @author Eike Stepper * @since 4.0 */ -public interface CDOSessionFailoverEvent extends CDOSessionEvent +public interface CDOSessionRecoveryEvent extends CDOSessionEvent { public Type getType(); diff --git a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/FailoverCDOSessionConfiguration.java b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/FailoverCDOSessionConfiguration.java index 1833630fc6..2ae5889967 100644 --- a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/FailoverCDOSessionConfiguration.java +++ b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/FailoverCDOSessionConfiguration.java @@ -14,7 +14,7 @@ package org.eclipse.emf.cdo.net4j; * @author Eike Stepper * @since 4.0 */ -public interface FailoverCDOSessionConfiguration extends CDOSessionConfiguration +public interface FailoverCDOSessionConfiguration extends RecoveringCDOSessionConfiguration { public String getMonitorConnectorDescription(); diff --git a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/ReconnectingCDOSessionConfiguration.java b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/ReconnectingCDOSessionConfiguration.java new file mode 100644 index 0000000000..eb2ecd9c2c --- /dev/null +++ b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/ReconnectingCDOSessionConfiguration.java @@ -0,0 +1,26 @@ +/** + * Copyright (c) 2004 - 2010 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Caspar De Groot - initial API and implementation + */ +package org.eclipse.emf.cdo.net4j; + +/** + * @author Caspar De Groot + * @since 4.0 + */ +public interface ReconnectingCDOSessionConfiguration extends RecoveringCDOSessionConfiguration +{ + public long getReconnectInterval(); + + public void setReconnectInterval(long interval); + + public int getMaxReconnectAttempts(); + + public void setMaxReconnectAttempts(int attempts); +} diff --git a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/RecoveringCDOSessionConfiguration.java b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/RecoveringCDOSessionConfiguration.java new file mode 100644 index 0000000000..675b18e4b6 --- /dev/null +++ b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/net4j/RecoveringCDOSessionConfiguration.java @@ -0,0 +1,34 @@ +/** + * Copyright (c) 2004 - 2010 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Caspar De Groot - initial API and implementation + */ +package org.eclipse.emf.cdo.net4j; + +/** + * @author Caspar De Groot + * @since 4.0 + */ +public interface RecoveringCDOSessionConfiguration extends CDOSessionConfiguration +{ + public boolean isHeartBeatEnabled(); + + public void setHeartBeatEnabled(boolean enabled); + + public long getHeartBeatPeriod(); + + public void setHeartBeatPeriod(long period); + + public long getHeartBeatTimeout(); + + public void setHeartBeatTimeout(long timeout); + + public long getConnectorTimeout(); + + public void setConnectorTimeout(long timeout); +} |