From 0c1bf98c2c15a4538d4b4bd636db71008fb5f8dc Mon Sep 17 00:00:00 2001 From: Eike Stepper Date: Thu, 30 Aug 2007 10:54:22 +0000 Subject: [201265] Handling large Collection by having them in chunk https://bugs.eclipse.org/bugs/show_bug.cgi?id=201265 --- .../emf/cdo/internal/server/RevisionManager.java | 4 +-- .../server/protocol/LoadRevisionIndication.java | 31 ++++++++++++++++--- .../org/eclipse/emf/internal/cdo/CDOAuditImpl.java | 5 +-- .../emf/internal/cdo/CDORevisionManagerImpl.java | 8 ++--- .../eclipse/emf/internal/cdo/CDOSessionImpl.java | 18 +++++++++++ .../org/eclipse/emf/internal/cdo/CDOViewImpl.java | 2 +- .../cdo/protocol/CommitTransactionRequest.java | 2 +- .../internal/cdo/protocol/LoadRevisionRequest.java | 36 ++++++++++++++++++---- .../src/org/eclipse/internal/net4j/Channel.java | 9 ++++-- .../src/org/eclipse/net4j/signal/Signal.java | 5 +++ .../eclipse/net4j/stream/BufferInputStream.java | 2 +- 11 files changed, 97 insertions(+), 25 deletions(-) diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/RevisionManager.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/RevisionManager.java index 9c1508beb9..2b32a802d9 100644 --- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/RevisionManager.java +++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/RevisionManager.java @@ -60,7 +60,7 @@ public class RevisionManager extends CDORevisionResolverImpl implements IRevisio } @Override - protected CDORevisionImpl loadRevision(CDOID id) + protected CDORevisionImpl loadRevision(CDOID id, int referenceChunk) { IStoreReader storeReader = StoreUtil.getReader(); CDORevisionImpl revision = (CDORevisionImpl)storeReader.readRevision(id); @@ -69,7 +69,7 @@ public class RevisionManager extends CDORevisionResolverImpl implements IRevisio } @Override - protected CDORevisionImpl loadRevision(CDOID id, long timeStamp) + protected CDORevisionImpl loadRevision(CDOID id, int referenceChunk, long timeStamp) { IStoreReader storeReader = StoreUtil.getReader(); CDORevisionImpl revision = (CDORevisionImpl)storeReader.readRevision(id, timeStamp); diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/protocol/LoadRevisionIndication.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/protocol/LoadRevisionIndication.java index 913b95ef20..2e12d362c4 100644 --- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/protocol/LoadRevisionIndication.java +++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/protocol/LoadRevisionIndication.java @@ -31,6 +31,8 @@ public class LoadRevisionIndication extends CDOReadIndication private CDORevisionImpl revision; + private int referenceChunk; + public LoadRevisionIndication() { super(CDOProtocolConstants.SIGNAL_LOAD_REVISION); @@ -40,9 +42,28 @@ public class LoadRevisionIndication extends CDOReadIndication protected void indicating(ExtendedDataInputStream in) throws IOException { CDOID id = CDOIDImpl.read(in); - if (PROTOCOL.isEnabled()) + if (id.isTemporary()) { - PROTOCOL.format("Read ID: {0}", id); + id = CDOIDImpl.create(-id.getValue()); + if (PROTOCOL.isEnabled()) + { + PROTOCOL.format("Read ID: {0}", id); + } + + referenceChunk = in.readInt(); + if (PROTOCOL.isEnabled()) + { + PROTOCOL.format("Read referenceChunk: {0}", referenceChunk); + } + } + else + { + if (PROTOCOL.isEnabled()) + { + PROTOCOL.format("Read ID: {0}", id); + } + + referenceChunk = CDORevisionImpl.COMPLETE_REFERENCES; } boolean historical = in.readBoolean(); @@ -54,17 +75,17 @@ public class LoadRevisionIndication extends CDOReadIndication PROTOCOL.format("Read timeStamp: {0}", timeStamp); } - revision = getRevisionManager().getRevision(id, timeStamp); + revision = getRevisionManager().getRevision(id, referenceChunk, timeStamp); } else { - revision = getRevisionManager().getRevision(id); + revision = getRevisionManager().getRevision(id, referenceChunk); } } @Override protected void responding(ExtendedDataOutputStream out) throws IOException { - revision.write(out, getSession()); + revision.write(out, getSession(), referenceChunk); } } diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/CDOAuditImpl.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/CDOAuditImpl.java index 5dd9de535f..95c27de93d 100644 --- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/CDOAuditImpl.java +++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/CDOAuditImpl.java @@ -44,8 +44,9 @@ public class CDOAuditImpl extends CDOViewImpl implements CDOAudit @Override public CDORevisionImpl getRevision(CDOID id) { - CDORevisionResolver revisionManager = getSession().getRevisionManager(); - return (CDORevisionImpl)revisionManager.getRevision(id, timeStamp); + CDOSessionImpl session = getSession(); + CDORevisionResolver revisionManager = session.getRevisionManager(); + return (CDORevisionImpl)revisionManager.getRevision(id, session.getReferenceChunkSize(), timeStamp); } @Override diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/CDORevisionManagerImpl.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/CDORevisionManagerImpl.java index ba0ae0e1f1..9405fcda4d 100644 --- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/CDORevisionManagerImpl.java +++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/CDORevisionManagerImpl.java @@ -38,12 +38,12 @@ public class CDORevisionManagerImpl extends CDORevisionResolverImpl implements C } @Override - protected CDORevisionImpl loadRevision(CDOID id) + protected CDORevisionImpl loadRevision(CDOID id, int referenceChunk) { try { IFailOverStrategy failOverStrategy = session.getFailOverStrategy(); - LoadRevisionRequest request = new LoadRevisionRequest(session.getChannel(), id); + LoadRevisionRequest request = new LoadRevisionRequest(session.getChannel(), id, referenceChunk); return failOverStrategy.send(request); } catch (RuntimeException ex) @@ -57,12 +57,12 @@ public class CDORevisionManagerImpl extends CDORevisionResolverImpl implements C } @Override - protected CDORevisionImpl loadRevision(CDOID id, long timeStamp) + protected CDORevisionImpl loadRevision(CDOID id, int referenceChunk, long timeStamp) { try { IFailOverStrategy failOverStrategy = session.getFailOverStrategy(); - LoadRevisionRequest request = new LoadRevisionRequest(session.getChannel(), id, timeStamp); + LoadRevisionRequest request = new LoadRevisionRequest(session.getChannel(), id, referenceChunk, timeStamp); return failOverStrategy.send(request); } catch (RuntimeException ex) diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/CDOSessionImpl.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/CDOSessionImpl.java index 0bfdabcaff..0b67e1a07e 100644 --- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/CDOSessionImpl.java +++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/CDOSessionImpl.java @@ -18,6 +18,7 @@ import org.eclipse.emf.cdo.internal.protocol.CDOIDRangeImpl; import org.eclipse.emf.cdo.internal.protocol.model.CDOClassImpl; import org.eclipse.emf.cdo.internal.protocol.model.CDOClassRefImpl; import org.eclipse.emf.cdo.internal.protocol.model.CDOPackageImpl; +import org.eclipse.emf.cdo.internal.protocol.revision.CDORevisionImpl; import org.eclipse.emf.cdo.protocol.CDOID; import org.eclipse.emf.cdo.protocol.CDOIDRange; import org.eclipse.emf.cdo.protocol.CDOProtocolConstants; @@ -75,6 +76,8 @@ public class CDOSessionImpl extends Container implements CDOSession private int sessionID; + private int referenceChunkSize = CDORevisionImpl.COMPLETE_REFERENCES; + private IFailOverStrategy failOverStrategy; private IListener failOverStrategyListener = new IListener() @@ -134,6 +137,21 @@ public class CDOSessionImpl extends Container implements CDOSession return sessionID; } + public int getReferenceChunkSize() + { + return referenceChunkSize; + } + + public void setReferenceChunkSize(int referenceChunkSize) + { + if (referenceChunkSize < 0) + { + referenceChunkSize = CDORevisionImpl.COMPLETE_REFERENCES; + } + + this.referenceChunkSize = referenceChunkSize; + } + public IFailOverStrategy getFailOverStrategy() { return failOverStrategy == null ? IFailOverStrategy.NOOP : failOverStrategy; diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/CDOViewImpl.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/CDOViewImpl.java index a62d76ba25..90a16188c9 100644 --- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/CDOViewImpl.java +++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/CDOViewImpl.java @@ -203,7 +203,7 @@ public class CDOViewImpl extends org.eclipse.net4j.internal.util.event.Notifier public CDORevisionImpl getRevision(CDOID id) { CDORevisionResolver revisionManager = session.getRevisionManager(); - return (CDORevisionImpl)revisionManager.getRevision(id); + return (CDORevisionImpl)revisionManager.getRevision(id, session.getReferenceChunkSize()); } public InternalCDOObject getObject(CDOID id) diff --git a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/protocol/CommitTransactionRequest.java b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/protocol/CommitTransactionRequest.java index aa2ed8aef9..2f005c9a17 100644 --- a/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/protocol/CommitTransactionRequest.java +++ b/plugins/org.eclipse.emf.cdo/src/org/eclipse/emf/internal/cdo/protocol/CommitTransactionRequest.java @@ -113,7 +113,7 @@ public class CommitTransactionRequest extends CDOClientRequest private Long timeStamp; - public LoadRevisionRequest(IChannel channel, CDOID id) + private int referenceChunk; + + public LoadRevisionRequest(IChannel channel, CDOID id, int referenceChunk) { super(channel, CDOProtocolConstants.SIGNAL_LOAD_REVISION); this.id = id; + this.referenceChunk = referenceChunk; } - public LoadRevisionRequest(IChannel channel, CDOID id, long timeStamp) + public LoadRevisionRequest(IChannel channel, CDOID id, int referenceChunk, long timeStamp) { - this(channel, id); + this(channel, id, referenceChunk); this.timeStamp = timeStamp; } @Override protected void requesting(ExtendedDataOutputStream out) throws IOException { - if (PROTOCOL.isEnabled()) + if (referenceChunk == CDORevisionImpl.COMPLETE_REFERENCES) { - PROTOCOL.format("Writing ID: {0}", id); + if (PROTOCOL.isEnabled()) + { + PROTOCOL.format("Writing ID: {0}", id); + } + + CDOIDImpl.write(out, id); + } + else + { + if (PROTOCOL.isEnabled()) + { + PROTOCOL.format("Writing ID: {0}", id); + } + + CDOID negID = CDOIDImpl.create(-id.getValue()); + CDOIDImpl.write(out, negID); + + if (PROTOCOL.isEnabled()) + { + PROTOCOL.format("Writing referenceChunk: {0}", referenceChunk); + } + + out.writeInt(referenceChunk); } - CDOIDImpl.write(out, id); if (timeStamp != null) { if (PROTOCOL.isEnabled()) diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Channel.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Channel.java index 21c13ea3db..e5b75cf1ab 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Channel.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/Channel.java @@ -17,7 +17,7 @@ import org.eclipse.net4j.IBufferProvider; import org.eclipse.net4j.IChannel; import org.eclipse.net4j.IChannelID; import org.eclipse.net4j.IConnector; -import org.eclipse.net4j.internal.util.concurrent.AsynchronousWorkSerializer; +import org.eclipse.net4j.internal.util.concurrent.QueueWorkerWorkSerializer; import org.eclipse.net4j.internal.util.concurrent.SynchronousWorkSerializer; import org.eclipse.net4j.internal.util.lifecycle.Lifecycle; import org.eclipse.net4j.internal.util.om.trace.ContextTracer; @@ -221,9 +221,12 @@ public class Channel extends Lifecycle implements IChannel, IBufferProvider } else { - receiveSerializer = new AsynchronousWorkSerializer(receiveExecutor); + // receiveSerializer = new AsynchronousWorkSerializer(receiveExecutor); + + // CompletionWorkSerializer throws "One command already pending" // receiveSerializer = new CompletionWorkSerializer(); - // receiveSerializer = new QueueWorkerWorkSerializer(); + + receiveSerializer = new QueueWorkerWorkSerializer(); } } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java index 4d508ea3a7..8938993828 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java @@ -131,6 +131,11 @@ public abstract class Signal implements Runnable } catch (EOFException ex) { + if (TRACER.isEnabled()) + { + TRACER.trace(ex); + } + throw new TimeoutException("Timeout"); } catch (Exception ex) diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/stream/BufferInputStream.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/stream/BufferInputStream.java index d4ec1a5c31..9dc57c082c 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/stream/BufferInputStream.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/stream/BufferInputStream.java @@ -129,7 +129,7 @@ public class BufferInputStream extends InputStream implements IBufferHandler { if (buffers == null) { - // Stream has been closed + // Stream has been closed - shutting sown return false; } -- cgit v1.2.3