Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEike Stepper2019-02-01 14:40:11 -0500
committerEike Stepper2019-02-01 14:40:11 -0500
commit7217380110c2353c2a6df073614f3a046135952f (patch)
tree4862bb91ffc69669cf3a8f96e59094cbce278e69
parentddbf0ebab3920ef12645400a1b929d8accd97831 (diff)
downloadcdo-7217380110c2353c2a6df073614f3a046135952f.tar.gz
cdo-7217380110c2353c2a6df073614f3a046135952f.tar.xz
cdo-7217380110c2353c2a6df073614f3a046135952f.zip
[544050] Provide commit template methods in CDOTransaction
https://bugs.eclipse.org/bugs/show_bug.cgi?id=544050
-rw-r--r--plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/TransactionTest.java52
-rw-r--r--plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/transaction/CDOPushTransaction.java25
-rw-r--r--plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/transaction/CDOTransaction.java54
-rw-r--r--plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/transaction/CDOUserTransaction.java12
-rw-r--r--plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/transaction/CDOTransactionImpl.java112
-rw-r--r--plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/transaction/CDOXATransactionImpl.java10
6 files changed, 203 insertions, 62 deletions
diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/TransactionTest.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/TransactionTest.java
index 4dd978c44c..7b38555824 100644
--- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/TransactionTest.java
+++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/TransactionTest.java
@@ -32,7 +32,6 @@ import org.eclipse.emf.cdo.transaction.CDOTransactionConflictEvent;
import org.eclipse.emf.cdo.transaction.CDOTransactionHandler2;
import org.eclipse.emf.cdo.util.CDOUtil;
import org.eclipse.emf.cdo.util.CommitException;
-import org.eclipse.emf.cdo.util.ConcurrentAccessException;
import org.eclipse.emf.cdo.view.CDOView;
import org.eclipse.net4j.signal.SignalCounter;
@@ -52,7 +51,6 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* See bug 213782, bug 201366
@@ -239,7 +237,7 @@ public class TransactionTest extends AbstractCDOTest
public void testCommitManyTransactionsMultiThread() throws Exception
{
final int RUNS = 1;
- final int THREADS = 1000;
+ final int THREADS = 100;
final int TIMEOUT = 10; // Minutes.
final boolean pessimistic = true;
@@ -258,7 +256,6 @@ public class TransactionTest extends AbstractCDOTest
{
System.out.println("RUN " + run);
- AtomicInteger concurrentAccessExceptions = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(THREADS);
List<Thread> threadList = new ArrayList<Thread>();
@@ -270,7 +267,7 @@ public class TransactionTest extends AbstractCDOTest
final Company company = transaction.getObject(initialCompany);
final Customer newCustomer = Model1Factory.eINSTANCE.createCustomer();
- threadList.add(new Committer(transaction, concurrentAccessExceptions, latch, new Callable<Boolean>()
+ threadList.add(new Committer(transaction, latch, new Callable<Boolean>()
{
public Boolean call() throws Exception
{
@@ -295,7 +292,6 @@ public class TransactionTest extends AbstractCDOTest
fail("Timeout after " + TIMEOUT + " seconds");
}
- System.out.println("ConcurrentAccessExceptions: " + concurrentAccessExceptions.get());
signalCounter.dump(IOUtil.OUT(), true);
}
}
@@ -307,53 +303,16 @@ public class TransactionTest extends AbstractCDOTest
{
private static final ThreadGroup THREAD_GROUP = new ThreadGroup("COMMITTERS");
- private static final int ATTEMPTS = 200;
-
private final CDOTransaction transaction;
- private final AtomicInteger concurrentAccessExceptions;
-
private final CountDownLatch latch;
private final Callable<Boolean> operation;
- private Callable<Boolean> callable = new Callable<Boolean>()
- {
- private int attempt;
-
- public Boolean call() throws Exception
- {
- ++attempt;
- operation.call();
-
- try
- {
- transaction.commit();
- }
- catch (ConcurrentAccessException ex)
- {
- concurrentAccessExceptions.incrementAndGet();
-
- if (attempt < ATTEMPTS)
- {
- transaction.rollback();
- return true;
- }
- }
- catch (Exception ex)
- {
- throw ex;
- }
-
- return false;
- }
- };
-
- public Committer(CDOTransaction transaction, AtomicInteger concurrentAccessExceptions, CountDownLatch latch, Callable<Boolean> operation)
+ public Committer(CDOTransaction transaction, CountDownLatch latch, Callable<Boolean> operation)
{
super(THREAD_GROUP, "Committer-" + transaction.getViewID());
this.transaction = transaction;
- this.concurrentAccessExceptions = concurrentAccessExceptions;
this.latch = latch;
this.operation = operation;
}
@@ -363,10 +322,7 @@ public class TransactionTest extends AbstractCDOTest
{
try
{
- while (transaction.syncExec(callable))
- {
- // Do nothing.
- }
+ transaction.commit(operation, 200, null);
}
catch (Exception ex)
{
diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/transaction/CDOPushTransaction.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/transaction/CDOPushTransaction.java
index dd75e242b0..bad6f65128 100644
--- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/transaction/CDOPushTransaction.java
+++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/transaction/CDOPushTransaction.java
@@ -32,6 +32,7 @@ import org.eclipse.emf.cdo.eresource.CDOResourceNode;
import org.eclipse.emf.cdo.eresource.CDOTextResource;
import org.eclipse.emf.cdo.session.CDOSession;
import org.eclipse.emf.cdo.util.CommitException;
+import org.eclipse.emf.cdo.util.ConcurrentAccessException;
import org.eclipse.emf.cdo.view.CDOObjectHandler;
import org.eclipse.emf.cdo.view.CDOQuery;
import org.eclipse.emf.cdo.view.CDORegistrationHandler;
@@ -43,6 +44,7 @@ import org.eclipse.emf.cdo.view.CDOViewSet;
import org.eclipse.emf.internal.cdo.bundle.OM;
import org.eclipse.net4j.util.AdapterUtil;
+import org.eclipse.net4j.util.Predicate;
import org.eclipse.net4j.util.collection.CloseableIterator;
import org.eclipse.net4j.util.concurrent.IRWLockManager.LockType;
import org.eclipse.net4j.util.event.IListener;
@@ -247,7 +249,7 @@ public class CDOPushTransaction extends Notifier implements CDOTransaction
return commit(null);
}
- public CDOCommitInfo commit(IProgressMonitor progressMonitor) throws CommitException
+ public CDOCommitInfo commit(IProgressMonitor monitor) throws CommitException
{
OutputStream out = null;
@@ -268,6 +270,27 @@ public class CDOPushTransaction extends Notifier implements CDOTransaction
}
}
+ public <T> CommitResult<T> commit(Callable<T> callable, Predicate<Long> retry, IProgressMonitor monitor)
+ throws ConcurrentAccessException, CommitException, Exception
+ {
+ return delegate.commit(callable, retry, monitor);
+ }
+
+ public <T> CommitResult<T> commit(Callable<T> callable, int attempts, IProgressMonitor monitor) throws ConcurrentAccessException, CommitException, Exception
+ {
+ return delegate.commit(callable, attempts, monitor);
+ }
+
+ public CDOCommitInfo commit(Runnable runnable, Predicate<Long> retry, IProgressMonitor monitor) throws ConcurrentAccessException, CommitException
+ {
+ return delegate.commit(runnable, retry, monitor);
+ }
+
+ public CDOCommitInfo commit(Runnable runnable, int attempts, IProgressMonitor monitor) throws ConcurrentAccessException, CommitException
+ {
+ return delegate.commit(runnable, attempts, monitor);
+ }
+
public void rollback()
{
throw new UnsupportedOperationException("Rollback not supported for push transactions");
diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/transaction/CDOTransaction.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/transaction/CDOTransaction.java
index 28cbbfcc05..84db68facd 100644
--- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/transaction/CDOTransaction.java
+++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/transaction/CDOTransaction.java
@@ -31,20 +31,26 @@ import org.eclipse.emf.cdo.eresource.CDOResource;
import org.eclipse.emf.cdo.eresource.CDOResourceFolder;
import org.eclipse.emf.cdo.eresource.CDOTextResource;
import org.eclipse.emf.cdo.session.CDOSession;
+import org.eclipse.emf.cdo.util.CommitException;
+import org.eclipse.emf.cdo.util.ConcurrentAccessException;
import org.eclipse.emf.cdo.view.CDOQuery;
import org.eclipse.emf.cdo.view.CDOView;
+import org.eclipse.net4j.util.Predicate;
import org.eclipse.net4j.util.options.IOptionsEvent;
import org.eclipse.emf.common.util.URI;
import org.eclipse.emf.ecore.EObject;
import org.eclipse.emf.ecore.resource.ResourceSet;
+import org.eclipse.core.runtime.IProgressMonitor;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Callable;
/**
* A read-write view to the <em>current</em> (i.e. latest) state of the object graph in the repository.
@@ -255,6 +261,27 @@ public interface CDOTransaction extends CDOView, CDOCommonTransaction, CDOUserTr
*/
public CDOQuery createQuery(String language, String queryString, Object context, boolean considerDirtyState);
+ /**
+ * @since 4.7
+ */
+ public <T> CommitResult<T> commit(Callable<T> callable, Predicate<Long> retry, IProgressMonitor monitor)
+ throws ConcurrentAccessException, CommitException, Exception;
+
+ /**
+ * @since 4.7
+ */
+ public <T> CommitResult<T> commit(Callable<T> callable, int attempts, IProgressMonitor monitor) throws ConcurrentAccessException, CommitException, Exception;
+
+ /**
+ * @since 4.7
+ */
+ public CDOCommitInfo commit(Runnable runnable, Predicate<Long> retry, IProgressMonitor monitor) throws ConcurrentAccessException, CommitException;
+
+ /**
+ * @since 4.7
+ */
+ public CDOCommitInfo commit(Runnable runnable, int attempts, IProgressMonitor monitor) throws ConcurrentAccessException, CommitException;
+
public Options options();
/**
@@ -528,4 +555,31 @@ public interface CDOTransaction extends CDOView, CDOCommonTransaction, CDOUserTr
{
}
}
+
+ /**
+ * @author Eike Stepper
+ * @since 4.7
+ */
+ public static final class CommitResult<T>
+ {
+ private final T result;
+
+ private final CDOCommitInfo info;
+
+ public CommitResult(T result, CDOCommitInfo info)
+ {
+ this.result = result;
+ this.info = info;
+ }
+
+ public T getResult()
+ {
+ return result;
+ }
+
+ public CDOCommitInfo getInfo()
+ {
+ return info;
+ }
+ }
}
diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/transaction/CDOUserTransaction.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/transaction/CDOUserTransaction.java
index 4d7b8e9cb0..1e7cf7f3d6 100644
--- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/transaction/CDOUserTransaction.java
+++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/transaction/CDOUserTransaction.java
@@ -45,21 +45,21 @@ public interface CDOUserTransaction
* write robust transactions:
* <pre>
CDOTransaction transaction = null;
-
+
try
{
transaction = session.openTransaction();
-
+
for (;;)
{
transaction.getViewLock().lock();
-
+
try
{
CDOResource resource = transaction.getResource("/stock/resource1");
-
+
// Modify the model here...
-
+
transaction.commit();
break;
}
@@ -91,7 +91,7 @@ public interface CDOUserTransaction
*
* @since 3.0
*/
- public CDOCommitInfo commit(IProgressMonitor progressMonitor) throws ConcurrentAccessException, CommitException;
+ public CDOCommitInfo commit(IProgressMonitor monitor) throws ConcurrentAccessException, CommitException;
public void rollback();
diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/transaction/CDOTransactionImpl.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/transaction/CDOTransactionImpl.java
index 5311d463c4..9532476c2a 100644
--- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/transaction/CDOTransactionImpl.java
+++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/transaction/CDOTransactionImpl.java
@@ -115,6 +115,7 @@ import org.eclipse.emf.cdo.transaction.CDOUserSavepoint;
import org.eclipse.emf.cdo.util.CDOURIUtil;
import org.eclipse.emf.cdo.util.CDOUtil;
import org.eclipse.emf.cdo.util.CommitException;
+import org.eclipse.emf.cdo.util.ConcurrentAccessException;
import org.eclipse.emf.cdo.util.DanglingIntegrityException;
import org.eclipse.emf.cdo.util.DanglingReferenceException;
import org.eclipse.emf.cdo.util.LocalCommitConflictException;
@@ -137,6 +138,7 @@ import org.eclipse.emf.internal.cdo.view.CDOViewImpl;
import org.eclipse.net4j.util.CheckUtil;
import org.eclipse.net4j.util.ObjectUtil;
+import org.eclipse.net4j.util.Predicate;
import org.eclipse.net4j.util.WrappedException;
import org.eclipse.net4j.util.collection.AbstractCloseableIterator;
import org.eclipse.net4j.util.collection.ByteArrayWrapper;
@@ -186,6 +188,7 @@ import org.eclipse.emf.spi.cdo.InternalCDOViewSet;
import org.eclipse.core.runtime.IProgressMonitor;
import org.eclipse.core.runtime.OperationCanceledException;
+import org.eclipse.core.runtime.SubMonitor;
import java.io.IOException;
import java.io.InputStream;
@@ -1555,7 +1558,7 @@ public class CDOTransactionImpl extends CDOViewImpl implements InternalCDOTransa
/**
* @since 2.0
*/
- public CDOCommitInfo commit(IProgressMonitor progressMonitor) throws CommitException
+ public CDOCommitInfo commit(IProgressMonitor monitor) throws CommitException
{
CDOConflictResolver[] conflictResolvers = options().getConflictResolvers();
if (conflictResolvers.length != 0)
@@ -1582,7 +1585,7 @@ public class CDOTransactionImpl extends CDOViewImpl implements InternalCDOTransa
}
}
- CDOCommitInfo info = commitSynced(progressMonitor);
+ CDOCommitInfo info = commitSynced(monitor);
if (info != null)
{
long timeStamp = info.getTimeStamp();
@@ -1693,6 +1696,90 @@ public class CDOTransactionImpl extends CDOViewImpl implements InternalCDOTransa
}
}
+ public <T> CommitResult<T> commit(final Callable<T> callable, Predicate<Long> retry, IProgressMonitor monitor)
+ throws ConcurrentAccessException, CommitException, Exception
+ {
+ final Object[] result = { null };
+ final Exception[] exception = { null };
+
+ Runnable runnable = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ result[0] = callable.call();
+ }
+ catch (Exception ex)
+ {
+ exception[0] = ex;
+ }
+ }
+ };
+
+ CDOCommitInfo info = commit(runnable, retry, monitor);
+ if (exception[0] != null)
+ {
+ throw exception[0];
+ }
+
+ if (info == null)
+ {
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ T r = (T)result[0];
+ return new CommitResult<T>(r, info);
+ }
+
+ public <T> CommitResult<T> commit(Callable<T> callable, int attempts, IProgressMonitor monitor) throws ConcurrentAccessException, CommitException, Exception
+ {
+ return commit(callable, new CountedRetryPredicate(attempts), monitor);
+ }
+
+ public CDOCommitInfo commit(Runnable runnable, Predicate<Long> retry, IProgressMonitor monitor) throws ConcurrentAccessException, CommitException
+ {
+ long start = System.currentTimeMillis();
+ SubMonitor subMonitor = SubMonitor.convert(monitor);
+
+ for (;;)
+ {
+ subMonitor.setWorkRemaining(100);
+
+ synchronized (getViewMonitor())
+ {
+ lockView();
+
+ runnable.run();
+
+ try
+ {
+ return commit(subMonitor.split(1));
+ }
+ catch (ConcurrentAccessException ex)
+ {
+ if (retry.apply(System.currentTimeMillis() - start))
+ {
+ rollback();
+ continue;
+ }
+
+ throw ex;
+ }
+ finally
+ {
+ unlockView();
+ }
+ }
+ }
+ }
+
+ public CDOCommitInfo commit(Runnable runnable, int attempts, IProgressMonitor monitor) throws ConcurrentAccessException, CommitException
+ {
+ return commit(runnable, new CountedRetryPredicate(attempts), monitor);
+ }
+
public CommitToken getCommitToken()
{
return commitToken;
@@ -3118,6 +3205,7 @@ public class CDOTransactionImpl extends CDOViewImpl implements InternalCDOTransa
return new AbstractCloseableIterator<CDOResourceNode>()
{
+
private Iterator<Object> listIterator = finalList == null ? null : finalList.iterator();
@Override
@@ -4768,6 +4856,26 @@ public class CDOTransactionImpl extends CDOViewImpl implements InternalCDOTransa
/**
* @author Eike Stepper
+ */
+ private static final class CountedRetryPredicate implements Predicate<Long>
+ {
+ private final int attempts;
+
+ private int attempt;
+
+ private CountedRetryPredicate(int attempts)
+ {
+ this.attempts = attempts;
+ }
+
+ public boolean apply(Long startMillis)
+ {
+ return ++attempt < attempts;
+ }
+ }
+
+ /**
+ * @author Eike Stepper
* @since 2.0
*/
protected final class OptionsImpl extends CDOViewImpl.OptionsImpl implements CDOTransaction.Options
diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/transaction/CDOXATransactionImpl.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/transaction/CDOXATransactionImpl.java
index dbc3cb9965..84355c0150 100644
--- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/transaction/CDOXATransactionImpl.java
+++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/transaction/CDOXATransactionImpl.java
@@ -298,9 +298,9 @@ public class CDOXATransactionImpl implements InternalCDOXATransaction
return commit(null);
}
- public CDOCommitInfo commit(IProgressMonitor progressMonitor) throws CommitException
+ public CDOCommitInfo commit(IProgressMonitor monitor) throws CommitException
{
- commitPrepare(progressMonitor);
+ commitPrepare(monitor);
int phase = 0;
try
@@ -308,17 +308,17 @@ public class CDOXATransactionImpl implements InternalCDOXATransaction
// We need to complete 3 phases
while (phase <= 2)
{
- commitPhase(progressMonitor);
+ commitPhase(monitor);
++phase;
}
}
catch (Exception ex)
{
- commitException(progressMonitor, phase, ex);
+ commitException(monitor, phase, ex);
}
finally
{
- commitFinally(progressMonitor);
+ commitFinally(monitor);
}
return null;

Back to the top