Skip to main content

This CGIT instance is deprecated, and repositories have been moved to Gitlab or Github. See the repository descriptions for specific locations.

summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEike Stepper2010-12-27 13:02:38 +0000
committerEike Stepper2010-12-27 13:02:38 +0000
commitb6335a6e13b3642e151d599d003e8737f463a774 (patch)
tree46fc157aa08ee29e9db1cfa544303a3e61b4cf4b
parent75c6426da059bd0005b2d5abe00c54a71c12d88c (diff)
downloadcdo-b6335a6e13b3642e151d599d003e8737f463a774.tar.gz
cdo-b6335a6e13b3642e151d599d003e8737f463a774.tar.xz
cdo-b6335a6e13b3642e151d599d003e8737f463a774.zip
[331775] Raw replication failure is not cleaned up properly
https://bugs.eclipse.org/bugs/show_bug.cgi?id=331775
-rw-r--r--plugins/org.eclipse.emf.cdo.server.db/src/org/eclipse/emf/cdo/server/internal/db/DBStoreAccessor.java72
-rw-r--r--plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/mem/MEMStoreAccessor.java2
-rw-r--r--plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/CDOServerImporter.java2
-rw-r--r--plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/IStoreAccessor.java40
-rw-r--r--plugins/org.eclipse.emf.cdo.workspace/src/org/eclipse/emf/cdo/internal/workspace/CDOWorkspaceImpl.java4
5 files changed, 73 insertions, 47 deletions
diff --git a/plugins/org.eclipse.emf.cdo.server.db/src/org/eclipse/emf/cdo/server/internal/db/DBStoreAccessor.java b/plugins/org.eclipse.emf.cdo.server.db/src/org/eclipse/emf/cdo/server/internal/db/DBStoreAccessor.java
index dcc9052a5e..26f4d8e950 100644
--- a/plugins/org.eclipse.emf.cdo.server.db/src/org/eclipse/emf/cdo/server/internal/db/DBStoreAccessor.java
+++ b/plugins/org.eclipse.emf.cdo.server.db/src/org/eclipse/emf/cdo/server/internal/db/DBStoreAccessor.java
@@ -1084,29 +1084,26 @@ public class DBStoreAccessor extends LongIDStoreAccessor implements IDBStoreAcce
int commitWork = 4;
monitor.begin(commitWork + size + commitWork);
+ Collection<InternalCDOPackageUnit> packageUnits = new HashSet<InternalCDOPackageUnit>();
+
try
{
DBUtil.deserializeTable(in, connection, CDODBSchema.BRANCHES, monitor.fork());
DBUtil.deserializeTable(in, connection, CDODBSchema.COMMIT_INFOS, monitor.fork());
DBUtil.deserializeTable(in, connection, CDODBSchema.EXTERNAL_REFS, monitor.fork());
- rawImportPackageUnits(in, fromCommitTime, toCommitTime, monitor.fork());
-
+ rawImportPackageUnits(in, fromCommitTime, toCommitTime, packageUnits, monitor.fork());
mappingStrategy.rawImport(this, in, fromCommitTime, toCommitTime, monitor.fork(size));
-
- Async async = monitor.forkAsync(commitWork);
-
- try
- {
- connection.commit();
- }
- catch (SQLException ex)
- {
- throw new DBException(ex);
- }
- finally
- {
- async.stop();
- }
+ rawCommit(commitWork, monitor);
+ }
+ catch (RuntimeException ex)
+ {
+ rawRollback(packageUnits);
+ throw ex;
+ }
+ catch (IOException ex)
+ {
+ rawRollback(packageUnits);
+ throw ex;
}
finally
{
@@ -1114,8 +1111,23 @@ public class DBStoreAccessor extends LongIDStoreAccessor implements IDBStoreAcce
}
}
- protected void rawImportPackageUnits(CDODataInput in, long fromCommitTime, long toCommitTime, OMMonitor monitor)
- throws IOException
+ private void rawRollback(Collection<InternalCDOPackageUnit> packageUnits)
+ {
+ try
+ {
+ connection.rollback();
+ }
+ catch (SQLException ex)
+ {
+ OM.LOG.error(ex);
+ }
+
+ getStore().getMappingStrategy().removeMapping(getConnection(),
+ packageUnits.toArray(new InternalCDOPackageUnit[packageUnits.size()]));
+ }
+
+ protected void rawImportPackageUnits(CDODataInput in, long fromCommitTime, long toCommitTime,
+ Collection<InternalCDOPackageUnit> packageUnits, OMMonitor monitor) throws IOException
{
monitor.begin(2);
@@ -1123,8 +1135,8 @@ public class DBStoreAccessor extends LongIDStoreAccessor implements IDBStoreAcce
{
DBStore store = getStore();
IMetaDataManager metaDataManager = store.getMetaDataManager();
- Collection<InternalCDOPackageUnit> packageUnits = metaDataManager.rawImport(getConnection(), in, fromCommitTime,
- toCommitTime, monitor.fork());
+
+ packageUnits.addAll(metaDataManager.rawImport(connection, in, fromCommitTime, toCommitTime, monitor.fork()));
InternalRepository repository = store.getRepository();
InternalCDOPackageRegistry packageRegistry = repository.getPackageRegistry(false);
@@ -1135,8 +1147,20 @@ public class DBStoreAccessor extends LongIDStoreAccessor implements IDBStoreAcce
}
IMappingStrategy mappingStrategy = store.getMappingStrategy();
- mappingStrategy.createMapping(connection, packageUnits.toArray(new InternalCDOPackageUnit[packageUnits.size()]),
- monitor.fork());
+
+ // Using another connection because CREATE TABLE (which is called in createMapping) on H2 databases does a commit.
+ Connection connection2 = null;
+ try
+ {
+ connection2 = store.getConnection();
+
+ mappingStrategy.createMapping(connection2,
+ packageUnits.toArray(new InternalCDOPackageUnit[packageUnits.size()]), monitor.fork());
+ }
+ finally
+ {
+ DBUtil.close(connection2);
+ }
}
finally
{
@@ -1182,7 +1206,7 @@ public class DBStoreAccessor extends LongIDStoreAccessor implements IDBStoreAcce
mapping.detachObject(this, id, version, branch, CDOBranchPoint.UNSPECIFIED_DATE, monitor.fork());
}
- public void rawCommit(OMMonitor monitor)
+ public void rawCommit(double commitWork, OMMonitor monitor)
{
Async async = monitor.forkAsync();
diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/mem/MEMStoreAccessor.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/mem/MEMStoreAccessor.java
index bd2c36d2eb..5c3cdf4a4f 100644
--- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/mem/MEMStoreAccessor.java
+++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/mem/MEMStoreAccessor.java
@@ -390,7 +390,7 @@ public class MEMStoreAccessor extends LongIDStoreAccessor implements IStoreAcces
getStore().rawDelete(id, version, branch);
}
- public void rawCommit(OMMonitor monitor)
+ public void rawCommit(double commitWork, OMMonitor monitor)
{
// Do nothing
}
diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/CDOServerImporter.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/CDOServerImporter.java
index a43b69037d..b653413151 100644
--- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/CDOServerImporter.java
+++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/CDOServerImporter.java
@@ -275,7 +275,7 @@ public abstract class CDOServerImporter
public void flush()
{
- accessor.rawCommit(monitor);
+ accessor.rawCommit(1, monitor);
}
private void collectPackageInfos()
diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/IStoreAccessor.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/IStoreAccessor.java
index 107ab269ea..2498320e61 100644
--- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/IStoreAccessor.java
+++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/IStoreAccessor.java
@@ -537,8 +537,8 @@ public interface IStoreAccessor extends IQueryHandlerProvider, BranchLoader, Com
* operation.
* <p>
* <b>Implementation note:</b> The implementor of this method may rely on the fact that multiple subsequent calls to
- * this method are followed by a single final call to the {@link #rawCommit(OMMonitor) rawCommit()} method where the
- * accumulated backend changes can be committed atomically.
+ * this method are followed by a single final call to the {@link #rawCommit(double, OMMonitor) rawCommit()} method
+ * where the accumulated backend changes can be committed atomically.
*
* @param packageUnits
* the package units to be stored in the backend represented by this {@link IStoreAccessor.Raw raw store
@@ -547,7 +547,7 @@ public interface IStoreAccessor extends IQueryHandlerProvider, BranchLoader, Com
* a progress monitor that <b>may be</b> used to report proper progress of this operation to the caller and
* <b>may be</b> used to react to cancelation requests of the caller and <b>must be</b> touched regularly
* to prevent timeouts from expiring in the caller.
- * @see #rawCommit(OMMonitor)
+ * @see #rawCommit(double, OMMonitor)
*/
public void rawStore(InternalCDOPackageUnit[] packageUnits, OMMonitor monitor);
@@ -558,8 +558,8 @@ public interface IStoreAccessor extends IQueryHandlerProvider, BranchLoader, Com
* stamps}, which is not desired in the context of a replication operation.
* <p>
* <b>Implementation note:</b> The implementor of this method may rely on the fact that multiple subsequent calls to
- * this method are followed by a single final call to the {@link #rawCommit(OMMonitor) rawCommit()} method where the
- * accumulated backend changes can be committed atomically.
+ * this method are followed by a single final call to the {@link #rawCommit(double, OMMonitor) rawCommit()} method
+ * where the accumulated backend changes can be committed atomically.
*
* @param revision
* the revision to be stored in the backend represented by this {@link IStoreAccessor.Raw raw store
@@ -568,7 +568,7 @@ public interface IStoreAccessor extends IQueryHandlerProvider, BranchLoader, Com
* a progress monitor that <b>may be</b> used to report proper progress of this operation to the caller and
* <b>may be</b> used to react to cancelation requests of the caller and <b>must be</b> touched regularly
* to prevent timeouts from expiring in the caller.
- * @see #rawCommit(OMMonitor)
+ * @see #rawCommit(double, OMMonitor)
*/
public void rawStore(InternalCDORevision revision, OMMonitor monitor);
@@ -577,8 +577,8 @@ public interface IStoreAccessor extends IQueryHandlerProvider, BranchLoader, Com
* accessor} without going through a regular {@link IStoreAccessor#commit(OMMonitor) commit}.
* <p>
* <b>Implementation note:</b> The implementor of this method may rely on the fact that multiple subsequent calls to
- * this method are followed by a single final call to the {@link #rawCommit(OMMonitor) rawCommit()} method where the
- * accumulated backend changes can be committed atomically.
+ * this method are followed by a single final call to the {@link #rawCommit(double, OMMonitor) rawCommit()} method
+ * where the accumulated backend changes can be committed atomically.
*
* @param id
* the {@link CDOBlob#getID() ID} of the blob to be stored in the backend represented by this
@@ -589,7 +589,7 @@ public interface IStoreAccessor extends IQueryHandlerProvider, BranchLoader, Com
* @param inputStream
* the {@link CDOBlob#getContents() contents} of the blob to be stored in the backend represented by this
* {@link IStoreAccessor.Raw raw store accessor}.
- * @see #rawCommit(OMMonitor)
+ * @see #rawCommit(double, OMMonitor)
*/
public void rawStore(byte[] id, long size, InputStream inputStream) throws IOException;
@@ -598,8 +598,8 @@ public interface IStoreAccessor extends IQueryHandlerProvider, BranchLoader, Com
* accessor} without going through a regular {@link IStoreAccessor#commit(OMMonitor) commit}.
* <p>
* <b>Implementation note:</b> The implementor of this method may rely on the fact that multiple subsequent calls to
- * this method are followed by a single final call to the {@link #rawCommit(OMMonitor) rawCommit()} method where the
- * accumulated backend changes can be committed atomically.
+ * this method are followed by a single final call to the {@link #rawCommit(double, OMMonitor) rawCommit()} method
+ * where the accumulated backend changes can be committed atomically.
*
* @param id
* the {@link CDOClob#getID() ID} of the clob to be stored in the backend represented by this
@@ -610,7 +610,7 @@ public interface IStoreAccessor extends IQueryHandlerProvider, BranchLoader, Com
* @param reader
* the {@link CDOClob#getContents() contents} of the clob to be stored in the backend represented by this
* {@link IStoreAccessor.Raw raw store accessor}.
- * @see #rawCommit(OMMonitor)
+ * @see #rawCommit(double, OMMonitor)
*/
public void rawStore(byte[] id, long size, Reader reader) throws IOException;
@@ -619,8 +619,8 @@ public interface IStoreAccessor extends IQueryHandlerProvider, BranchLoader, Com
* store accessor} without going through a regular {@link IStoreAccessor#commit(OMMonitor) commit}.
* <p>
* <b>Implementation note:</b> The implementor of this method may rely on the fact that multiple subsequent calls to
- * this method are followed by a single final call to the {@link #rawCommit(OMMonitor) rawCommit()} method where the
- * accumulated backend changes can be committed atomically.
+ * this method are followed by a single final call to the {@link #rawCommit(double, OMMonitor) rawCommit()} method
+ * where the accumulated backend changes can be committed atomically.
*
* @param branch
* the {@link CDOCommitInfo#getBranch() branch} of the commit info to be stored in the backend represented
@@ -637,7 +637,7 @@ public interface IStoreAccessor extends IQueryHandlerProvider, BranchLoader, Com
* @param comment
* the {@link CDOCommitInfo#getComment() comment} of the commit info to be stored in the backend
* represented by this {@link IStoreAccessor.Raw raw store accessor}.
- * @see #rawCommit(OMMonitor)
+ * @see #rawCommit(double, OMMonitor)
*/
public void rawStore(CDOBranch branch, long timeStamp, long previousTimeStamp, String userID, String comment,
OMMonitor monitor);
@@ -648,16 +648,18 @@ public interface IStoreAccessor extends IQueryHandlerProvider, BranchLoader, Com
* {@link IStoreAccessor#commit(OMMonitor) commit}.
* <p>
* <b>Implementation note:</b> The implementor of this method may rely on the fact that multiple subsequent calls to
- * this method are followed by a single final call to the {@link #rawCommit(OMMonitor) rawCommit()} method where the
- * accumulated backend changes can be committed atomically.
+ * this method are followed by a single final call to the {@link #rawCommit(double, OMMonitor) rawCommit()} method
+ * where the accumulated backend changes can be committed atomically.
*
- * @see #rawCommit(OMMonitor)
+ * @see #rawCommit(double, OMMonitor)
*/
public void rawDelete(CDOID id, int version, CDOBranch branch, EClass eClass, OMMonitor monitor);
/**
* Atomically commits the accumulated backend changes resulting from previous calls to the rawStore() methods.
*
+ * @param commitWork
+ * the amount of work to use up from the monitor while executing the commit.
* @param monitor
* a progress monitor that <b>may be</b> used to report proper progress of this operation to the caller and
* <b>may be</b> used to react to cancelation requests of the caller and <b>must be</b> touched regularly
@@ -668,6 +670,6 @@ public interface IStoreAccessor extends IQueryHandlerProvider, BranchLoader, Com
* @see #rawStore(byte[], long, Reader)
* @see #rawStore(CDOBranch, long, long, String, String, OMMonitor)
*/
- public void rawCommit(OMMonitor monitor);
+ public void rawCommit(double commitWork, OMMonitor monitor);
}
}
diff --git a/plugins/org.eclipse.emf.cdo.workspace/src/org/eclipse/emf/cdo/internal/workspace/CDOWorkspaceImpl.java b/plugins/org.eclipse.emf.cdo.workspace/src/org/eclipse/emf/cdo/internal/workspace/CDOWorkspaceImpl.java
index 961450af24..f0e390e7d1 100644
--- a/plugins/org.eclipse.emf.cdo.workspace/src/org/eclipse/emf/cdo/internal/workspace/CDOWorkspaceImpl.java
+++ b/plugins/org.eclipse.emf.cdo.workspace/src/org/eclipse/emf/cdo/internal/workspace/CDOWorkspaceImpl.java
@@ -198,7 +198,7 @@ public class CDOWorkspaceImpl implements InternalCDOWorkspace
LifecycleUtil.deactivate(remoteSession);
}
- accessor.rawCommit(monitor);
+ accessor.rawCommit(1, monitor);
}
finally
{
@@ -429,7 +429,7 @@ public class CDOWorkspaceImpl implements InternalCDOWorkspace
accessor.rawStore(revision, monitor);
}
- accessor.rawCommit(monitor);
+ accessor.rawCommit(1, monitor);
}
protected InternalCDORevision adjustRevision(IStoreAccessor.Raw accessor, CDOID id, CDOIDMapper idMapper)

Back to the top