Skip to main content
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/org.eclipse.emf.cdo.server.db/src/org/eclipse/emf/cdo/server/internal/db/mapping/horizontal/AbstractHorizontalClassMapping.java')
-rw-r--r--plugins/org.eclipse.emf.cdo.server.db/src/org/eclipse/emf/cdo/server/internal/db/mapping/horizontal/AbstractHorizontalClassMapping.java167
1 files changed, 167 insertions, 0 deletions
diff --git a/plugins/org.eclipse.emf.cdo.server.db/src/org/eclipse/emf/cdo/server/internal/db/mapping/horizontal/AbstractHorizontalClassMapping.java b/plugins/org.eclipse.emf.cdo.server.db/src/org/eclipse/emf/cdo/server/internal/db/mapping/horizontal/AbstractHorizontalClassMapping.java
index 1283af5e69..69acfea09e 100644
--- a/plugins/org.eclipse.emf.cdo.server.db/src/org/eclipse/emf/cdo/server/internal/db/mapping/horizontal/AbstractHorizontalClassMapping.java
+++ b/plugins/org.eclipse.emf.cdo.server.db/src/org/eclipse/emf/cdo/server/internal/db/mapping/horizontal/AbstractHorizontalClassMapping.java
@@ -27,18 +27,22 @@ import org.eclipse.emf.cdo.common.revision.CDORevisionManager;
import org.eclipse.emf.cdo.eresource.EresourcePackage;
import org.eclipse.emf.cdo.server.IRepository;
import org.eclipse.emf.cdo.server.IStoreAccessor.QueryXRefsContext;
+import org.eclipse.emf.cdo.server.StoreThreadLocal;
import org.eclipse.emf.cdo.server.db.IDBStoreAccessor;
import org.eclipse.emf.cdo.server.db.IIDHandler;
import org.eclipse.emf.cdo.server.db.mapping.IClassMapping;
import org.eclipse.emf.cdo.server.db.mapping.IListMapping;
import org.eclipse.emf.cdo.server.db.mapping.IListMapping3;
+import org.eclipse.emf.cdo.server.db.mapping.IListMappingUnitSupport;
import org.eclipse.emf.cdo.server.db.mapping.IMappingStrategy;
import org.eclipse.emf.cdo.server.db.mapping.ITypeMapping;
+import org.eclipse.emf.cdo.server.internal.db.DBStore;
import org.eclipse.emf.cdo.server.internal.db.bundle.OM;
import org.eclipse.emf.cdo.server.internal.db.mapping.AbstractMappingStrategy;
import org.eclipse.emf.cdo.spi.common.commit.CDOChangeSetSegment;
import org.eclipse.emf.cdo.spi.common.revision.InternalCDOList;
import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevision;
+import org.eclipse.emf.cdo.spi.common.revision.StubCDORevision;
import org.eclipse.net4j.db.DBException;
import org.eclipse.net4j.db.DBType;
@@ -52,6 +56,10 @@ import org.eclipse.net4j.db.ddl.IDBIndex;
import org.eclipse.net4j.db.ddl.IDBSchema;
import org.eclipse.net4j.db.ddl.IDBTable;
import org.eclipse.net4j.spi.db.ddl.InternalDBIndex;
+import org.eclipse.net4j.util.WrappedException;
+import org.eclipse.net4j.util.collection.MoveableList;
+import org.eclipse.net4j.util.concurrent.RunnableWithName;
+import org.eclipse.net4j.util.concurrent.TimeoutRuntimeException;
import org.eclipse.net4j.util.lifecycle.IDeactivateable;
import org.eclipse.net4j.util.om.monitor.OMMonitor;
import org.eclipse.net4j.util.om.monitor.OMMonitor.Async;
@@ -76,6 +84,10 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
/**
* @author Eike Stepper
@@ -1016,4 +1028,159 @@ public abstract class AbstractHorizontalClassMapping implements IClassMapping, I
}
}
}
+
+ /**
+ * @author Eike Stepper
+ */
+ protected abstract class AsnychronousListIO<T extends IListMapping> extends RunnableWithName
+ {
+ protected final IDBStoreAccessor accessor;
+
+ protected final DBStore store;
+
+ protected final IIDHandler idHandler;
+
+ protected final IListMapping[] listMappings;
+
+ private final BlockingQueue<InternalCDORevision> queue = new LinkedBlockingQueue<InternalCDORevision>();
+
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ private Throwable exception;
+
+ public AsnychronousListIO(IDBStoreAccessor accessor)
+ {
+ this.accessor = accessor;
+ store = (DBStore)accessor.getStore();
+ idHandler = store.getIDHandler();
+
+ List<IListMapping> tmp = getListMappings();
+ listMappings = new IListMappingUnitSupport[tmp.size()];
+
+ int i = 0;
+ for (IListMapping listMapping : tmp)
+ {
+ listMappings[i++] = listMapping;
+ }
+ }
+
+ @Override
+ public String getName()
+ {
+ return getClass().getName();
+ }
+
+ public void schedule(InternalCDORevision revision)
+ {
+ queue.offer(revision);
+ }
+
+ public void await(OMMonitor monitor) throws SQLException
+ {
+ // Schedule an end marker revision.
+ schedule(new StubCDORevision(getEClass()));
+
+ Async async = monitor != null ? monitor.forkAsync() : null;
+
+ try
+ {
+ while (!latch.await(100, TimeUnit.MILLISECONDS))
+ {
+ if (monitor != null)
+ {
+ monitor.checkCanceled();
+ }
+ }
+ }
+ catch (InterruptedException ex)
+ {
+ throw new TimeoutRuntimeException();
+ }
+ finally
+ {
+ if (async != null)
+ {
+ async.stop();
+ }
+
+ closeStatements(monitor);
+ }
+
+ if (exception instanceof RuntimeException)
+ {
+ throw (RuntimeException)exception;
+ }
+
+ if (exception instanceof Error)
+ {
+ throw (Error)exception;
+ }
+
+ if (exception instanceof SQLException)
+ {
+ throw (SQLException)exception;
+ }
+
+ if (exception instanceof Exception)
+ {
+ throw WrappedException.wrap((Exception)exception);
+ }
+ }
+
+ @Override
+ protected void doRun()
+ {
+ StoreThreadLocal.setAccessor(accessor);
+
+ try
+ {
+ while (store.isActive())
+ {
+ InternalCDORevision revision = queue.poll(1, TimeUnit.SECONDS);
+ if (revision == null)
+ {
+ continue;
+ }
+
+ if (revision instanceof StubCDORevision)
+ {
+ return;
+ }
+
+ runWithRevision(revision);
+ }
+ }
+ catch (Throwable ex)
+ {
+ exception = ex;
+ }
+ finally
+ {
+ latch.countDown();
+ StoreThreadLocal.remove();
+ }
+ }
+
+ protected void runWithRevision(InternalCDORevision revision) throws SQLException
+ {
+ for (int i = 0; i < listMappings.length; i++)
+ {
+ @SuppressWarnings("unchecked")
+ T listMapping = (T)listMappings[i];
+
+ EStructuralFeature feature = listMapping.getFeature();
+ MoveableList<Object> list = revision.getList(feature);
+
+ if (list.size() != 0)
+ {
+ runWithList(revision, i, listMapping, list);
+ }
+ }
+ }
+
+ protected abstract void runWithList(InternalCDORevision revision, int i, T listMapping, MoveableList<Object> list)
+ throws SQLException;
+
+ protected abstract void closeStatements(OMMonitor monitor) throws SQLException;
+ }
}

Back to the top