diff options
13 files changed, 541 insertions, 60 deletions
diff --git a/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/common/protocol/CDOProtocolConstants.java b/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/common/protocol/CDOProtocolConstants.java index d1ac37326d..7b7fc0565c 100644 --- a/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/common/protocol/CDOProtocolConstants.java +++ b/plugins/org.eclipse.emf.cdo.common/src/org/eclipse/emf/cdo/common/protocol/CDOProtocolConstants.java @@ -32,8 +32,9 @@ public interface CDOProtocolConstants * @since 4.2 * @noreference This field is not intended to be referenced by clients. */ - public static final int PROTOCOL_VERSION = 36; // CDOID.NIL + public static final int PROTOCOL_VERSION = 37; // SIGNAL_ACKNOWLEDGE_COMPRESSED_STRINGS + // public static final int PROTOCOL_VERSION = 36; // CDOID.NIL // public static final int PROTOCOL_VERSION = 35; // DiffieHellman.Server.Challenge.getSecretAlgorithmKeyLen() // public static final int PROTOCOL_VERSION = 34; // CDOSessionProtocol.loadMergeData2() // public static final int PROTOCOL_VERSION = 33; // CDOCommitInfo.getMergeSource() diff --git a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/protocol/CDOClientProtocol.java b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/protocol/CDOClientProtocol.java index 8ef48e3cd6..7042dec127 100644 --- a/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/protocol/CDOClientProtocol.java +++ b/plugins/org.eclipse.emf.cdo.net4j/src/org/eclipse/emf/cdo/internal/net4j/protocol/CDOClientProtocol.java @@ -61,6 +61,7 @@ import org.eclipse.net4j.util.collection.Pair; import org.eclipse.net4j.util.concurrent.IRWLockManager.LockType; import org.eclipse.net4j.util.io.StringCompressor; import org.eclipse.net4j.util.io.StringIO; +import org.eclipse.net4j.util.om.OMPlatform; import org.eclipse.net4j.util.om.monitor.Monitor; import org.eclipse.net4j.util.om.monitor.OMMonitor; import org.eclipse.net4j.util.om.trace.PerfTracer; @@ -89,7 +90,10 @@ public class CDOClientProtocol extends AuthenticatingSignalProtocol<CDOSessionIm { private static final PerfTracer REVISION_LOADING = new PerfTracer(OM.PERF_REVISION_LOADING, CDOClientProtocol.class); - private StringIO packageURICompressor = StringCompressor.BYPASS ? StringIO.DIRECT : new StringCompressor(true); + private static final boolean COMPRESS_PACKAGE_URIS = OMPlatform.INSTANCE.isProperty("org.eclipse.emf.cdo.protocol.compressPackageURIs", + !StringCompressor.BYPASS); + + private StringIO packageURICompressor = COMPRESS_PACKAGE_URIS ? new StringCompressor(true) : StringIO.DIRECT; public CDOClientProtocol() { @@ -542,6 +546,17 @@ public class CDOClientProtocol extends AuthenticatingSignalProtocol<CDOSessionIm } @Override + protected StringCompressor getStringCompressor() + { + if (COMPRESS_PACKAGE_URIS) + { + return (StringCompressor)packageURICompressor; + } + + return super.getStringCompressor(); + } + + @Override protected SignalReactor createSignalReactor(short signalID) { switch (signalID) diff --git a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/CDOServerProtocol.java b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/CDOServerProtocol.java index 37fa9d40a9..761cd0c69f 100644 --- a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/CDOServerProtocol.java +++ b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/CDOServerProtocol.java @@ -34,6 +34,7 @@ import org.eclipse.net4j.signal.security.AuthenticationRequest; import org.eclipse.net4j.util.io.StringCompressor; import org.eclipse.net4j.util.io.StringIO; import org.eclipse.net4j.util.lifecycle.LifecycleUtil; +import org.eclipse.net4j.util.om.OMPlatform; import org.eclipse.net4j.util.om.monitor.Monitor; import org.eclipse.net4j.util.om.trace.ContextTracer; import org.eclipse.net4j.util.security.CredentialsUpdateOperation; @@ -49,12 +50,15 @@ public class CDOServerProtocol extends SignalProtocol<InternalSession> implement private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_PROTOCOL, CDOServerProtocol.class); + private static final boolean COMPRESS_PACKAGE_URIS = OMPlatform.INSTANCE.isProperty("org.eclipse.emf.cdo.protocol.compressPackageURIs", + !StringCompressor.BYPASS); + + private StringIO packageURICompressor = COMPRESS_PACKAGE_URIS ? new StringCompressor(false) : StringIO.DIRECT; + private long negotiationTimeout = DEFAULT_NEGOTIATION_TIMEOUT; private IRepositoryProvider repositoryProvider; - private StringIO packageURICompressor = StringCompressor.BYPASS ? StringIO.DIRECT : new StringCompressor(false); - public CDOServerProtocol(IRepositoryProvider repositoryProvider) { super(PROTOCOL_NAME); @@ -225,6 +229,17 @@ public class CDOServerProtocol extends SignalProtocol<InternalSession> implement } @Override + protected StringCompressor getStringCompressor() + { + if (COMPRESS_PACKAGE_URIS) + { + return (StringCompressor)packageURICompressor; + } + + return super.getStringCompressor(); + } + + @Override protected SignalReactor createSignalReactor(short signalID) { switch (signalID) diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_340709_Test.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_340709_Test.java index d9c31363d6..6c2929cfb7 100644 --- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_340709_Test.java +++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_340709_Test.java @@ -27,7 +27,7 @@ import java.util.concurrent.CountDownLatch; */ public class Bugzilla_340709_Test extends AbstractCDOTest { - CountDownLatch latch = new CountDownLatch(1); + private transient CountDownLatch latch = new CountDownLatch(1); public void test() throws Exception { diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_441136_Test.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_441136_Test.java index a6c2d45e31..b68826183b 100644 --- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_441136_Test.java +++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_441136_Test.java @@ -24,6 +24,8 @@ import org.eclipse.emf.cdo.tests.AbstractCDOTest; import org.eclipse.emf.cdo.tests.model1.Company; import org.eclipse.emf.cdo.transaction.CDOTransaction; +import org.eclipse.net4j.signal.AcknowledgeCompressedStringsIndication; +import org.eclipse.net4j.signal.AcknowledgeCompressedStringsRequest; import org.eclipse.net4j.signal.ISignalProtocol; import org.eclipse.net4j.signal.IndicationWithMonitoring; import org.eclipse.net4j.signal.MonitorProgressIndication; @@ -72,30 +74,35 @@ public class Bugzilla_441136_Test extends AbstractCDOTest Company company = getModel1Factory().createCompany(); resource.getContents().add(company); transaction.commit(useMonitor ? new NullProgressMonitor() : null); - String assertMessage = " differents kinds of requests should have been sent, QueryRequest, QueryCancel, LoadRevisionsRequest and CommitTransactionRequest"; + + signalCounter.removeCountFor(AcknowledgeCompressedStringsRequest.class); + signalCounter.removeCountFor(AcknowledgeCompressedStringsIndication.class); + int nbExpectedCalls; - if (!useMonitor) + String assertMessage = " differents kinds of requests should have been sent, QueryRequest, QueryCancel, LoadRevisionsRequest and CommitTransactionRequest"; + + if (useMonitor) { + nbExpectedCalls = 5; + assertMessage += " and MonitorProgressIndications should have been received"; + // QueryRequest, QueryCancel are used to get the resourcePath - nbExpectedCalls = 4; assertEquals(nbExpectedCalls + assertMessage, nbExpectedCalls, signalCounter.getCountForSignalTypes()); assertNotSame(0, signalCounter.getCountFor(QueryRequest.class)); assertNotSame(0, signalCounter.getCountFor(QueryCancelRequest.class)); assertNotSame(0, signalCounter.getCountFor(LoadRevisionsRequest.class)); assertNotSame(0, signalCounter.getCountFor(CommitTransactionRequest.class)); + assertNotSame(0, signalCounter.getCountFor(MonitorProgressIndication.class)); } else { - nbExpectedCalls = 5; - assertMessage += " and MonitorProgressIndications should have been received"; - // QueryRequest, QueryCancel are used to get the resourcePath + nbExpectedCalls = 4; assertEquals(nbExpectedCalls + assertMessage, nbExpectedCalls, signalCounter.getCountForSignalTypes()); assertNotSame(0, signalCounter.getCountFor(QueryRequest.class)); assertNotSame(0, signalCounter.getCountFor(QueryCancelRequest.class)); assertNotSame(0, signalCounter.getCountFor(LoadRevisionsRequest.class)); assertNotSame(0, signalCounter.getCountFor(CommitTransactionRequest.class)); - assertNotSame(0, signalCounter.getCountFor(MonitorProgressIndication.class)); } protocol.removeListener(signalCounter); diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_517225_Test.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_517225_Test.java new file mode 100644 index 0000000000..2842217c54 --- /dev/null +++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/bugzilla/Bugzilla_517225_Test.java @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2018 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.emf.cdo.tests.bugzilla; + +import org.eclipse.emf.cdo.eresource.CDOResource; +import org.eclipse.emf.cdo.internal.net4j.protocol.CDOClientProtocol; +import org.eclipse.emf.cdo.net4j.CDONet4jSession; +import org.eclipse.emf.cdo.session.CDOSession; +import org.eclipse.emf.cdo.tests.AbstractCDOTest; +import org.eclipse.emf.cdo.tests.model1.Category; +import org.eclipse.emf.cdo.tests.model1.Company; +import org.eclipse.emf.cdo.tests.model1.Product1; +import org.eclipse.emf.cdo.transaction.CDOTransaction; + +import org.eclipse.net4j.signal.SignalProtocol; +import org.eclipse.net4j.util.ReflectUtil; +import org.eclipse.net4j.util.io.StringCompressor; + +import org.eclipse.emf.common.util.EList; +import org.eclipse.emf.ecore.EObject; +import org.eclipse.emf.spi.cdo.CDOSessionProtocol; + +import java.util.Collection; +import java.util.Iterator; + +/** + * Bug 517225: StringCompressor can create huge memory leak + * + * @author Eike Stepper + */ +public class Bugzilla_517225_Test extends AbstractCDOTest +{ + private static final int LEVELS = 3; + + private static final int CATEGORIES = 3; + + private static final int PRODUCTS = 15; + + @SuppressWarnings("unchecked") + public void testStringCompressorLeak() throws Exception + { + // Initialize model. + { + CDOSession session = openSession(); + CDOTransaction transaction = session.openTransaction(); + CDOResource resource = transaction.createResource(getResourcePath("res")); + + Category category = getModel1Factory().createCategory(); + category.setName("ROOT"); + + Company company = getModel1Factory().createCompany(); + company.getCategories().add(category); + resource.getContents().add(company); + + createModel(category, LEVELS); + transaction.commit(); + } + + CDONet4jSession session = (CDONet4jSession)openSession(); + CDOClientProtocol protocol = (CDOClientProtocol)(CDOSessionProtocol)session.options().getNet4jProtocol(); + StringCompressor compressor = (StringCompressor)protocol.getPackageURICompressor(); + + Collection<Integer> pendingAcknowledgements = (Collection<Integer>)ReflectUtil + .getValue(ReflectUtil.getField(StringCompressor.class, "pendingAcknowledgements"), compressor); + + CDOTransaction transaction = session.openTransaction(); + CDOResource resource = transaction.getResource(getResourcePath("res")); + for (Iterator<EObject> it = resource.eAllContents(); it.hasNext();) + { + it.next(); + } + + // Should be 9 with a HashSet instead of 728 with an ArrayList. + assertTrue("pendingAcknowledgements: " + pendingAcknowledgements, pendingAcknowledgements.size() < 10); + + sleep(SignalProtocol.COMPRESSED_STRINGS_ACKNOWLEDGE_TIMEOUT + 1000); + session.openView(); + + pendingAcknowledgements = (Collection<Integer>)ReflectUtil.getValue(ReflectUtil.getField(StringCompressor.class, "pendingAcknowledgements"), compressor); + assertEquals("pendingAcknowledgements: " + pendingAcknowledgements, 0, pendingAcknowledgements.size()); + } + + private void createModel(Category parent, int levels) + { + EList<Category> categories = parent.getCategories(); + for (int i = 0; i < CATEGORIES; i++) + { + Category category = getModel1Factory().createCategory(); + category.setName("Category" + levels + "-" + i); + categories.add(category); + } + + EList<Product1> products = parent.getProducts(); + for (int i = 0; i < PRODUCTS; i++) + { + Product1 product = getModel1Factory().createProduct1(); + product.setName("Product" + levels + "-" + i); + products.add(product); + } + + if (levels > 0) + { + for (Category category : categories) + { + createModel(category, levels - 1); + } + } + } +} diff --git a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/StringCompressor.java b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/StringCompressor.java index e44cfe3bcc..cac5414e89 100644 --- a/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/StringCompressor.java +++ b/plugins/org.eclipse.net4j.util/src/org/eclipse/net4j/util/io/StringCompressor.java @@ -11,12 +11,13 @@ package org.eclipse.net4j.util.io; import org.eclipse.net4j.util.CheckUtil; +import org.eclipse.net4j.util.om.OMPlatform; import java.io.IOException; import java.text.MessageFormat; -import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; -import java.util.List; +import java.util.HashSet; import java.util.Map; /** @@ -28,7 +29,7 @@ public class StringCompressor implements StringIO /** * @since 3.0 */ - public static boolean BYPASS = false; + public static boolean BYPASS = OMPlatform.INSTANCE.isProperty("org.eclipse.net4j.util.io.StringCompressor.BYPASS"); private static final int NULL_ID = 0; @@ -56,7 +57,9 @@ public class StringCompressor implements StringIO private Map<Integer, String> idToString = new HashMap<Integer, String>(); - private List<Integer> pendingAcknowledgements = new ArrayList<Integer>(); + private Collection<Integer> pendingAcknowledgements = createAcknowledgementCollection(); + + private long lastAcknowledgementCheck; /** * Creates a StringCompressor instance. @@ -88,8 +91,9 @@ public class StringCompressor implements StringIO } ID id; - List<Integer> acknowledgements = null; + Collection<Integer> acknowledgements = null; boolean stringFollows = false; + synchronized (this) { id = stringToID.get(string); @@ -107,10 +111,11 @@ public class StringCompressor implements StringIO stringFollows = true; } + lastAcknowledgementCheck = System.currentTimeMillis(); if (!pendingAcknowledgements.isEmpty()) { acknowledgements = pendingAcknowledgements; - pendingAcknowledgements = new ArrayList<Integer>(); + pendingAcknowledgements = createAcknowledgementCollection(); } } @@ -125,16 +130,7 @@ public class StringCompressor implements StringIO writeString(out, string); } - if (acknowledgements != null) - { - for (int ack : acknowledgements) - { - writeByte(out, ACK_FOLLOWS); - writeInt(out, ack); - } - } - - writeByte(out, NOTHING_FOLLOWS); + writeAcknowledgements(out, acknowledgements); } else { @@ -156,7 +152,7 @@ public class StringCompressor implements StringIO } String string = null; - List<Integer> acks = null; + Collection<Integer> acks = null; if (id == INFO_FOLLOWS) { id = readInt(in); @@ -178,7 +174,7 @@ public class StringCompressor implements StringIO case ACK_FOLLOWS: if (acks == null) { - acks = new ArrayList<Integer>(); + acks = createAcknowledgementCollection(); } acks.add(readInt(in)); @@ -192,7 +188,8 @@ public class StringCompressor implements StringIO synchronized (this) { - acknowledge(acks); + processAcknowledgements(acks); + if (string != null) { stringToID.put(string, new ID(id)); @@ -212,31 +209,92 @@ public class StringCompressor implements StringIO return string; } - @Override - public String toString() + /** + * @since 3.8 + */ + public Collection<Integer> getPendingAcknowledgements(long timeout) { - return MessageFormat.format("StringCompressor[client={0}]", client); //$NON-NLS-1$ + Collection<Integer> acknowledgements = null; + + synchronized (this) + { + long now = System.currentTimeMillis(); + if (lastAcknowledgementCheck + timeout < now) + { + lastAcknowledgementCheck = now; + if (!pendingAcknowledgements.isEmpty()) + { + acknowledgements = pendingAcknowledgements; + pendingAcknowledgements = createAcknowledgementCollection(); + } + } + } + + return acknowledgements; + } + + /** + * @since 3.8 + */ + public void writeAcknowledgements(ExtendedDataOutput out, Collection<Integer> acknowledgements) throws IOException + { + if (acknowledgements != null) + { + for (int ack : acknowledgements) + { + writeByte(out, ACK_FOLLOWS); + writeInt(out, ack); + } + } + + writeByte(out, NOTHING_FOLLOWS); + } + + /** + * @since 3.8 + */ + public Collection<Integer> readAcknowledgements(ExtendedDataInput in) throws IOException + { + Collection<Integer> acknowledgements = createAcknowledgementCollection(); + while (in.readByte() == ACK_FOLLOWS) + { + acknowledgements.add(in.readInt()); + } + + return acknowledgements; } - private void acknowledge(List<Integer> acks) + /** + * @since 3.8 + */ + public void processAcknowledgements(Collection<Integer> acknowledgements) { - if (acks != null) + if (acknowledgements != null) { - for (int value : acks) + synchronized (this) { - String string = idToString.get(value); - if (string != null) + for (int value : acknowledgements) { - ID id = stringToID.get(string); - if (id != null) + String string = idToString.get(value); + if (string != null) { - id.setAcknowledged(); + ID id = stringToID.get(string); + if (id != null) + { + id.setAcknowledged(); + } } } } } } + @Override + public String toString() + { + return MessageFormat.format("StringCompressor[client={0}]", client); //$NON-NLS-1$ + } + private void writeByte(ExtendedDataOutput out, byte value) throws IOException { if (DEBUG) @@ -375,6 +433,11 @@ public class StringCompressor implements StringIO IOUtil.OUT().println(msg); } + private static Collection<Integer> createAcknowledgementCollection() + { + return new HashSet<Integer>(); + } + /** * @author Eike Stepper */ diff --git a/plugins/org.eclipse.net4j/.settings/.api_filters b/plugins/org.eclipse.net4j/.settings/.api_filters index e9b9fc76f6..14ee494c23 100644 --- a/plugins/org.eclipse.net4j/.settings/.api_filters +++ b/plugins/org.eclipse.net4j/.settings/.api_filters @@ -1,6 +1,44 @@ <?xml version="1.0" encoding="UTF-8" standalone="no"?> <component id="org.eclipse.net4j" version="2"> + <resource path="src/org/eclipse/internal/net4j/TransportConfig.java" type="org.eclipse.internal.net4j.TransportConfig"> + <filter id="574619656"> + <message_arguments> + <message_argument value="ITransportConfig"/> + <message_argument value="TransportConfig"/> + </message_arguments> + </filter> + </resource> + <resource path="src/org/eclipse/internal/net4j/buffer/Buffer.java" type="org.eclipse.internal.net4j.buffer.Buffer"> + <filter id="574619656"> + <message_arguments> + <message_argument value="InternalBuffer"/> + <message_argument value="Buffer"/> + </message_arguments> + </filter> + </resource> + <resource path="src/org/eclipse/internal/net4j/buffer/BufferProvider.java" type="org.eclipse.internal.net4j.buffer.BufferProvider$BufferProviderEvent$Provided"> + <filter id="574619656"> + <message_arguments> + <message_argument value="BufferProvidedEvent"/> + <message_argument value="Provided"/> + </message_arguments> + </filter> + </resource> + <resource path="src/org/eclipse/internal/net4j/buffer/BufferProvider.java" type="org.eclipse.internal.net4j.buffer.BufferProvider$BufferProviderEvent$Retaining"> + <filter id="574619656"> + <message_arguments> + <message_argument value="BufferRetainingEvent"/> + <message_argument value="Retaining"/> + </message_arguments> + </filter> + </resource> <resource path="src/org/eclipse/net4j/connector/IConnector.java" type="org.eclipse.net4j.connector.IConnector"> + <filter id="571473929"> + <message_arguments> + <message_argument value="IChannelMultiplexer"/> + <message_argument value="IConnector"/> + </message_arguments> + </filter> <filter id="576778288"> <message_arguments> <message_argument value="IChannelMultiplexer"/> @@ -8,6 +46,14 @@ </message_arguments> </filter> </resource> + <resource path="src/org/eclipse/net4j/connector/IConnectorStateEvent.java" type="org.eclipse.net4j.connector.IConnectorStateEvent"> + <filter id="571473929"> + <message_arguments> + <message_argument value="IConnectorEvent"/> + <message_argument value="IConnectorStateEvent"/> + </message_arguments> + </filter> + </resource> <resource path="src/org/eclipse/net4j/connector/IServerConnector.java" type="org.eclipse.net4j.connector.IServerConnector"> <filter id="576720909"> <message_arguments> @@ -16,6 +62,89 @@ </message_arguments> </filter> </resource> + <resource path="src/org/eclipse/net4j/signal/SignalProtocol.java" type="org.eclipse.net4j.signal.SignalProtocol"> + <filter id="336658481"> + <message_arguments> + <message_argument value="org.eclipse.net4j.signal.SignalProtocol"/> + <message_argument value="COMPRESSED_STRINGS_ACKNOWLEDGE_TIMEOUT"/> + </message_arguments> + </filter> + <filter id="336658481"> + <message_arguments> + <message_argument value="org.eclipse.net4j.signal.SignalProtocol"/> + <message_argument value="SIGNAL_ACKNOWLEDGE_COMPRESSED_STRINGS"/> + </message_arguments> + </filter> + </resource> + <resource path="src/org/eclipse/spi/net4j/Acceptor.java" type="org.eclipse.spi.net4j.Acceptor"> + <filter id="574668824"> + <message_arguments> + <message_argument value="InternalAcceptor"/> + <message_argument value="Acceptor"/> + <message_argument value="IAcceptor"/> + </message_arguments> + </filter> + </resource> + <resource path="src/org/eclipse/spi/net4j/Channel.java" type="org.eclipse.spi.net4j.Channel"> + <filter id="574668824"> + <message_arguments> + <message_argument value="InternalChannel"/> + <message_argument value="Channel"/> + <message_argument value="IChannel"/> + </message_arguments> + </filter> + </resource> + <resource path="src/org/eclipse/spi/net4j/Channel.java" type="org.eclipse.spi.net4j.Channel$SendQueue"> + <filter id="338792546"> + <message_arguments> + <message_argument value="org.eclipse.spi.net4j.Channel.SendQueue"/> + <message_argument value="add(IBuffer)"/> + </message_arguments> + </filter> + <filter id="338792546"> + <message_arguments> + <message_argument value="org.eclipse.spi.net4j.Channel.SendQueue"/> + <message_argument value="remove()"/> + </message_arguments> + </filter> + </resource> + <resource path="src/org/eclipse/spi/net4j/Channel.java" type="org.eclipse.spi.net4j.Channel$SendQueueEventImpl"> + <filter id="574619656"> + <message_arguments> + <message_argument value="SendQueueEvent"/> + <message_argument value="SendQueueEventImpl"/> + </message_arguments> + </filter> + </resource> + <resource path="src/org/eclipse/spi/net4j/ChannelMultiplexer.java"> + <filter id="0"/> + </resource> + <resource path="src/org/eclipse/spi/net4j/ChannelMultiplexer.java" type="org.eclipse.spi.net4j.ChannelMultiplexer"> + <filter id="574668824"> + <message_arguments> + <message_argument value="InternalChannelMultiplexer"/> + <message_argument value="ChannelMultiplexer"/> + <message_argument value="IChannelMultiplexer"/> + </message_arguments> + </filter> + </resource> + <resource path="src/org/eclipse/spi/net4j/Connector.java" type="org.eclipse.spi.net4j.Connector"> + <filter id="574668824"> + <message_arguments> + <message_argument value="InternalConnector"/> + <message_argument value="Connector"/> + <message_argument value="IConnector"/> + </message_arguments> + </filter> + </resource> + <resource path="src/org/eclipse/spi/net4j/Connector.java" type="org.eclipse.spi.net4j.Connector$ConnectorStateEvent"> + <filter id="574619656"> + <message_arguments> + <message_argument value="IConnectorStateEvent"/> + <message_argument value="ConnectorStateEvent"/> + </message_arguments> + </filter> + </resource> <resource path="src/org/eclipse/spi/net4j/InternalAcceptor.java" type="org.eclipse.spi.net4j.InternalAcceptor"> <filter id="576720909"> <message_arguments> @@ -25,6 +154,12 @@ </filter> </resource> <resource path="src/org/eclipse/spi/net4j/InternalBuffer.java" type="org.eclipse.spi.net4j.InternalBuffer"> + <filter id="571473929"> + <message_arguments> + <message_argument value="IBuffer"/> + <message_argument value="InternalBuffer"/> + </message_arguments> + </filter> <filter id="576778288"> <message_arguments> <message_argument value="IBuffer"/> @@ -33,6 +168,12 @@ </filter> </resource> <resource path="src/org/eclipse/spi/net4j/InternalChannel.java" type="org.eclipse.spi.net4j.InternalChannel"> + <filter id="571473929"> + <message_arguments> + <message_argument value="IChannel"/> + <message_argument value="InternalChannel"/> + </message_arguments> + </filter> <filter id="576778288"> <message_arguments> <message_argument value="IChannel"/> @@ -53,6 +194,12 @@ <message_argument value="CONTEXT_MULTIPLEXER"/> </message_arguments> </filter> + <filter id="571473929"> + <message_arguments> + <message_argument value="IChannelMultiplexer"/> + <message_argument value="InternalChannelMultiplexer"/> + </message_arguments> + </filter> <filter id="576778288"> <message_arguments> <message_argument value="IChannelMultiplexer"/> diff --git a/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF b/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF index b1b079dc00..161fe04d62 100644 --- a/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF +++ b/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF @@ -1,7 +1,7 @@ Manifest-Version: 1.0 Bundle-ManifestVersion: 2 Bundle-SymbolicName: org.eclipse.net4j;singleton:=true -Bundle-Version: 4.6.100.qualifier +Bundle-Version: 4.7.0.qualifier Bundle-Name: %pluginName Bundle-Vendor: %providerName Bundle-Localization: plugin @@ -11,7 +11,7 @@ Bundle-RequiredExecutionEnvironment: J2SE-1.5 Bundle-ClassPath: . Require-Bundle: org.eclipse.core.runtime;bundle-version="[3.5.0,4.0.0)";resolution:=optional, org.eclipse.net4j.util;bundle-version="[3.0.0,4.0.0)";visibility:=reexport -Export-Package: org.eclipse.internal.net4j;version="4.6.100"; +Export-Package: org.eclipse.internal.net4j;version="4.7.0"; x-friends:="org.eclipse.net4j.http.server, org.eclipse.net4j.jvm, org.eclipse.net4j.tcp, @@ -19,26 +19,27 @@ Export-Package: org.eclipse.internal.net4j;version="4.6.100"; org.eclipse.net4j.http.common, org.eclipse.net4j.http.tests, org.eclipse.net4j.tests", - org.eclipse.internal.net4j.buffer;version="4.6.100"; + org.eclipse.internal.net4j.buffer;version="4.7.0"; x-friends:="org.eclipse.net4j.http.server, org.eclipse.net4j.jvm, org.eclipse.net4j.tcp, org.eclipse.net4j.http, org.eclipse.net4j.http.common, org.eclipse.net4j.http.tests, - org.eclipse.net4j.tests", - org.eclipse.internal.net4j.bundle;version="4.6.100";x-internal:=true, - org.eclipse.net4j;version="4.6.100", - org.eclipse.net4j.acceptor;version="4.6.100", - org.eclipse.net4j.buffer;version="4.6.100", - org.eclipse.net4j.channel;version="4.6.100", - org.eclipse.net4j.connector;version="4.6.100", - org.eclipse.net4j.protocol;version="4.6.100", - org.eclipse.net4j.signal;version="4.6.100", - org.eclipse.net4j.signal.confirmation;version="4.6.100", - org.eclipse.net4j.signal.heartbeat;version="4.6.100", - org.eclipse.net4j.signal.security;version="4.6.100", - org.eclipse.net4j.signal.wrapping;version="4.6.100", - org.eclipse.spi.net4j;version="4.6.100" + org.eclipse.net4j.tests, + org.eclipse.net4j.trace", + org.eclipse.internal.net4j.bundle;version="4.7.0";x-internal:=true, + org.eclipse.net4j;version="4.7.0", + org.eclipse.net4j.acceptor;version="4.7.0", + org.eclipse.net4j.buffer;version="4.7.0", + org.eclipse.net4j.channel;version="4.7.0", + org.eclipse.net4j.connector;version="4.7.0", + org.eclipse.net4j.protocol;version="4.7.0", + org.eclipse.net4j.signal;version="4.7.0", + org.eclipse.net4j.signal.confirmation;version="4.7.0", + org.eclipse.net4j.signal.heartbeat;version="4.7.0", + org.eclipse.net4j.signal.security;version="4.7.0", + org.eclipse.net4j.signal.wrapping;version="4.7.0", + org.eclipse.spi.net4j;version="4.7.0" Eclipse-BuddyPolicy: registered Automatic-Module-Name: org.eclipse.net4j diff --git a/plugins/org.eclipse.net4j/pom.xml b/plugins/org.eclipse.net4j/pom.xml index c0e6f91acd..de77e1fb5a 100644 --- a/plugins/org.eclipse.net4j/pom.xml +++ b/plugins/org.eclipse.net4j/pom.xml @@ -25,7 +25,7 @@ <groupId>org.eclipse.emf.cdo</groupId> <artifactId>org.eclipse.net4j</artifactId> - <version>4.6.100-SNAPSHOT</version> + <version>4.7.0-SNAPSHOT</version> <packaging>eclipse-plugin</packaging> </project> diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/AcknowledgeCompressedStringsIndication.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/AcknowledgeCompressedStringsIndication.java new file mode 100644 index 0000000000..c073089352 --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/AcknowledgeCompressedStringsIndication.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2004-2018 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.signal; + +import org.eclipse.net4j.util.io.ExtendedDataInputStream; +import org.eclipse.net4j.util.io.StringCompressor; + +import java.util.Collection; + +/** + * @author Eike Stepper + * @since 4.7 + */ +public class AcknowledgeCompressedStringsIndication extends Indication +{ + public AcknowledgeCompressedStringsIndication(SignalProtocol<?> protocol) + { + super(protocol, SignalProtocol.SIGNAL_ACKNOWLEDGE_COMPRESSED_STRINGS); + } + + @Override + protected void indicating(ExtendedDataInputStream in) throws Exception + { + StringCompressor compressor = getProtocol().getStringCompressor(); + + Collection<Integer> acknowledgements = compressor.readAcknowledgements(in); + compressor.processAcknowledgements(acknowledgements); + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/AcknowledgeCompressedStringsRequest.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/AcknowledgeCompressedStringsRequest.java new file mode 100644 index 0000000000..3cef165cbc --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/AcknowledgeCompressedStringsRequest.java @@ -0,0 +1,28 @@ +package org.eclipse.net4j.signal; + +import org.eclipse.net4j.util.io.ExtendedDataOutputStream; +import org.eclipse.net4j.util.io.StringCompressor; + +import java.util.Collection; + +/** + * @author Eike Stepper + * @since 4.7 + */ +public class AcknowledgeCompressedStringsRequest extends Request +{ + private final Collection<Integer> acknowledgements; + + public AcknowledgeCompressedStringsRequest(SignalProtocol<?> protocol, Collection<Integer> acknowledgements) + { + super(protocol, SignalProtocol.SIGNAL_ACKNOWLEDGE_COMPRESSED_STRINGS); + this.acknowledgements = acknowledgements; + } + + @Override + protected void requesting(ExtendedDataOutputStream out) throws Exception + { + StringCompressor compressor = getProtocol().getStringCompressor(); + compressor.writeAcknowledgements(out, acknowledgements); + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java index 575d70bbc6..4c9a1a298d 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java @@ -24,7 +24,9 @@ import org.eclipse.net4j.util.event.IListener; import org.eclipse.net4j.util.io.IORuntimeException; import org.eclipse.net4j.util.io.IStreamWrapper; import org.eclipse.net4j.util.io.StreamWrapperChain; +import org.eclipse.net4j.util.io.StringCompressor; import org.eclipse.net4j.util.lifecycle.LifecycleUtil; +import org.eclipse.net4j.util.om.OMPlatform; import org.eclipse.net4j.util.om.log.OMLogger; import org.eclipse.net4j.util.om.trace.ContextTracer; @@ -37,6 +39,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.text.MessageFormat; +import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -52,6 +55,12 @@ import java.util.Map; public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> implements ISignalProtocol<INFRA_STRUCTURE> { /** + * @since 4.7 + */ + public static final long COMPRESSED_STRINGS_ACKNOWLEDGE_TIMEOUT = OMPlatform.INSTANCE + .getProperty("org.eclipse.net4j.signal.COMPRESSED_STRINGS_ACKNOWLEDGE_TIMEOUT", 5000L); + + /** * @since 2.0 */ public static final short SIGNAL_REMOTE_EXCEPTION = -1; @@ -71,6 +80,11 @@ public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> i */ public static final short SIGNAL_SET_TIMEOUT = -4; + /** + * @since 4.7 + */ + public static final short SIGNAL_ACKNOWLEDGE_COMPRESSED_STRINGS = -5; + private static final int MIN_CORRELATION_ID = 1; private static final int MAX_CORRELATION_ID = Integer.MAX_VALUE; @@ -359,7 +373,12 @@ public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> i case SIGNAL_SET_TIMEOUT: return new SetTimeoutIndication(this); + case SIGNAL_ACKNOWLEDGE_COMPRESSED_STRINGS: + return new AcknowledgeCompressedStringsIndication(this); + default: + checkStringCompressorAcknowledgements(); + SignalReactor signal = createSignalReactor(signalID); if (signal == null) { @@ -389,6 +408,14 @@ public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> i return true; } + /** + * @since 4.7 + */ + protected StringCompressor getStringCompressor() + { + return null; + } + synchronized int getNextCorrelationID() { int correlationID = nextCorrelationID; @@ -463,6 +490,8 @@ public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> i fireSignalScheduledEvent(signalActor); signalActor.runSync(); + + checkStringCompressorAcknowledgements(); } void stopSignal(Signal signal, Exception exception) @@ -545,6 +574,27 @@ public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> i return timeoutSent; } + private void checkStringCompressorAcknowledgements() + { + StringCompressor compressor = getStringCompressor(); + if (compressor != null) + { + Collection<Integer> acknowledgements = compressor.getPendingAcknowledgements(COMPRESSED_STRINGS_ACKNOWLEDGE_TIMEOUT); + + if (acknowledgements != null) + { + try + { + new AcknowledgeCompressedStringsRequest(this, acknowledgements).sendAsync(); + } + catch (Exception ex) + { + OM.LOG.error(ex); + } + } + } + } + private void fireSignalScheduledEvent(Signal signal) { IListener[] listeners = getListeners(); |