diff options
author | Eike Stepper | 2008-11-12 17:42:21 +0000 |
---|---|---|
committer | Eike Stepper | 2008-11-12 17:42:21 +0000 |
commit | 87ffc1e4b3038e67dc2759167f71e81d1103874d (patch) | |
tree | 6c0af40cb4f3a2b882a855e7c75725e2093760fb /plugins/org.eclipse.net4j | |
parent | 9740b5da2821adfa962f89ac7d204c8da1e7e5fc (diff) | |
download | cdo-87ffc1e4b3038e67dc2759167f71e81d1103874d.tar.gz cdo-87ffc1e4b3038e67dc2759167f71e81d1103874d.tar.xz cdo-87ffc1e4b3038e67dc2759167f71e81d1103874d.zip |
[251751] Provide progress monitoring for commit operations
https://bugs.eclipse.org/bugs/show_bug.cgi?id=251751
Diffstat (limited to 'plugins/org.eclipse.net4j')
44 files changed, 2409 insertions, 1287 deletions
diff --git a/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF b/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF index e5c660af87..35b21c8595 100644 --- a/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF +++ b/plugins/org.eclipse.net4j/META-INF/MANIFEST.MF @@ -19,14 +19,6 @@ Export-Package: org.eclipse.internal.net4j;version="2.0.0"; org.eclipse.net4j.http.common, org.eclipse.net4j.http.tests, org.eclipse.net4j.tests", - org.eclipse.internal.net4j.acceptor;version="2.0.0"; - x-friends:="org.eclipse.net4j.http.server, - org.eclipse.net4j.jvm, - org.eclipse.net4j.tcp, - org.eclipse.net4j.http, - org.eclipse.net4j.http.common, - org.eclipse.net4j.http.tests, - org.eclipse.net4j.tests", org.eclipse.internal.net4j.buffer;version="2.0.0"; x-friends:="org.eclipse.net4j.http.server, org.eclipse.net4j.jvm, @@ -36,22 +28,6 @@ Export-Package: org.eclipse.internal.net4j;version="2.0.0"; org.eclipse.net4j.http.tests, org.eclipse.net4j.tests", org.eclipse.internal.net4j.bundle;version="2.0.0";x-internal:=true, - org.eclipse.internal.net4j.channel;version="2.0.0"; - x-friends:="org.eclipse.net4j.http, - org.eclipse.net4j.http.common, - org.eclipse.net4j.http.server, - org.eclipse.net4j.tcp, - org.eclipse.net4j.http.tests, - org.eclipse.net4j.tests, - org.eclipse.net4j.jvm", - org.eclipse.internal.net4j.connector;version="2.0.0"; - x-friends:="org.eclipse.net4j.http, - org.eclipse.net4j.http.common, - org.eclipse.net4j.http.server, - org.eclipse.net4j.jvm, - org.eclipse.net4j.tcp, - org.eclipse.net4j.http.tests, - org.eclipse.net4j.tests", org.eclipse.net4j;version="2.0.0", org.eclipse.net4j.acceptor;version="2.0.0", org.eclipse.net4j.buffer;version="2.0.0", diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java index 90effe2563..5e1a1f0f3e 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/buffer/Buffer.java @@ -14,6 +14,7 @@ import org.eclipse.net4j.buffer.BufferState; import org.eclipse.net4j.buffer.IBuffer; import org.eclipse.net4j.buffer.IBufferProvider; import org.eclipse.net4j.util.HexUtil; +import org.eclipse.net4j.util.IErrorHandler; import org.eclipse.net4j.util.ReflectUtil; import org.eclipse.net4j.util.StringUtil; import org.eclipse.net4j.util.om.trace.ContextTracer; @@ -36,9 +37,11 @@ public class Buffer implements InternalBuffer private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_BUFFER, Buffer.class); + private IErrorHandler errorHandler; + private IBufferProvider bufferProvider; - private short channelIndex; + private short channelID; private boolean eos; @@ -72,14 +75,14 @@ public class Buffer implements InternalBuffer this.bufferProvider = bufferProvider; } - public short getChannelIndex() + public short getChannelID() { if (state == BufferState.INITIAL || state == BufferState.READING_HEADER) { throw new IllegalStateException("state == " + state); //$NON-NLS-1$ } - return channelIndex; + return channelID; } public short getCapacity() @@ -107,6 +110,7 @@ public class Buffer implements InternalBuffer */ public void release() { + errorHandler = null; if (bufferProvider != null) { bufferProvider.retainBuffer(this); @@ -117,7 +121,7 @@ public class Buffer implements InternalBuffer { byteBuffer.clear(); state = BufferState.INITIAL; - channelIndex = NO_CHANNEL; + channelID = NO_CHANNEL; eos = false; } @@ -129,21 +133,50 @@ public class Buffer implements InternalBuffer public ByteBuffer startGetting(SocketChannel socketChannel) throws IOException { - if (state != BufferState.INITIAL && state != BufferState.READING_HEADER && state != BufferState.READING_BODY) + try { - throw new IllegalStateException("state == " + state); //$NON-NLS-1$ - } + if (state != BufferState.INITIAL && state != BufferState.READING_HEADER && state != BufferState.READING_BODY) + { + throw new IllegalStateException("state == " + state); //$NON-NLS-1$ + } - if (state == BufferState.INITIAL) - { - byteBuffer.limit(IBuffer.HEADER_SIZE); - state = BufferState.READING_HEADER; - } + if (state == BufferState.INITIAL) + { + byteBuffer.limit(IBuffer.HEADER_SIZE); + state = BufferState.READING_HEADER; + } - if (state == BufferState.READING_HEADER) - { - int num = socketChannel.read(byteBuffer); - if (num == -1) + if (state == BufferState.READING_HEADER) + { + int num = socketChannel.read(byteBuffer); + if (num == -1) + { + throw new ClosedChannelException(); + } + + if (byteBuffer.hasRemaining()) + { + return null; + } + + byteBuffer.flip(); + channelID = byteBuffer.getShort(); + short payloadSize = byteBuffer.getShort(); + if (payloadSize < 0) + { + eos = true; + payloadSize = (short)-payloadSize; + } + + payloadSize -= EOS_OFFSET; + + byteBuffer.clear(); + byteBuffer.limit(payloadSize); + state = BufferState.READING_BODY; + } + + // state == State.READING_BODY + if (socketChannel.read(byteBuffer) == -1) { throw new ClosedChannelException(); } @@ -153,67 +186,69 @@ public class Buffer implements InternalBuffer return null; } - byteBuffer.flip(); - channelIndex = byteBuffer.getShort(); - short payloadSize = byteBuffer.getShort(); - if (payloadSize < 0) + if (TRACER.isEnabled()) { - eos = true; - payloadSize = (short)-payloadSize; + TRACER.trace("Read " + byteBuffer.limit() + " bytes" //$NON-NLS-1$ //$NON-NLS-2$ + + (eos ? " (EOS)" : "") + StringUtil.NL + formatContent(false)); //$NON-NLS-1$ //$NON-NLS-2$ } - payloadSize -= EOS_OFFSET; - - byteBuffer.clear(); - byteBuffer.limit(payloadSize); - state = BufferState.READING_BODY; + byteBuffer.flip(); + state = BufferState.GETTING; + return byteBuffer; } - - // state == State.READING_BODY - if (socketChannel.read(byteBuffer) == -1) + catch (IOException ex) { - throw new ClosedChannelException(); + handleError(ex); + throw ex; } - - if (byteBuffer.hasRemaining()) + catch (RuntimeException ex) { - return null; + handleError(ex); + throw ex; } - - if (TRACER.isEnabled()) + catch (Error ex) { - TRACER.trace("Read " + byteBuffer.limit() + " bytes" //$NON-NLS-1$ //$NON-NLS-2$ - + (eos ? " (EOS)" : "") + StringUtil.NL + formatContent(false)); //$NON-NLS-1$ //$NON-NLS-2$ + handleError(ex); + throw ex; } - - byteBuffer.flip(); - state = BufferState.GETTING; - return byteBuffer; } - public ByteBuffer startPutting(short channelIndex) + public ByteBuffer startPutting(short channelID) { - if (state == BufferState.PUTTING) + try { - if (channelIndex != this.channelIndex) + if (state == BufferState.PUTTING) + { + if (channelID != this.channelID) + { + throw new IllegalArgumentException("channelID != this.channelID"); //$NON-NLS-1$ + } + } + else if (state != BufferState.INITIAL) { - throw new IllegalArgumentException("channelIndex != this.channelIndex"); //$NON-NLS-1$ + throw new IllegalStateException("state: " + state); //$NON-NLS-1$ } + else + { + state = BufferState.PUTTING; + this.channelID = channelID; + + byteBuffer.clear(); + byteBuffer.position(IBuffer.HEADER_SIZE); + } + + return byteBuffer; } - else if (state != BufferState.INITIAL) + catch (RuntimeException ex) { - throw new IllegalStateException("state: " + state); //$NON-NLS-1$ + handleError(ex); + throw ex; } - else + catch (Error ex) { - state = BufferState.PUTTING; - this.channelIndex = channelIndex; - - byteBuffer.clear(); - byteBuffer.position(IBuffer.HEADER_SIZE); + handleError(ex); + throw ex; } - - return byteBuffer; } /** @@ -221,62 +256,93 @@ public class Buffer implements InternalBuffer */ public boolean write(SocketChannel socketChannel) throws IOException { - if (state != BufferState.PUTTING && state != BufferState.WRITING) + try { - throw new IllegalStateException("state == " + state); //$NON-NLS-1$ - } + if (state != BufferState.PUTTING && state != BufferState.WRITING) + { + throw new IllegalStateException("state == " + state); //$NON-NLS-1$ + } - if (state == BufferState.PUTTING) - { - if (channelIndex == NO_CHANNEL) + if (state == BufferState.PUTTING) { - throw new IllegalStateException("channelIndex == NO_CHANNEL"); //$NON-NLS-1$ + if (channelID == NO_CHANNEL) + { + throw new IllegalStateException("channelID == NO_CHANNEL"); //$NON-NLS-1$ + } + + int payloadSize = byteBuffer.position() - IBuffer.HEADER_SIZE + EOS_OFFSET; + if (eos) + { + payloadSize = -payloadSize; + } + + if (TRACER.isEnabled()) + { + TRACER.trace("Writing " + (Math.abs(payloadSize) - 1) + " bytes" //$NON-NLS-1$ //$NON-NLS-2$ + + (eos ? " (EOS)" : "") + StringUtil.NL + formatContent(false)); //$NON-NLS-1$ //$NON-NLS-2$ + } + + byteBuffer.flip(); + byteBuffer.putShort(channelID); + byteBuffer.putShort((short)payloadSize); + byteBuffer.position(0); + state = BufferState.WRITING; } - int payloadSize = byteBuffer.position() - IBuffer.HEADER_SIZE + EOS_OFFSET; - if (eos) + int numBytes = socketChannel.write(byteBuffer); + if (numBytes == -1) { - payloadSize = -payloadSize; + throw new IOException("Channel closed"); //$NON-NLS-1$ } - if (TRACER.isEnabled()) + if (byteBuffer.hasRemaining()) { - TRACER.trace("Writing " + (Math.abs(payloadSize) - 1) + " bytes" //$NON-NLS-1$ //$NON-NLS-2$ - + (eos ? " (EOS)" : "") + StringUtil.NL + formatContent(false)); //$NON-NLS-1$ //$NON-NLS-2$ + return false; } - byteBuffer.flip(); - byteBuffer.putShort(channelIndex); - byteBuffer.putShort((short)payloadSize); - byteBuffer.position(0); - state = BufferState.WRITING; + clear(); + return true; } - - int numBytes = socketChannel.write(byteBuffer); - if (numBytes == -1) + catch (IOException ex) { - throw new IOException("Channel closed"); //$NON-NLS-1$ + handleError(ex); + throw ex; } - - if (byteBuffer.hasRemaining()) + catch (RuntimeException ex) { - return false; + handleError(ex); + throw ex; + } + catch (Error ex) + { + handleError(ex); + throw ex; } - - clear(); - return true; } public void flip() { - if (state != BufferState.PUTTING) + try { - throw new IllegalStateException("state == " + state); //$NON-NLS-1$ - } + if (state != BufferState.PUTTING) + { + throw new IllegalStateException("state == " + state); //$NON-NLS-1$ + } - byteBuffer.flip(); - byteBuffer.position(IBuffer.HEADER_SIZE); - state = BufferState.GETTING; + byteBuffer.flip(); + byteBuffer.position(IBuffer.HEADER_SIZE); + state = BufferState.GETTING; + } + catch (RuntimeException ex) + { + handleError(ex); + throw ex; + } + catch (Error ex) + { + handleError(ex); + throw ex; + } } @Override @@ -319,4 +385,24 @@ public class Buffer implements InternalBuffer byteBuffer.limit(oldLimit); } } + + public IErrorHandler getErrorHandler() + { + return errorHandler; + } + + public void setErrorHandler(IErrorHandler errorHandler) + { + this.errorHandler = errorHandler; + } + + private void handleError(Throwable t) + { + OM.LOG.error(t); + if (errorHandler != null) + { + errorHandler.handleError(t); + release(); + } + } } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/connector/Connector.java b/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/connector/Connector.java deleted file mode 100644 index ef929e6122..0000000000 --- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/connector/Connector.java +++ /dev/null @@ -1,778 +0,0 @@ -/*************************************************************************** - * Copyright (c) 2004 - 2008 Eike Stepper, Germany. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * Eike Stepper - initial API and implementation - **************************************************************************/ -package org.eclipse.internal.net4j.connector; - -import org.eclipse.net4j.ITransportConfig; -import org.eclipse.net4j.buffer.IBuffer; -import org.eclipse.net4j.channel.IChannel; -import org.eclipse.net4j.channel.IChannelMultiplexer; -import org.eclipse.net4j.connector.ConnectorException; -import org.eclipse.net4j.connector.ConnectorState; -import org.eclipse.net4j.connector.IConnector; -import org.eclipse.net4j.connector.IConnectorStateEvent; -import org.eclipse.net4j.protocol.ClientProtocolFactory; -import org.eclipse.net4j.protocol.IProtocol; -import org.eclipse.net4j.protocol.IProtocolProvider; -import org.eclipse.net4j.protocol.ServerProtocolFactory; -import org.eclipse.net4j.util.StringUtil; -import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump; -import org.eclipse.net4j.util.concurrent.TimeoutRuntimeException; -import org.eclipse.net4j.util.container.Container; -import org.eclipse.net4j.util.event.Event; -import org.eclipse.net4j.util.event.INotifier; -import org.eclipse.net4j.util.factory.FactoryKey; -import org.eclipse.net4j.util.factory.IFactoryKey; -import org.eclipse.net4j.util.lifecycle.LifecycleUtil; -import org.eclipse.net4j.util.om.monitor.MonitorUtil; -import org.eclipse.net4j.util.om.trace.ContextTracer; -import org.eclipse.net4j.util.security.INegotiationContext; -import org.eclipse.net4j.util.security.INegotiator; -import org.eclipse.net4j.util.security.NegotiationException; - -import org.eclipse.internal.net4j.TransportConfig; -import org.eclipse.internal.net4j.bundle.OM; -import org.eclipse.internal.net4j.channel.Channel; - -import org.eclipse.spi.net4j.InternalChannel; -import org.eclipse.spi.net4j.InternalConnector; - -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -/** - * @author Eike Stepper - */ -public abstract class Connector extends Container<IChannel> implements InternalConnector -{ - private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CONNECTOR, Connector.class); - - private String userID; - - private ITransportConfig config; - - private long channelTimeout = IChannelMultiplexer.DEFAULT_CHANNEL_TIMEOUT; - - private transient ConnectorState connectorState = ConnectorState.DISCONNECTED; - - private transient InternalChannel[] channels = {}; - - @ExcludeFromDump - private transient Object channelsLock = new Object(); - - @ExcludeFromDump - private transient CountDownLatch finishedConnecting; - - @ExcludeFromDump - private transient CountDownLatch finishedNegotiating; - - @ExcludeFromDump - private transient INegotiationContext negotiationContext; - - @ExcludeFromDump - private transient NegotiationException negotiationException; - - public Connector() - { - } - - public synchronized ITransportConfig getConfig() - { - if (config == null) - { - config = new TransportConfig(); - } - - return config; - } - - public synchronized void setConfig(ITransportConfig config) - { - checkInactive(); - this.config = config; - } - - public INegotiator getNegotiator() - { - return getConfig().getNegotiator(); - } - - public void setNegotiator(INegotiator negotiator) - { - getConfig().setNegotiator(negotiator); - } - - public INegotiationContext getNegotiationContext() - { - return negotiationContext; - } - - public long getChannelTimeout() - { - if (channelTimeout == IChannelMultiplexer.DEFAULT_CHANNEL_TIMEOUT) - { - return OM.BUNDLE.getDebugSupport().getDebugOption("channel.timeout", 10000); - } - - return channelTimeout; - } - - public void setChannelTimeout(long channelTimeout) - { - this.channelTimeout = channelTimeout; - } - - public boolean isClient() - { - return getLocation() == Location.CLIENT; - } - - public boolean isServer() - { - return getLocation() == Location.SERVER; - } - - public String getUserID() - { - return userID; - } - - public void setUserID(String userID) - { - checkState(getState() != ConnectorState.CONNECTED, "Connector is already connected"); - if (TRACER.isEnabled()) - { - TRACER.format("Setting userID {0} for {1}", userID, this); - } - - this.userID = userID; - } - - public ConnectorState getState() - { - return connectorState; - } - - public void setState(ConnectorState newState) throws ConnectorException - { - ConnectorState oldState = getState(); - if (newState != oldState) - { - if (TRACER.isEnabled()) - { - TRACER.format("Setting state {0} (was {1}) for {2}", newState, oldState.toString().toLowerCase(), this); - } - - connectorState = newState; - switch (newState) - { - case DISCONNECTED: - if (finishedConnecting != null) - { - finishedConnecting.countDown(); - finishedConnecting = null; - } - - if (finishedNegotiating != null) - { - finishedNegotiating.countDown(); - finishedNegotiating = null; - } - break; - - case CONNECTING: - finishedConnecting = new CountDownLatch(1); - finishedNegotiating = new CountDownLatch(1); - // The concrete implementation must advance state to NEGOTIATING or CONNECTED - break; - - case NEGOTIATING: - finishedConnecting.countDown(); - negotiationContext = createNegotiationContext(); - getNegotiator().negotiate(negotiationContext); - break; - - case CONNECTED: - negotiationContext = null; - deferredActivate(); - finishedConnecting.countDown(); - finishedNegotiating.countDown(); - break; - } - - fireEvent(new ConnectorStateEvent(this, oldState, newState)); - } - } - - public boolean isDisconnected() - { - return connectorState == ConnectorState.DISCONNECTED; - } - - public boolean isConnecting() - { - return connectorState == ConnectorState.CONNECTING; - } - - public boolean isNegotiating() - { - return connectorState == ConnectorState.NEGOTIATING; - } - - public boolean isConnected() - { - if (negotiationException != null) - { - throw new ConnectorException("Connector negotiation failed", negotiationException); - } - - return connectorState == ConnectorState.CONNECTED; - } - - public void connectAsync() throws ConnectorException - { - try - { - activate(); - } - catch (ConnectorException ex) - { - throw ex; - } - catch (Exception ex) - { - throw new ConnectorException(ex); - } - } - - public boolean waitForConnection(long timeout) throws ConnectorException - { - final long MAX_POLL_INTERVAL = 100L; - boolean withTimeout = timeout != NO_TIMEOUT; - - try - { - if (TRACER.isEnabled()) - { - TRACER.trace("Waiting for connection..."); - } - - for (;;) - { - long t = MAX_POLL_INTERVAL; - if (withTimeout) - { - t = Math.min(MAX_POLL_INTERVAL, timeout); - timeout -= MAX_POLL_INTERVAL; - } - - if (t <= 0) - { - break; - } - - if (finishedNegotiating == null) - { - break; - } - - if (finishedNegotiating.await(t, TimeUnit.MILLISECONDS)) - { - break; - } - - if (MonitorUtil.isCanceled()) - { - break; - } - } - - return isConnected(); - } - catch (InterruptedException ex) - { - return false; - } - } - - public boolean connect(long timeout) throws ConnectorException - { - connectAsync(); - return waitForConnection(timeout); - } - - public boolean connect() throws ConnectorException - { - return connect(NO_TIMEOUT); - } - - public ConnectorException disconnect() - { - Exception ex = deactivate(); - if (ex == null) - { - return null; - } - - if (ex instanceof ConnectorException) - { - return (ConnectorException)ex; - } - - return new ConnectorException(ex); - } - - public final List<IChannel> getChannels() - { - List<IChannel> result = new ArrayList<IChannel>(0); - synchronized (channelsLock) - { - for (int i = 0; i < channels.length; i++) - { - IChannel channel = channels[i]; - if (LifecycleUtil.isActive(channel)) - { - result.add(channel); - } - } - } - - return result; - } - - @Override - public boolean isEmpty() - { - return getElements().length == 0; - } - - public IChannel[] getElements() - { - List<IChannel> list = getChannels(); - return list.toArray(new IChannel[list.size()]); - } - - public InternalChannel openChannel() throws ConnectorException - { - return openChannel((IProtocol<?>)null); - } - - public InternalChannel openChannel(String protocolID, Object infraStructure) throws ConnectorException - { - IProtocol<?> protocol = createProtocol(protocolID, infraStructure); - if (protocol == null) - { - throw new IllegalArgumentException("Unknown protocolID: " + protocolID); - } - - return openChannel(protocol); - } - - public InternalChannel openChannel(IProtocol<?> protocol) throws ConnectorException - { - long openChannelTimeout = getChannelTimeout(); - long start = System.currentTimeMillis(); - if (!waitForConnection(openChannelTimeout)) - { - throw new ConnectorException("Connector not connected"); - } - - final long elapsed = System.currentTimeMillis() - start; - InternalChannel channel = createChannel(); - initChannel(channel, protocol); - addChannelWithoutIndex(channel); - - try - { - try - { - registerChannelWithPeer(channel.getIndex(), openChannelTimeout - elapsed, protocol); - } - catch (TimeoutRuntimeException ex) - { - // Adjust the message for the complete timeout time - throw new TimeoutRuntimeException("Registration timeout after " + openChannelTimeout + " milliseconds"); - } - } - catch (ConnectorException ex) - { - throw ex; - } - catch (Exception ex) - { - throw new ConnectorException(ex); - } - - return channel; - } - - public InternalChannel inverseOpenChannel(short channelIndex, String protocolID) - { - IProtocol<?> protocol = createProtocol(protocolID, null); - - InternalChannel channel = createChannel(); - initChannel(channel, protocol); - channel.setChannelIndex(channelIndex); - addChannelWithIndex(channel); - return channel; - } - - public final InternalChannel getChannel(short channelIndex) - { - int index = getChannelsArrayIndex(channelIndex); - synchronized (channelsLock) - { - if (channels == null || index >= channels.length) - { - return null; - } - - return channels[index]; - } - } - - protected InternalChannel createChannel() - { - return new Channel(); - } - - private void initChannel(InternalChannel channel, IProtocol<?> protocol) - { - channel.setMultiplexer(this); - channel.setReceiveExecutor(getConfig().getReceiveExecutor()); - channel.setUserID(getUserID()); - if (protocol != null) - { - protocol.setChannel(channel); - LifecycleUtil.activate(protocol); - if (TRACER.isEnabled()) - { - String protocolType = protocol == null ? null : protocol.getType(); - TRACER.format("Opening channel with protocol {0}", protocolType); - } - - channel.setReceiveHandler(protocol); - } - else - { - if (TRACER.isEnabled()) - { - TRACER.trace("Opening channel without protocol"); - } - } - } - - private void addChannelWithIndex(InternalChannel channel) - { - short channelIndex = channel.getIndex(); - int index = getChannelsArrayIndex(channelIndex); - synchronized (channelsLock) - { - if (index >= channels.length) - { - InternalChannel[] newChannels = new InternalChannel[index + 1]; - System.arraycopy(channels, 0, newChannels, 0, channels.length); - channels = newChannels; - } - - channels[index] = channel; - } - - LifecycleUtil.activate(channel); - fireElementAddedEvent(channel); - } - - private void addChannelWithoutIndex(InternalChannel channel) - { - final short INCREMENT = (short)(isClient() ? 1 : -1); - short channelIndex = INCREMENT; - synchronized (channelsLock) - { - for (;;) - { - int index = getChannelsArrayIndex(channelIndex); - if (index >= channels.length) - { - channel.setChannelIndex(channelIndex); - addChannelWithIndex(channel); - return; - } - - if (channels[index] == null) - { - channel.setChannelIndex(channelIndex); - channels[index] = channel; - - LifecycleUtil.activate(channel); - fireElementAddedEvent(channel); - return; - } - - channelIndex += INCREMENT; - } - } - } - - public void closeChannel(InternalChannel channel) throws ConnectorException - { - InternalChannel internalChannel = channel; - deregisterChannelFromPeer(internalChannel, getChannelTimeout()); - removeChannel(internalChannel, false); - } - - public void inverseCloseChannel(short channelIndex) throws ConnectorException - { - InternalChannel channel = getChannel(channelIndex); - if (channel != null && channel.isActive()) - { - removeChannel(channel, true); - } - } - - private void removeChannel(InternalChannel channel, boolean inverse) - { - try - { - short channelIndex = channel.getIndex(); - int index = getChannelsArrayIndex(channelIndex); - synchronized (channelsLock) - { - if (index < channels.length) - { - if (channels[index] != channel) - { - throw new IllegalStateException("Wrong channel: " + channels[index]); - } - - if (TRACER.isEnabled()) - { - TRACER.trace("Removing " + channel); - } - - if (index == channels.length - 1) - { - --index; - while (index > 0 && channels[index] == null) - { - --index; - } - - if (index == 0) - { - channels = new InternalChannel[0]; - } - else - { - InternalChannel[] newChannels = new InternalChannel[index + 1]; - System.arraycopy(channels, 0, newChannels, 0, newChannels.length); - channels = newChannels; - } - } - else - { - channels[index] = null; - } - } - } - - channel.finishDeactivate(inverse); - } - catch (RuntimeException ex) - { - OM.LOG.error(ex); - throw ex; - } - } - - private int getChannelsArrayIndex(short channelIndex) - { - if (channelIndex < 0) - { - return ~channelIndex << 1; - } - - return (channelIndex << 1) - 1; - } - - public short getBufferCapacity() - { - return getConfig().getBufferProvider().getBufferCapacity(); - } - - public IBuffer provideBuffer() - { - return getConfig().getBufferProvider().provideBuffer(); - } - - public void retainBuffer(IBuffer buffer) - { - getConfig().getBufferProvider().retainBuffer(buffer); - } - - protected void leaveConnecting() - { - if (getNegotiator() == null) - { - setState(ConnectorState.CONNECTED); - } - else - { - setState(ConnectorState.NEGOTIATING); - } - } - - protected abstract INegotiationContext createNegotiationContext(); - - protected NegotiationException getNegotiationException() - { - return negotiationException; - } - - protected void setNegotiationException(NegotiationException negotiationException) - { - this.negotiationException = negotiationException; - } - - @SuppressWarnings("unchecked") - protected <INFRA_STRUCTURE> IProtocol<INFRA_STRUCTURE> createProtocol(String type, INFRA_STRUCTURE infraStructure) - { - if (StringUtil.isEmpty(type)) - { - return null; - } - - IProtocolProvider protocolProvider = getConfig().getProtocolProvider(); - if (protocolProvider == null) - { - throw new ConnectorException("No protocol provider configured"); - } - - IProtocol<INFRA_STRUCTURE> protocol = (IProtocol<INFRA_STRUCTURE>)protocolProvider.getProtocol(type); - if (protocol == null) - { - throw new ConnectorException("Invalid protocol factory: " + type); - } - - protocol.setBufferProvider(getConfig().getBufferProvider()); - protocol.setExecutorService(getConfig().getReceiveExecutor()); - if (infraStructure != null) - { - protocol.setInfraStructure(infraStructure); - } - - return protocol; - } - - protected IFactoryKey createProtocolFactoryKey(String type) - { - switch (getLocation()) - { - case SERVER: - return new FactoryKey(ServerProtocolFactory.PRODUCT_GROUP, type); - case CLIENT: - return new FactoryKey(ClientProtocolFactory.PRODUCT_GROUP, type); - default: - throw new IllegalStateException(); - } - } - - @Override - protected boolean isDeferredActivation() - { - return true; - } - - @Override - protected void doBeforeActivate() throws Exception - { - super.doBeforeActivate(); - if (getConfig().getBufferProvider() == null) - { - throw new IllegalStateException("getConfig().getBufferProvider() == null"); - } - } - - @Override - protected void doActivate() throws Exception - { - super.doActivate(); - setState(ConnectorState.CONNECTING); - } - - @Override - protected void doDeactivate() throws Exception - { - setState(ConnectorState.DISCONNECTED); - synchronized (channelsLock) - { - for (short i = 0; i < channels.length; i++) - { - InternalChannel channel = channels[i]; - if (channel != null) - { - LifecycleUtil.deactivate(channel); - } - } - - channels = new InternalChannel[0]; - } - - super.doDeactivate(); - } - - protected abstract void registerChannelWithPeer(short channelIndex, long timeout, IProtocol<?> protocol) - throws ConnectorException; - - protected abstract void deregisterChannelFromPeer(InternalChannel channel, long timeout) throws ConnectorException; - - /** - * @author Eike Stepper - */ - private static class ConnectorStateEvent extends Event implements IConnectorStateEvent - { - private static final long serialVersionUID = 1L; - - private ConnectorState oldState; - - private ConnectorState newState; - - public ConnectorStateEvent(INotifier notifier, ConnectorState oldState, ConnectorState newState) - { - super(notifier); - this.oldState = oldState; - this.newState = newState; - } - - public IConnector getConnector() - { - return (IConnector)getSource(); - } - - public ConnectorState getOldState() - { - return oldState; - } - - public ConnectorState getNewState() - { - return newState; - } - - @Override - public String toString() - { - return MessageFormat.format("ConnectorStateEvent[source={0}, oldState={1}, newState={2}]", getSource(), - getOldState(), getNewState()); - } - } -} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/ContainerProtocolProvider.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/ContainerProtocolProvider.java index c5c855d7a6..74dc4ae96f 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/ContainerProtocolProvider.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/ContainerProtocolProvider.java @@ -10,13 +10,14 @@ **************************************************************************/ package org.eclipse.net4j; -import org.eclipse.net4j.protocol.ClientProtocolFactory; import org.eclipse.net4j.protocol.IProtocol; import org.eclipse.net4j.protocol.IProtocolProvider; -import org.eclipse.net4j.protocol.ServerProtocolFactory; import org.eclipse.net4j.util.concurrent.NonBlockingLongCounter; import org.eclipse.net4j.util.container.IManagedContainer; +import org.eclipse.spi.net4j.ClientProtocolFactory; +import org.eclipse.spi.net4j.ServerProtocolFactory; + /** * @author Eike Stepper * @since 2.0 diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/TransportConfigurator.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/TransportConfigurator.java index 7af26c36ba..03ddadabd4 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/TransportConfigurator.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/TransportConfigurator.java @@ -16,10 +16,10 @@ import org.eclipse.net4j.util.om.trace.ContextTracer; import org.eclipse.net4j.util.security.INegotiator; import org.eclipse.net4j.util.security.NegotiatorFactory; -import org.eclipse.internal.net4j.acceptor.Acceptor; import org.eclipse.internal.net4j.bundle.OM; import org.eclipse.core.runtime.CoreException; +import org.eclipse.spi.net4j.Acceptor; import org.eclipse.spi.net4j.AcceptorFactory; import org.w3c.dom.Document; diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/acceptor/IAcceptor.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/acceptor/IAcceptor.java index 9d47f78c0c..eccc93ce52 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/acceptor/IAcceptor.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/acceptor/IAcceptor.java @@ -13,7 +13,7 @@ package org.eclipse.net4j.acceptor; import org.eclipse.net4j.connector.IConnector; import org.eclipse.net4j.util.container.IContainer; -import org.eclipse.internal.net4j.acceptor.Acceptor; +import org.eclipse.spi.net4j.Acceptor; /** * Accepts incoming connection requests from {@link Location#CLIENT client} {@link IConnector connectors} and creates diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java index 28732ed7ca..8079445eb8 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferInputStream.java @@ -42,6 +42,8 @@ public class BufferInputStream extends InputStream implements IBufferHandler private RuntimeException exception; + private long stopTimeMillis; + public BufferInputStream() { } @@ -59,6 +61,14 @@ public class BufferInputStream extends InputStream implements IBufferHandler /** * @since 2.0 */ + public void restartTimeout() + { + stopTimeMillis = System.currentTimeMillis() + getMillisBeforeTimeout(); + } + + /** + * @since 2.0 + */ public RuntimeException getException() { return exception; @@ -129,11 +139,10 @@ public class BufferInputStream extends InputStream implements IBufferHandler protected boolean ensureBuffer() throws IOException { final long check = getMillisInterruptCheck(); - final long timeout = getMillisBeforeTimeout(); try { - if (timeout == NO_TIMEOUT) + if (getMillisBeforeTimeout() == NO_TIMEOUT) { while (currentBuffer == null) { @@ -153,8 +162,7 @@ public class BufferInputStream extends InputStream implements IBufferHandler } else { - // TODO Consider something faster than currentTimeMillis(), maybe less accurate? - final long stop = System.currentTimeMillis() + timeout; + restartTimeout(); while (currentBuffer == null) { if (exception != null) @@ -168,7 +176,13 @@ public class BufferInputStream extends InputStream implements IBufferHandler return false; } - final long remaining = stop - System.currentTimeMillis(); + long remaining; + synchronized (this) + { + remaining = stopTimeMillis; + } + + remaining -= System.currentTimeMillis(); if (remaining <= 0) { return false; diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferOutputStream.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferOutputStream.java index c7710aea49..01203a8aea 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferOutputStream.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferOutputStream.java @@ -11,6 +11,9 @@ package org.eclipse.net4j.buffer; import org.eclipse.net4j.util.HexUtil; +import org.eclipse.net4j.util.IErrorHandler; +import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump; +import org.eclipse.net4j.util.io.IORuntimeException; import org.eclipse.net4j.util.lifecycle.LifecycleUtil; import org.eclipse.net4j.util.om.trace.ContextTracer; @@ -29,15 +32,33 @@ public class BufferOutputStream extends OutputStream private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_BUFFER_STREAM, BufferOutputStream.class); - private IBufferHandler bufferHandler; - private IBufferProvider bufferProvider; + private IBufferHandler bufferHandler; + private IBuffer currentBuffer; - private short channelIndex; + private short channelID; + + private RuntimeException exception; + + @ExcludeFromDump + private transient IErrorHandler writeErrorHandler = new IErrorHandler() + { + public void handleError(Throwable t) + { + if (t instanceof RuntimeException) + { + setException((RuntimeException)t); + } + else + { + setException(new IORuntimeException(t)); + } + } + }; - public BufferOutputStream(IBufferHandler bufferHandler, IBufferProvider bufferProvider, short channelIndex) + public BufferOutputStream(IBufferHandler bufferHandler, IBufferProvider bufferProvider, short channelID) { if (bufferHandler == null) { @@ -51,12 +72,28 @@ public class BufferOutputStream extends OutputStream this.bufferHandler = bufferHandler; this.bufferProvider = bufferProvider; - this.channelIndex = channelIndex; + this.channelID = channelID; + } + + public BufferOutputStream(IBufferHandler bufferHandler, short channelID) + { + this(bufferHandler, extractBufferProvider(bufferHandler), channelID); } - public BufferOutputStream(IBufferHandler bufferHandler, short channelIndex) + /** + * @since 2.0 + */ + public RuntimeException getException() { - this(bufferHandler, extractBufferProvider(bufferHandler), channelIndex); + return exception; + } + + /** + * @since 2.0 + */ + public void setException(RuntimeException exception) + { + this.exception = exception; } @SuppressWarnings("deprecation") @@ -123,10 +160,16 @@ public class BufferOutputStream extends OutputStream protected void ensureBuffer() { + if (exception != null) + { + throw exception; + } + if (currentBuffer == null) { currentBuffer = bufferProvider.provideBuffer(); - currentBuffer.startPutting(channelIndex); + currentBuffer.setErrorHandler(writeErrorHandler); + currentBuffer.startPutting(channelID); } } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferState.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferState.java index 5db9105c80..74682509e7 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferState.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/BufferState.java @@ -35,8 +35,8 @@ public enum BufferState * <p> * A transition to {@link #PUTTING} can be triggered by calling {@link IBuffer#startPutting(short)} once. If the * buffer is intended to be passed to an {@link org.eclipse.net4j.channel.IChannel IChannel} later the - * {@link org.eclipse.net4j.channel.IChannel#getChannelIndex() channel index} of that Channel has to be passed because - * it is part of the buffer's header. A {@link ByteBuffer} is returned that can be used for putting data. + * {@link org.eclipse.net4j.channel.IChannel#getChannelID() channel index} of that Channel has to be passed because it + * is part of the buffer's header. A {@link ByteBuffer} is returned that can be used for putting data. * <p> * A transition to {@link #GETTING} can be triggered by calling {@link IBuffer#startGetting(SocketChannel)} repeatedly * until it finally returns a {@link ByteBuffer} that can be used for getting data. diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/IBuffer.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/IBuffer.java index 3e6cad36e3..bc4c878abb 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/IBuffer.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/IBuffer.java @@ -11,6 +11,7 @@ package org.eclipse.net4j.buffer; import org.eclipse.net4j.channel.IChannel; +import org.eclipse.net4j.util.IErrorHandler; import java.io.IOException; import java.nio.ByteBuffer; @@ -51,7 +52,7 @@ import java.nio.channels.SocketChannel; * An example for <b>putting</b> values into a buffer and writing it to a {@link SocketChannel}: * <p> * <pre style="background-color:#ffffc8; border-width:1px; border-style:solid; padding:.5em;"> // Obtain a fresh buffer - * Buffer buffer = bufferProvider.getBuffer(); // Start filling the buffer for channelIndex 4711 ByteBuffer byteBuffer = + * Buffer buffer = bufferProvider.getBuffer(); // Start filling the buffer for channelID 4711 ByteBuffer byteBuffer = * buffer.startPutting(4711); byteBuffer.putDouble(15.47); // Write the contents of the Buffer to a // SocketChannel * without blocking while (!buffer.write(socketChannel)) { // Do something else } </pre> An example for reading a buffer * from a {@link SocketChannel} and <b>getting</b> values from it: @@ -72,8 +73,8 @@ import java.nio.channels.SocketChannel; public interface IBuffer { /** - * Possible argument value of {@link #startPutting(short)} and possible return value of {@link #getChannelIndex()} - * that indicates that this buffer is not intended to be passed to a {@link SocketChannel}. + * Possible argument value of {@link #startPutting(short)} and possible return value of {@link #getChannelID()} that + * indicates that this buffer is not intended to be passed to a {@link SocketChannel}. */ public static final short NO_CHANNEL = Short.MIN_VALUE; @@ -102,8 +103,10 @@ public interface IBuffer /** * Returns the channel index value stored in the header of this buffer. + * + * @since 2.0 */ - public short getChannelIndex(); + public short getChannelID(); /** * Returns the capacity of this buffer. @@ -191,16 +194,16 @@ public interface IBuffer * {@link ByteBuffer#capacity()} * </ul> * - * @param channelIndex + * @param channelID * The index of an {@link IChannel} that this buffer is intended to be passed to later or {@link #NO_CHANNEL} * . * @return A {@link ByteBuffer} that can be used for putting data. * @throws IllegalStateException * If the state of this buffer is not {@link BufferState#INITIAL INITIAL} ({@link BufferState#PUTTING - * PUTTING} is allowed but meaningless if and only if the given <code>channelIndex</code> is equal to the - * existing <code>channelIndex</code> of this buffer). + * PUTTING} is allowed but meaningless if and only if the given <code>channelID</code> is equal to the + * existing <code>channelID</code> of this buffer). */ - public ByteBuffer startPutting(short channelIndex) throws IllegalStateException; + public ByteBuffer startPutting(short channelID) throws IllegalStateException; /** * Tries to write the data of this buffer to a {@link SocketChannel}. @@ -259,4 +262,14 @@ public interface IBuffer public void clear(); public String formatContent(boolean showHeader); + + /** + * @since 2.0 + */ + public IErrorHandler getErrorHandler(); + + /** + * @since 2.0 + */ + public void setErrorHandler(IErrorHandler errorHandler); } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalRemoteException.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelException.java index 3fa69c6429..b666688d2a 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalRemoteException.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelException.java @@ -8,31 +8,34 @@ * Contributors: * Eike Stepper - initial API and implementation **************************************************************************/ -package org.eclipse.net4j.signal; +package org.eclipse.net4j.channel; /** + * Thrown by an {@link IChannel} to indicate channel management problems. + * + * @see IChannelMultiplexer * @author Eike Stepper * @since 2.0 */ -public class SignalRemoteException extends RuntimeException +public class ChannelException extends RuntimeException { private static final long serialVersionUID = 1L; - public SignalRemoteException() + public ChannelException() { } - public SignalRemoteException(String message) + public ChannelException(String message) { super(message); } - public SignalRemoteException(Throwable cause) + public ChannelException(Throwable cause) { super(cause); } - public SignalRemoteException(String message, Throwable cause) + public ChannelException(String message, Throwable cause) { super(message, cause); } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelOutputStream.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelOutputStream.java index 236807c0a4..68a0257a41 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelOutputStream.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/ChannelOutputStream.java @@ -11,6 +11,7 @@ package org.eclipse.net4j.channel; import org.eclipse.net4j.buffer.BufferOutputStream; +import org.eclipse.net4j.buffer.IBuffer; import org.eclipse.net4j.buffer.IBufferProvider; /** @@ -20,11 +21,11 @@ public class ChannelOutputStream extends BufferOutputStream { public ChannelOutputStream(IChannel channel) { - super(channel, channel.getIndex()); + super(channel, channel.getID()); } public ChannelOutputStream(IChannel channel, IBufferProvider bufferProvider) { - super(channel, bufferProvider, channel.getIndex()); + super(channel, bufferProvider, channel == null ? IBuffer.NO_CHANNEL : channel.getID()); } } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/IChannel.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/IChannel.java index 4d6a3494d7..1118f9ec1e 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/IChannel.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/IChannel.java @@ -42,9 +42,9 @@ import org.eclipse.net4j.util.security.IUserAware; * An example for opening a channel on an {@link IConnector} and sending an {@link IBuffer}: * <p> * <pre style="background-color:#ffffc8; border-width:1px; border-style:solid; padding:.5em;"> // Open a channel - * IChannel channel = connector.openChannel(); short channelIndex = channel.getIndex(); // Fill a buffer Buffer buffer = - * bufferProvider.getBuffer(); ByteBuffer byteBuffer = buffer.startPutting(channelIndex); byteBuffer.putDouble(15.47); - * // Let the channel send the buffer without blocking channel.sendBuffer(buffer); </pre> + * IChannel channel = connector.openChannel(); short channelID = channel.getIndex(); // Fill a buffer Buffer buffer = + * bufferProvider.getBuffer(); ByteBuffer byteBuffer = buffer.startPutting(channelID); byteBuffer.putDouble(15.47); // + * Let the channel send the buffer without blocking channel.sendBuffer(buffer); </pre> * <p> * An example for receiving {@link IBuffer}s from channels on an {@link IConnector}: * <p> @@ -63,12 +63,12 @@ import org.eclipse.net4j.util.security.IUserAware; public interface IChannel extends ILocationAware, IUserAware, IBufferHandler, INotifier { /** - * Returns the index of this channel within the array of channels returned from the - * {@link IChannelMultiplexer#getChannels() getChannels()} method of the multiplexer of this channel. + * Returns the ID of this channel. The ID is unique at any time among all channels of the associated + * {@link IChannelMultiplexer multiplexer}. * * @since 2.0 */ - public short getIndex(); + public short getID(); /** * Returns the multiplexer this channel is associated with. This channel multiplexer can be used, for example, to open diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/IChannelMultiplexer.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/IChannelMultiplexer.java index ba4f9750c0..332c00f1b6 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/IChannelMultiplexer.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/IChannelMultiplexer.java @@ -12,14 +12,13 @@ package org.eclipse.net4j.channel; import org.eclipse.net4j.ILocationAware; import org.eclipse.net4j.buffer.IBufferHandler; -import org.eclipse.net4j.connector.ConnectorException; import org.eclipse.net4j.protocol.IProtocol; import org.eclipse.net4j.util.container.IContainer; import org.eclipse.net4j.util.event.IListener; import org.eclipse.net4j.util.factory.IFactory; import org.eclipse.net4j.util.lifecycle.ILifecycle; -import java.util.List; +import java.util.Collection; /** * @author Eike Stepper @@ -37,18 +36,7 @@ public interface IChannelMultiplexer extends ILocationAware, IContainer<IChannel * * @since 2.0 */ - public static final long DEFAULT_CHANNEL_TIMEOUT = -1L; - - /** - * Returns a list of currently open channels. Note that the resulting list does not contain <code>null</code> values. - * Generally the {@link IChannel#getIndex() index} of a channel <b>must not</b> be used as an index into this list. - * Each call to this method creates a new copy of the internal channels array, so it can safely be modified bz the - * caller. - * <p> - * - * @since 2.0 - */ - public List<IChannel> getChannels(); + public static final long DEFAULT_CHANNEL_TIMEOUT = -1; /** * Synchronous request to open a new {@link IChannel} with an undefined channel protocol. Since the peer connector @@ -61,7 +49,7 @@ public interface IChannelMultiplexer extends ILocationAware, IContainer<IChannel * @see #openChannel(IProtocol) * @since 2.0 */ - public IChannel openChannel() throws ConnectorException; + public IChannel openChannel() throws ChannelException; /** * Synchronous request to open a new {@link IChannel} with a channel protocol defined by a given protocol identifier. @@ -73,7 +61,7 @@ public interface IChannelMultiplexer extends ILocationAware, IContainer<IChannel * @see #openChannel(IProtocol) * @since 2.0 */ - public IChannel openChannel(String protocolID, Object infraStructure) throws ConnectorException; + public IChannel openChannel(String protocolID, Object infraStructure) throws ChannelException; /** * Synchronous request to open a new {@link IChannel} with the given channel protocol . The peer connector will lookup @@ -85,7 +73,14 @@ public interface IChannelMultiplexer extends ILocationAware, IContainer<IChannel * @see #openChannel(String, Object) * @since 2.0 */ - public IChannel openChannel(IProtocol<?> protocol) throws ConnectorException; + public IChannel openChannel(IProtocol<?> protocol) throws ChannelException; + + /** + * Returns a collection of currently open channels. + * + * @since 2.0 + */ + public Collection<IChannel> getChannels(); /** * @since 2.0 diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/ISignalProtocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/ISignalProtocol.java new file mode 100644 index 0000000000..1f68c894d2 --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/ISignalProtocol.java @@ -0,0 +1,32 @@ +/*************************************************************************** + * Copyright (c) 2004 - 2008 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.net4j.signal; + +import org.eclipse.net4j.buffer.BufferInputStream; +import org.eclipse.net4j.protocol.IProtocol; +import org.eclipse.net4j.signal.failover.IFailOverStrategy; + +/** + * @author Eike Stepper + * @since 2.0 + */ +public interface ISignalProtocol<INFRA_STRUCTURE> extends IProtocol<INFRA_STRUCTURE> +{ + public static final long NO_TIMEOUT = BufferInputStream.NO_TIMEOUT; + + public long getTimeout(); + + public void setTimeout(long timeout); + + public IFailOverStrategy getFailOverStrategy(); + + public void setFailOverStrategy(IFailOverStrategy failOverStrategy); +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Indication.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Indication.java index 7e7e9aacc6..3c98db4064 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Indication.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Indication.java @@ -12,79 +12,48 @@ package org.eclipse.net4j.signal; import org.eclipse.net4j.buffer.BufferInputStream; import org.eclipse.net4j.buffer.BufferOutputStream; -import org.eclipse.net4j.util.ReflectUtil; -import org.eclipse.net4j.util.StringUtil; -import org.eclipse.net4j.util.WrappedException; import org.eclipse.net4j.util.io.ExtendedDataInputStream; -import org.eclipse.net4j.util.om.trace.ContextTracer; - -import org.eclipse.internal.net4j.bundle.OM; - -import java.io.InputStream; /** * @author Eike Stepper */ public abstract class Indication extends SignalReactor { - private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, Indication.class); - /** * @since 2.0 */ - public Indication(SignalProtocol<?> protocol, short signalID) + public Indication(SignalProtocol<?> protocol, short id, String name) { - super(protocol, signalID); + super(protocol, id, name); } - @Override - protected void execute(BufferInputStream in, BufferOutputStream out) throws Exception + /** + * @since 2.0 + */ + public Indication(SignalProtocol<?> protocol, short signalID) { - if (TRACER.isEnabled()) - { - TRACER.trace("================ Indicating " + ReflectUtil.getSimpleClassName(this)); //$NON-NLS-1$ - } - - InputStream wrappedInputStream = wrapInputStream(in); - - try - { - indicating(ExtendedDataInputStream.wrap(wrappedInputStream)); - } - catch (Error ex) - { - OM.LOG.error(ex); - sendExceptionSignal(ex); - throw ex; - } - catch (Exception ex) - { - ex = WrappedException.unwrap(ex); - OM.LOG.error(ex); - sendExceptionSignal(ex); - throw ex; - } - finally - { - finishInputStream(wrappedInputStream); - } + super(protocol, signalID); } - protected abstract void indicating(ExtendedDataInputStream in) throws Exception; - /** * @since 2.0 */ - protected String getMessage(Throwable t) + public Indication(SignalProtocol<?> protocol, Enum<?> literal) + { + super(protocol, literal); + } + + @Override + protected void execute(BufferInputStream in, BufferOutputStream out) throws Exception { - return StringUtil.formatException(t); + doInput(in); } - void sendExceptionSignal(Throwable t) throws Exception + @Override + void doExtendedInput(ExtendedDataInputStream in) throws Exception { - SignalProtocol<?> protocol = getProtocol(); - int correlationID = -getCorrelationID(); - String message = getMessage(t); - new RemoteExceptionRequest(protocol, correlationID, message, t).send(); + indicating(in); } + + protected abstract void indicating(ExtendedDataInputStream in) throws Exception; } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithMonitoring.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithMonitoring.java new file mode 100644 index 0000000000..0381fb5f22 --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithMonitoring.java @@ -0,0 +1,190 @@ +/*************************************************************************** + * Copyright (c) 2004 - 2008 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.net4j.signal; + +import org.eclipse.net4j.buffer.BufferInputStream; +import org.eclipse.net4j.buffer.BufferOutputStream; +import org.eclipse.net4j.util.concurrent.ConcurrencyUtil; +import org.eclipse.net4j.util.io.ExtendedDataInputStream; +import org.eclipse.net4j.util.io.ExtendedDataOutputStream; +import org.eclipse.net4j.util.om.monitor.IMonitor; +import org.eclipse.net4j.util.om.monitor.Monitor; +import org.eclipse.net4j.util.om.monitor.MonitorCanceledException; + +import org.eclipse.internal.net4j.bundle.OM; + +import java.util.concurrent.ExecutorService; + +/** + * @author Eike Stepper + * @since 2.0 + */ +public abstract class IndicationWithMonitoring extends IndicationWithResponse +{ + private Monitor monitor; + + private long lastMonitorAccess; + + /** + * @since 2.0 + */ + public IndicationWithMonitoring(SignalProtocol<?> protocol, short id, String name) + { + super(protocol, id, name); + } + + /** + * @since 2.0 + */ + public IndicationWithMonitoring(SignalProtocol<?> protocol, short signalID) + { + super(protocol, signalID); + } + + /** + * @since 2.0 + */ + public IndicationWithMonitoring(SignalProtocol<?> protocol, Enum<?> literal) + { + super(protocol, literal); + } + + @Override + protected void execute(BufferInputStream in, BufferOutputStream out) throws Exception + { + try + { + super.execute(in, out); + } + finally + { + monitor = null; + } + } + + @Override + protected final void indicating(ExtendedDataInputStream in) throws Exception + { + final long monitorProgressInterval = in.readLong(); + ExecutorService executorService = getMonitoringExecutorService(); + if (executorService != null) + { + monitor = new LastAccessMonitor(); + setLastMonitorAccess(); + executorService.execute(new Runnable() + { + public void run() + { + while (monitor != null) + { + if (System.currentTimeMillis() - lastMonitorAccess > monitorProgressInterval) + { + setMonitorCanceled(); + break; + } + + sendProgress(monitor.getTotalWork(), monitor.getWork()); + ConcurrencyUtil.sleep(monitorProgressInterval); + } + } + + private void sendProgress(int totalWork, int work) + { + try + { + new MonitorProgressRequest(getProtocol(), -getCorrelationID(), totalWork, work).sendAsync(); + } + catch (Exception ex) + { + OM.LOG.error(ex); + } + } + }); + } + + indicating(in, monitor.fork(getIndicatingWorkPercent())); + } + + @Override + protected final void responding(ExtendedDataOutputStream out) throws Exception + { + responding(out, monitor.fork(getRespondingWorkPercent())); + } + + protected abstract void indicating(ExtendedDataInputStream in, IMonitor monitor) throws Exception; + + protected abstract void responding(ExtendedDataOutputStream out, IMonitor monitor) throws Exception; + + /** + * @since 2.0 + */ + protected ExecutorService getMonitoringExecutorService() + { + return getProtocol().getExecutorService(); + } + + protected int getIndicatingWorkPercent() + { + return 50; + } + + protected int getRespondingWorkPercent() + { + return 50; + } + + void setMonitorCanceled() + { + if (monitor != null) + { + monitor.cancel(); + } + } + + void setLastMonitorAccess() + { + lastMonitorAccess = System.currentTimeMillis(); + } + + /** + * @author Eike Stepper + */ + private final class LastAccessMonitor extends Monitor + { + @Override + public synchronized void begin(int totalWork) + { + setLastMonitorAccess(); + super.begin(totalWork); + } + + @Override + public synchronized void checkCanceled() throws MonitorCanceledException + { + setLastMonitorAccess(); + super.checkCanceled(); + } + + @Override + public synchronized boolean isCanceled() + { + setLastMonitorAccess(); + return super.isCanceled(); + } + + @Override + public synchronized void worked(int work) + { + setLastMonitorAccess(); + super.worked(work); + } + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithResponse.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithResponse.java index 5c16fb3c29..76661e8aab 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithResponse.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithResponse.java @@ -12,21 +12,22 @@ package org.eclipse.net4j.signal; import org.eclipse.net4j.buffer.BufferInputStream; import org.eclipse.net4j.buffer.BufferOutputStream; -import org.eclipse.net4j.util.ReflectUtil; -import org.eclipse.net4j.util.WrappedException; +import org.eclipse.net4j.util.StringUtil; +import org.eclipse.net4j.util.io.ExtendedDataInputStream; import org.eclipse.net4j.util.io.ExtendedDataOutputStream; -import org.eclipse.net4j.util.om.trace.ContextTracer; - -import org.eclipse.internal.net4j.bundle.OM; - -import java.io.OutputStream; /** * @author Eike Stepper */ -public abstract class IndicationWithResponse extends Indication +public abstract class IndicationWithResponse extends SignalReactor { - private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, IndicationWithResponse.class); + /** + * @since 2.0 + */ + public IndicationWithResponse(SignalProtocol<?> protocol, short id, String name) + { + super(protocol, id, name); + } /** * @since 2.0 @@ -36,47 +37,70 @@ public abstract class IndicationWithResponse extends Indication super(protocol, signalID); } + /** + * @since 2.0 + */ + public IndicationWithResponse(SignalProtocol<?> protocol, Enum<?> literal) + { + super(protocol, literal); + } + + /** + * @since 2.0 + */ + protected String getExceptionMessage(Throwable t) + { + return StringUtil.formatException(t); + } + @Override protected void execute(BufferInputStream in, BufferOutputStream out) throws Exception { - super.execute(in, out); - if (TRACER.isEnabled()) - { - TRACER.trace("================ Responding " + ReflectUtil.getSimpleClassName(this)); //$NON-NLS-1$ - } - - OutputStream wrappedOutputStream = wrapOutputStream(out); + boolean responding = false; try { - responding(ExtendedDataOutputStream.wrap(wrappedOutputStream)); + doInput(in); + responding = true; + doOutput(out); } catch (Error ex) { - OM.LOG.error(ex); - sendExceptionSignal(ex); + sendExceptionSignal(ex, responding); throw ex; } catch (Exception ex) { - ex = WrappedException.unwrap(ex); - OM.LOG.error(ex); - sendExceptionSignal(ex); + sendExceptionSignal(ex, responding); throw ex; } - finally - { - finishOutputStream(wrappedOutputStream); - } - - // End response - out.flushWithEOS(); } + protected abstract void indicating(ExtendedDataInputStream in) throws Exception; + /** * <b>Important Note:</b> The response must not be empty, i.e. the stream must be used at least to write a * <code>boolean</code>. Otherwise synchronization problems will result! - * @throws Exception TODO */ protected abstract void responding(ExtendedDataOutputStream out) throws Exception; + + @Override + void doExtendedInput(ExtendedDataInputStream in) throws Exception + { + indicating(in); + } + + @Override + void doExtendedOutput(ExtendedDataOutputStream out) throws Exception + { + responding(out); + } + + void sendExceptionSignal(Throwable t, boolean responding) throws Exception + { + SignalProtocol<?> protocol = getProtocol(); + int correlationID = -getCorrelationID(); + String message = getExceptionMessage(t); + new RemoteExceptionRequest(protocol, correlationID, responding, message, t).sendAsync(); + } } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorCanceledIndication.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorCanceledIndication.java new file mode 100644 index 0000000000..68f50becd6 --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorCanceledIndication.java @@ -0,0 +1,41 @@ +/*************************************************************************** + * Copyright (c) 2004 - 2008 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.net4j.signal; + +import org.eclipse.net4j.util.io.ExtendedDataInputStream; +import org.eclipse.net4j.util.om.trace.ContextTracer; + +import org.eclipse.internal.net4j.bundle.OM; + +/** + * @author Eike Stepper + */ +class MonitorCanceledIndication extends Indication +{ + private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, MonitorCanceledIndication.class); + + public MonitorCanceledIndication(SignalProtocol<?> protocol) + { + super(protocol, SignalProtocol.SIGNAL_MONITOR_CANCELED); + } + + @Override + protected void indicating(ExtendedDataInputStream in) throws Exception + { + int correlationID = in.readInt(); + if (TRACER.isEnabled()) + { + TRACER.format("Canceling monitor of signal {0}", correlationID); + } + + getProtocol().handleMonitorCanceled(correlationID); + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorCanceledRequest.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorCanceledRequest.java new file mode 100644 index 0000000000..075c43909c --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorCanceledRequest.java @@ -0,0 +1,43 @@ +/*************************************************************************** + * Copyright (c) 2004 - 2008 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.net4j.signal; + +import org.eclipse.net4j.util.io.ExtendedDataOutputStream; +import org.eclipse.net4j.util.om.trace.ContextTracer; + +import org.eclipse.internal.net4j.bundle.OM; + +/** + * @author Eike Stepper + */ +class MonitorCanceledRequest extends Request +{ + private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, MonitorCanceledRequest.class); + + private int correlationID; + + public MonitorCanceledRequest(SignalProtocol<?> protocol, int correlationID) + { + super(protocol, SignalProtocol.SIGNAL_MONITOR_CANCELED); + this.correlationID = correlationID; + } + + @Override + protected void requesting(ExtendedDataOutputStream out) throws Exception + { + if (TRACER.isEnabled()) + { + TRACER.format("Canceling monitor of signal {0}", correlationID); + } + + out.writeInt(correlationID); + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorProgressIndication.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorProgressIndication.java new file mode 100644 index 0000000000..dbfa9c2412 --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorProgressIndication.java @@ -0,0 +1,43 @@ +/*************************************************************************** + * Copyright (c) 2004 - 2008 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.net4j.signal; + +import org.eclipse.net4j.util.io.ExtendedDataInputStream; +import org.eclipse.net4j.util.om.trace.ContextTracer; + +import org.eclipse.internal.net4j.bundle.OM; + +/** + * @author Eike Stepper + */ +class MonitorProgressIndication extends Indication +{ + private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, MonitorProgressIndication.class); + + public MonitorProgressIndication(SignalProtocol<?> protocol) + { + super(protocol, SignalProtocol.SIGNAL_MONITOR_PROGRESS); + } + + @Override + protected void indicating(ExtendedDataInputStream in) throws Exception + { + int correlationID = in.readInt(); + int totalWork = in.readInt(); + int work = in.readInt(); + if (TRACER.isEnabled()) + { + TRACER.format("Progress of signal {0}: totalWork={1}, work={2}", correlationID, totalWork, work); + } + + getProtocol().handleMonitorProgress(correlationID, totalWork, work); + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorProgressRequest.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorProgressRequest.java new file mode 100644 index 0000000000..503a82df3e --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/MonitorProgressRequest.java @@ -0,0 +1,51 @@ +/*************************************************************************** + * Copyright (c) 2004 - 2008 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.net4j.signal; + +import org.eclipse.net4j.util.io.ExtendedDataOutputStream; +import org.eclipse.net4j.util.om.trace.ContextTracer; + +import org.eclipse.internal.net4j.bundle.OM; + +/** + * @author Eike Stepper + */ +class MonitorProgressRequest extends Request +{ + private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, MonitorProgressRequest.class); + + private int correlationID; + + private int totalWork; + + private int work; + + public MonitorProgressRequest(SignalProtocol<?> protocol, int correlationID, int totalWork, int work) + { + super(protocol, SignalProtocol.SIGNAL_MONITOR_PROGRESS); + this.correlationID = correlationID; + this.totalWork = totalWork; + this.work = work; + } + + @Override + protected void requesting(ExtendedDataOutputStream out) throws Exception + { + if (TRACER.isEnabled()) + { + TRACER.format("Progress of signal {0}: totalWork={1}, work={2}", correlationID, totalWork, work); + } + + out.writeInt(correlationID); + out.writeInt(totalWork); + out.writeInt(work); + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteException.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteException.java new file mode 100644 index 0000000000..76495612d3 --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteException.java @@ -0,0 +1,39 @@ +/*************************************************************************** + * Copyright (c) 2004 - 2008 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.net4j.signal; + +/** + * @author Eike Stepper + * @since 2.0 + */ +public class RemoteException extends RuntimeException +{ + private static final long serialVersionUID = 1L; + + private boolean whileResponding; + + public RemoteException(Throwable cause, boolean whileResponding) + { + super(cause); + this.whileResponding = whileResponding; + } + + public RemoteException(String message, boolean whileResponding) + { + super(message); + this.whileResponding = whileResponding; + } + + public boolean whileResponding() + { + return whileResponding; + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteExceptionIndication.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteExceptionIndication.java index de18a1b3bb..82218548bf 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteExceptionIndication.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteExceptionIndication.java @@ -11,6 +11,7 @@ package org.eclipse.net4j.signal; import org.eclipse.net4j.util.io.ExtendedDataInputStream; +import org.eclipse.net4j.util.om.trace.ContextTracer; import org.eclipse.internal.net4j.bundle.OM; @@ -19,6 +20,10 @@ import org.eclipse.internal.net4j.bundle.OM; */ class RemoteExceptionIndication extends Indication { + private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, RemoteExceptionIndication.class); + + private Throwable t; + public RemoteExceptionIndication(SignalProtocol<?> protocol) { super(protocol, SignalProtocol.SIGNAL_REMOTE_EXCEPTION); @@ -28,8 +33,13 @@ class RemoteExceptionIndication extends Indication protected void indicating(ExtendedDataInputStream in) throws Exception { int correlationID = in.readInt(); + boolean responding = in.readBoolean(); String message = in.readString(); - Throwable t; + if (TRACER.isEnabled()) + { + String msg = RemoteExceptionRequest.getFirstLine(message); + TRACER.format("Reading remote exception for signal {0}: {1}", correlationID, msg); + } try { @@ -37,9 +47,9 @@ class RemoteExceptionIndication extends Indication } catch (Throwable couldNotLoadExceptionClass) { - t = new SignalRemoteException(message); + t = new RemoteException(message, responding); } - getProtocol().stopSignal(correlationID, t); + getProtocol().handleRemoteException(correlationID, t, responding); } } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteExceptionRequest.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteExceptionRequest.java index 8dd6f083f0..9190517da4 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteExceptionRequest.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteExceptionRequest.java @@ -11,31 +11,67 @@ package org.eclipse.net4j.signal; import org.eclipse.net4j.util.io.ExtendedDataOutputStream; +import org.eclipse.net4j.util.om.trace.ContextTracer; + +import org.eclipse.internal.net4j.bundle.OM; /** * @author Eike Stepper */ class RemoteExceptionRequest extends Request { + private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, RemoteExceptionRequest.class); + private int correlationID; + private boolean responding; + private String message; private Throwable t; - public RemoteExceptionRequest(SignalProtocol<?> protocol, int correlationID, String message, Throwable t) + public RemoteExceptionRequest(SignalProtocol<?> protocol, int correlationID, boolean responding, String message, + Throwable t) { super(protocol, SignalProtocol.SIGNAL_REMOTE_EXCEPTION); this.correlationID = correlationID; this.message = message; this.t = t; + this.responding = responding; } @Override protected void requesting(ExtendedDataOutputStream out) throws Exception { + if (TRACER.isEnabled()) + { + String msg = getFirstLine(message); + TRACER.format("Writing remote exception for signal {0}: {1}", correlationID, msg); + } + out.writeInt(correlationID); + out.writeBoolean(responding); out.writeString(message); out.writeObject(t); } + + public static String getFirstLine(String message) + { + if (message == null) + { + return null; + } + + int nl = message.indexOf('\n'); + if (nl == -1) + { + nl = message.length(); + } + if (nl > 100) + { + nl = 100; + } + + return message.substring(0, nl); + } } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java index e79a162163..dcc3d3c75d 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java @@ -12,20 +12,20 @@ package org.eclipse.net4j.signal; import org.eclipse.net4j.buffer.BufferInputStream; import org.eclipse.net4j.buffer.BufferOutputStream; -import org.eclipse.net4j.util.ReflectUtil; import org.eclipse.net4j.util.io.ExtendedDataOutputStream; -import org.eclipse.net4j.util.om.trace.ContextTracer; - -import org.eclipse.internal.net4j.bundle.OM; - -import java.io.OutputStream; /** * @author Eike Stepper */ public abstract class Request extends SignalActor { - private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, Request.class); + /** + * @since 2.0 + */ + public Request(SignalProtocol<?> protocol, short id, String name) + { + super(protocol, id, name); + } /** * @since 2.0 @@ -35,18 +35,33 @@ public abstract class Request extends SignalActor super(protocol, signalID); } + /** + * @since 2.0 + */ + public Request(SignalProtocol<?> protocol, Enum<?> literal) + { + super(protocol, literal); + } + + /** + * @since 2.0 + */ + public void sendAsync() throws Exception + { + getProtocol().startSignal(this, getProtocol().getTimeout()); + + } + @Override protected void execute(BufferInputStream in, BufferOutputStream out) throws Exception { - if (TRACER.isEnabled()) - { - TRACER.trace("================ Requesting " + ReflectUtil.getSimpleClassName(this)); //$NON-NLS-1$ - } - - OutputStream wrappedOutputStream = wrapOutputStream(out); - requesting(ExtendedDataOutputStream.wrap(wrappedOutputStream)); - finishOutputStream(wrappedOutputStream); - out.flushWithEOS(); + doOutput(out); + } + + @Override + void doExtendedOutput(ExtendedDataOutputStream out) throws Exception + { + requesting(out); } protected abstract void requesting(ExtendedDataOutputStream out) throws Exception; diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithConfirmation.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithConfirmation.java index a5d47735ce..26d0798200 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithConfirmation.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithConfirmation.java @@ -12,20 +12,27 @@ package org.eclipse.net4j.signal; import org.eclipse.net4j.buffer.BufferInputStream; import org.eclipse.net4j.buffer.BufferOutputStream; -import org.eclipse.net4j.util.ReflectUtil; import org.eclipse.net4j.util.io.ExtendedDataInputStream; -import org.eclipse.net4j.util.om.trace.ContextTracer; +import org.eclipse.net4j.util.io.ExtendedDataOutputStream; -import org.eclipse.internal.net4j.bundle.OM; - -import java.io.InputStream; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; /** * @author Eike Stepper */ -public abstract class RequestWithConfirmation<RESULT> extends Request +public abstract class RequestWithConfirmation<RESULT> extends SignalActor { - private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, RequestWithConfirmation.class); + private RESULT result; + + /** + * @since 2.0 + */ + public RequestWithConfirmation(SignalProtocol<?> protocol, short id, String name) + { + super(protocol, id, name); + } /** * @since 2.0 @@ -35,56 +42,84 @@ public abstract class RequestWithConfirmation<RESULT> extends Request super(protocol, signalID); } - @Override - @SuppressWarnings("unchecked") - public RESULT send() throws Exception, SignalRemoteException + /** + * @since 2.0 + */ + public RequestWithConfirmation(SignalProtocol<?> protocol, Enum<?> literal) { - return (RESULT)super.send(); + super(protocol, literal); } - @Override - @SuppressWarnings("unchecked") - public RESULT send(long timeout) throws Exception, SignalRemoteException + /** + * @since 2.0 + */ + public Future<RESULT> sendAsync() { - return (RESULT)super.send(timeout); + ExecutorService executorService = getAsyncExecutorService(); + return executorService.submit(new Callable<RESULT>() + { + public RESULT call() throws Exception + { + return send(); + } + }); } - @Override - protected final void execute(BufferInputStream in, BufferOutputStream out) throws Exception + /** + * @since 2.0 + */ + public RESULT send() throws Exception, RemoteException { - super.execute(in, out); - if (TRACER.isEnabled()) - { - TRACER.trace("================ Confirming " + ReflectUtil.getSimpleClassName(this)); //$NON-NLS-1$ - } + return send(getProtocol().getTimeout()); + } + + /** + * @since 2.0 + */ + public RESULT send(long timeout) throws Exception, RemoteException + { + result = null; + getProtocol().startSignal(this, timeout); + return result; + } - InputStream wrappedInputStream = wrapInputStream(in); - RESULT result = confirming(ExtendedDataInputStream.wrap(wrappedInputStream)); - finishInputStream(wrappedInputStream); - setResult(result); + /** + * @since 2.0 + */ + protected ExecutorService getAsyncExecutorService() + { + return getProtocol().getExecutorService(); } + @Override + protected void execute(BufferInputStream in, BufferOutputStream out) throws Exception + { + doOutput(out); + doInput(in); + } + + protected abstract void requesting(ExtendedDataOutputStream out) throws Exception; + /** * <b>Important Note:</b> The confirmation must not be empty, i.e. the stream must be used at least to read a * <code>boolean</code>. Otherwise synchronization problems will result! - * - * @throws Exception - * TODO */ protected abstract RESULT confirming(ExtendedDataInputStream in) throws Exception; - void setRemoteException(Throwable t) + @Override + void doExtendedOutput(ExtendedDataOutputStream out) throws Exception { - SignalRemoteException remoteException; - if (t instanceof SignalRemoteException) - { - remoteException = (SignalRemoteException)t; - } - else - { - remoteException = new SignalRemoteException(t); - } + requesting(out); + } - getBufferInputStream().setException(remoteException); + @Override + void doExtendedInput(ExtendedDataInputStream in) throws Exception + { + result = confirming(in); + } + + void setRemoteException(Throwable t, boolean responding) + { + getBufferInputStream().setException(new RemoteException(t, responding)); } } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithMonitoring.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithMonitoring.java new file mode 100644 index 0000000000..deafa7bc06 --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithMonitoring.java @@ -0,0 +1,254 @@ +/*************************************************************************** + * Copyright (c) 2004 - 2008 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.net4j.signal; + +import org.eclipse.net4j.buffer.BufferInputStream; +import org.eclipse.net4j.buffer.BufferOutputStream; +import org.eclipse.net4j.util.ImplementationError; +import org.eclipse.net4j.util.concurrent.ConcurrencyUtil; +import org.eclipse.net4j.util.io.ExtendedDataInputStream; +import org.eclipse.net4j.util.io.ExtendedDataOutputStream; +import org.eclipse.net4j.util.om.monitor.IMonitor; +import org.eclipse.net4j.util.om.monitor.Monitor; + +import org.eclipse.internal.net4j.bundle.OM; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +/** + * @author Eike Stepper + * @since 2.0 + */ +public abstract class RequestWithMonitoring<RESULT> extends RequestWithConfirmation<RESULT> +{ + /** + * @since 2.0 + */ + public static final long DEFAULT_CANCELATION_POLL_INTERVAL = 100L; + + /** + * @since 2.0 + */ + public static final long DEFAULT_MONITOR_PROGRESS_INTERVAL = 2000; + + private IMonitor mainMonitor; + + private IMonitor remoteMonitor; + + /** + * @since 2.0 + */ + public RequestWithMonitoring(SignalProtocol<?> protocol, short id, String name) + { + super(protocol, id, name); + } + + /** + * @since 2.0 + */ + public RequestWithMonitoring(SignalProtocol<?> protocol, short signalID) + { + super(protocol, signalID); + } + + /** + * @since 2.0 + */ + public RequestWithMonitoring(SignalProtocol<?> protocol, Enum<?> literal) + { + super(protocol, literal); + } + + @Override + public Future<RESULT> sendAsync() + { + return sendAsync(null); + } + + @Override + public RESULT send() throws Exception, RemoteException + { + return send(null); + } + + @Override + public RESULT send(long timeout) throws Exception, RemoteException + { + return send(timeout, null); + } + + public Future<RESULT> sendAsync(IMonitor monitor) + { + initMainMonitor(monitor); + return super.sendAsync(); + } + + public RESULT send(IMonitor monitor) throws Exception, RemoteException + { + initMainMonitor(monitor); + return super.send(); + } + + public RESULT send(long timeout, IMonitor monitor) throws Exception, RemoteException + { + initMainMonitor(monitor); + return super.send(timeout); + } + + @Override + protected void execute(BufferInputStream in, BufferOutputStream out) throws Exception + { + try + { + super.execute(in, out); + } + finally + { + remoteMonitor.done(); + remoteMonitor = null; + + mainMonitor.done(); + mainMonitor = null; + } + } + + @Override + protected final void requesting(ExtendedDataOutputStream out) throws Exception + { + int remoteWork = 100 - getRequestingWorkPercent() - getConfirmingWorkPercent(); + if (remoteWork < 0) + { + throw new ImplementationError("Remote work must not be negative: " + remoteWork); + } + + mainMonitor.begin(100); + remoteMonitor = mainMonitor.fork(remoteWork); + + ExecutorService executorService = getCancelationExecutorService(); + if (executorService != null) + { + executorService.execute(new Runnable() + { + public void run() + { + while (mainMonitor != null) + { + ConcurrencyUtil.sleep(getCancelationPollInterval()); + if (mainMonitor != null && mainMonitor.isCanceled()) + { + try + { + new MonitorCanceledRequest(getProtocol(), getCorrelationID()).sendAsync(); + } + catch (Exception ex) + { + OM.LOG.error(ex); + } + + return; + } + } + } + }); + } + + out.writeLong(getMonitorProgressInterval()); + requesting(out, mainMonitor.fork(getRequestingWorkPercent())); + } + + @Override + protected final RESULT confirming(ExtendedDataInputStream in) throws Exception + { + return confirming(in, mainMonitor.fork(getConfirmingWorkPercent())); + } + + protected abstract void requesting(ExtendedDataOutputStream out, IMonitor monitor) throws Exception; + + /** + * <b>Important Note:</b> The confirmation must not be empty, i.e. the stream must be used at least to read a + * <code>boolean</code>. Otherwise synchronization problems will result! + */ + protected abstract RESULT confirming(ExtendedDataInputStream in, IMonitor monitor) throws Exception; + + /** + * @since 2.0 + */ + protected ExecutorService getCancelationExecutorService() + { + return getProtocol().getExecutorService(); + } + + /** + * @since 2.0 + */ + protected long getCancelationPollInterval() + { + return DEFAULT_CANCELATION_POLL_INTERVAL; + } + + /** + * @since 2.0 + */ + protected long getMonitorProgressInterval() + { + return DEFAULT_MONITOR_PROGRESS_INTERVAL; + } + + /** + * @since 2.0 + */ + protected int getRequestingWorkPercent() + { + return 25; + } + + /** + * @since 2.0 + */ + protected int getConfirmingWorkPercent() + { + return 25; + } + + void setMonitorProgress(int totalWork, int work) + { + getBufferInputStream().restartTimeout(); + if (remoteMonitor != null) + { + if (remoteMonitor.getTotalWork() == 0) + { + remoteMonitor.begin(totalWork); + remoteMonitor.worked(work); + } + else + { + float oldRatio = remoteMonitor.getWork(); + oldRatio /= remoteMonitor.getTotalWork(); + + float newRatio = work; + newRatio /= totalWork; + + float newWork = newRatio - oldRatio; + newWork *= remoteMonitor.getTotalWork(); + if (newWork >= 1.0) + { + remoteMonitor.worked((int)newWork); + } + } + } + } + + private void initMainMonitor(IMonitor monitor) + { + mainMonitor = monitor == null ? new Monitor() : monitor; + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java index 9567f6bfa3..be91752cca 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java @@ -12,6 +12,9 @@ package org.eclipse.net4j.signal; import org.eclipse.net4j.buffer.BufferInputStream; import org.eclipse.net4j.buffer.BufferOutputStream; +import org.eclipse.net4j.util.ReflectUtil; +import org.eclipse.net4j.util.io.ExtendedDataInputStream; +import org.eclipse.net4j.util.io.ExtendedDataOutputStream; import org.eclipse.net4j.util.io.IORuntimeException; import org.eclipse.net4j.util.om.trace.ContextTracer; @@ -21,6 +24,7 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.text.MessageFormat; import java.util.concurrent.TimeoutException; /** @@ -28,11 +32,18 @@ import java.util.concurrent.TimeoutException; */ public abstract class Signal implements Runnable { + /** + * @since 2.0 + */ + public static final long NO_TIMEOUT = BufferInputStream.NO_TIMEOUT; + private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, Signal.class); private SignalProtocol<?> protocol; - private short signalID; + private short id; + + private String name; private int correlationID; @@ -48,10 +59,29 @@ public abstract class Signal implements Runnable * * @since 2.0 */ - protected Signal(SignalProtocol<?> protocol, short signalID) + public Signal(SignalProtocol<?> protocol, short id, String name) { this.protocol = protocol; - this.signalID = signalID; + this.id = id; + this.name = name; + } + + /** + * @since 2.0 + * @see #Signal(SignalProtocol, short, String) + */ + public Signal(SignalProtocol<?> protocol, short id) + { + this(protocol, id, null); + } + + /** + * @since 2.0 + * @see #Signal(SignalProtocol, short, String) + */ + public Signal(SignalProtocol<?> protocol, Enum<?> literal) + { + this(protocol, (short)literal.ordinal(), literal.name()); } public SignalProtocol<?> getProtocol() @@ -60,20 +90,71 @@ public abstract class Signal implements Runnable } /** - * Returns the short integer ID of this signal. + * Returns the short integer ID of this signal that is unique among all signals of the associated + * {@link #getProtocol() protocol}. * * @since 2.0 */ - public final short getSignalID() + public final short getID() { - return signalID; + return id; } - protected final int getCorrelationID() + /** + * @since 2.0 + */ + public String getName() + { + if (name == null) + { + // Needs no synchronization because any thread would set the same value. + name = ReflectUtil.getSimpleClassName(this); + } + + return name; + } + + /** + * @since 2.0 + */ + public final int getCorrelationID() { return correlationID; } + @Override + public String toString() + { + return MessageFormat.format("Signal[protocol={0}, id={1}, name={2}, correlation={3}]", getProtocol().getType(), + getID(), getName(), getCorrelationID()); + } + + public final void run() + { + String threadName = null; + try + { + if (OM.SET_SIGNAL_THREAD_NAME) + { + threadName = getClass().getSimpleName(); + Thread.currentThread().setName(threadName); + } + + runSync(); + } + catch (Exception ex) + { + OM.LOG.error(ex); + } + finally + { + if (threadName != null) + { + Thread.currentThread().setName(threadName + "(FINISHED)"); + } + } + } + protected final BufferInputStream getBufferInputStream() { return bufferInputStream; @@ -173,33 +254,9 @@ public abstract class Signal implements Runnable } } - public final void run() - { - String threadName = null; - try - { - if (OM.SET_SIGNAL_THREAD_NAME) - { - threadName = getClass().getSimpleName(); - Thread.currentThread().setName(threadName); - } - - runSync(); - } - catch (Exception ex) - { - OM.LOG.error(ex); - } - finally - { - if (threadName != null) - { - Thread.currentThread().setName(threadName + "(FINISHED)"); - } - } - } + protected abstract void execute(BufferInputStream in, BufferOutputStream out) throws Exception; - protected void runSync() throws Exception + void runSync() throws Exception { try { @@ -224,8 +281,6 @@ public abstract class Signal implements Runnable } } - protected abstract void execute(BufferInputStream in, BufferOutputStream out) throws Exception; - void setCorrelationID(int correlationID) { this.correlationID = correlationID; @@ -240,4 +295,78 @@ public abstract class Signal implements Runnable { bufferOutputStream = outputStream; } + + void doOutput(BufferOutputStream out) throws Exception + { + if (TRACER.isEnabled()) + { + TRACER.format("================ {0}: {1}", getOutputMeaning(), this); //$NON-NLS-1$ + } + + OutputStream wrappedOutputStream = wrapOutputStream(out); + ExtendedDataOutputStream extended = ExtendedDataOutputStream.wrap(wrappedOutputStream); + + try + { + doExtendedOutput(extended); + } + catch (Error ex) + { + OM.LOG.error(ex); + throw ex; + } + catch (Exception ex) + { + OM.LOG.error(ex); + throw ex; + } + finally + { + finishOutputStream(wrappedOutputStream); + } + + out.flushWithEOS(); + } + + void doInput(BufferInputStream in) throws Exception + { + if (TRACER.isEnabled()) + { + TRACER.format("================ {0}: {1}", getInputMeaning(), this); //$NON-NLS-1$ + } + + InputStream wrappedInputStream = wrapInputStream(in); + ExtendedDataInputStream extended = ExtendedDataInputStream.wrap(wrappedInputStream); + + try + { + doExtendedInput(extended); + } + catch (Error ex) + { + OM.LOG.error(ex); + throw ex; + } + catch (Exception ex) + { + OM.LOG.error(ex); + throw ex; + } + finally + { + finishInputStream(wrappedInputStream); + } + } + + void doExtendedOutput(ExtendedDataOutputStream out) throws Exception + { + } + + void doExtendedInput(ExtendedDataInputStream in) throws Exception + { + } + + abstract String getOutputMeaning(); + + abstract String getInputMeaning(); } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java index 7e30b589ae..a46333efd9 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java @@ -10,57 +10,47 @@ **************************************************************************/ package org.eclipse.net4j.signal; -import org.eclipse.net4j.buffer.BufferInputStream; -import org.eclipse.net4j.util.ReflectUtil; - -import java.text.MessageFormat; - /** * @author Eike Stepper */ public abstract class SignalActor extends Signal { - public static final long NO_TIMEOUT = BufferInputStream.NO_TIMEOUT; - - private boolean terminated; - - private Object result; - /** * @since 2.0 */ - public SignalActor(SignalProtocol<?> protocol, short signalID) + public SignalActor(SignalProtocol<?> protocol, short id, String name) { - super(protocol, signalID); + super(protocol, id, name); setCorrelationID(protocol.getNextCorrelationID()); } - public Object send() throws Exception, SignalRemoteException + /** + * @since 2.0 + */ + public SignalActor(SignalProtocol<?> protocol, short id) { - return send(NO_TIMEOUT); + super(protocol, id); + setCorrelationID(protocol.getNextCorrelationID()); } - public Object send(long timeout) throws Exception, SignalRemoteException + /** + * @since 2.0 + */ + public SignalActor(SignalProtocol<?> protocol, Enum<?> literal) { - if (terminated) - { - throw new IllegalStateException("Terminated"); //$NON-NLS-1$ - } - - getProtocol().startSignal(this, timeout); - terminated = true; - return result; + super(protocol, literal); + setCorrelationID(protocol.getNextCorrelationID()); } @Override - public String toString() + String getInputMeaning() { - return MessageFormat.format("{0}[{1}, {2}, correlation={3} {4}]", ReflectUtil.getSimpleName(getClass()), - getSignalID(), getProtocol(), getCorrelationID(), terminated ? "SENT" : "UNSENT"); + return "Confirming"; } - protected void setResult(Object result) + @Override + String getOutputMeaning() { - this.result = result; + return "Requesting"; } } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java index 5c2fb306f4..a19c91d30f 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java @@ -16,7 +16,8 @@ import org.eclipse.net4j.buffer.IBufferProvider; import org.eclipse.net4j.channel.ChannelOutputStream; import org.eclipse.net4j.channel.IChannel; import org.eclipse.net4j.connector.IConnector; -import org.eclipse.net4j.protocol.Protocol; +import org.eclipse.net4j.signal.failover.IFailOverStrategy; +import org.eclipse.net4j.util.io.IORuntimeException; import org.eclipse.net4j.util.io.IStreamWrapper; import org.eclipse.net4j.util.io.StreamWrapperChain; import org.eclipse.net4j.util.om.trace.ContextTracer; @@ -24,27 +25,37 @@ import org.eclipse.net4j.util.om.trace.ContextTracer; import org.eclipse.internal.net4j.bundle.OM; import org.eclipse.spi.net4j.InternalConnector; +import org.eclipse.spi.net4j.Protocol; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.text.MessageFormat; +import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; /** * @author Eike Stepper */ -public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> +public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> implements + ISignalProtocol<INFRA_STRUCTURE> { - public static final long NO_TIMEOUT = BufferInputStream.NO_TIMEOUT; - /** * @since 2.0 */ public static final short SIGNAL_REMOTE_EXCEPTION = -1; + /** + * @since 2.0 + */ + public static final short SIGNAL_MONITOR_CANCELED = -2; + + /** + * @since 2.0 + */ + public static final short SIGNAL_MONITOR_PROGRESS = -3; + private static final int MIN_CORRELATION_ID = 1; private static final int MAX_CORRELATION_ID = Integer.MAX_VALUE; @@ -53,9 +64,13 @@ public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STR private static final ContextTracer STREAM_TRACER = new ContextTracer(OM.DEBUG_BUFFER_STREAM, SignalProtocol.class); + private long timeout = NO_TIMEOUT; + private IStreamWrapper streamWrapper; - private Map<Integer, Signal> signals = new ConcurrentHashMap<Integer, Signal>(0); + private IFailOverStrategy failOverStrategy; + + private Map<Integer, Signal> signals = new HashMap<Integer, Signal>(); private int nextCorrelationID = MIN_CORRELATION_ID; @@ -66,6 +81,22 @@ public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STR /** * @since 2.0 */ + public long getTimeout() + { + return timeout; + } + + /** + * @since 2.0 + */ + public void setTimeout(long timeout) + { + this.timeout = timeout; + } + + /** + * @since 2.0 + */ public IChannel open(IConnector connector) { InternalConnector conn = (InternalConnector)connector; @@ -107,6 +138,22 @@ public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STR } } + /** + * @since 2.0 + */ + public IFailOverStrategy getFailOverStrategy() + { + return failOverStrategy; + } + + /** + * @since 2.0 + */ + public void setFailOverStrategy(IFailOverStrategy failOverStrategy) + { + this.failOverStrategy = failOverStrategy; + } + public boolean waitForSignals(long timeout) { synchronized (signals) @@ -189,7 +236,7 @@ public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STR signal = provideSignalReactor(signalID); signal.setCorrelationID(-correlationID); - signal.setBufferInputStream(new SignalInputStream(getInputStreamTimeout())); + signal.setBufferInputStream(new SignalInputStream(getTimeout())); signal.setBufferOutputStream(new SignalOutputStream(-correlationID, signalID, false)); signals.put(-correlationID, signal); getExecutorService().execute(signal); @@ -218,11 +265,6 @@ public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STR } } - public long getInputStreamTimeout() - { - return NO_TIMEOUT; - } - @Override public String toString() { @@ -245,18 +287,26 @@ public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STR protected final SignalReactor provideSignalReactor(short signalID) { checkActive(); - if (signalID == SIGNAL_REMOTE_EXCEPTION) + switch (signalID) { + case SIGNAL_REMOTE_EXCEPTION: return new RemoteExceptionIndication(this); - } - SignalReactor signal = createSignalReactor(signalID); - if (signal == null) - { - throw new IllegalArgumentException("Invalid signalID " + signalID); - } + case SIGNAL_MONITOR_CANCELED: + return new MonitorCanceledIndication(this); - return signal; + case SIGNAL_MONITOR_PROGRESS: + return new MonitorProgressIndication(this); + + default: + SignalReactor signal = createSignalReactor(signalID); + if (signal == null) + { + throw new IllegalArgumentException("Invalid signalID " + signalID); + } + + return signal; + } } /** @@ -292,10 +342,10 @@ public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STR throw new IllegalArgumentException("signalActor.getProtocol() != this"); //$NON-NLS-1$ } - short signalID = signalActor.getSignalID(); + short signalID = signalActor.getID(); int correlationID = signalActor.getCorrelationID(); - signalActor.setBufferInputStream(new SignalInputStream(timeout)); signalActor.setBufferOutputStream(new SignalOutputStream(correlationID, signalID, true)); + signalActor.setBufferInputStream(new SignalInputStream(timeout)); synchronized (signals) { signals.put(correlationID, signalActor); @@ -314,7 +364,7 @@ public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STR } } - void stopSignal(int correlationID, Throwable t) + void handleRemoteException(int correlationID, Throwable t, boolean responding) { synchronized (signals) { @@ -322,13 +372,39 @@ public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STR if (signal instanceof RequestWithConfirmation) { RequestWithConfirmation<?> request = (RequestWithConfirmation<?>)signal; - request.setRemoteException(t); + request.setRemoteException(t, responding); } signals.notifyAll(); } } + void handleMonitorProgress(int correlationID, int totalWork, int work) + { + synchronized (signals) + { + Signal signal = signals.get(correlationID); + if (signal instanceof RequestWithMonitoring) + { + RequestWithMonitoring<?> request = (RequestWithMonitoring<?>)signal; + request.setMonitorProgress(totalWork, work); + } + } + } + + void handleMonitorCanceled(int correlationID) + { + synchronized (signals) + { + Signal signal = signals.get(correlationID); + if (signal instanceof IndicationWithMonitoring) + { + IndicationWithMonitoring indication = (IndicationWithMonitoring)signal; + indication.setMonitorCanceled(); + } + } + } + /** * @author Eike Stepper */ @@ -368,8 +444,14 @@ public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STR public IBuffer provideBuffer() { + IChannel channel = getChannel(); + if (channel == null) + { + throw new IORuntimeException("No channel for protocol " + SignalProtocol.this); + } + IBuffer buffer = delegate.provideBuffer(); - ByteBuffer byteBuffer = buffer.startPutting(getChannel().getIndex()); + ByteBuffer byteBuffer = buffer.startPutting(channel.getID()); if (STREAM_TRACER.isEnabled()) { STREAM_TRACER.trace("Providing buffer for correlation " + correlationID); //$NON-NLS-1$ diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalReactor.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalReactor.java index 962af81478..ba62a6eb85 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalReactor.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalReactor.java @@ -10,8 +10,6 @@ **************************************************************************/ package org.eclipse.net4j.signal; -import org.eclipse.net4j.util.ReflectUtil; - /** * @author Eike Stepper */ @@ -20,15 +18,36 @@ public abstract class SignalReactor extends Signal /** * @since 2.0 */ - protected SignalReactor(SignalProtocol<?> protocol, short signalID) + public SignalReactor(SignalProtocol<?> protocol, short id, String name) + { + super(protocol, id, name); + } + + /** + * @since 2.0 + */ + public SignalReactor(SignalProtocol<?> protocol, short signalID) { super(protocol, signalID); } + /** + * @since 2.0 + */ + public SignalReactor(SignalProtocol<?> protocol, Enum<?> literal) + { + super(protocol, literal); + } + + @Override + String getInputMeaning() + { + return "Indicating"; + } + @Override - public String toString() + String getOutputMeaning() { - return ReflectUtil.getSimpleName(getClass()) + "[" + getSignalID() + ", " + getProtocol() + ", correlation=" //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ - + getCorrelationID() + "]"; //$NON-NLS-1$ + return "Responding"; } } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/AbstractFailOverStrategy.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/AbstractFailOverStrategy.java index ff782e1d45..617202a41c 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/AbstractFailOverStrategy.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/AbstractFailOverStrategy.java @@ -10,9 +10,8 @@ **************************************************************************/ package org.eclipse.net4j.signal.failover; +import org.eclipse.net4j.signal.RemoteException; import org.eclipse.net4j.signal.RequestWithConfirmation; -import org.eclipse.net4j.signal.SignalActor; -import org.eclipse.net4j.signal.SignalRemoteException; import org.eclipse.net4j.util.event.Notifier; /** @@ -21,7 +20,7 @@ import org.eclipse.net4j.util.event.Notifier; */ public abstract class AbstractFailOverStrategy extends Notifier implements IFailOverStrategy { - private long defaultTimeout = SignalActor.NO_TIMEOUT; + private long defaultTimeout = RequestWithConfirmation.NO_TIMEOUT; public AbstractFailOverStrategy() { @@ -37,7 +36,7 @@ public abstract class AbstractFailOverStrategy extends Notifier implements IFail this.defaultTimeout = defaultTimeout; } - public <RESULT> RESULT send(RequestWithConfirmation<RESULT> request) throws Exception, SignalRemoteException + public <RESULT> RESULT send(RequestWithConfirmation<RESULT> request) throws Exception, RemoteException { return send(request, defaultTimeout); } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/FailOverStrategy.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/FailOverStrategy.java index 242824fcc6..32b12ec620 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/FailOverStrategy.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/FailOverStrategy.java @@ -12,9 +12,9 @@ package org.eclipse.net4j.signal.failover; import org.eclipse.net4j.channel.IChannel; import org.eclipse.net4j.connector.IConnector; +import org.eclipse.net4j.signal.RemoteException; import org.eclipse.net4j.signal.RequestWithConfirmation; import org.eclipse.net4j.signal.SignalProtocol; -import org.eclipse.net4j.signal.SignalRemoteException; import org.eclipse.net4j.util.CheckUtil; import java.util.concurrent.TimeoutException; @@ -28,8 +28,7 @@ public abstract class FailOverStrategy extends AbstractFailOverStrategy { } - public <RESULT> RESULT send(RequestWithConfirmation<RESULT> request, long timeout) throws Exception, - SignalRemoteException + public <RESULT> RESULT send(RequestWithConfirmation<RESULT> request, long timeout) throws Exception, RemoteException { for (;;) { diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/IFailOverStrategy.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/IFailOverStrategy.java index 245e57663a..62fc021bc6 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/IFailOverStrategy.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/IFailOverStrategy.java @@ -10,8 +10,8 @@ **************************************************************************/ package org.eclipse.net4j.signal.failover; +import org.eclipse.net4j.signal.RemoteException; import org.eclipse.net4j.signal.RequestWithConfirmation; -import org.eclipse.net4j.signal.SignalRemoteException; import org.eclipse.net4j.util.event.INotifier; /** @@ -29,8 +29,7 @@ public interface IFailOverStrategy extends INotifier */ public void setDefaultTimeout(long defaultTimeout); - public <RESULT> RESULT send(RequestWithConfirmation<RESULT> request) throws Exception, SignalRemoteException; + public <RESULT> RESULT send(RequestWithConfirmation<RESULT> request) throws Exception, RemoteException; - public <RESULT> RESULT send(RequestWithConfirmation<RESULT> request, long timeout) throws Exception, - SignalRemoteException; + public <RESULT> RESULT send(RequestWithConfirmation<RESULT> request, long timeout) throws Exception, RemoteException; } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/NOOPFailOverStrategy.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/NOOPFailOverStrategy.java index 689675c1aa..79b18c5cc5 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/NOOPFailOverStrategy.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/NOOPFailOverStrategy.java @@ -10,8 +10,8 @@ **************************************************************************/ package org.eclipse.net4j.signal.failover; +import org.eclipse.net4j.signal.RemoteException; import org.eclipse.net4j.signal.RequestWithConfirmation; -import org.eclipse.net4j.signal.SignalRemoteException; import org.eclipse.net4j.util.event.IListener; /** @@ -23,8 +23,7 @@ public class NOOPFailOverStrategy extends AbstractFailOverStrategy { } - public <RESULT> RESULT send(RequestWithConfirmation<RESULT> request, long timeout) throws Exception, - SignalRemoteException + public <RESULT> RESULT send(RequestWithConfirmation<RESULT> request, long timeout) throws Exception, RemoteException { return request.send(timeout); } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/acceptor/Acceptor.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Acceptor.java index 6d89a692b6..b63f3f2a10 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/acceptor/Acceptor.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Acceptor.java @@ -8,7 +8,7 @@ * Contributors: * Eike Stepper - initial API and implementation **************************************************************************/ -package org.eclipse.internal.net4j.acceptor; +package org.eclipse.spi.net4j; import org.eclipse.net4j.ITransportConfig; import org.eclipse.net4j.connector.IConnector; @@ -23,14 +23,12 @@ import org.eclipse.net4j.util.security.INegotiator; import org.eclipse.internal.net4j.TransportConfig; import org.eclipse.internal.net4j.bundle.OM; -import org.eclipse.spi.net4j.InternalAcceptor; -import org.eclipse.spi.net4j.InternalConnector; - import java.util.HashSet; import java.util.Set; /** * @author Eike Stepper + * @since 2.0 */ public abstract class Acceptor extends Container<IConnector> implements InternalAcceptor { diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/channel/Channel.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java index 23cd94753b..4bb4f65100 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/internal/net4j/channel/Channel.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Channel.java @@ -8,25 +8,20 @@ * Contributors: * Eike Stepper - initial API and implementation **************************************************************************/ -package org.eclipse.internal.net4j.channel; +package org.eclipse.spi.net4j; import org.eclipse.net4j.buffer.BufferState; import org.eclipse.net4j.buffer.IBuffer; import org.eclipse.net4j.buffer.IBufferHandler; import org.eclipse.net4j.channel.IChannelMultiplexer; -import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump; import org.eclipse.net4j.util.concurrent.IWorkSerializer; import org.eclipse.net4j.util.concurrent.QueueWorkerWorkSerializer; import org.eclipse.net4j.util.concurrent.SynchronousWorkSerializer; import org.eclipse.net4j.util.lifecycle.Lifecycle; -import org.eclipse.net4j.util.lifecycle.LifecycleUtil; import org.eclipse.net4j.util.om.trace.ContextTracer; import org.eclipse.internal.net4j.bundle.OM; -import org.eclipse.spi.net4j.InternalChannel; -import org.eclipse.spi.net4j.InternalChannelMultiplexer; - import java.text.MessageFormat; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @@ -34,6 +29,7 @@ import java.util.concurrent.ExecutorService; /** * @author Eike Stepper + * @since 2.0 */ public class Channel extends Lifecycle implements InternalChannel { @@ -43,7 +39,7 @@ public class Channel extends Lifecycle implements InternalChannel private InternalChannelMultiplexer channelMultiplexer; - private short channelIndex = IBuffer.NO_CHANNEL; + private short id = IBuffer.NO_CHANNEL; private ExecutorService receiveExecutor; @@ -56,9 +52,6 @@ public class Channel extends Lifecycle implements InternalChannel private transient Queue<IBuffer> sendQueue; - @ExcludeFromDump - private transient boolean inverseClosed; - public Channel() { } @@ -98,19 +91,15 @@ public class Channel extends Lifecycle implements InternalChannel this.channelMultiplexer = (InternalChannelMultiplexer)channelMultiplexer; } - public short getIndex() + public short getID() { - return channelIndex; + return id; } - public void setChannelIndex(short channelIndex) + public void setID(short id) { - if (channelIndex == IBuffer.NO_CHANNEL) - { - throw new IllegalArgumentException("channelIndex == INVALID_CHANNEL_ID"); //$NON-NLS-1$ - } - - this.channelIndex = channelIndex; + checkArg(id != IBuffer.NO_CHANNEL, "id == IBuffer.NO_CHANNEL"); //$NON-NLS-1$ + this.id = id; } public ExecutorService getReceiveExecutor() @@ -154,7 +143,7 @@ public class Channel extends Lifecycle implements InternalChannel if (TRACER.isEnabled()) { - TRACER.format("Handling buffer from client: {0} --> {1}", buffer, this); //$NON-NLS-1$ + TRACER.format("Handling buffer: {0} --> {1}", buffer, this); //$NON-NLS-1$ } if (sendQueue == null) @@ -214,14 +203,14 @@ public class Channel extends Lifecycle implements InternalChannel @Override public String toString() { - return MessageFormat.format("Channel[{0}, {1}]", channelIndex, getLocation()); //$NON-NLS-1$ + return MessageFormat.format("Channel[{0}, {1}]", id, getLocation()); //$NON-NLS-1$ } @Override protected void doBeforeActivate() throws Exception { super.doBeforeActivate(); - checkState(channelIndex != IBuffer.NO_CHANNEL, "channelIndex == NO_CHANNEL"); //$NON-NLS-1$ + checkState(id != IBuffer.NO_CHANNEL, "channelID == NO_CHANNEL"); //$NON-NLS-1$ checkState(channelMultiplexer, "channelMultiplexer"); //$NON-NLS-1$ } @@ -257,24 +246,7 @@ public class Channel extends Lifecycle implements InternalChannel @Override protected void doDeactivate() throws Exception { - if (!inverseClosed) - { - channelMultiplexer.closeChannel(this); - } - - super.doDeactivate(); - } - - public void finishDeactivate(boolean inverse) - { - inverseClosed = inverse; - if (inverse) - { - LifecycleUtil.deactivate(receiveHandler); - deactivate(); - } - - receiveHandler = null; + channelMultiplexer.closeChannel(this); if (receiveSerializer != null) { receiveSerializer.dispose(); @@ -286,6 +258,8 @@ public class Channel extends Lifecycle implements InternalChannel sendQueue.clear(); sendQueue = null; } + + super.doDeactivate(); } public void close() diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ChannelMultiplexer.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ChannelMultiplexer.java new file mode 100644 index 0000000000..5a85aeeee5 --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ChannelMultiplexer.java @@ -0,0 +1,360 @@ +/*************************************************************************** + * Copyright (c) 2004 - 2008 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.spi.net4j; + +import org.eclipse.net4j.ITransportConfig; +import org.eclipse.net4j.buffer.IBuffer; +import org.eclipse.net4j.channel.ChannelException; +import org.eclipse.net4j.channel.IChannel; +import org.eclipse.net4j.channel.IChannelMultiplexer; +import org.eclipse.net4j.protocol.IProtocol; +import org.eclipse.net4j.protocol.IProtocolProvider; +import org.eclipse.net4j.util.StringUtil; +import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump; +import org.eclipse.net4j.util.concurrent.TimeoutRuntimeException; +import org.eclipse.net4j.util.container.Container; +import org.eclipse.net4j.util.factory.FactoryKey; +import org.eclipse.net4j.util.factory.IFactoryKey; +import org.eclipse.net4j.util.lifecycle.LifecycleUtil; +import org.eclipse.net4j.util.om.trace.ContextTracer; +import org.eclipse.net4j.util.security.INegotiationContext; + +import org.eclipse.internal.net4j.TransportConfig; +import org.eclipse.internal.net4j.bundle.OM; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * @author Eike Stepper + * @since 2.0 + */ +public abstract class ChannelMultiplexer extends Container<IChannel> implements InternalChannelMultiplexer +{ + private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CONNECTOR, ChannelMultiplexer.class); + + private ITransportConfig config; + + private long channelTimeout = IChannelMultiplexer.DEFAULT_CHANNEL_TIMEOUT; + + @ExcludeFromDump + private transient ConcurrentMap<Short, IChannel> channels = new ConcurrentHashMap<Short, IChannel>(); + + @ExcludeFromDump + private transient Set<Short> channelIDs = new HashSet<Short>(); + + @ExcludeFromDump + private transient int lastChannelID; + + public ChannelMultiplexer() + { + } + + public synchronized ITransportConfig getConfig() + { + if (config == null) + { + config = new TransportConfig(); + } + + return config; + } + + public synchronized void setConfig(ITransportConfig config) + { + checkInactive(); + this.config = config; + } + + public long getChannelTimeout() + { + if (channelTimeout == IChannelMultiplexer.DEFAULT_CHANNEL_TIMEOUT) + { + return OM.BUNDLE.getDebugSupport().getDebugOption("channel.timeout", 10000); + } + + return channelTimeout; + } + + public void setChannelTimeout(long channelTimeout) + { + this.channelTimeout = channelTimeout; + } + + public final InternalChannel getChannel(short channelID) + { + return (InternalChannel)channels.get(channelID); + } + + public final Collection<IChannel> getChannels() + { + return channels.values(); + } + + @Override + public boolean isEmpty() + { + return channels.isEmpty(); + } + + public IChannel[] getElements() + { + List<IChannel> list = new ArrayList<IChannel>(getChannels()); + return list.toArray(new IChannel[list.size()]); + } + + public InternalChannel openChannel() throws ChannelException + { + return openChannel((IProtocol<?>)null); + } + + public InternalChannel openChannel(String protocolID, Object infraStructure) throws ChannelException + { + IProtocol<?> protocol = createProtocol(protocolID, infraStructure); + if (protocol == null) + { + throw new IllegalArgumentException("Unknown protocolID: " + protocolID); + } + + return openChannel(protocol); + } + + public InternalChannel openChannel(IProtocol<?> protocol) throws ChannelException + { + InternalChannel channel = createChannel(); + initChannel(channel, protocol); + channel.setID(getNextChannelID()); + addChannel(channel); + + try + { + try + { + registerChannelWithPeer(channel.getID(), getChannelTimeout(), protocol); + } + catch (TimeoutRuntimeException ex) + { + // Adjust the message for the complete timeout time + throw new TimeoutRuntimeException("Registration timeout after " + getChannelTimeout() + " milliseconds"); + } + } + catch (ChannelException ex) + { + throw ex; + } + catch (Exception ex) + { + throw new ChannelException(ex); + } + + return channel; + } + + public InternalChannel inverseOpenChannel(short channelID, String protocolID) + { + IProtocol<?> protocol = createProtocol(protocolID, null); + + InternalChannel channel = createChannel(); + initChannel(channel, protocol); + channel.setID(channelID); + addChannel(channel); + return channel; + } + + public void closeChannel(InternalChannel channel) throws ChannelException + { + InternalChannel internalChannel = channel; + deregisterChannelFromPeer(internalChannel, getChannelTimeout()); + removeChannel(internalChannel); + } + + public void inverseCloseChannel(short channelID) throws ChannelException + { + InternalChannel channel = getChannel(channelID); + LifecycleUtil.deactivate(channel); + } + + protected InternalChannel createChannel() + { + return new Channel(); + } + + protected void initChannel(InternalChannel channel, IProtocol<?> protocol) + { + channel.setMultiplexer(this); + channel.setReceiveExecutor(getConfig().getReceiveExecutor()); + // channel.setUserID(getUserID()); + if (protocol != null) + { + protocol.setChannel(channel); + LifecycleUtil.activate(protocol); + if (TRACER.isEnabled()) + { + String protocolType = protocol == null ? null : protocol.getType(); + TRACER.format("Opening channel with protocol {0}", protocolType); + } + + channel.setReceiveHandler(protocol); + } + else + { + if (TRACER.isEnabled()) + { + TRACER.trace("Opening channel without protocol"); + } + } + } + + @SuppressWarnings("unchecked") + protected <INFRA_STRUCTURE> IProtocol<INFRA_STRUCTURE> createProtocol(String type, INFRA_STRUCTURE infraStructure) + { + if (StringUtil.isEmpty(type)) + { + return null; + } + + IProtocolProvider protocolProvider = getConfig().getProtocolProvider(); + if (protocolProvider == null) + { + throw new ChannelException("No protocol provider configured"); + } + + IProtocol<INFRA_STRUCTURE> protocol = (IProtocol<INFRA_STRUCTURE>)protocolProvider.getProtocol(type); + if (protocol == null) + { + throw new ChannelException("Invalid protocol factory: " + type); + } + + protocol.setBufferProvider(getConfig().getBufferProvider()); + protocol.setExecutorService(getConfig().getReceiveExecutor()); + if (infraStructure != null) + { + protocol.setInfraStructure(infraStructure); + } + + return protocol; + } + + protected IFactoryKey createProtocolFactoryKey(String type) + { + switch (getLocation()) + { + case SERVER: + return new FactoryKey(ServerProtocolFactory.PRODUCT_GROUP, type); + case CLIENT: + return new FactoryKey(ClientProtocolFactory.PRODUCT_GROUP, type); + default: + throw new IllegalStateException(); + } + } + + @Override + protected boolean isDeferredActivation() + { + return true; + } + + @Override + protected void doDeactivate() throws Exception + { + synchronized (channelIDs) + { + for (IChannel channel : getChannels()) + { + LifecycleUtil.deactivate(channel); + } + + channels.clear(); + } + + super.doDeactivate(); + } + + protected abstract INegotiationContext createNegotiationContext(); + + protected abstract void registerChannelWithPeer(short channelID, long timeout, IProtocol<?> protocol) + throws ChannelException; + + protected abstract void deregisterChannelFromPeer(InternalChannel channel, long timeout) throws ChannelException; + + private short getNextChannelID() + { + synchronized (channelIDs) + { + int start = lastChannelID; + int maxValue = Short.MAX_VALUE; + for (;;) + { + ++lastChannelID; + if (lastChannelID == start) + { + throw new ChannelException("Too many channels"); + } + + if (lastChannelID > maxValue) + { + lastChannelID = 1; + } + + short id = (short)(isClient() ? lastChannelID : -lastChannelID); + if (channelIDs.add(id)) + { + return id; + } + } + } + } + + private void addChannel(InternalChannel channel) + { + short channelID = channel.getID(); + if (channelID == IBuffer.CONTROL_CHANNEL || channelID == IBuffer.NO_CHANNEL) + { + throw new ChannelException("Invalid channel ID: " + channelID); + } + + channels.put(channelID, channel); + LifecycleUtil.activate(channel); + fireElementAddedEvent(channel); + } + + private void removeChannel(InternalChannel channel) + { + try + { + short channelID = channel.getID(); + boolean removed; + synchronized (channelIDs) + { + removed = channels.remove(channelID) != null; + if (removed) + { + channelIDs.remove(channelID); + } + } + + if (removed) + { + fireElementRemovedEvent(channel); + } + } + catch (RuntimeException ex) + { + OM.LOG.error(ex); + throw ex; + } + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/ClientProtocolFactory.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ClientProtocolFactory.java index 710578e224..aa926d8909 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/ClientProtocolFactory.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ClientProtocolFactory.java @@ -8,12 +8,13 @@ * Contributors: * Eike Stepper - initial API and implementation **************************************************************************/ -package org.eclipse.net4j.protocol; +package org.eclipse.spi.net4j; import org.eclipse.net4j.util.factory.Factory; /** * @author Eike Stepper + * @since 2.0 */ public abstract class ClientProtocolFactory extends Factory { diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Connector.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Connector.java new file mode 100644 index 0000000000..acb145e4aa --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Connector.java @@ -0,0 +1,402 @@ +/*************************************************************************** + * Copyright (c) 2004 - 2008 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.spi.net4j; + +import org.eclipse.net4j.ITransportConfig; +import org.eclipse.net4j.buffer.IBuffer; +import org.eclipse.net4j.connector.ConnectorException; +import org.eclipse.net4j.connector.ConnectorState; +import org.eclipse.net4j.connector.IConnector; +import org.eclipse.net4j.connector.IConnectorStateEvent; +import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump; +import org.eclipse.net4j.util.event.Event; +import org.eclipse.net4j.util.event.INotifier; +import org.eclipse.net4j.util.om.trace.ContextTracer; +import org.eclipse.net4j.util.security.INegotiationContext; +import org.eclipse.net4j.util.security.INegotiator; +import org.eclipse.net4j.util.security.NegotiationException; + +import org.eclipse.internal.net4j.TransportConfig; +import org.eclipse.internal.net4j.bundle.OM; + +import java.text.MessageFormat; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * @author Eike Stepper + * @since 2.0 + */ +public abstract class Connector extends ChannelMultiplexer implements InternalConnector +{ + private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_CONNECTOR, Connector.class); + + private String userID; + + private ITransportConfig config; + + private transient ConnectorState connectorState = ConnectorState.DISCONNECTED; + + @ExcludeFromDump + private transient CountDownLatch finishedConnecting; + + @ExcludeFromDump + private transient CountDownLatch finishedNegotiating; + + @ExcludeFromDump + private transient INegotiationContext negotiationContext; + + @ExcludeFromDump + private transient NegotiationException negotiationException; + + public Connector() + { + } + + @Override + public synchronized ITransportConfig getConfig() + { + if (config == null) + { + config = new TransportConfig(); + } + + return config; + } + + @Override + public synchronized void setConfig(ITransportConfig config) + { + checkInactive(); + this.config = config; + } + + public INegotiator getNegotiator() + { + return getConfig().getNegotiator(); + } + + public void setNegotiator(INegotiator negotiator) + { + getConfig().setNegotiator(negotiator); + } + + public INegotiationContext getNegotiationContext() + { + return negotiationContext; + } + + public boolean isClient() + { + return getLocation() == Location.CLIENT; + } + + public boolean isServer() + { + return getLocation() == Location.SERVER; + } + + public String getUserID() + { + return userID; + } + + public void setUserID(String userID) + { + checkState(getState() != ConnectorState.CONNECTED, "Connector is already connected"); + if (TRACER.isEnabled()) + { + TRACER.format("Setting userID {0} for {1}", userID, this); + } + + this.userID = userID; + } + + public ConnectorState getState() + { + return connectorState; + } + + public void setState(ConnectorState newState) throws ConnectorException + { + ConnectorState oldState = getState(); + if (newState != oldState) + { + if (TRACER.isEnabled()) + { + TRACER.format("Setting state {0} (was {1}) for {2}", newState, oldState.toString().toLowerCase(), this); + } + + connectorState = newState; + switch (newState) + { + case DISCONNECTED: + if (finishedConnecting != null) + { + finishedConnecting.countDown(); + finishedConnecting = null; + } + + if (finishedNegotiating != null) + { + finishedNegotiating.countDown(); + finishedNegotiating = null; + } + break; + + case CONNECTING: + finishedConnecting = new CountDownLatch(1); + finishedNegotiating = new CountDownLatch(1); + // The concrete implementation must advance state to NEGOTIATING or CONNECTED + break; + + case NEGOTIATING: + finishedConnecting.countDown(); + negotiationContext = createNegotiationContext(); + getNegotiator().negotiate(negotiationContext); + break; + + case CONNECTED: + negotiationContext = null; + deferredActivate(); + finishedConnecting.countDown(); + finishedNegotiating.countDown(); + break; + } + + fireEvent(new ConnectorStateEvent(this, oldState, newState)); + } + } + + public boolean isDisconnected() + { + return connectorState == ConnectorState.DISCONNECTED; + } + + public boolean isConnecting() + { + return connectorState == ConnectorState.CONNECTING; + } + + public boolean isNegotiating() + { + return connectorState == ConnectorState.NEGOTIATING; + } + + public boolean isConnected() + { + if (negotiationException != null) + { + throw new ConnectorException("Connector negotiation failed", negotiationException); + } + + return connectorState == ConnectorState.CONNECTED; + } + + public void connectAsync() throws ConnectorException + { + try + { + activate(); + } + catch (ConnectorException ex) + { + throw ex; + } + catch (Exception ex) + { + throw new ConnectorException(ex); + } + } + + public boolean waitForConnection(long timeout) throws ConnectorException + { + final long MAX_POLL_INTERVAL = 100L; + boolean withTimeout = timeout != NO_TIMEOUT; + + try + { + if (TRACER.isEnabled()) + { + TRACER.trace("Waiting for connection..."); + } + + for (;;) + { + long t = MAX_POLL_INTERVAL; + if (withTimeout) + { + t = Math.min(MAX_POLL_INTERVAL, timeout); + timeout -= MAX_POLL_INTERVAL; + } + + if (t <= 0) + { + break; + } + + if (finishedNegotiating == null) + { + break; + } + + if (finishedNegotiating.await(t, TimeUnit.MILLISECONDS)) + { + break; + } + } + + return isConnected(); + } + catch (InterruptedException ex) + { + return false; + } + } + + public boolean connect(long timeout) throws ConnectorException + { + connectAsync(); + return waitForConnection(timeout); + } + + public boolean connect() throws ConnectorException + { + return connect(NO_TIMEOUT); + } + + public ConnectorException disconnect() + { + Exception ex = deactivate(); + if (ex == null) + { + return null; + } + + if (ex instanceof ConnectorException) + { + return (ConnectorException)ex; + } + + return new ConnectorException(ex); + } + + public short getBufferCapacity() + { + return getConfig().getBufferProvider().getBufferCapacity(); + } + + public IBuffer provideBuffer() + { + return getConfig().getBufferProvider().provideBuffer(); + } + + public void retainBuffer(IBuffer buffer) + { + getConfig().getBufferProvider().retainBuffer(buffer); + } + + protected void leaveConnecting() + { + if (getNegotiator() == null) + { + setState(ConnectorState.CONNECTED); + } + else + { + setState(ConnectorState.NEGOTIATING); + } + } + + @Override + protected abstract INegotiationContext createNegotiationContext(); + + protected NegotiationException getNegotiationException() + { + return negotiationException; + } + + protected void setNegotiationException(NegotiationException negotiationException) + { + this.negotiationException = negotiationException; + } + + @Override + protected boolean isDeferredActivation() + { + return true; + } + + @Override + protected void doBeforeActivate() throws Exception + { + super.doBeforeActivate(); + if (getConfig().getBufferProvider() == null) + { + throw new IllegalStateException("getConfig().getBufferProvider() == null"); + } + } + + @Override + protected void doActivate() throws Exception + { + super.doActivate(); + setState(ConnectorState.CONNECTING); + } + + @Override + protected void doDeactivate() throws Exception + { + setState(ConnectorState.DISCONNECTED); + super.doDeactivate(); + } + + /** + * @author Eike Stepper + */ + private static class ConnectorStateEvent extends Event implements IConnectorStateEvent + { + private static final long serialVersionUID = 1L; + + private ConnectorState oldState; + + private ConnectorState newState; + + public ConnectorStateEvent(INotifier notifier, ConnectorState oldState, ConnectorState newState) + { + super(notifier); + this.oldState = oldState; + this.newState = newState; + } + + public IConnector getConnector() + { + return (IConnector)getSource(); + } + + public ConnectorState getOldState() + { + return oldState; + } + + public ConnectorState getNewState() + { + return newState; + } + + @Override + public String toString() + { + return MessageFormat.format("ConnectorStateEvent[source={0}, oldState={1}, newState={2}]", getSource(), + getOldState(), getNewState()); + } + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/InternalChannel.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/InternalChannel.java index 556a1cfda5..90f4039fa4 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/InternalChannel.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/InternalChannel.java @@ -27,9 +27,12 @@ public interface InternalChannel extends IChannel, IBufferProvider, ILifecycle.I /** * @since 2.0 */ - public void setUserID(String userID); + public void setID(short id); - public void setChannelIndex(short channelIndex); + /** + * @since 2.0 + */ + public void setUserID(String userID); public ExecutorService getReceiveExecutor(); @@ -43,9 +46,4 @@ public interface InternalChannel extends IChannel, IBufferProvider, ILifecycle.I public void handleBufferFromMultiplexer(IBuffer buffer); public Queue<IBuffer> getSendQueue(); - - /** - * @since 2.0 - */ - public void finishDeactivate(boolean inverse); } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/Protocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Protocol.java index 69a8607a2a..3139624d60 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/Protocol.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/Protocol.java @@ -8,31 +8,77 @@ * Contributors: * Eike Stepper - initial API and implementation **************************************************************************/ -package org.eclipse.net4j.protocol; +package org.eclipse.spi.net4j; import org.eclipse.net4j.buffer.IBufferProvider; import org.eclipse.net4j.channel.IChannel; +import org.eclipse.net4j.protocol.IProtocol; +import org.eclipse.net4j.util.ReflectUtil.ExcludeFromDump; +import org.eclipse.net4j.util.event.IListener; +import org.eclipse.net4j.util.lifecycle.ILifecycle; import org.eclipse.net4j.util.lifecycle.Lifecycle; +import org.eclipse.net4j.util.lifecycle.LifecycleEventAdapter; import java.util.concurrent.ExecutorService; /** * @author Eike Stepper + * @since 2.0 */ public abstract class Protocol<INFRA_STRUCTURE> extends Lifecycle implements IProtocol<INFRA_STRUCTURE> { - private IChannel channel; + private ExecutorService executorService; private IBufferProvider bufferProvider; - private ExecutorService executorService; - private INFRA_STRUCTURE infraStructure; + private IChannel channel; + + @ExcludeFromDump + private transient IListener channelListener = new LifecycleEventAdapter() + { + @Override + protected void onDeactivated(ILifecycle lifecycle) + { + handleChannelDeactivation(); + }; + }; + public Protocol() { } + public ExecutorService getExecutorService() + { + return executorService; + } + + public void setExecutorService(ExecutorService executorService) + { + this.executorService = executorService; + } + + public IBufferProvider getBufferProvider() + { + return bufferProvider; + } + + public void setBufferProvider(IBufferProvider bufferProvider) + { + this.bufferProvider = bufferProvider; + } + + public INFRA_STRUCTURE getInfraStructure() + { + return infraStructure; + } + + public void setInfraStructure(INFRA_STRUCTURE infraStructure) + { + this.infraStructure = infraStructure; + } + /** * @since 2.0 */ @@ -62,39 +108,29 @@ public abstract class Protocol<INFRA_STRUCTURE> extends Lifecycle implements IPr return channel; } - public void setChannel(IChannel channel) - { - this.channel = channel; - } - - public IBufferProvider getBufferProvider() - { - return bufferProvider; - } - - public void setBufferProvider(IBufferProvider bufferProvider) - { - this.bufferProvider = bufferProvider; - } - - public ExecutorService getExecutorService() + public void setChannel(IChannel newChannel) { - return executorService; + if (channel != newChannel) + { + if (channel != null) + { + channel.removeListener(channelListener); + } + + channel = newChannel; + if (channel != null) + { + channel.addListener(channelListener); + } + } } - public void setExecutorService(ExecutorService executorService) - { - this.executorService = executorService; - } - - public INFRA_STRUCTURE getInfraStructure() - { - return infraStructure; - } - - public void setInfraStructure(INFRA_STRUCTURE infraStructure) + /** + * @since 2.0 + */ + protected void handleChannelDeactivation() { - this.infraStructure = infraStructure; + deactivate(); } @Override @@ -109,7 +145,7 @@ public abstract class Protocol<INFRA_STRUCTURE> extends Lifecycle implements IPr @Override protected void doDeactivate() throws Exception { - channel = null; + setChannel(null); super.doDeactivate(); } } diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/ServerProtocolFactory.java b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ServerProtocolFactory.java index e4fdb76ffb..b4a30d1078 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/ServerProtocolFactory.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/spi/net4j/ServerProtocolFactory.java @@ -8,12 +8,13 @@ * Contributors: * Eike Stepper - initial API and implementation **************************************************************************/ -package org.eclipse.net4j.protocol; +package org.eclipse.spi.net4j; import org.eclipse.net4j.util.factory.Factory; /** * @author Eike Stepper + * @since 2.0 */ public abstract class ServerProtocolFactory extends Factory { |