diff options
author | Eike Stepper | 2010-12-27 13:02:38 +0000 |
---|---|---|
committer | Eike Stepper | 2010-12-27 13:02:38 +0000 |
commit | b6335a6e13b3642e151d599d003e8737f463a774 (patch) | |
tree | 46fc157aa08ee29e9db1cfa544303a3e61b4cf4b | |
parent | 75c6426da059bd0005b2d5abe00c54a71c12d88c (diff) | |
download | cdo-b6335a6e13b3642e151d599d003e8737f463a774.tar.gz cdo-b6335a6e13b3642e151d599d003e8737f463a774.tar.xz cdo-b6335a6e13b3642e151d599d003e8737f463a774.zip |
[331775] Raw replication failure is not cleaned up properly
https://bugs.eclipse.org/bugs/show_bug.cgi?id=331775
5 files changed, 73 insertions, 47 deletions
diff --git a/plugins/org.eclipse.emf.cdo.server.db/src/org/eclipse/emf/cdo/server/internal/db/DBStoreAccessor.java b/plugins/org.eclipse.emf.cdo.server.db/src/org/eclipse/emf/cdo/server/internal/db/DBStoreAccessor.java index dcc9052a5e..26f4d8e950 100644 --- a/plugins/org.eclipse.emf.cdo.server.db/src/org/eclipse/emf/cdo/server/internal/db/DBStoreAccessor.java +++ b/plugins/org.eclipse.emf.cdo.server.db/src/org/eclipse/emf/cdo/server/internal/db/DBStoreAccessor.java @@ -1084,29 +1084,26 @@ public class DBStoreAccessor extends LongIDStoreAccessor implements IDBStoreAcce int commitWork = 4; monitor.begin(commitWork + size + commitWork); + Collection<InternalCDOPackageUnit> packageUnits = new HashSet<InternalCDOPackageUnit>(); + try { DBUtil.deserializeTable(in, connection, CDODBSchema.BRANCHES, monitor.fork()); DBUtil.deserializeTable(in, connection, CDODBSchema.COMMIT_INFOS, monitor.fork()); DBUtil.deserializeTable(in, connection, CDODBSchema.EXTERNAL_REFS, monitor.fork()); - rawImportPackageUnits(in, fromCommitTime, toCommitTime, monitor.fork()); - + rawImportPackageUnits(in, fromCommitTime, toCommitTime, packageUnits, monitor.fork()); mappingStrategy.rawImport(this, in, fromCommitTime, toCommitTime, monitor.fork(size)); - - Async async = monitor.forkAsync(commitWork); - - try - { - connection.commit(); - } - catch (SQLException ex) - { - throw new DBException(ex); - } - finally - { - async.stop(); - } + rawCommit(commitWork, monitor); + } + catch (RuntimeException ex) + { + rawRollback(packageUnits); + throw ex; + } + catch (IOException ex) + { + rawRollback(packageUnits); + throw ex; } finally { @@ -1114,8 +1111,23 @@ public class DBStoreAccessor extends LongIDStoreAccessor implements IDBStoreAcce } } - protected void rawImportPackageUnits(CDODataInput in, long fromCommitTime, long toCommitTime, OMMonitor monitor) - throws IOException + private void rawRollback(Collection<InternalCDOPackageUnit> packageUnits) + { + try + { + connection.rollback(); + } + catch (SQLException ex) + { + OM.LOG.error(ex); + } + + getStore().getMappingStrategy().removeMapping(getConnection(), + packageUnits.toArray(new InternalCDOPackageUnit[packageUnits.size()])); + } + + protected void rawImportPackageUnits(CDODataInput in, long fromCommitTime, long toCommitTime, + Collection<InternalCDOPackageUnit> packageUnits, OMMonitor monitor) throws IOException { monitor.begin(2); @@ -1123,8 +1135,8 @@ public class DBStoreAccessor extends LongIDStoreAccessor implements IDBStoreAcce { DBStore store = getStore(); IMetaDataManager metaDataManager = store.getMetaDataManager(); - Collection<InternalCDOPackageUnit> packageUnits = metaDataManager.rawImport(getConnection(), in, fromCommitTime, - toCommitTime, monitor.fork()); + + packageUnits.addAll(metaDataManager.rawImport(connection, in, fromCommitTime, toCommitTime, monitor.fork())); InternalRepository repository = store.getRepository(); InternalCDOPackageRegistry packageRegistry = repository.getPackageRegistry(false); @@ -1135,8 +1147,20 @@ public class DBStoreAccessor extends LongIDStoreAccessor implements IDBStoreAcce } IMappingStrategy mappingStrategy = store.getMappingStrategy(); - mappingStrategy.createMapping(connection, packageUnits.toArray(new InternalCDOPackageUnit[packageUnits.size()]), - monitor.fork()); + + // Using another connection because CREATE TABLE (which is called in createMapping) on H2 databases does a commit. + Connection connection2 = null; + try + { + connection2 = store.getConnection(); + + mappingStrategy.createMapping(connection2, + packageUnits.toArray(new InternalCDOPackageUnit[packageUnits.size()]), monitor.fork()); + } + finally + { + DBUtil.close(connection2); + } } finally { @@ -1182,7 +1206,7 @@ public class DBStoreAccessor extends LongIDStoreAccessor implements IDBStoreAcce mapping.detachObject(this, id, version, branch, CDOBranchPoint.UNSPECIFIED_DATE, monitor.fork()); } - public void rawCommit(OMMonitor monitor) + public void rawCommit(double commitWork, OMMonitor monitor) { Async async = monitor.forkAsync(); diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/mem/MEMStoreAccessor.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/mem/MEMStoreAccessor.java index bd2c36d2eb..5c3cdf4a4f 100644 --- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/mem/MEMStoreAccessor.java +++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/mem/MEMStoreAccessor.java @@ -390,7 +390,7 @@ public class MEMStoreAccessor extends LongIDStoreAccessor implements IStoreAcces getStore().rawDelete(id, version, branch); } - public void rawCommit(OMMonitor monitor) + public void rawCommit(double commitWork, OMMonitor monitor) { // Do nothing } diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/CDOServerImporter.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/CDOServerImporter.java index a43b69037d..b653413151 100644 --- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/CDOServerImporter.java +++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/CDOServerImporter.java @@ -275,7 +275,7 @@ public abstract class CDOServerImporter public void flush() { - accessor.rawCommit(monitor); + accessor.rawCommit(1, monitor); } private void collectPackageInfos() diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/IStoreAccessor.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/IStoreAccessor.java index 107ab269ea..2498320e61 100644 --- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/IStoreAccessor.java +++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/server/IStoreAccessor.java @@ -537,8 +537,8 @@ public interface IStoreAccessor extends IQueryHandlerProvider, BranchLoader, Com * operation. * <p> * <b>Implementation note:</b> The implementor of this method may rely on the fact that multiple subsequent calls to - * this method are followed by a single final call to the {@link #rawCommit(OMMonitor) rawCommit()} method where the - * accumulated backend changes can be committed atomically. + * this method are followed by a single final call to the {@link #rawCommit(double, OMMonitor) rawCommit()} method + * where the accumulated backend changes can be committed atomically. * * @param packageUnits * the package units to be stored in the backend represented by this {@link IStoreAccessor.Raw raw store @@ -547,7 +547,7 @@ public interface IStoreAccessor extends IQueryHandlerProvider, BranchLoader, Com * a progress monitor that <b>may be</b> used to report proper progress of this operation to the caller and * <b>may be</b> used to react to cancelation requests of the caller and <b>must be</b> touched regularly * to prevent timeouts from expiring in the caller. - * @see #rawCommit(OMMonitor) + * @see #rawCommit(double, OMMonitor) */ public void rawStore(InternalCDOPackageUnit[] packageUnits, OMMonitor monitor); @@ -558,8 +558,8 @@ public interface IStoreAccessor extends IQueryHandlerProvider, BranchLoader, Com * stamps}, which is not desired in the context of a replication operation. * <p> * <b>Implementation note:</b> The implementor of this method may rely on the fact that multiple subsequent calls to - * this method are followed by a single final call to the {@link #rawCommit(OMMonitor) rawCommit()} method where the - * accumulated backend changes can be committed atomically. + * this method are followed by a single final call to the {@link #rawCommit(double, OMMonitor) rawCommit()} method + * where the accumulated backend changes can be committed atomically. * * @param revision * the revision to be stored in the backend represented by this {@link IStoreAccessor.Raw raw store @@ -568,7 +568,7 @@ public interface IStoreAccessor extends IQueryHandlerProvider, BranchLoader, Com * a progress monitor that <b>may be</b> used to report proper progress of this operation to the caller and * <b>may be</b> used to react to cancelation requests of the caller and <b>must be</b> touched regularly * to prevent timeouts from expiring in the caller. - * @see #rawCommit(OMMonitor) + * @see #rawCommit(double, OMMonitor) */ public void rawStore(InternalCDORevision revision, OMMonitor monitor); @@ -577,8 +577,8 @@ public interface IStoreAccessor extends IQueryHandlerProvider, BranchLoader, Com * accessor} without going through a regular {@link IStoreAccessor#commit(OMMonitor) commit}. * <p> * <b>Implementation note:</b> The implementor of this method may rely on the fact that multiple subsequent calls to - * this method are followed by a single final call to the {@link #rawCommit(OMMonitor) rawCommit()} method where the - * accumulated backend changes can be committed atomically. + * this method are followed by a single final call to the {@link #rawCommit(double, OMMonitor) rawCommit()} method + * where the accumulated backend changes can be committed atomically. * * @param id * the {@link CDOBlob#getID() ID} of the blob to be stored in the backend represented by this @@ -589,7 +589,7 @@ public interface IStoreAccessor extends IQueryHandlerProvider, BranchLoader, Com * @param inputStream * the {@link CDOBlob#getContents() contents} of the blob to be stored in the backend represented by this * {@link IStoreAccessor.Raw raw store accessor}. - * @see #rawCommit(OMMonitor) + * @see #rawCommit(double, OMMonitor) */ public void rawStore(byte[] id, long size, InputStream inputStream) throws IOException; @@ -598,8 +598,8 @@ public interface IStoreAccessor extends IQueryHandlerProvider, BranchLoader, Com * accessor} without going through a regular {@link IStoreAccessor#commit(OMMonitor) commit}. * <p> * <b>Implementation note:</b> The implementor of this method may rely on the fact that multiple subsequent calls to - * this method are followed by a single final call to the {@link #rawCommit(OMMonitor) rawCommit()} method where the - * accumulated backend changes can be committed atomically. + * this method are followed by a single final call to the {@link #rawCommit(double, OMMonitor) rawCommit()} method + * where the accumulated backend changes can be committed atomically. * * @param id * the {@link CDOClob#getID() ID} of the clob to be stored in the backend represented by this @@ -610,7 +610,7 @@ public interface IStoreAccessor extends IQueryHandlerProvider, BranchLoader, Com * @param reader * the {@link CDOClob#getContents() contents} of the clob to be stored in the backend represented by this * {@link IStoreAccessor.Raw raw store accessor}. - * @see #rawCommit(OMMonitor) + * @see #rawCommit(double, OMMonitor) */ public void rawStore(byte[] id, long size, Reader reader) throws IOException; @@ -619,8 +619,8 @@ public interface IStoreAccessor extends IQueryHandlerProvider, BranchLoader, Com * store accessor} without going through a regular {@link IStoreAccessor#commit(OMMonitor) commit}. * <p> * <b>Implementation note:</b> The implementor of this method may rely on the fact that multiple subsequent calls to - * this method are followed by a single final call to the {@link #rawCommit(OMMonitor) rawCommit()} method where the - * accumulated backend changes can be committed atomically. + * this method are followed by a single final call to the {@link #rawCommit(double, OMMonitor) rawCommit()} method + * where the accumulated backend changes can be committed atomically. * * @param branch * the {@link CDOCommitInfo#getBranch() branch} of the commit info to be stored in the backend represented @@ -637,7 +637,7 @@ public interface IStoreAccessor extends IQueryHandlerProvider, BranchLoader, Com * @param comment * the {@link CDOCommitInfo#getComment() comment} of the commit info to be stored in the backend * represented by this {@link IStoreAccessor.Raw raw store accessor}. - * @see #rawCommit(OMMonitor) + * @see #rawCommit(double, OMMonitor) */ public void rawStore(CDOBranch branch, long timeStamp, long previousTimeStamp, String userID, String comment, OMMonitor monitor); @@ -648,16 +648,18 @@ public interface IStoreAccessor extends IQueryHandlerProvider, BranchLoader, Com * {@link IStoreAccessor#commit(OMMonitor) commit}. * <p> * <b>Implementation note:</b> The implementor of this method may rely on the fact that multiple subsequent calls to - * this method are followed by a single final call to the {@link #rawCommit(OMMonitor) rawCommit()} method where the - * accumulated backend changes can be committed atomically. + * this method are followed by a single final call to the {@link #rawCommit(double, OMMonitor) rawCommit()} method + * where the accumulated backend changes can be committed atomically. * - * @see #rawCommit(OMMonitor) + * @see #rawCommit(double, OMMonitor) */ public void rawDelete(CDOID id, int version, CDOBranch branch, EClass eClass, OMMonitor monitor); /** * Atomically commits the accumulated backend changes resulting from previous calls to the rawStore() methods. * + * @param commitWork + * the amount of work to use up from the monitor while executing the commit. * @param monitor * a progress monitor that <b>may be</b> used to report proper progress of this operation to the caller and * <b>may be</b> used to react to cancelation requests of the caller and <b>must be</b> touched regularly @@ -668,6 +670,6 @@ public interface IStoreAccessor extends IQueryHandlerProvider, BranchLoader, Com * @see #rawStore(byte[], long, Reader) * @see #rawStore(CDOBranch, long, long, String, String, OMMonitor) */ - public void rawCommit(OMMonitor monitor); + public void rawCommit(double commitWork, OMMonitor monitor); } } diff --git a/plugins/org.eclipse.emf.cdo.workspace/src/org/eclipse/emf/cdo/internal/workspace/CDOWorkspaceImpl.java b/plugins/org.eclipse.emf.cdo.workspace/src/org/eclipse/emf/cdo/internal/workspace/CDOWorkspaceImpl.java index 961450af24..f0e390e7d1 100644 --- a/plugins/org.eclipse.emf.cdo.workspace/src/org/eclipse/emf/cdo/internal/workspace/CDOWorkspaceImpl.java +++ b/plugins/org.eclipse.emf.cdo.workspace/src/org/eclipse/emf/cdo/internal/workspace/CDOWorkspaceImpl.java @@ -198,7 +198,7 @@ public class CDOWorkspaceImpl implements InternalCDOWorkspace LifecycleUtil.deactivate(remoteSession); } - accessor.rawCommit(monitor); + accessor.rawCommit(1, monitor); } finally { @@ -429,7 +429,7 @@ public class CDOWorkspaceImpl implements InternalCDOWorkspace accessor.rawStore(revision, monitor); } - accessor.rawCommit(monitor); + accessor.rawCommit(1, monitor); } protected InternalCDORevision adjustRevision(IStoreAccessor.Raw accessor, CDOID id, CDOIDMapper idMapper) |