diff options
author | Eike Stepper | 2008-10-02 19:20:40 +0000 |
---|---|---|
committer | Eike Stepper | 2008-10-02 19:20:40 +0000 |
commit | 04fce9b796f105acadcf4967c85a220135e027d8 (patch) | |
tree | f780443f25d626afa52e9a6eb9a171e586bd56ba | |
parent | a606614ae5708eb985dcfbd7db3665fef8ff0f9e (diff) | |
download | cdo-04fce9b796f105acadcf4967c85a220135e027d8.tar.gz cdo-04fce9b796f105acadcf4967c85a220135e027d8.tar.xz cdo-04fce9b796f105acadcf4967c85a220135e027d8.zip |
[249536] Provide a public view lock to protect clients against remote invalidation
https://bugs.eclipse.org/bugs/show_bug.cgi?id=249536
15 files changed, 150 insertions, 54 deletions
diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/CDOView.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/CDOView.java index 2a80dd4d1e..47198e51cf 100644 --- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/CDOView.java +++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/cdo/CDOView.java @@ -24,6 +24,7 @@ import org.eclipse.emf.ecore.EObject; import org.eclipse.emf.ecore.resource.ResourceSet; import java.util.List; +import java.util.concurrent.locks.ReentrantLock; /** * A read-only view to the <em>current</em> (i.e. latest) state of the object graph in the repository. @@ -47,6 +48,11 @@ public interface CDOView extends CDOProtocolView, INotifier public ResourceSet getResourceSet(); + /** + * @since 2.0 + */ + public ReentrantLock getLock(); + public boolean isDirty(); public boolean hasConflict(); diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/CDOSessionImpl.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/CDOSessionImpl.java index 12533b9df1..271d0e32df 100644 --- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/CDOSessionImpl.java +++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/CDOSessionImpl.java @@ -60,6 +60,7 @@ import org.eclipse.net4j.signal.failover.NOOPFailOverStrategy; import org.eclipse.net4j.util.ImplementationError; import org.eclipse.net4j.util.WrappedException; import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump; +import org.eclipse.net4j.util.concurrent.QueueRunner; import org.eclipse.net4j.util.container.Container; import org.eclipse.net4j.util.event.Event; import org.eclipse.net4j.util.event.EventUtil; @@ -150,6 +151,10 @@ public class CDOSessionImpl extends Container<CDOView> implements CDOSession, CD private Set<CDOViewImpl> views = new HashSet<CDOViewImpl>(); + private QueueRunner invalidationRunner; + + private Object invalidationRunnerLock = new Object(); + @ExcludeFromDump private transient Map<CDOID, InternalEObject> idToMetaInstanceMap = new HashMap<CDOID, InternalEObject>(); @@ -657,8 +662,8 @@ public class CDOSessionImpl extends Container<CDOView> implements CDOSession, CD notifyInvalidation(timestamp, dirtyOIDs, detachedObjects, null); } - private void notifyInvalidation(long timeStamp, Set<CDOIDAndVersion> dirtyOIDs, Collection<CDOID> detachedObjects, - CDOViewImpl excludedView) + private void notifyInvalidation(final long timeStamp, Set<CDOIDAndVersion> dirtyOIDs, + Collection<CDOID> detachedObjects, CDOViewImpl excludedView) { for (CDOIDAndVersion dirtyOID : dirtyOIDs) { @@ -688,27 +693,48 @@ public class CDOSessionImpl extends Container<CDOView> implements CDOSession, CD } } - dirtyOIDs = Collections.unmodifiableSet(dirtyOIDs); - detachedObjects = Collections.unmodifiableCollection(detachedObjects); + final Set<CDOIDAndVersion> finalDirtyOIDs = Collections.unmodifiableSet(dirtyOIDs); + final Collection<CDOID> finalDetachedObjects = Collections.unmodifiableCollection(detachedObjects); - for (CDOViewImpl view : getViews()) + for (final CDOViewImpl view : getViews()) { if (view != excludedView) { - try - { - view.handleInvalidation(timeStamp, dirtyOIDs, detachedObjects); - } - catch (RuntimeException ex) + QueueRunner runner = getInvalidationRunner(); + runner.addWork(new Runnable() { - OM.LOG.error(ex); - } + public void run() + { + try + { + view.handleInvalidation(timeStamp, finalDirtyOIDs, finalDetachedObjects); + } + catch (RuntimeException ex) + { + OM.LOG.error(ex); + } + } + }); } } fireInvalidationEvent(timeStamp, dirtyOIDs, detachedObjects, excludedView); } + private QueueRunner getInvalidationRunner() + { + synchronized (invalidationRunnerLock) + { + if (invalidationRunner == null) + { + invalidationRunner = new QueueRunner(); + invalidationRunner.activate(); + } + } + + return invalidationRunner; + } + private void handleChangeSubcription(Collection<CDORevisionDelta> deltas, CDOViewImpl excludedView) { if (deltas == null || deltas.size() <= 0) @@ -859,8 +885,18 @@ public class CDOSessionImpl extends Container<CDOView> implements CDOSession, CD protected void doDeactivate() throws Exception { EventUtil.removeListener(channel, channelListener); + if (invalidationRunner != null) + { + invalidationRunner.deactivate(); + invalidationRunner = null; + } + revisionManager.deactivate(); + revisionManager = null; + packageManager.deactivate(); + packageManager = null; + for (CDOViewImpl view : views.toArray(new CDOViewImpl[views.size()])) { try @@ -872,6 +908,9 @@ public class CDOSessionImpl extends Container<CDOView> implements CDOSession, CD } } + views.clear(); + views = null; + channel.close(); channel = null; super.doDeactivate(); diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/CDOViewImpl.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/CDOViewImpl.java index 0158f9ad24..90dcd44874 100644 --- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/CDOViewImpl.java +++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/CDOViewImpl.java @@ -75,6 +75,7 @@ import java.util.Map; import java.util.Set; import java.util.Map.Entry; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReentrantLock; /** * @author Eike Stepper @@ -101,6 +102,11 @@ public class CDOViewImpl extends org.eclipse.net4j.util.event.Notifier implement private CDOStore store = new CDOStore(this); + /** + * Since always almost two threads can be under contention for this lock we need no fairness ;-) + */ + private ReentrantLock lock = new ReentrantLock(false); + @ExcludeFromDump private transient CDOID lastLookupID; @@ -162,6 +168,14 @@ public class CDOViewImpl extends org.eclipse.net4j.util.event.Notifier implement return store; } + /** + * @since 2.0 + */ + public ReentrantLock getLock() + { + return lock; + } + public boolean isDirty() { return false; @@ -752,6 +766,8 @@ public class CDOViewImpl extends org.eclipse.net4j.util.event.Notifier implement * <b>Implementation note:</b> This implementation guarantees that exceptions from listener code don't propagate up to * the caller of this method. Runtime exceptions from the implementation of the {@link CDOStateMachine} are propagated * to the caller of this method but this should not happen in the absence of implementation errors. + * <p> + * Note that this method can block for an uncertain time on the reentrant view lock! * * @param timeStamp * The time stamp of the server transaction if this event was sent as a result of a successfully committed @@ -764,44 +780,53 @@ public class CDOViewImpl extends org.eclipse.net4j.util.event.Notifier implement public void handleInvalidation(long timeStamp, Set<CDOIDAndVersion> dirtyOIDs, Collection<CDOID> detachedObjects) { List<InternalCDOObject> dirtyObjects = invalidationNotificationEnabled ? new ArrayList<InternalCDOObject>() : null; - for (CDOIDAndVersion dirtyOID : dirtyOIDs) + lock.lock(); + + try { - InternalCDOObject dirtyObject; - synchronized (objects) + for (CDOIDAndVersion dirtyOID : dirtyOIDs) { - dirtyObject = objects.get(dirtyOID.getID()); - if (dirtyObject != null) + InternalCDOObject dirtyObject; + synchronized (objects) + { + dirtyObject = objects.get(dirtyOID.getID()); + if (dirtyObject != null) + { + CDOStateMachine.INSTANCE.invalidate(dirtyObject, timeStamp); + } + } + + if (dirtyObject != null && dirtyObjects != null && dirtyObject.eNotificationRequired()) { - CDOStateMachine.INSTANCE.invalidate(dirtyObject, timeStamp); + dirtyObjects.add(dirtyObject); } } - if (dirtyObject != null && dirtyObjects != null && dirtyObject.eNotificationRequired()) + for (CDOID id : detachedObjects) { - dirtyObjects.add(dirtyObject); + InternalCDOObject cdoObject = removeObject(id); + if (cdoObject != null) + { + CDOStateMachine.INSTANCE.invalidate(cdoObject); + if (dirtyObjects != null && cdoObject.eNotificationRequired()) + { + dirtyObjects.add(cdoObject); + } + } } - } - for (CDOID id : detachedObjects) - { - InternalCDOObject cdoObject = removeObject(id); - if (cdoObject != null) + if (dirtyObjects != null) { - CDOStateMachine.INSTANCE.invalidate(cdoObject); - if (dirtyObjects != null && cdoObject.eNotificationRequired()) + for (InternalCDOObject dirtyObject : dirtyObjects) { - dirtyObjects.add(cdoObject); + CDOInvalidationNotificationImpl notification = new CDOInvalidationNotificationImpl(dirtyObject); + dirtyObject.eNotify(notification); } } } - - if (dirtyObjects != null) + finally { - for (InternalCDOObject dirtyObject : dirtyObjects) - { - CDOInvalidationNotificationImpl notification = new CDOInvalidationNotificationImpl(dirtyObject); - dirtyObject.eNotify(notification); - } + lock.unlock(); } } diff --git a/plugins/org.eclipse.net4j.http.server/src/org/eclipse/net4j/http/internal/server/HTTPAcceptor.java b/plugins/org.eclipse.net4j.http.server/src/org/eclipse/net4j/http/internal/server/HTTPAcceptor.java index 670fcc32b7..fa7eea43dc 100644 --- a/plugins/org.eclipse.net4j.http.server/src/org/eclipse/net4j/http/internal/server/HTTPAcceptor.java +++ b/plugins/org.eclipse.net4j.http.server/src/org/eclipse/net4j/http/internal/server/HTTPAcceptor.java @@ -17,9 +17,9 @@ import org.eclipse.net4j.http.internal.server.bundle.OM; import org.eclipse.net4j.http.server.IHTTPAcceptor; import org.eclipse.net4j.http.server.INet4jTransportServlet; import org.eclipse.net4j.util.StringUtil; +import org.eclipse.net4j.util.concurrent.Worker; import org.eclipse.net4j.util.io.ExtendedDataInputStream; import org.eclipse.net4j.util.io.ExtendedDataOutputStream; -import org.eclipse.net4j.util.lifecycle.Worker; import org.eclipse.net4j.util.om.trace.ContextTracer; import org.eclipse.net4j.util.security.IRandomizer; diff --git a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPClientConnector.java b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPClientConnector.java index 8e031b0ea9..87d9b1e9b8 100644 --- a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPClientConnector.java +++ b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPClientConnector.java @@ -12,12 +12,12 @@ package org.eclipse.net4j.internal.http; import org.eclipse.net4j.channel.IChannel; import org.eclipse.net4j.http.internal.common.HTTPConnector; +import org.eclipse.net4j.util.concurrent.Worker; import org.eclipse.net4j.util.io.ExtendedDataInputStream; import org.eclipse.net4j.util.io.ExtendedDataOutputStream; import org.eclipse.net4j.util.io.ExtendedIOAdapter; import org.eclipse.net4j.util.io.ExtendedIOHandler; import org.eclipse.net4j.util.io.IORuntimeException; -import org.eclipse.net4j.util.lifecycle.Worker; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.HttpException; diff --git a/plugins/org.eclipse.net4j.jms.server/src/org/eclipse/net4j/jms/internal/server/Server.java b/plugins/org.eclipse.net4j.jms.server/src/org/eclipse/net4j/jms/internal/server/Server.java index d886ff06cd..ede845de04 100644 --- a/plugins/org.eclipse.net4j.jms.server/src/org/eclipse/net4j/jms/internal/server/Server.java +++ b/plugins/org.eclipse.net4j.jms.server/src/org/eclipse/net4j/jms/internal/server/Server.java @@ -20,7 +20,7 @@ import org.eclipse.net4j.jms.server.IStore; import org.eclipse.net4j.jms.server.IStoreTransaction; import org.eclipse.net4j.util.HexUtil; import org.eclipse.net4j.util.concurrent.NonBlockingLongCounter; -import org.eclipse.net4j.util.lifecycle.QueueWorker; +import org.eclipse.net4j.util.concurrent.QueueWorker; import javax.jms.Destination; import javax.naming.Context; diff --git a/plugins/org.eclipse.net4j.jms/src/org/eclipse/net4j/internal/jms/SessionImpl.java b/plugins/org.eclipse.net4j.jms/src/org/eclipse/net4j/internal/jms/SessionImpl.java index 9e4cebe9a9..227890c2b1 100644 --- a/plugins/org.eclipse.net4j.jms/src/org/eclipse/net4j/internal/jms/SessionImpl.java +++ b/plugins/org.eclipse.net4j.jms/src/org/eclipse/net4j/internal/jms/SessionImpl.java @@ -19,7 +19,7 @@ import org.eclipse.net4j.internal.jms.protocol.JMSRegisterConsumerRequest; import org.eclipse.net4j.internal.jms.protocol.JMSRollbackRequest; import org.eclipse.net4j.internal.jms.util.DestinationUtil; import org.eclipse.net4j.internal.jms.util.MessageUtil; -import org.eclipse.net4j.util.lifecycle.QueueWorker; +import org.eclipse.net4j.util.concurrent.QueueWorker; import javax.jms.BytesMessage; import javax.jms.Destination; @@ -416,6 +416,9 @@ public class SessionImpl extends QueueWorker<MessageConsumerImpl> implements Ses return "jms-session"; } + /** + * @since 2.0 + */ @Override protected void work(WorkContext context, MessageConsumerImpl consumer) { diff --git a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPAcceptor.java b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPAcceptor.java index 235e7356c2..2edb683b9a 100644 --- a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPAcceptor.java +++ b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPAcceptor.java @@ -14,8 +14,8 @@ import org.eclipse.net4j.internal.tcp.bundle.OM; import org.eclipse.net4j.tcp.ITCPAcceptor; import org.eclipse.net4j.tcp.ITCPPassiveSelectorListener; import org.eclipse.net4j.tcp.ITCPSelector; +import org.eclipse.net4j.util.concurrent.Worker; import org.eclipse.net4j.util.io.IOUtil; -import org.eclipse.net4j.util.lifecycle.Worker; import org.eclipse.net4j.util.om.trace.ContextTracer; import org.eclipse.internal.net4j.acceptor.Acceptor; diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/cache/Cache.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/cache/Cache.java index 75e9a29978..eb3d9f9d70 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/cache/Cache.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/cache/Cache.java @@ -11,7 +11,7 @@ package org.eclipse.net4j.util.cache; import org.eclipse.net4j.internal.util.bundle.OM; -import org.eclipse.net4j.util.lifecycle.Worker; +import org.eclipse.net4j.util.concurrent.Worker; import org.eclipse.net4j.util.om.trace.ContextTracer; import java.lang.ref.Reference; diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/cache/CacheMonitor.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/cache/CacheMonitor.java index 1632c9b753..6af766bdde 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/cache/CacheMonitor.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/cache/CacheMonitor.java @@ -12,8 +12,8 @@ package org.eclipse.net4j.util.cache; import org.eclipse.net4j.internal.util.bundle.OM; import org.eclipse.net4j.util.ImplementationError; +import org.eclipse.net4j.util.concurrent.Worker; import org.eclipse.net4j.util.event.Event; -import org.eclipse.net4j.util.lifecycle.Worker; import org.eclipse.net4j.util.om.trace.ContextTracer; import java.util.HashMap; diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/QueueRunner.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/QueueRunner.java new file mode 100644 index 0000000000..0682d1378e --- /dev/null +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/QueueRunner.java @@ -0,0 +1,28 @@ +/*************************************************************************** + * Copyright (c) 2004 - 2008 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.net4j.util.concurrent; + +/** + * @author Eike Stepper + * @since 2.0 + */ +public class QueueRunner extends QueueWorker<Runnable> +{ + public QueueRunner() + { + } + + @Override + protected void work(WorkContext context, Runnable runnable) + { + runnable.run(); + } +} diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/lifecycle/QueueWorker.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/QueueWorker.java index 35d5dc47d8..5685683b3e 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/lifecycle/QueueWorker.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/QueueWorker.java @@ -8,7 +8,7 @@ * Contributors: * Eike Stepper - initial API and implementation **************************************************************************/ -package org.eclipse.net4j.util.lifecycle; +package org.eclipse.net4j.util.concurrent; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -16,6 +16,7 @@ import java.util.concurrent.TimeUnit; /** * @author Eike Stepper + * @since 2.0 */ public abstract class QueueWorker<E> extends Worker { diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/QueueWorkerWorkSerializer.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/QueueWorkerWorkSerializer.java index 663dc78e36..b862aa7032 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/QueueWorkerWorkSerializer.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/QueueWorkerWorkSerializer.java @@ -10,12 +10,10 @@ **************************************************************************/ package org.eclipse.net4j.util.concurrent; -import org.eclipse.net4j.util.lifecycle.QueueWorker; - /** * @author Eike Stepper */ -public class QueueWorkerWorkSerializer extends QueueWorker<Runnable> implements IWorkSerializer +public class QueueWorkerWorkSerializer extends QueueRunner implements IWorkSerializer { public QueueWorkerWorkSerializer() { @@ -26,10 +24,4 @@ public class QueueWorkerWorkSerializer extends QueueWorker<Runnable> implements { deactivate(); } - - @Override - protected void work(WorkContext context, Runnable element) - { - element.run(); - } } diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/lifecycle/Worker.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/Worker.java index d483e81aae..4d3464734e 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/lifecycle/Worker.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/concurrent/Worker.java @@ -8,10 +8,11 @@ * Contributors: * Eike Stepper - initial API and implementation **************************************************************************/ -package org.eclipse.net4j.util.lifecycle; +package org.eclipse.net4j.util.concurrent; import org.eclipse.net4j.internal.util.bundle.OM; import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump; +import org.eclipse.net4j.util.lifecycle.Lifecycle; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -19,6 +20,7 @@ import java.util.concurrent.TimeoutException; /** * @author Eike Stepper + * @since 2.0 */ public abstract class Worker extends Lifecycle { diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/ref/ReferenceQueueWorker.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/ref/ReferenceQueueWorker.java index 0b5de3d60e..00b1a7c04c 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/ref/ReferenceQueueWorker.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/ref/ReferenceQueueWorker.java @@ -10,7 +10,7 @@ **************************************************************************/ package org.eclipse.net4j.util.ref; -import org.eclipse.net4j.util.lifecycle.Worker; +import org.eclipse.net4j.util.concurrent.Worker; import java.lang.ref.Reference; import java.lang.ref.ReferenceQueue; |