Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEike Stepper2018-05-24 02:54:10 -0400
committerEike Stepper2018-05-24 02:54:10 -0400
commitbc842e8edda8edaaf40c281760e7791af16075d4 (patch)
tree1cb3c329151a00ed03fc852bf32c61bb955f8f79
parent0664b0471e5b12aca1965fabd0b698c57ea42d70 (diff)
downloadcdo-bc842e8edda8edaaf40c281760e7791af16075d4.tar.gz
cdo-bc842e8edda8edaaf40c281760e7791af16075d4.tar.xz
cdo-bc842e8edda8edaaf40c281760e7791af16075d4.zip
[517225] StringCompressor can create huge memory leak
https://bugs.eclipse.org/bugs/show_bug.cgi?id=517225
-rw-r--r--plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/common/protocol/CDOProtocolConstants.java3
-rw-r--r--plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/protocol/CDOClientProtocol.java17
-rw-r--r--plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/CDOServerProtocol.java19
-rw-r--r--plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_340709_Test.java2
-rw-r--r--plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_441136_Test.java21
-rw-r--r--plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_517225_Test.java117
-rw-r--r--plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/StringCompressor.java123
-rw-r--r--plugins/org.eclipse.net4j/.settings/.api_filters147
-rw-r--r--plugins/org.eclipse.net4j/META-INF/MANIFEST.MF35
-rw-r--r--plugins/org.eclipse.net4j/pom.xml2
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/AcknowledgeCompressedStringsIndication.java37
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/AcknowledgeCompressedStringsRequest.java28
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java50
13 files changed, 541 insertions, 60 deletions
diff --git a/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/common/protocol/CDOProtocolConstants.java b/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/common/protocol/CDOProtocolConstants.java
index d1ac37326d..7b7fc0565c 100644
--- a/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/common/protocol/CDOProtocolConstants.java
+++ b/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/common/protocol/CDOProtocolConstants.java
@@ -32,8 +32,9 @@ public interface CDOProtocolConstants
* @since 4.2
* @noreference This field is not intended to be referenced by clients.
*/
- public static final int PROTOCOL_VERSION = 36; // CDOID.NIL
+ public static final int PROTOCOL_VERSION = 37; // SIGNAL_ACKNOWLEDGE_COMPRESSED_STRINGS
+ // public static final int PROTOCOL_VERSION = 36; // CDOID.NIL
// public static final int PROTOCOL_VERSION = 35; // DiffieHellman.Server.Challenge.getSecretAlgorithmKeyLen()
// public static final int PROTOCOL_VERSION = 34; // CDOSessionProtocol.loadMergeData2()
// public static final int PROTOCOL_VERSION = 33; // CDOCommitInfo.getMergeSource()
diff --git a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/protocol/CDOClientProtocol.java b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/protocol/CDOClientProtocol.java
index 8ef48e3cd6..7042dec127 100644
--- a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/protocol/CDOClientProtocol.java
+++ b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/protocol/CDOClientProtocol.java
@@ -61,6 +61,7 @@ import org.eclipse.net4j.util.collection.Pair;
import org.eclipse.net4j.util.concurrent.IRWLockManager.LockType;
import org.eclipse.net4j.util.io.StringCompressor;
import org.eclipse.net4j.util.io.StringIO;
+import org.eclipse.net4j.util.om.OMPlatform;
import org.eclipse.net4j.util.om.monitor.Monitor;
import org.eclipse.net4j.util.om.monitor.OMMonitor;
import org.eclipse.net4j.util.om.trace.PerfTracer;
@@ -89,7 +90,10 @@ public class CDOClientProtocol extends AuthenticatingSignalProtocol<CDOSessionIm
{
private static final PerfTracer REVISION_LOADING = new PerfTracer(OM.PERF_REVISION_LOADING, CDOClientProtocol.class);
- private StringIO packageURICompressor = StringCompressor.BYPASS ? StringIO.DIRECT : new StringCompressor(true);
+ private static final boolean COMPRESS_PACKAGE_URIS = OMPlatform.INSTANCE.isProperty("org.eclipse.emf.cdo.protocol.compressPackageURIs",
+ !StringCompressor.BYPASS);
+
+ private StringIO packageURICompressor = COMPRESS_PACKAGE_URIS ? new StringCompressor(true) : StringIO.DIRECT;
public CDOClientProtocol()
{
@@ -542,6 +546,17 @@ public class CDOClientProtocol extends AuthenticatingSignalProtocol<CDOSessionIm
}
@Override
+ protected StringCompressor getStringCompressor()
+ {
+ if (COMPRESS_PACKAGE_URIS)
+ {
+ return (StringCompressor)packageURICompressor;
+ }
+
+ return super.getStringCompressor();
+ }
+
+ @Override
protected SignalReactor createSignalReactor(short signalID)
{
switch (signalID)
diff --git a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/CDOServerProtocol.java b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/CDOServerProtocol.java
index 37fa9d40a9..761cd0c69f 100644
--- a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/CDOServerProtocol.java
+++ b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/CDOServerProtocol.java
@@ -34,6 +34,7 @@ import org.eclipse.net4j.signal.security.AuthenticationRequest;
import org.eclipse.net4j.util.io.StringCompressor;
import org.eclipse.net4j.util.io.StringIO;
import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
+import org.eclipse.net4j.util.om.OMPlatform;
import org.eclipse.net4j.util.om.monitor.Monitor;
import org.eclipse.net4j.util.om.trace.ContextTracer;
import org.eclipse.net4j.util.security.CredentialsUpdateOperation;
@@ -49,12 +50,15 @@ public class CDOServerProtocol extends SignalProtocol<InternalSession> implement
private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_PROTOCOL, CDOServerProtocol.class);
+ private static final boolean COMPRESS_PACKAGE_URIS = OMPlatform.INSTANCE.isProperty("org.eclipse.emf.cdo.protocol.compressPackageURIs",
+ !StringCompressor.BYPASS);
+
+ private StringIO packageURICompressor = COMPRESS_PACKAGE_URIS ? new StringCompressor(false) : StringIO.DIRECT;
+
private long negotiationTimeout = DEFAULT_NEGOTIATION_TIMEOUT;
private IRepositoryProvider repositoryProvider;
- private StringIO packageURICompressor = StringCompressor.BYPASS ? StringIO.DIRECT : new StringCompressor(false);
-
public CDOServerProtocol(IRepositoryProvider repositoryProvider)
{
super(PROTOCOL_NAME);
@@ -225,6 +229,17 @@ public class CDOServerProtocol extends SignalProtocol<InternalSession> implement
}
@Override
+ protected StringCompressor getStringCompressor()
+ {
+ if (COMPRESS_PACKAGE_URIS)
+ {
+ return (StringCompressor)packageURICompressor;
+ }
+
+ return super.getStringCompressor();
+ }
+
+ @Override
protected SignalReactor createSignalReactor(short signalID)
{
switch (signalID)
diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_340709_Test.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_340709_Test.java
index d9c31363d6..6c2929cfb7 100644
--- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_340709_Test.java
+++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_340709_Test.java
@@ -27,7 +27,7 @@ import java.util.concurrent.CountDownLatch;
*/
public class Bugzilla_340709_Test extends AbstractCDOTest
{
- CountDownLatch latch = new CountDownLatch(1);
+ private transient CountDownLatch latch = new CountDownLatch(1);
public void test() throws Exception
{
diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_441136_Test.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_441136_Test.java
index a6c2d45e31..b68826183b 100644
--- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_441136_Test.java
+++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_441136_Test.java
@@ -24,6 +24,8 @@ import org.eclipse.emf.cdo.tests.AbstractCDOTest;
import org.eclipse.emf.cdo.tests.model1.Company;
import org.eclipse.emf.cdo.transaction.CDOTransaction;
+import org.eclipse.net4j.signal.AcknowledgeCompressedStringsIndication;
+import org.eclipse.net4j.signal.AcknowledgeCompressedStringsRequest;
import org.eclipse.net4j.signal.ISignalProtocol;
import org.eclipse.net4j.signal.IndicationWithMonitoring;
import org.eclipse.net4j.signal.MonitorProgressIndication;
@@ -72,30 +74,35 @@ public class Bugzilla_441136_Test extends AbstractCDOTest
Company company = getModel1Factory().createCompany();
resource.getContents().add(company);
transaction.commit(useMonitor ? new NullProgressMonitor() : null);
- String assertMessage = " differents kinds of requests should have been sent, QueryRequest, QueryCancel, LoadRevisionsRequest and CommitTransactionRequest";
+
+ signalCounter.removeCountFor(AcknowledgeCompressedStringsRequest.class);
+ signalCounter.removeCountFor(AcknowledgeCompressedStringsIndication.class);
+
int nbExpectedCalls;
- if (!useMonitor)
+ String assertMessage = " differents kinds of requests should have been sent, QueryRequest, QueryCancel, LoadRevisionsRequest and CommitTransactionRequest";
+
+ if (useMonitor)
{
+ nbExpectedCalls = 5;
+ assertMessage += " and MonitorProgressIndications should have been received";
+
// QueryRequest, QueryCancel are used to get the resourcePath
- nbExpectedCalls = 4;
assertEquals(nbExpectedCalls + assertMessage, nbExpectedCalls, signalCounter.getCountForSignalTypes());
assertNotSame(0, signalCounter.getCountFor(QueryRequest.class));
assertNotSame(0, signalCounter.getCountFor(QueryCancelRequest.class));
assertNotSame(0, signalCounter.getCountFor(LoadRevisionsRequest.class));
assertNotSame(0, signalCounter.getCountFor(CommitTransactionRequest.class));
+ assertNotSame(0, signalCounter.getCountFor(MonitorProgressIndication.class));
}
else
{
- nbExpectedCalls = 5;
- assertMessage += " and MonitorProgressIndications should have been received";
-
// QueryRequest, QueryCancel are used to get the resourcePath
+ nbExpectedCalls = 4;
assertEquals(nbExpectedCalls + assertMessage, nbExpectedCalls, signalCounter.getCountForSignalTypes());
assertNotSame(0, signalCounter.getCountFor(QueryRequest.class));
assertNotSame(0, signalCounter.getCountFor(QueryCancelRequest.class));
assertNotSame(0, signalCounter.getCountFor(LoadRevisionsRequest.class));
assertNotSame(0, signalCounter.getCountFor(CommitTransactionRequest.class));
- assertNotSame(0, signalCounter.getCountFor(MonitorProgressIndication.class));
}
protocol.removeListener(signalCounter);
diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_517225_Test.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_517225_Test.java
new file mode 100644
index 0000000000..2842217c54
--- /dev/null
+++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_517225_Test.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright (c) 2018 Eike Stepper (Berlin, Germany) and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ */
+package org.eclipse.emf.cdo.tests.bugzilla;
+
+import org.eclipse.emf.cdo.eresource.CDOResource;
+import org.eclipse.emf.cdo.internal.net4j.protocol.CDOClientProtocol;
+import org.eclipse.emf.cdo.net4j.CDONet4jSession;
+import org.eclipse.emf.cdo.session.CDOSession;
+import org.eclipse.emf.cdo.tests.AbstractCDOTest;
+import org.eclipse.emf.cdo.tests.model1.Category;
+import org.eclipse.emf.cdo.tests.model1.Company;
+import org.eclipse.emf.cdo.tests.model1.Product1;
+import org.eclipse.emf.cdo.transaction.CDOTransaction;
+
+import org.eclipse.net4j.signal.SignalProtocol;
+import org.eclipse.net4j.util.ReflectUtil;
+import org.eclipse.net4j.util.io.StringCompressor;
+
+import org.eclipse.emf.common.util.EList;
+import org.eclipse.emf.ecore.EObject;
+import org.eclipse.emf.spi.cdo.CDOSessionProtocol;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * Bug 517225: StringCompressor can create huge memory leak
+ *
+ * @author Eike Stepper
+ */
+public class Bugzilla_517225_Test extends AbstractCDOTest
+{
+ private static final int LEVELS = 3;
+
+ private static final int CATEGORIES = 3;
+
+ private static final int PRODUCTS = 15;
+
+ @SuppressWarnings("unchecked")
+ public void testStringCompressorLeak() throws Exception
+ {
+ // Initialize model.
+ {
+ CDOSession session = openSession();
+ CDOTransaction transaction = session.openTransaction();
+ CDOResource resource = transaction.createResource(getResourcePath("res"));
+
+ Category category = getModel1Factory().createCategory();
+ category.setName("ROOT");
+
+ Company company = getModel1Factory().createCompany();
+ company.getCategories().add(category);
+ resource.getContents().add(company);
+
+ createModel(category, LEVELS);
+ transaction.commit();
+ }
+
+ CDONet4jSession session = (CDONet4jSession)openSession();
+ CDOClientProtocol protocol = (CDOClientProtocol)(CDOSessionProtocol)session.options().getNet4jProtocol();
+ StringCompressor compressor = (StringCompressor)protocol.getPackageURICompressor();
+
+ Collection<Integer> pendingAcknowledgements = (Collection<Integer>)ReflectUtil
+ .getValue(ReflectUtil.getField(StringCompressor.class, "pendingAcknowledgements"), compressor);
+
+ CDOTransaction transaction = session.openTransaction();
+ CDOResource resource = transaction.getResource(getResourcePath("res"));
+ for (Iterator<EObject> it = resource.eAllContents(); it.hasNext();)
+ {
+ it.next();
+ }
+
+ // Should be 9 with a HashSet instead of 728 with an ArrayList.
+ assertTrue("pendingAcknowledgements: " + pendingAcknowledgements, pendingAcknowledgements.size() < 10);
+
+ sleep(SignalProtocol.COMPRESSED_STRINGS_ACKNOWLEDGE_TIMEOUT + 1000);
+ session.openView();
+
+ pendingAcknowledgements = (Collection<Integer>)ReflectUtil.getValue(ReflectUtil.getField(StringCompressor.class, "pendingAcknowledgements"), compressor);
+ assertEquals("pendingAcknowledgements: " + pendingAcknowledgements, 0, pendingAcknowledgements.size());
+ }
+
+ private void createModel(Category parent, int levels)
+ {
+ EList<Category> categories = parent.getCategories();
+ for (int i = 0; i < CATEGORIES; i++)
+ {
+ Category category = getModel1Factory().createCategory();
+ category.setName("Category" + levels + "-" + i);
+ categories.add(category);
+ }
+
+ EList<Product1> products = parent.getProducts();
+ for (int i = 0; i < PRODUCTS; i++)
+ {
+ Product1 product = getModel1Factory().createProduct1();
+ product.setName("Product" + levels + "-" + i);
+ products.add(product);
+ }
+
+ if (levels > 0)
+ {
+ for (Category category : categories)
+ {
+ createModel(category, levels - 1);
+ }
+ }
+ }
+}
diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/StringCompressor.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/StringCompressor.java
index e44cfe3bcc..cac5414e89 100644
--- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/StringCompressor.java
+++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/StringCompressor.java
@@ -11,12 +11,13 @@
package org.eclipse.net4j.util.io;
import org.eclipse.net4j.util.CheckUtil;
+import org.eclipse.net4j.util.om.OMPlatform;
import java.io.IOException;
import java.text.MessageFormat;
-import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
import java.util.Map;
/**
@@ -28,7 +29,7 @@ public class StringCompressor implements StringIO
/**
* @since 3.0
*/
- public static boolean BYPASS = false;
+ public static boolean BYPASS = OMPlatform.INSTANCE.isProperty("org.eclipse.net4j.util.io.StringCompressor.BYPASS");
private static final int NULL_ID = 0;
@@ -56,7 +57,9 @@ public class StringCompressor implements StringIO
private Map<Integer, String> idToString = new HashMap<Integer, String>();
- private List<Integer> pendingAcknowledgements = new ArrayList<Integer>();
+ private Collection<Integer> pendingAcknowledgements = createAcknowledgementCollection();
+
+ private long lastAcknowledgementCheck;
/**
* Creates a StringCompressor instance.
@@ -88,8 +91,9 @@ public class StringCompressor implements StringIO
}
ID id;
- List<Integer> acknowledgements = null;
+ Collection<Integer> acknowledgements = null;
boolean stringFollows = false;
+
synchronized (this)
{
id = stringToID.get(string);
@@ -107,10 +111,11 @@ public class StringCompressor implements StringIO
stringFollows = true;
}
+ lastAcknowledgementCheck = System.currentTimeMillis();
if (!pendingAcknowledgements.isEmpty())
{
acknowledgements = pendingAcknowledgements;
- pendingAcknowledgements = new ArrayList<Integer>();
+ pendingAcknowledgements = createAcknowledgementCollection();
}
}
@@ -125,16 +130,7 @@ public class StringCompressor implements StringIO
writeString(out, string);
}
- if (acknowledgements != null)
- {
- for (int ack : acknowledgements)
- {
- writeByte(out, ACK_FOLLOWS);
- writeInt(out, ack);
- }
- }
-
- writeByte(out, NOTHING_FOLLOWS);
+ writeAcknowledgements(out, acknowledgements);
}
else
{
@@ -156,7 +152,7 @@ public class StringCompressor implements StringIO
}
String string = null;
- List<Integer> acks = null;
+ Collection<Integer> acks = null;
if (id == INFO_FOLLOWS)
{
id = readInt(in);
@@ -178,7 +174,7 @@ public class StringCompressor implements StringIO
case ACK_FOLLOWS:
if (acks == null)
{
- acks = new ArrayList<Integer>();
+ acks = createAcknowledgementCollection();
}
acks.add(readInt(in));
@@ -192,7 +188,8 @@ public class StringCompressor implements StringIO
synchronized (this)
{
- acknowledge(acks);
+ processAcknowledgements(acks);
+
if (string != null)
{
stringToID.put(string, new ID(id));
@@ -212,31 +209,92 @@ public class StringCompressor implements StringIO
return string;
}
- @Override
- public String toString()
+ /**
+ * @since 3.8
+ */
+ public Collection<Integer> getPendingAcknowledgements(long timeout)
{
- return MessageFormat.format("StringCompressor[client={0}]", client); //$NON-NLS-1$
+ Collection<Integer> acknowledgements = null;
+
+ synchronized (this)
+ {
+ long now = System.currentTimeMillis();
+ if (lastAcknowledgementCheck + timeout < now)
+ {
+ lastAcknowledgementCheck = now;
+ if (!pendingAcknowledgements.isEmpty())
+ {
+ acknowledgements = pendingAcknowledgements;
+ pendingAcknowledgements = createAcknowledgementCollection();
+ }
+ }
+ }
+
+ return acknowledgements;
+ }
+
+ /**
+ * @since 3.8
+ */
+ public void writeAcknowledgements(ExtendedDataOutput out, Collection<Integer> acknowledgements) throws IOException
+ {
+ if (acknowledgements != null)
+ {
+ for (int ack : acknowledgements)
+ {
+ writeByte(out, ACK_FOLLOWS);
+ writeInt(out, ack);
+ }
+ }
+
+ writeByte(out, NOTHING_FOLLOWS);
+ }
+
+ /**
+ * @since 3.8
+ */
+ public Collection<Integer> readAcknowledgements(ExtendedDataInput in) throws IOException
+ {
+ Collection<Integer> acknowledgements = createAcknowledgementCollection();
+ while (in.readByte() == ACK_FOLLOWS)
+ {
+ acknowledgements.add(in.readInt());
+ }
+
+ return acknowledgements;
}
- private void acknowledge(List<Integer> acks)
+ /**
+ * @since 3.8
+ */
+ public void processAcknowledgements(Collection<Integer> acknowledgements)
{
- if (acks != null)
+ if (acknowledgements != null)
{
- for (int value : acks)
+ synchronized (this)
{
- String string = idToString.get(value);
- if (string != null)
+ for (int value : acknowledgements)
{
- ID id = stringToID.get(string);
- if (id != null)
+ String string = idToString.get(value);
+ if (string != null)
{
- id.setAcknowledged();
+ ID id = stringToID.get(string);
+ if (id != null)
+ {
+ id.setAcknowledged();
+ }
}
}
}
}
}
+ @Override
+ public String toString()
+ {
+ return MessageFormat.format("StringCompressor[client={0}]", client); //$NON-NLS-1$
+ }
+
private void writeByte(ExtendedDataOutput out, byte value) throws IOException
{
if (DEBUG)
@@ -375,6 +433,11 @@ public class StringCompressor implements StringIO
IOUtil.OUT().println(msg);
}
+ private static Collection<Integer> createAcknowledgementCollection()
+ {
+ return new HashSet<Integer>();
+ }
+
/**
* @author Eike Stepper
*/
diff --git a/plugins/org.eclipse.net4j/.settings/.api_filters b/plugins/org.eclipse.net4j/.settings/.api_filters
index e9b9fc76f6..14ee494c23 100644
--- a/plugins/org.eclipse.net4j/.settings/.api_filters
+++ b/plugins/org.eclipse.net4j/.settings/.api_filters
@@ -1,6 +1,44 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<component id="org.eclipse.net4j" version="2">
+ <resource path="src/org/eclipse/internal/net4j/TransportConfig.java" type="org.eclipse.internal.net4j.TransportConfig">
+ <filter id="574619656">
+ <message_arguments>
+ <message_argument value="ITransportConfig"/>
+ <message_argument value="TransportConfig"/>
+ </message_arguments>
+ </filter>
+ </resource>
+ <resource path="src/org/eclipse/internal/net4j/buffer/Buffer.java" type="org.eclipse.internal.net4j.buffer.Buffer">
+ <filter id="574619656">
+ <message_arguments>
+ <message_argument value="InternalBuffer"/>
+ <message_argument value="Buffer"/>
+ </message_arguments>
+ </filter>
+ </resource>
+ <resource path="src/org/eclipse/internal/net4j/buffer/BufferProvider.java" type="org.eclipse.internal.net4j.buffer.BufferProvider$BufferProviderEvent$Provided">
+ <filter id="574619656">
+ <message_arguments>
+ <message_argument value="BufferProvidedEvent"/>
+ <message_argument value="Provided"/>
+ </message_arguments>
+ </filter>
+ </resource>
+ <resource path="src/org/eclipse/internal/net4j/buffer/BufferProvider.java" type="org.eclipse.internal.net4j.buffer.BufferProvider$BufferProviderEvent$Retaining">
+ <filter id="574619656">
+ <message_arguments>
+ <message_argument value="BufferRetainingEvent"/>
+ <message_argument value="Retaining"/>
+ </message_arguments>
+ </filter>
+ </resource>
<resource path="src/org/eclipse/net4j/connector/IConnector.java" type="org.eclipse.net4j.connector.IConnector">
+ <filter id="571473929">
+ <message_arguments>
+ <message_argument value="IChannelMultiplexer"/>
+ <message_argument value="IConnector"/>
+ </message_arguments>
+ </filter>
<filter id="576778288">
<message_arguments>
<message_argument value="IChannelMultiplexer"/>
@@ -8,6 +46,14 @@
</message_arguments>
</filter>
</resource>
+ <resource path="src/org/eclipse/net4j/connector/IConnectorStateEvent.java" type="org.eclipse.net4j.connector.IConnectorStateEvent">
+ <filter id="571473929">
+ <message_arguments>
+ <message_argument value="IConnectorEvent"/>
+ <message_argument value="IConnectorStateEvent"/>
+ </message_arguments>
+ </filter>
+ </resource>
<resource path="src/org/eclipse/net4j/connector/IServerConnector.java" type="org.eclipse.net4j.connector.IServerConnector">
<filter id="576720909">
<message_arguments>
@@ -16,6 +62,89 @@
</message_arguments>
</filter>
</resource>
+ <resource path="src/org/eclipse/net4j/signal/SignalProtocol.java" type="org.eclipse.net4j.signal.SignalProtocol">
+ <filter id="336658481">
+ <message_arguments>
+ <message_argument value="org.eclipse.net4j.signal.SignalProtocol"/>
+ <message_argument value="COMPRESSED_STRINGS_ACKNOWLEDGE_TIMEOUT"/>
+ </message_arguments>
+ </filter>
+ <filter id="336658481">
+ <message_arguments>
+ <message_argument value="org.eclipse.net4j.signal.SignalProtocol"/>
+ <message_argument value="SIGNAL_ACKNOWLEDGE_COMPRESSED_STRINGS"/>
+ </message_arguments>
+ </filter>
+ </resource>
+ <resource path="src/org/eclipse/spi/net4j/Acceptor.java" type="org.eclipse.spi.net4j.Acceptor">
+ <filter id="574668824">
+ <message_arguments>
+ <message_argument value="InternalAcceptor"/>
+ <message_argument value="Acceptor"/>
+ <message_argument value="IAcceptor"/>
+ </message_arguments>
+ </filter>
+ </resource>
+ <resource path="src/org/eclipse/spi/net4j/Channel.java" type="org.eclipse.spi.net4j.Channel">
+ <filter id="574668824">
+ <message_arguments>
+ <message_argument value="InternalChannel"/>
+ <message_argument value="Channel"/>
+ <message_argument value="IChannel"/>
+ </message_arguments>
+ </filter>
+ </resource>
+ <resource path="src/org/eclipse/spi/net4j/Channel.java" type="org.eclipse.spi.net4j.Channel$SendQueue">
+ <filter id="338792546">
+ <message_arguments>
+ <message_argument value="org.eclipse.spi.net4j.Channel.SendQueue"/>
+ <message_argument value="add(IBuffer)"/>
+ </message_arguments>
+ </filter>
+ <filter id="338792546">
+ <message_arguments>
+ <message_argument value="org.eclipse.spi.net4j.Channel.SendQueue"/>
+ <message_argument value="remove()"/>
+ </message_arguments>
+ </filter>
+ </resource>
+ <resource path="src/org/eclipse/spi/net4j/Channel.java" type="org.eclipse.spi.net4j.Channel$SendQueueEventImpl">
+ <filter id="574619656">
+ <message_arguments>
+ <message_argument value="SendQueueEvent"/>
+ <message_argument value="SendQueueEventImpl"/>
+ </message_arguments>
+ </filter>
+ </resource>
+ <resource path="src/org/eclipse/spi/net4j/ChannelMultiplexer.java">
+ <filter id="0"/>
+ </resource>
+ <resource path="src/org/eclipse/spi/net4j/ChannelMultiplexer.java" type="org.eclipse.spi.net4j.ChannelMultiplexer">
+ <filter id="574668824">
+ <message_arguments>
+ <message_argument value="InternalChannelMultiplexer"/>
+ <message_argument value="ChannelMultiplexer"/>
+ <message_argument value="IChannelMultiplexer"/>
+ </message_arguments>
+ </filter>
+ </resource>
+ <resource path="src/org/eclipse/spi/net4j/Connector.java" type="org.eclipse.spi.net4j.Connector">
+ <filter id="574668824">
+ <message_arguments>
+ <message_argument value="InternalConnector"/>
+ <message_argument value="Connector"/>
+ <message_argument value="IConnector"/>
+ </message_arguments>
+ </filter>
+ </resource>
+ <resource path="src/org/eclipse/spi/net4j/Connector.java" type="org.eclipse.spi.net4j.Connector$ConnectorStateEvent">
+ <filter id="574619656">
+ <message_arguments>
+ <message_argument value="IConnectorStateEvent"/>
+ <message_argument value="ConnectorStateEvent"/>
+ </message_arguments>
+ </filter>
+ </resource>
<resource path="src/org/eclipse/spi/net4j/InternalAcceptor.java" type="org.eclipse.spi.net4j.InternalAcceptor">
<filter id="576720909">
<message_arguments>
@@ -25,6 +154,12 @@
</filter>
</resource>
<resource path="src/org/eclipse/spi/net4j/InternalBuffer.java" type="org.eclipse.spi.net4j.InternalBuffer">
+ <filter id="571473929">
+ <message_arguments>
+ <message_argument value="IBuffer"/>
+ <message_argument value="InternalBuffer"/>
+ </message_arguments>
+ </filter>
<filter id="576778288">
<message_arguments>
<message_argument value="IBuffer"/>
@@ -33,6 +168,12 @@
</filter>
</resource>
<resource path="src/org/eclipse/spi/net4j/InternalChannel.java" type="org.eclipse.spi.net4j.InternalChannel">
+ <filter id="571473929">
+ <message_arguments>
+ <message_argument value="IChannel"/>
+ <message_argument value="InternalChannel"/>
+ </message_arguments>
+ </filter>
<filter id="576778288">
<message_arguments>
<message_argument value="IChannel"/>
@@ -53,6 +194,12 @@
<message_argument value="CONTEXT_MULTIPLEXER"/>
</message_arguments>
</filter>
+ <filter id="571473929">
+ <message_arguments>
+ <message_argument value="IChannelMultiplexer"/>
+ <message_argument value="InternalChannelMultiplexer"/>
+ </message_arguments>
+ </filter>
<filter id="576778288">
<message_arguments>
<message_argument value="IChannelMultiplexer"/>
diff --git a/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF b/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF
index b1b079dc00..161fe04d62 100644
--- a/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF
+++ b/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF
@@ -1,7 +1,7 @@
Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-SymbolicName: org.eclipse.net4j;singleton:=true
-Bundle-Version: 4.6.100.qualifier
+Bundle-Version: 4.7.0.qualifier
Bundle-Name: %pluginName
Bundle-Vendor: %providerName
Bundle-Localization: plugin
@@ -11,7 +11,7 @@ Bundle-RequiredExecutionEnvironment: J2SE-1.5
Bundle-ClassPath: .
Require-Bundle: org.eclipse.core.runtime;bundle-version="[3.5.0,4.0.0)";resolution:=optional,
org.eclipse.net4j.util;bundle-version="[3.0.0,4.0.0)";visibility:=reexport
-Export-Package: org.eclipse.internal.net4j;version="4.6.100";
+Export-Package: org.eclipse.internal.net4j;version="4.7.0";
x-friends:="org.eclipse.net4j.http.server,
org.eclipse.net4j.jvm,
org.eclipse.net4j.tcp,
@@ -19,26 +19,27 @@ Export-Package: org.eclipse.internal.net4j;version="4.6.100";
org.eclipse.net4j.http.common,
org.eclipse.net4j.http.tests,
org.eclipse.net4j.tests",
- org.eclipse.internal.net4j.buffer;version="4.6.100";
+ org.eclipse.internal.net4j.buffer;version="4.7.0";
x-friends:="org.eclipse.net4j.http.server,
org.eclipse.net4j.jvm,
org.eclipse.net4j.tcp,
org.eclipse.net4j.http,
org.eclipse.net4j.http.common,
org.eclipse.net4j.http.tests,
- org.eclipse.net4j.tests",
- org.eclipse.internal.net4j.bundle;version="4.6.100";x-internal:=true,
- org.eclipse.net4j;version="4.6.100",
- org.eclipse.net4j.acceptor;version="4.6.100",
- org.eclipse.net4j.buffer;version="4.6.100",
- org.eclipse.net4j.channel;version="4.6.100",
- org.eclipse.net4j.connector;version="4.6.100",
- org.eclipse.net4j.protocol;version="4.6.100",
- org.eclipse.net4j.signal;version="4.6.100",
- org.eclipse.net4j.signal.confirmation;version="4.6.100",
- org.eclipse.net4j.signal.heartbeat;version="4.6.100",
- org.eclipse.net4j.signal.security;version="4.6.100",
- org.eclipse.net4j.signal.wrapping;version="4.6.100",
- org.eclipse.spi.net4j;version="4.6.100"
+ org.eclipse.net4j.tests,
+ org.eclipse.net4j.trace",
+ org.eclipse.internal.net4j.bundle;version="4.7.0";x-internal:=true,
+ org.eclipse.net4j;version="4.7.0",
+ org.eclipse.net4j.acceptor;version="4.7.0",
+ org.eclipse.net4j.buffer;version="4.7.0",
+ org.eclipse.net4j.channel;version="4.7.0",
+ org.eclipse.net4j.connector;version="4.7.0",
+ org.eclipse.net4j.protocol;version="4.7.0",
+ org.eclipse.net4j.signal;version="4.7.0",
+ org.eclipse.net4j.signal.confirmation;version="4.7.0",
+ org.eclipse.net4j.signal.heartbeat;version="4.7.0",
+ org.eclipse.net4j.signal.security;version="4.7.0",
+ org.eclipse.net4j.signal.wrapping;version="4.7.0",
+ org.eclipse.spi.net4j;version="4.7.0"
Eclipse-BuddyPolicy: registered
Automatic-Module-Name: org.eclipse.net4j
diff --git a/plugins/org.eclipse.net4j/pom.xml b/plugins/org.eclipse.net4j/pom.xml
index c0e6f91acd..de77e1fb5a 100644
--- a/plugins/org.eclipse.net4j/pom.xml
+++ b/plugins/org.eclipse.net4j/pom.xml
@@ -25,7 +25,7 @@
<groupId>org.eclipse.emf.cdo</groupId>
<artifactId>org.eclipse.net4j</artifactId>
- <version>4.6.100-SNAPSHOT</version>
+ <version>4.7.0-SNAPSHOT</version>
<packaging>eclipse-plugin</packaging>
</project>
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/AcknowledgeCompressedStringsIndication.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/AcknowledgeCompressedStringsIndication.java
new file mode 100644
index 0000000000..c073089352
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/AcknowledgeCompressedStringsIndication.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2004-2018 Eike Stepper (Berlin, Germany) and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ */
+package org.eclipse.net4j.signal;
+
+import org.eclipse.net4j.util.io.ExtendedDataInputStream;
+import org.eclipse.net4j.util.io.StringCompressor;
+
+import java.util.Collection;
+
+/**
+ * @author Eike Stepper
+ * @since 4.7
+ */
+public class AcknowledgeCompressedStringsIndication extends Indication
+{
+ public AcknowledgeCompressedStringsIndication(SignalProtocol<?> protocol)
+ {
+ super(protocol, SignalProtocol.SIGNAL_ACKNOWLEDGE_COMPRESSED_STRINGS);
+ }
+
+ @Override
+ protected void indicating(ExtendedDataInputStream in) throws Exception
+ {
+ StringCompressor compressor = getProtocol().getStringCompressor();
+
+ Collection<Integer> acknowledgements = compressor.readAcknowledgements(in);
+ compressor.processAcknowledgements(acknowledgements);
+ }
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/AcknowledgeCompressedStringsRequest.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/AcknowledgeCompressedStringsRequest.java
new file mode 100644
index 0000000000..3cef165cbc
--- /dev/null
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/AcknowledgeCompressedStringsRequest.java
@@ -0,0 +1,28 @@
+package org.eclipse.net4j.signal;
+
+import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
+import org.eclipse.net4j.util.io.StringCompressor;
+
+import java.util.Collection;
+
+/**
+ * @author Eike Stepper
+ * @since 4.7
+ */
+public class AcknowledgeCompressedStringsRequest extends Request
+{
+ private final Collection<Integer> acknowledgements;
+
+ public AcknowledgeCompressedStringsRequest(SignalProtocol<?> protocol, Collection<Integer> acknowledgements)
+ {
+ super(protocol, SignalProtocol.SIGNAL_ACKNOWLEDGE_COMPRESSED_STRINGS);
+ this.acknowledgements = acknowledgements;
+ }
+
+ @Override
+ protected void requesting(ExtendedDataOutputStream out) throws Exception
+ {
+ StringCompressor compressor = getProtocol().getStringCompressor();
+ compressor.writeAcknowledgements(out, acknowledgements);
+ }
+}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java
index 575d70bbc6..4c9a1a298d 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java
@@ -24,7 +24,9 @@ import org.eclipse.net4j.util.event.IListener;
import org.eclipse.net4j.util.io.IORuntimeException;
import org.eclipse.net4j.util.io.IStreamWrapper;
import org.eclipse.net4j.util.io.StreamWrapperChain;
+import org.eclipse.net4j.util.io.StringCompressor;
import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
+import org.eclipse.net4j.util.om.OMPlatform;
import org.eclipse.net4j.util.om.log.OMLogger;
import org.eclipse.net4j.util.om.trace.ContextTracer;
@@ -37,6 +39,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.text.MessageFormat;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -52,6 +55,12 @@ import java.util.Map;
public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> implements ISignalProtocol<INFRA_STRUCTURE>
{
/**
+ * @since 4.7
+ */
+ public static final long COMPRESSED_STRINGS_ACKNOWLEDGE_TIMEOUT = OMPlatform.INSTANCE
+ .getProperty("org.eclipse.net4j.signal.COMPRESSED_STRINGS_ACKNOWLEDGE_TIMEOUT", 5000L);
+
+ /**
* @since 2.0
*/
public static final short SIGNAL_REMOTE_EXCEPTION = -1;
@@ -71,6 +80,11 @@ public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> i
*/
public static final short SIGNAL_SET_TIMEOUT = -4;
+ /**
+ * @since 4.7
+ */
+ public static final short SIGNAL_ACKNOWLEDGE_COMPRESSED_STRINGS = -5;
+
private static final int MIN_CORRELATION_ID = 1;
private static final int MAX_CORRELATION_ID = Integer.MAX_VALUE;
@@ -359,7 +373,12 @@ public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> i
case SIGNAL_SET_TIMEOUT:
return new SetTimeoutIndication(this);
+ case SIGNAL_ACKNOWLEDGE_COMPRESSED_STRINGS:
+ return new AcknowledgeCompressedStringsIndication(this);
+
default:
+ checkStringCompressorAcknowledgements();
+
SignalReactor signal = createSignalReactor(signalID);
if (signal == null)
{
@@ -389,6 +408,14 @@ public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> i
return true;
}
+ /**
+ * @since 4.7
+ */
+ protected StringCompressor getStringCompressor()
+ {
+ return null;
+ }
+
synchronized int getNextCorrelationID()
{
int correlationID = nextCorrelationID;
@@ -463,6 +490,8 @@ public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> i
fireSignalScheduledEvent(signalActor);
signalActor.runSync();
+
+ checkStringCompressorAcknowledgements();
}
void stopSignal(Signal signal, Exception exception)
@@ -545,6 +574,27 @@ public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> i
return timeoutSent;
}
+ private void checkStringCompressorAcknowledgements()
+ {
+ StringCompressor compressor = getStringCompressor();
+ if (compressor != null)
+ {
+ Collection<Integer> acknowledgements = compressor.getPendingAcknowledgements(COMPRESSED_STRINGS_ACKNOWLEDGE_TIMEOUT);
+
+ if (acknowledgements != null)
+ {
+ try
+ {
+ new AcknowledgeCompressedStringsRequest(this, acknowledgements).sendAsync();
+ }
+ catch (Exception ex)
+ {
+ OM.LOG.error(ex);
+ }
+ }
+ }
+ }
+
private void fireSignalScheduledEvent(Signal signal)
{
IListener[] listeners = getListeners();

Back to the top