diff options
21 files changed, 319 insertions, 82 deletions
diff --git a/plugins/org.eclipse.emf.cdo.explorer.ui/src/org/eclipse/emf/cdo/explorer/ui/repositories/wizards/MasterRepositoryController.java b/plugins/org.eclipse.emf.cdo.explorer.ui/src/org/eclipse/emf/cdo/explorer/ui/repositories/wizards/MasterRepositoryController.java index 08816eefbe..2784e9fd43 100644 --- a/plugins/org.eclipse.emf.cdo.explorer.ui/src/org/eclipse/emf/cdo/explorer/ui/repositories/wizards/MasterRepositoryController.java +++ b/plugins/org.eclipse.emf.cdo.explorer.ui/src/org/eclipse/emf/cdo/explorer/ui/repositories/wizards/MasterRepositoryController.java @@ -18,9 +18,12 @@ import org.eclipse.emf.cdo.common.CDOCommonRepository.IDGenerationLocation; import org.eclipse.emf.cdo.explorer.ui.ViewerUtil; import org.eclipse.emf.cdo.explorer.ui.bundle.OM; import org.eclipse.emf.cdo.net4j.CDONet4jSession; +import org.eclipse.emf.cdo.net4j.CDONet4jSessionConfiguration; import org.eclipse.emf.cdo.net4j.CDONet4jUtil; import org.eclipse.emf.cdo.session.CDORepositoryInfo; +import org.eclipse.net4j.Net4jUtil; +import org.eclipse.net4j.connector.IConnector; import org.eclipse.net4j.util.container.ContainerEventAdapter; import org.eclipse.net4j.util.container.ContainerUtil; import org.eclipse.net4j.util.container.IContainer; @@ -467,8 +470,6 @@ public class MasterRepositoryController */ private final class RepositoryValidatingText extends ValidatingText { - private CDONet4jSession session; - private RepositoryValidatingText(Composite parent) { super(parent, 120); @@ -477,7 +478,6 @@ public class MasterRepositoryController @Override protected String getValidationInfo() { - session = null; if (connectorDescription == null) { return null; @@ -495,40 +495,69 @@ public class MasterRepositoryController @Override protected String validate(String repositoryName) throws Exception { - String description = "tcp://" + connectorDescription + "?repositoryName=" + repositoryName; + CDONet4jSession session = null; try { - session = CDONet4jUtil.getNet4jSession(container, description); - if (session != null && session.isClosed()) + IConnector connector = null; + + try + { + connector = Net4jUtil.getConnector(container, "tcp", connectorDescription); + } + catch (Exception ex) + { + connector = null; + } + + if (connector == null) + { + throw new Exception("Host unreachable"); + } + + try + { + CDONet4jSessionConfiguration config = CDONet4jUtil.createNet4jSessionConfiguration(); + config.setConnector(connector); + config.setRepositoryName(repositoryName); + // config.setCredentialsProvider(this); + + session = config.openNet4jSession(); + if (session != null && session.isClosed()) + { + session = null; + } + } + catch (Exception ex) { session = null; } - } - catch (Exception ex) - { - //$FALL-THROUGH$ - } - if (session == null) - { - throw new Exception("Unreachable"); - } + if (session == null) + { + throw new Exception("Repository unreachable"); + } - CDORepositoryInfo repositoryInfo = session.getRepositoryInfo(); - String message = getMode(repositoryInfo); + CDORepositoryInfo repositoryInfo = session.getRepositoryInfo(); + String message = getMode(repositoryInfo); - if (repositoryInfo.isSupportingBranches() - && repositoryInfo.getIDGenerationLocation() == IDGenerationLocation.CLIENT) - { - message += ", Replicable"; + if (repositoryInfo.isSupportingBranches() + && repositoryInfo.getIDGenerationLocation() == IDGenerationLocation.CLIENT) + { + message += ", Replicable"; + } + + if (repositoryInfo.isAuthenticating()) + { + message += ", Authenticating"; + } + + return message; } - else + finally { - message += ", Non-Replicable"; + LifecycleUtil.deactivate(session); } - - return message; } private String getMode(CDORepositoryInfo repositoryInfo) diff --git a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/OpenSessionIndication.java b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/OpenSessionIndication.java index fca30b0589..7645d24432 100644 --- a/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/OpenSessionIndication.java +++ b/plugins/org.eclipse.emf.cdo.server.net4j/src/org/eclipse/emf/cdo/server/internal/net4j/protocol/OpenSessionIndication.java @@ -59,6 +59,12 @@ public class OpenSessionIndication extends CDOServerIndicationWithMonitoring } @Override + protected boolean closeChannelAfterException() + { + return true; + } + + @Override protected InternalRepository getRepository() { return repository; diff --git a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/SessionManager.java b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/SessionManager.java index 8c0a43a0f4..42957e4e44 100644 --- a/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/SessionManager.java +++ b/plugins/org.eclipse.emf.cdo.server/src/org/eclipse/emf/cdo/internal/server/SessionManager.java @@ -423,9 +423,10 @@ public class SessionManager extends Container<ISession> implements InternalSessi throw notAuthenticated(); } - ByteArrayInputStream baos = new ByteArrayInputStream(authenticationServer.handleResponse(response)); + ByteArrayInputStream bais = new ByteArrayInputStream(authenticationServer.handleResponse(response)); + @SuppressWarnings("resource") - ExtendedDataInputStream stream = new ExtendedDataInputStream(baos); + ExtendedDataInputStream stream = new ExtendedDataInputStream(bais); String userID = stream.readString(); char[] password = stream.readString().toCharArray(); diff --git a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/SessionTest.java b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/SessionTest.java index 301a8777d2..4dc5f1e637 100644 --- a/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/SessionTest.java +++ b/plugins/org.eclipse.emf.cdo.tests/src/org/eclipse/emf/cdo/tests/SessionTest.java @@ -18,12 +18,17 @@ import org.eclipse.emf.cdo.server.ISession; import org.eclipse.emf.cdo.session.CDOSession; import org.eclipse.emf.cdo.spi.common.revision.InternalCDORevisionCache; import org.eclipse.emf.cdo.spi.server.ISessionProtocol; +import org.eclipse.emf.cdo.tests.config.ISessionConfig; import org.eclipse.emf.cdo.tests.config.impl.RepositoryConfig; import org.eclipse.emf.cdo.tests.config.impl.SessionConfig; +import org.eclipse.emf.cdo.tests.config.impl.SessionConfig.Net4j; import org.eclipse.emf.cdo.transaction.CDOTransaction; import org.eclipse.emf.cdo.util.CommitException; import org.eclipse.emf.cdo.view.CDOView; +import org.eclipse.net4j.acceptor.IAcceptor; +import org.eclipse.net4j.channel.IChannel; +import org.eclipse.net4j.connector.IConnector; import org.eclipse.net4j.signal.ISignalProtocol; import org.eclipse.net4j.signal.SignalProtocol.TimeoutChangedEvent; import org.eclipse.net4j.util.WrappedException; @@ -43,6 +48,7 @@ import org.eclipse.emf.common.notify.Notification; import org.eclipse.emf.ecore.util.EContentAdapter; import org.eclipse.emf.spi.cdo.InternalCDOSession; +import java.util.Collection; import java.util.concurrent.CountDownLatch; /** @@ -447,4 +453,29 @@ public class SessionTest extends AbstractCDOTest transaction.createResource(getResourcePath("ttt")); transaction.commit(); } + + @Requires(ISessionConfig.CAPABILITY_NET4J) + public void testRepositoryNotFound() throws Exception + { + CDOSession session = null; + + try + { + session = openSession("sdkghfaifuzicxuhvnjlksah"); + } + catch (Exception ex) + { + } + + assertEquals(null, session); + sleep(200); + + Net4j sessionConfig = (Net4j)getSessionConfig(); + IAcceptor acceptor = sessionConfig.getAcceptor(); + for (IConnector connector : acceptor.getAcceptedConnectors()) + { + Collection<IChannel> channels = connector.getChannels(); + assertEquals(0, channels.size()); + } + } } diff --git a/plugins/org.eclipse.net4j.buddies.server/src/org/eclipse/net4j/buddies/internal/server/protocol/OpenSessionIndication.java b/plugins/org.eclipse.net4j.buddies.server/src/org/eclipse/net4j/buddies/internal/server/protocol/OpenSessionIndication.java index 029a291836..bb987bfede 100644 --- a/plugins/org.eclipse.net4j.buddies.server/src/org/eclipse/net4j/buddies/internal/server/protocol/OpenSessionIndication.java +++ b/plugins/org.eclipse.net4j.buddies.server/src/org/eclipse/net4j/buddies/internal/server/protocol/OpenSessionIndication.java @@ -4,7 +4,7 @@ * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html - * + * * Contributors: * Eike Stepper - initial API and implementation */ @@ -44,6 +44,12 @@ public class OpenSessionIndication extends IndicationWithResponse } @Override + protected boolean closeChannelAfterException() + { + return true; + } + + @Override protected void indicating(ExtendedDataInputStream in) throws Exception { String userID = in.readString(); diff --git a/plugins/org.eclipse.net4j.examples.installer/examples/org.eclipse.net4j.buddies.server/src/org/eclipse/net4j/buddies/internal/server/protocol/OpenSessionIndication.java b/plugins/org.eclipse.net4j.examples.installer/examples/org.eclipse.net4j.buddies.server/src/org/eclipse/net4j/buddies/internal/server/protocol/OpenSessionIndication.java index 029a291836..bb987bfede 100644 --- a/plugins/org.eclipse.net4j.examples.installer/examples/org.eclipse.net4j.buddies.server/src/org/eclipse/net4j/buddies/internal/server/protocol/OpenSessionIndication.java +++ b/plugins/org.eclipse.net4j.examples.installer/examples/org.eclipse.net4j.buddies.server/src/org/eclipse/net4j/buddies/internal/server/protocol/OpenSessionIndication.java @@ -4,7 +4,7 @@ * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html - * + * * Contributors: * Eike Stepper - initial API and implementation */ @@ -44,6 +44,12 @@ public class OpenSessionIndication extends IndicationWithResponse } @Override + protected boolean closeChannelAfterException() + { + return true; + } + + @Override protected void indicating(ExtendedDataInputStream in) throws Exception { String userID = in.readString(); diff --git a/plugins/org.eclipse.net4j.http.common/META-INF/MANIFEST.MF b/plugins/org.eclipse.net4j.http.common/META-INF/MANIFEST.MF index 87d415d7f9..8b61c0c8d7 100644 --- a/plugins/org.eclipse.net4j.http.common/META-INF/MANIFEST.MF +++ b/plugins/org.eclipse.net4j.http.common/META-INF/MANIFEST.MF @@ -1,7 +1,7 @@ Manifest-Version: 1.0 Bundle-ManifestVersion: 2 Bundle-SymbolicName: org.eclipse.net4j.http.common -Bundle-Version: 4.0.300.qualifier +Bundle-Version: 4.0.400.qualifier Bundle-Name: %pluginName Bundle-Vendor: %providerName Bundle-Localization: plugin @@ -11,11 +11,11 @@ Bundle-RequiredExecutionEnvironment: J2SE-1.5 Bundle-ClassPath: . Require-Bundle: org.eclipse.core.runtime;bundle-version="[3.5.0,4.0.0)";resolution:=optional, org.eclipse.net4j;bundle-version="[4.0.0,5.0.0)";visibility:=reexport -Export-Package: org.eclipse.net4j.http.common;version="4.0.300";uses:="org.eclipse.net4j.connector", - org.eclipse.net4j.http.internal.common;version="4.0.300"; +Export-Package: org.eclipse.net4j.http.common;version="4.0.400";uses:="org.eclipse.net4j.connector", + org.eclipse.net4j.http.internal.common;version="4.0.400"; x-friends:="org.eclipse.net4j.http, org.eclipse.net4j.http.server, org.eclipse.net4j.http.tests, org.eclipse.net4j.defs", - org.eclipse.net4j.http.internal.common.bundle;version="4.0.300";x-friends:="org.eclipse.net4j.http,org.eclipse.net4j.http.server,org.eclipse.net4j.http.tests", - org.eclipse.net4j.http.internal.common.messages;version="4.0.300";x-internal:=true + org.eclipse.net4j.http.internal.common.bundle;version="4.0.400";x-friends:="org.eclipse.net4j.http,org.eclipse.net4j.http.server,org.eclipse.net4j.http.tests", + org.eclipse.net4j.http.internal.common.messages;version="4.0.400";x-internal:=true diff --git a/plugins/org.eclipse.net4j.http.common/src/org/eclipse/net4j/http/internal/common/HTTPConnector.java b/plugins/org.eclipse.net4j.http.common/src/org/eclipse/net4j/http/internal/common/HTTPConnector.java index 0cb052ae8f..6af663db88 100644 --- a/plugins/org.eclipse.net4j.http.common/src/org/eclipse/net4j/http/internal/common/HTTPConnector.java +++ b/plugins/org.eclipse.net4j.http.common/src/org/eclipse/net4j/http/internal/common/HTTPConnector.java @@ -110,6 +110,11 @@ public abstract class HTTPConnector extends Connector implements IHTTPConnector } outputOperations.add(new BufferChannelOperation(httpChannel.getID(), outputOperationCount, buffer)); + + if (buffer.isCCAM()) + { + httpChannel.close(); + } } /** diff --git a/plugins/org.eclipse.net4j.jvm/META-INF/MANIFEST.MF b/plugins/org.eclipse.net4j.jvm/META-INF/MANIFEST.MF index 8589a8ab98..320b6f9ae8 100644 --- a/plugins/org.eclipse.net4j.jvm/META-INF/MANIFEST.MF +++ b/plugins/org.eclipse.net4j.jvm/META-INF/MANIFEST.MF @@ -1,7 +1,7 @@ Manifest-Version: 1.0 Bundle-ManifestVersion: 2 Bundle-SymbolicName: org.eclipse.net4j.jvm;singleton:=true -Bundle-Version: 4.1.200.qualifier +Bundle-Version: 4.1.300.qualifier Bundle-Name: %pluginName Bundle-Vendor: %providerName Bundle-Localization: plugin @@ -12,7 +12,7 @@ Bundle-ClassPath: . Require-Bundle: org.eclipse.core.runtime;bundle-version="[3.5.0,4.0.0)";resolution:=optional, org.eclipse.net4j;bundle-version="[4.0.0,5.0.0)";visibility:=reexport Import-Package: org.osgi.framework;version="[1.3.0,2.0.0)";resolution:=optional -Export-Package: org.eclipse.net4j.internal.jvm;version="4.1.200";x-friends:="org.eclipse.net4j.defs,org.eclipse.net4j.tests,org.eclipse.net4j.ui", - org.eclipse.net4j.internal.jvm.bundle;version="4.1.200";x-internal:=true, - org.eclipse.net4j.internal.jvm.messages;version="4.1.200";x-internal:=true, - org.eclipse.net4j.jvm;version="4.1.200" +Export-Package: org.eclipse.net4j.internal.jvm;version="4.1.300";x-friends:="org.eclipse.net4j.defs,org.eclipse.net4j.tests,org.eclipse.net4j.ui", + org.eclipse.net4j.internal.jvm.bundle;version="4.1.300";x-internal:=true, + org.eclipse.net4j.internal.jvm.messages;version="4.1.300";x-internal:=true, + org.eclipse.net4j.jvm;version="4.1.300" diff --git a/plugins/org.eclipse.net4j.jvm/src/org/eclipse/net4j/internal/jvm/JVMChannel.java b/plugins/org.eclipse.net4j.jvm/src/org/eclipse/net4j/internal/jvm/JVMChannel.java index 0665097f58..205c98e818 100644 --- a/plugins/org.eclipse.net4j.jvm/src/org/eclipse/net4j/internal/jvm/JVMChannel.java +++ b/plugins/org.eclipse.net4j.jvm/src/org/eclipse/net4j/internal/jvm/JVMChannel.java @@ -4,7 +4,7 @@ * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html - * + * * Contributors: * Eike Stepper - initial API and implementation */ diff --git a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPConnector.java b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPConnector.java index 3e18fb8e23..bed021ce3d 100644 --- a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPConnector.java +++ b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/TCPConnector.java @@ -308,12 +308,19 @@ public abstract class TCPConnector extends Connector implements ITCPConnector, I IBuffer buffer = channelSendQueue.peek(); if (buffer != null) { + boolean closeChannelAfterMe = buffer.isCCAM(); + if (buffer.write(socketChannel)) { writeQueue.poll(); channelSendQueue.poll(); buffer.release(); } + + if (closeChannelAfterMe) + { + channel.close(); + } } } } diff --git a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/ssl/SSLBuffer.java b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/ssl/SSLBuffer.java index 6d38d74587..69dc2fd746 100644 --- a/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/ssl/SSLBuffer.java +++ b/plugins/org.eclipse.net4j.tcp/src/org/eclipse/net4j/internal/tcp/ssl/SSLBuffer.java @@ -90,7 +90,7 @@ public class SSLBuffer extends Buffer payloadSize = (short)-payloadSize; } - payloadSize -= EOS_OFFSET; + payloadSize -= FLAGS_OFFSET; byteBuffer.position(IBuffer.HEADER_SIZE); setState(BufferState.READING_HEADER); @@ -149,7 +149,7 @@ public class SSLBuffer extends Buffer throw new IllegalStateException("channelID == NO_CHANNEL"); //$NON-NLS-1$ } - int payloadSize = byteBuffer.position() - IBuffer.HEADER_SIZE + EOS_OFFSET; + int payloadSize = byteBuffer.position() - IBuffer.HEADER_SIZE + FLAGS_OFFSET; if (isEOS()) { payloadSize = -payloadSize; diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java index a9edd26855..279e3d80c3 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java @@ -37,7 +37,11 @@ import java.text.MessageFormat; */ public class Buffer implements InternalBuffer { - public static final int EOS_OFFSET = 1; + public static final int FLAGS_OFFSET = 1; + + private static final byte FLAG_EOS = 1 << 0; + + private static final byte FLAG_CCAM = 1 << 1; private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_BUFFER, Buffer.class); @@ -51,7 +55,7 @@ public class Buffer implements InternalBuffer private short channelID; - private boolean eos; + private byte flags; private BufferState state = BufferState.INITIAL; @@ -65,12 +69,36 @@ public class Buffer implements InternalBuffer public boolean isEOS() { - return eos; + return (flags & FLAG_EOS) != 0; } public void setEOS(boolean eos) { - this.eos = eos; + if (eos) + { + flags |= FLAG_EOS; + } + else + { + flags &= ~FLAG_EOS; + } + } + + public boolean isCCAM() + { + return (flags & FLAG_CCAM) != 0; + } + + public void setCCAM(boolean ccam) + { + if (ccam) + { + flags |= FLAG_CCAM; + } + else + { + flags &= ~FLAG_CCAM; + } } public IBufferProvider getBufferProvider() @@ -127,7 +155,7 @@ public class Buffer implements InternalBuffer { state = BufferState.INITIAL; channelID = NO_CHANNEL; - eos = false; + flags = 0; byteBuffer.clear(); } @@ -179,11 +207,11 @@ public class Buffer implements InternalBuffer short payloadSize = byteBuffer.getShort(); if (payloadSize < 0) { - eos = true; + setEOS(true); payloadSize = (short)-payloadSize; } - payloadSize -= EOS_OFFSET; + payloadSize -= FLAGS_OFFSET; byteBuffer.clear(); byteBuffer.limit(payloadSize); @@ -199,7 +227,7 @@ public class Buffer implements InternalBuffer if (TRACER.isEnabled()) { TRACER.trace("Read " + byteBuffer.limit() + " bytes" //$NON-NLS-1$ //$NON-NLS-2$ - + (eos ? " (EOS)" : "") + StringUtil.NL + formatContent(false)); //$NON-NLS-1$ //$NON-NLS-2$ + + (isEOS() ? " (EOS)" : "") + StringUtil.NL + formatContent(false)); //$NON-NLS-1$ //$NON-NLS-2$ } byteBuffer.flip(); @@ -286,7 +314,9 @@ public class Buffer implements InternalBuffer throw new IllegalStateException("channelID == NO_CHANNEL"); //$NON-NLS-1$ } - int payloadSize = byteBuffer.position() - IBuffer.HEADER_SIZE + EOS_OFFSET; + int payloadSize = byteBuffer.position() - IBuffer.HEADER_SIZE + FLAGS_OFFSET; + + boolean eos = isEOS(); if (eos) { payloadSize = -payloadSize; @@ -475,7 +505,7 @@ public class Buffer implements InternalBuffer eos = true; } - payloadSize -= EOS_OFFSET; + payloadSize -= FLAGS_OFFSET; System.out.println("channelID: " + channelID); System.out.println("payloadSize: " + payloadSize); diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java index b4b0621773..3df30f29f9 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java @@ -50,6 +50,8 @@ public class BufferInputStream extends InputStream implements IBufferHandler private boolean eos; + private boolean ccam; + private RemoteException exception; private long stopTimeMillis; @@ -59,6 +61,14 @@ public class BufferInputStream extends InputStream implements IBufferHandler tracerEnabled = TRACER.isEnabled(); } + /** + * @since 4.4 + */ + public boolean isCCAM() + { + return ccam; + } + public long getMillisBeforeTimeout() { return DEFAULT_MILLIS_BEFORE_TIMEOUT; @@ -110,13 +120,13 @@ public class BufferInputStream extends InputStream implements IBufferHandler if (eos) { // End of stream - return IOUtil.EOF; + return readEOF(); } if (!ensureBuffer()) { // Timeout or interrupt - return IOUtil.EOF; + return readEOF(); } } @@ -124,7 +134,7 @@ public class BufferInputStream extends InputStream implements IBufferHandler if (!byteBuffer.hasRemaining()) { // End of stream - return IOUtil.EOF; + return readEOF(); } final int result = byteBuffer.get() & 0xFF; @@ -198,9 +208,19 @@ public class BufferInputStream extends InputStream implements IBufferHandler } eos = currentBuffer.isEOS(); + ccam = currentBuffer.isCCAM(); + return true; } + /** + * @since 4.4 + */ + protected int readEOF() + { + return IOUtil.EOF; + } + private long computeTimeout(final long check) throws IOTimeoutException { long remaining; diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferOutputStream.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferOutputStream.java index 773db19828..ae2bd8c704 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferOutputStream.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferOutputStream.java @@ -4,7 +4,7 @@ * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html - * + * * Contributors: * Eike Stepper - initial API and implementation * Andre Dietisheim - Bug 262875: java.nio.BufferUnderFlowException https://bugs.eclipse.org/bugs/show_bug.cgi?id=262875 @@ -27,7 +27,7 @@ import java.nio.ByteBuffer; /** * An {@link OutputStream output stream} that fragments the written byte sequence into fixed-sized {@link IBuffer * buffers} and passes them to configured {@link IBufferHandler buffer handler}. - * + * * @author Eike Stepper */ public class BufferOutputStream extends OutputStream @@ -120,7 +120,7 @@ public class BufferOutputStream extends OutputStream /** * Flushes the current buffer, it's handled over to the buffer handler. - * + * * @throws IOException * Signals that an I/O exception has occurred. * @see #currentBuffer @@ -134,7 +134,7 @@ public class BufferOutputStream extends OutputStream /** * Flushes the current buffer if it has no remaining space. - * + * * @throws IOException * Signals that an I/O exception has occurred. */ @@ -157,9 +157,18 @@ public class BufferOutputStream extends OutputStream public void flushWithEOS() throws IOException { + flushWithEOS(false); + } + + /** + * @since 4.4 + */ + public void flushWithEOS(boolean ccam) throws IOException + { throwExceptionOnError(); ensureBufferPrivate(); currentBuffer.setEOS(true); + currentBuffer.setCCAM(ccam); flushPrivate(); } @@ -191,7 +200,7 @@ public class BufferOutputStream extends OutputStream /** * Ensures that this BufferOutputStream has a buffer. If the current buffer was flushed a new one is fetched from the * buffer provider. - * + * * @throws IOException * Signals that an I/O exception has occurred. * @see #flush() @@ -214,7 +223,7 @@ public class BufferOutputStream extends OutputStream /** * Throws an exception if there's an error. - * + * * @throws IOException * Signals that an I/O exception has occurred. * @see #error diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/IBuffer.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/IBuffer.java index 2882c4d3a9..726a26c265 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/IBuffer.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/IBuffer.java @@ -51,33 +51,33 @@ import java.nio.channels.SocketChannel; * <p> * An example for <b>putting</b> values into a buffer and writing it to a {@link SocketChannel}: * <p> - * + * * <pre style="background-color:#ffffc8; border-width:1px; border-style:solid; padding:.5em;"> * // Obtain a fresh buffer * Buffer buffer = bufferProvider.getBuffer(); // Start filling the buffer for channelID 4711 ByteBuffer byteBuffer = * buffer.startPutting(4711); byteBuffer.putDouble(15.47); // Write the contents of the Buffer to a // SocketChannel * without blocking while (!buffer.write(socketChannel)) { // Do something else } * </pre> - * + * * An example for reading a buffer from a {@link SocketChannel} and <b>getting</b> values from it: * <p> - * + * * <pre style="background-color:#ffffc8; border-width:1px; border-style:solid; padding:.5em;"> * // Obtain a fresh buffer * Buffer buffer = bufferProvider.getBuffer(); - * + * * // Read the contents of the Buffer from a SocketChannel without blocking * ByteBuffer byteBuffer; * while ((byteBuffer = buffer.startGetting(socketChannel)) == null) * { * // Do something else * } - * + * * // Access the contents of the buffer and release it to its provider * double value = byteBuffer.getDouble(); * buffer.release(); * </pre> - * + * * @see IBufferProvider * @see IChannel#sendBuffer(IBuffer) * @see IChannel#setReceiveHandler(IBufferHandler) @@ -114,7 +114,7 @@ public interface IBuffer /** * Returns the channel index value stored in the header of this buffer. - * + * * @since 2.0 */ public short getChannelID(); @@ -163,7 +163,7 @@ public interface IBuffer * <li>all other methods that do not influence {@link ByteBuffer#position()}, {@link ByteBuffer#limit()} and * {@link ByteBuffer#capacity()} * </ul> - * + * * @param socketChannel * The <code>socketChannel</code> to read the {@link ByteBuffer} from. * @return A {@link ByteBuffer} that can be used for getting data if it was possible to completely read the data from @@ -204,7 +204,7 @@ public interface IBuffer * <li>all other methods that do not influence {@link ByteBuffer#position()}, {@link ByteBuffer#limit()} and * {@link ByteBuffer#capacity()} * </ul> - * + * * @param channelID * The index of an {@link IChannel} that this buffer is intended to be passed to later or {@link #NO_CHANNEL} * . @@ -222,7 +222,7 @@ public interface IBuffer * This method is non-blocking and it can be necessary to repeatedly call it. If it was not possible to completely * write the data to the <code>SocketChannel</code> <code>false</code> is returned and the state of this buffer * remains {@link BufferState#WRITING WRITING}. - * + * * @param socketChannel * The <code>socketChannel</code> to write the data to. * @return <code>true</code> if it was possible to completely write the data to the <code>SocketChannel</code>, @@ -237,7 +237,7 @@ public interface IBuffer /** * Turns the state of this buffer from {@link BufferState#PUTTING PUTTING} into {@link BufferState#GETTING GETTING}. - * + * * @throws IllegalStateException * If the state of this buffer is not {@link BufferState#PUTTING PUTTING}. */ @@ -245,7 +245,7 @@ public interface IBuffer /** * Returns the <code>ByteBuffer</code> that can be used for putting or getting data. - * + * * @throws IllegalStateException * If the state of this buffer is not {@link BufferState#PUTTING PUTTING} or {@link BufferState#GETTING * GETTING}. @@ -253,16 +253,31 @@ public interface IBuffer public ByteBuffer getByteBuffer() throws IllegalStateException; /** - * Returns the <em>End Of Stream</em> flag to indicate whether this buffer is the last buffer in a stream of buffers. + * Returns the <em>End Of Stream</em> flag to indicate whether this buffer is the last buffer in a stream (typically a signal) of buffers. */ public boolean isEOS(); /** - * Sets the <em>End Of Stream</em> flag to indicate whether this buffer is the last buffer in a stream of buffers. + * Sets the <em>End Of Stream</em> flag to indicate whether this buffer is the last buffer in a stream (typically a signal) of buffers. */ public void setEOS(boolean eos); /** + * Returns the <em>Close Channel After Me</em> flag. + * + * @since 4.4 + */ + + public boolean isCCAM(); + + /** + * Sets the <em>Close Channel After Me</em> flag. + * + * @since 4.4 + */ + public void setCCAM(boolean ccam); + + /** * Releases this buffer to its original {@link IBufferProvider}. */ public void release(); diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelInputStream.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelInputStream.java index ab28213bef..85020c25e6 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelInputStream.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelInputStream.java @@ -13,12 +13,16 @@ package org.eclipse.net4j.channel; import org.eclipse.net4j.buffer.BufferInputStream; import org.eclipse.net4j.buffer.IBuffer; +import org.eclipse.spi.net4j.InternalChannel; + +import java.io.IOException; import java.io.InputStream; +import java.util.concurrent.ExecutorService; /** * An {@link InputStream input stream} that provides the {@link IBuffer buffers} which arrive at a {@link IChannel * channel} as a continuous byte sequence. - * + * * @author Eike Stepper */ public class ChannelInputStream extends BufferInputStream @@ -70,6 +74,24 @@ public class ChannelInputStream extends BufferInputStream } @Override + public int read() throws IOException + { + if (isCCAM()) + { + ExecutorService executorService = ((InternalChannel)channel).getReceiveExecutor(); + executorService.submit(new Runnable() + { + public void run() + { + channel.close(); + } + }); + } + + return super.read(); + } + + @Override public String toString() { return "ChannelInputStream[" + channel + "]"; //$NON-NLS-1$ //$NON-NLS-2$ diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithResponse.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithResponse.java index fe0a9acca8..d3c047bc69 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithResponse.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithResponse.java @@ -48,6 +48,14 @@ public abstract class IndicationWithResponse extends SignalReactor } /** + * @since 4.4 + */ + protected boolean closeChannelAfterException() + { + return false; + } + + /** * @since 2.0 */ protected String getExceptionMessage(Throwable t) @@ -103,6 +111,17 @@ public abstract class IndicationWithResponse extends SignalReactor SignalProtocol<?> protocol = getProtocol(); int correlationID = -getCorrelationID(); String message = getExceptionMessage(t); - new RemoteExceptionRequest(protocol, correlationID, responding, message, t).sendAsync(); + final boolean closeChannel = closeChannelAfterException(); + + RemoteExceptionRequest request = new RemoteExceptionRequest(protocol, correlationID, responding, message, t) + { + @Override + protected boolean closeChannelAfterMe() + { + return closeChannel; + } + }; + + request.sendAsync(); } } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java index 299dce4832..f1affa61dc 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java @@ -186,6 +186,14 @@ public abstract class Signal implements Runnable } /** + * @since 4.4 + */ + protected boolean closeChannelAfterMe() + { + return false; + } + + /** * @since 2.0 */ protected InputStream getCurrentInputStream() @@ -226,13 +234,13 @@ public abstract class Signal implements Runnable protected void finishInputStream(InputStream in) throws IOException { currentStream = null; - getProtocol().finishInputStream(in); + protocol.finishInputStream(in); } protected void finishOutputStream(OutputStream out) throws IOException { currentStream = null; - getProtocol().finishOutputStream(out); + protocol.finishOutputStream(out); } protected abstract void execute(BufferInputStream in, BufferOutputStream out) throws Exception; @@ -257,7 +265,7 @@ public abstract class Signal implements Runnable } finally { - getProtocol().stopSignal(this, exception); + protocol.stopSignal(this, exception); } } @@ -303,7 +311,8 @@ public abstract class Signal implements Runnable finishOutputStream(wrappedOutputStream); } - out.flushWithEOS(); + boolean ccam = closeChannelAfterMe(); + out.flushWithEOS(ccam); } void doInput(BufferInputStream in) throws Exception diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java index 05ea9eaee4..ceb64cc7c8 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java @@ -30,6 +30,7 @@ import org.eclipse.net4j.util.om.trace.ContextTracer; import org.eclipse.internal.net4j.bundle.OM; +import org.eclipse.spi.net4j.InternalChannel; import org.eclipse.spi.net4j.Protocol; import java.io.IOException; @@ -39,6 +40,7 @@ import java.nio.ByteBuffer; import java.text.MessageFormat; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutorService; /** * The default implementation of a {@link ISignalProtocol signal protocol}. @@ -623,6 +625,26 @@ public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> i { return timeout; } + + @Override + public int read() throws IOException + { + if (isCCAM()) + { + final InternalChannel channel = (InternalChannel)getChannel(); + + ExecutorService executorService = channel.getReceiveExecutor(); + executorService.submit(new Runnable() + { + public void run() + { + channel.close(); + } + }); + } + + return super.read(); + } } /** diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java index 9181aec745..255c7c4636 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java @@ -375,7 +375,7 @@ public class Channel extends Lifecycle implements InternalChannel /** * A queue that holds buffers that shall be sent. This implementation notifies observers of enqueued and dequeued - * buffers. The notification's deliberately not synchronized. It shall only be used by O&M tooling to offer (not 100% + * buffers. The notification is deliberately not synchronized. It shall only be used by O&M tooling to offer (not 100% * accurate) statistical insights * * @author Eike Stepper |