diff options
Diffstat (limited to 'plugins/org.eclipse.emf.cdo.server.db/src/org/eclipse/emf/cdo/server/internal/db/mapping/horizontal/AbstractHorizontalClassMapping.java')
-rw-r--r-- | plugins/org.eclipse.emf.cdo.server.db/src/org/eclipse/emf/cdo/server/internal/db/mapping/horizontal/AbstractHorizontalClassMapping.java | 167 |
1 files changed, 167 insertions, 0 deletions
diff --git a/plugins/org.eclipse.emf.cdo.server.db/src/org/eclipse/emf/cdo/server/internal/db/mapping/horizontal/AbstractHorizontalClassMapping.java b/plugins/org.eclipse.emf.cdo.server.db/src/org/eclipse/emf/cdo/server/internal/db/mapping/horizontal/AbstractHorizontalClassMapping.java index 1283af5e69..69acfea09e 100644 --- a/plugins/org.eclipse.emf.cdo.server.db/src/org/eclipse/emf/cdo/server/internal/db/mapping/horizontal/AbstractHorizontalClassMapping.java +++ b/plugins/org.eclipse.emf.cdo.server.db/src/org/eclipse/emf/cdo/server/internal/db/mapping/horizontal/AbstractHorizontalClassMapping.java @@ -27,18 +27,22 @@ import org.eclipse.emf.cdo.common.revision.CDORevisionManager; import org.eclipse.emf.cdo.eresource.EresourcePackage; import org.eclipse.emf.cdo.server.IRepository; import org.eclipse.emf.cdo.server.IStoreAccessor.QueryXRefsContext; +import org.eclipse.emf.cdo.server.StoreThreadLocal; import org.eclipse.emf.cdo.server.db.IDBStoreAccessor; import org.eclipse.emf.cdo.server.db.IIDHandler; import org.eclipse.emf.cdo.server.db.mapping.IClassMapping; import org.eclipse.emf.cdo.server.db.mapping.IListMapping; import org.eclipse.emf.cdo.server.db.mapping.IListMapping3; +import org.eclipse.emf.cdo.server.db.mapping.IListMappingUnitSupport; import org.eclipse.emf.cdo.server.db.mapping.IMappingStrategy; import org.eclipse.emf.cdo.server.db.mapping.ITypeMapping; +import org.eclipse.emf.cdo.server.internal.db.DBStore; import org.eclipse.emf.cdo.server.internal.db.bundle.OM; import org.eclipse.emf.cdo.server.internal.db.mapping.AbstractMappingStrategy; import org.eclipse.emf.cdo.spi.common.commit.CDOChangeSetSegment; import org.eclipse.emf.cdo.spi.common.revision.InternalCDOList; import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevision; +import org.eclipse.emf.cdo.spi.common.revision.StubCDORevision; import org.eclipse.net4j.db.DBException; import org.eclipse.net4j.db.DBType; @@ -52,6 +56,10 @@ import org.eclipse.net4j.db.ddl.IDBIndex; import org.eclipse.net4j.db.ddl.IDBSchema; import org.eclipse.net4j.db.ddl.IDBTable; import org.eclipse.net4j.spi.db.ddl.InternalDBIndex; +import org.eclipse.net4j.util.WrappedException; +import org.eclipse.net4j.util.collection.MoveableList; +import org.eclipse.net4j.util.concurrent.RunnableWithName; +import org.eclipse.net4j.util.concurrent.TimeoutRuntimeException; import org.eclipse.net4j.util.lifecycle.IDeactivateable; import org.eclipse.net4j.util.om.monitor.OMMonitor; import org.eclipse.net4j.util.om.monitor.OMMonitor.Async; @@ -76,6 +84,10 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; /** * @author Eike Stepper @@ -1016,4 +1028,159 @@ public abstract class AbstractHorizontalClassMapping implements IClassMapping, I } } } + + /** + * @author Eike Stepper + */ + protected abstract class AsnychronousListIO<T extends IListMapping> extends RunnableWithName + { + protected final IDBStoreAccessor accessor; + + protected final DBStore store; + + protected final IIDHandler idHandler; + + protected final IListMapping[] listMappings; + + private final BlockingQueue<InternalCDORevision> queue = new LinkedBlockingQueue<InternalCDORevision>(); + + private final CountDownLatch latch = new CountDownLatch(1); + + private Throwable exception; + + public AsnychronousListIO(IDBStoreAccessor accessor) + { + this.accessor = accessor; + store = (DBStore)accessor.getStore(); + idHandler = store.getIDHandler(); + + List<IListMapping> tmp = getListMappings(); + listMappings = new IListMappingUnitSupport[tmp.size()]; + + int i = 0; + for (IListMapping listMapping : tmp) + { + listMappings[i++] = listMapping; + } + } + + @Override + public String getName() + { + return getClass().getName(); + } + + public void schedule(InternalCDORevision revision) + { + queue.offer(revision); + } + + public void await(OMMonitor monitor) throws SQLException + { + // Schedule an end marker revision. + schedule(new StubCDORevision(getEClass())); + + Async async = monitor != null ? monitor.forkAsync() : null; + + try + { + while (!latch.await(100, TimeUnit.MILLISECONDS)) + { + if (monitor != null) + { + monitor.checkCanceled(); + } + } + } + catch (InterruptedException ex) + { + throw new TimeoutRuntimeException(); + } + finally + { + if (async != null) + { + async.stop(); + } + + closeStatements(monitor); + } + + if (exception instanceof RuntimeException) + { + throw (RuntimeException)exception; + } + + if (exception instanceof Error) + { + throw (Error)exception; + } + + if (exception instanceof SQLException) + { + throw (SQLException)exception; + } + + if (exception instanceof Exception) + { + throw WrappedException.wrap((Exception)exception); + } + } + + @Override + protected void doRun() + { + StoreThreadLocal.setAccessor(accessor); + + try + { + while (store.isActive()) + { + InternalCDORevision revision = queue.poll(1, TimeUnit.SECONDS); + if (revision == null) + { + continue; + } + + if (revision instanceof StubCDORevision) + { + return; + } + + runWithRevision(revision); + } + } + catch (Throwable ex) + { + exception = ex; + } + finally + { + latch.countDown(); + StoreThreadLocal.remove(); + } + } + + protected void runWithRevision(InternalCDORevision revision) throws SQLException + { + for (int i = 0; i < listMappings.length; i++) + { + @SuppressWarnings("unchecked") + T listMapping = (T)listMappings[i]; + + EStructuralFeature feature = listMapping.getFeature(); + MoveableList<Object> list = revision.getList(feature); + + if (list.size() != 0) + { + runWithList(revision, i, listMapping, list); + } + } + } + + protected abstract void runWithList(InternalCDORevision revision, int i, T listMapping, MoveableList<Object> list) + throws SQLException; + + protected abstract void closeStatements(OMMonitor monitor) throws SQLException; + } } |