diff options
9 files changed, 1194 insertions, 323 deletions
diff --git a/target_explorer/plugins/org.eclipse.tcf.te.runtime.stepper/src/org/eclipse/tcf/te/runtime/stepper/utils/StepperHelper.java b/target_explorer/plugins/org.eclipse.tcf.te.runtime.stepper/src/org/eclipse/tcf/te/runtime/stepper/utils/StepperHelper.java index 1e403d8bf..ab7dd8c2e 100644 --- a/target_explorer/plugins/org.eclipse.tcf.te.runtime.stepper/src/org/eclipse/tcf/te/runtime/stepper/utils/StepperHelper.java +++ b/target_explorer/plugins/org.eclipse.tcf.te.runtime.stepper/src/org/eclipse/tcf/te/runtime/stepper/utils/StepperHelper.java @@ -10,6 +10,7 @@ package org.eclipse.tcf.te.runtime.stepper.utils; +import org.eclipse.core.runtime.Assert; import org.eclipse.core.runtime.IProgressMonitor; import org.eclipse.tcf.te.runtime.interfaces.callback.ICallback; import org.eclipse.tcf.te.runtime.interfaces.properties.IPropertiesContainer; @@ -38,7 +39,12 @@ public final class StepperHelper { return stepperOperationService; } - public static final void scheduleStepperJob(Object context, String operation, IStepperOperationService service, IPropertiesContainer data, ICallback callback, IProgressMonitor monitor) { + public static final StepperJob scheduleStepperJob(Object context, String operation, IStepperOperationService service, IPropertiesContainer data, ICallback callback, IProgressMonitor monitor) { + Assert.isNotNull(service); + Assert.isNotNull(data); + + StepperJob job = null; + IStepContext stepContext = service.getStepContext(context, operation); String stepGroupId = service.getStepGroupId(context, operation); String name = service.getStepGroupName(context, operation); @@ -53,13 +59,7 @@ public final class StepperHelper { } if (stepGroupId != null && stepContext != null) { - StepperJob job = new StepperJob(name != null ? name : "", //$NON-NLS-1$ - stepContext, - data, - stepGroupId, - operation, - isCancelable, - monitor == null); + job = new StepperJob(name != null ? name : "", stepContext, data, stepGroupId, operation, isCancelable, monitor == null); //$NON-NLS-1$ job.setJobCallback(callback); if (monitor != null) { @@ -70,5 +70,6 @@ public final class StepperHelper { } } + return job; } } diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/plugin.xml b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/plugin.xml index c0aaa661a..1adf10ec1 100644 --- a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/plugin.xml +++ b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/plugin.xml @@ -17,6 +17,18 @@ class="org.eclipse.tcf.te.tcf.core.internal.services.PathMapResolverService"> <serviceType class="org.eclipse.tcf.te.tcf.core.interfaces.IPathMapResolverService"/> </service> + + <service + class="org.eclipse.tcf.te.tcf.core.internal.StepperOperationService" + id="org.eclipse.tcf.te.tcf.core.services.stepperOperation"> + <serviceType + bundleId="org.eclipse.tcf.te.runtime.stepper" + class="org.eclipse.tcf.te.runtime.stepper.interfaces.IStepperOperationService"> + </serviceType> + <enablement> + <instanceof value="org.eclipse.tcf.protocol.IPeer"/> + </enablement> + </service> </extension> <!-- Step contributions --> diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/interfaces/IChannelManager.java b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/interfaces/IChannelManager.java index 8b3734f68..92fc04553 100644 --- a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/interfaces/IChannelManager.java +++ b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/interfaces/IChannelManager.java @@ -24,8 +24,9 @@ public interface IChannelManager extends IAdaptable { /** * If set to <code>true</code>, a new and not reference counted channel is opened. - * The returned channel must be closed by the caller himself. The channel manager - * is not keeping track of non reference counted channels. + * <p> + * All channels opened by the channel manager must be closed by the channel managers + * {@link #closeChannel(IChannel)} API. * <p> * If not present in the flags map passed in to open channel, the default value is * <code>false</code>. @@ -37,8 +38,8 @@ public interface IChannelManager extends IAdaptable { * and no value add is launched and associated with the channel. This option should * be used with extreme caution. * <p> - * The returned channel must be closed by the caller himself. The channel manager - * is not keeping track of non reference counted channels. + * All channels opened by the channel manager must be closed by the channel managers + * {@link #closeChannel(IChannel)} API. * <p> * If not present in the flags map passed in to open channel, the default value is * <code>false</code>. @@ -49,8 +50,8 @@ public interface IChannelManager extends IAdaptable { * If set to <code>true</code>, a new and not reference counted channel is opened, * and the configured path map is not auto applied to the opened channel. * <p> - * The returned channel must be closed by the caller himself. The channel manager - * is not keeping track of non reference counted channels. + * All channels opened by the channel manager must be closed by the channel managers + * {@link #closeChannel(IChannel)} API. * <p> * If not present in the flags map passed in to open channel, the default value is * <code>false</code>. @@ -77,7 +78,8 @@ public interface IChannelManager extends IAdaptable { /** * Opens a new channel to communicate with the given peer. * <p> - * Reference counted channels are cached by the channel manager and must be closed via {@link #closeChannel(IChannel)}. + * Channels opened by the channel manager must be closed by the channel managers + * {@link #closeChannel(IChannel)} API. * <p> * The method can be called from any thread context. * @@ -90,7 +92,8 @@ public interface IChannelManager extends IAdaptable { /** * Opens a new channel to communicate with the peer described by the given peer attributes. * <p> - * Reference counted channels are cached by the channel manager and must be closed via {@link #closeChannel(IChannel)}. + * Channels opened by the channel manager must be closed by the channel managers + * {@link #closeChannel(IChannel)} API. * <p> * The method can be called from any thread context. * diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/ChannelManager.java b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/ChannelManager.java index 150784161..902b3ff17 100644 --- a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/ChannelManager.java +++ b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/ChannelManager.java @@ -13,7 +13,6 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.ListIterator; import java.util.Map; @@ -22,7 +21,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.eclipse.core.runtime.Assert; import org.eclipse.core.runtime.IStatus; -import org.eclipse.core.runtime.ListenerList; import org.eclipse.core.runtime.OperationCanceledException; import org.eclipse.core.runtime.PlatformObject; import org.eclipse.osgi.util.NLS; @@ -35,7 +33,6 @@ import org.eclipse.tcf.services.IDiagnostics; import org.eclipse.tcf.services.IPathMap; import org.eclipse.tcf.services.IStreams; import org.eclipse.tcf.te.runtime.callback.Callback; -import org.eclipse.tcf.te.runtime.interfaces.IDisposable; import org.eclipse.tcf.te.runtime.services.ServiceManager; import org.eclipse.tcf.te.tcf.core.activator.CoreBundleActivator; import org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager; @@ -1455,310 +1452,6 @@ public final class ChannelManager extends PlatformObject implements IChannelMana channel.redirect(proxy.get() != null ? proxy.get().getAttributes() : attrs); } - /** - * Private stream listener proxy implementation. - */ - private final static class StreamListenerProxy implements IStreams.StreamsListener, IChannelManager.IStreamsListenerProxy { - // The channel - private final IChannel channel; - // The stream type the proxy is registered for - private final String streamType; - // The list of proxied stream listeners - /* default */ ListenerList listeners = new ListenerList(); - // The list of delayed stream created events - private final List<StreamCreatedEvent> delayedCreatedEvents = new ArrayList<StreamCreatedEvent>(); - - /** - * Immutable stream created event. - */ - private final static class StreamCreatedEvent { - /** - * The stream type. - */ - public final String streamType; - /** - * The stream id. - */ - public final String streamId; - /** - * The context id. - */ - public final String contextId; - - // As the class is immutable, we do not need to build the toString - // value again and again. Build it once in the constructor and reuse it later. - private final String toString; - - /** - * Constructor. - * - * @param streamType The stream type. - * @param streamId The stream id. - * @param contextId The context id. - */ - public StreamCreatedEvent(String streamType, String streamId, String contextId) { - this.streamType = streamType; - this.streamId = streamId; - this.contextId = contextId; - - toString = toString(); - } - - /* (non-Javadoc) - * @see java.lang.Object#equals(java.lang.Object) - */ - @Override - public boolean equals(Object obj) { - return obj instanceof StreamCreatedEvent - && toString().equals(((StreamCreatedEvent)obj).toString()); - } - - /* (non-Javadoc) - * @see java.lang.Object#hashCode() - */ - @Override - public int hashCode() { - return toString().hashCode(); - } - - /* (non-Javadoc) - * @see java.lang.Object#toString() - */ - @Override - public String toString() { - if (toString != null) return toString; - - StringBuilder builder = new StringBuilder(getClass().getSimpleName()); - builder.append(": streamType = "); //$NON-NLS-1$ - builder.append(streamType); - builder.append("; streamId = "); //$NON-NLS-1$ - builder.append(streamId); - builder.append("; contextId = "); //$NON-NLS-1$ - builder.append(contextId); - - return builder.toString(); - } - } - - /** - * Constructor - * - * @param The channel. Must not be <code>null</code>. - */ - public StreamListenerProxy(final IChannel channel, final String streamType) { - Assert.isNotNull(channel); - Assert.isNotNull(streamType); - - this.channel = channel; - this.channel.addChannelListener(new IChannel.IChannelListener() { - @Override - public void onChannelOpened() {} - - @Override - public void onChannelClosed(Throwable error) { - // Channel is closed, remove ourself - channel.removeChannelListener(this); - // Dispose all registered streams listener - Object[] candidates = listeners.getListeners(); - listeners.clear(); - for (Object listener : candidates) { - if (listener instanceof IDisposable) { - ((IDisposable)listener).dispose(); - } - } - } - - @Override - public void congestionLevel(int level) { - } - }); - - // Remember the stream type - this.streamType = streamType; - } - - /** - * Returns the stream type the proxy is registered for. - * - * @return The stream type. - */ - public String getStreamType() { - return streamType; - } - - /** - * Adds the given streams listener to the list of proxied listeners. - * - * @param listener The streams listener. Must not be <code>null</code>. - */ - public void addListener(IStreamsListener listener) { - Assert.isNotNull(listener); - listener.setProxy(this); - listeners.add(listener); - } - - /** - * Removes the given streams listener from the list of proxied listeners. - * - * @param listener The streams listener. Must not be <code>null</code>. - */ - public void removeListener(IStreamsListener listener) { - Assert.isNotNull(listener); - listener.setProxy(null); - listeners.remove(listener); - } - - /** - * Returns if the proxied listeners list is empty or not. - * - * @return <code>True</code> if the list is empty, <code>false</code> otherwise. - */ - public boolean isEmpty() { - return listeners.isEmpty(); - } - - /* (non-Javadoc) - * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.IStreamsListenerProxy#processDelayedCreatedEvents() - */ - @Override - public void processDelayedCreatedEvents() { - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_STREAMS_LISTENER_PROXY)) { - CoreBundleActivator.getTraceHandler().trace("StreamListenerProxy: processDelayedCreatedEvents()", //$NON-NLS-1$ - 0, ITraceIds.TRACE_STREAMS_LISTENER_PROXY, - IStatus.INFO, getClass()); - } - - synchronized (delayedCreatedEvents) { - // Make a snapshot of all delayed created events - StreamCreatedEvent[] events = delayedCreatedEvents.toArray(new StreamCreatedEvent[delayedCreatedEvents.size()]); - // Clear the events now, it will be refilled by calling the created method - delayedCreatedEvents.clear(); - // Loop the delayed created events and recall the created method to process them - for (StreamCreatedEvent event : events) { - created(event.streamType, event.streamId, event.contextId); - } - } - } - - /* (non-Javadoc) - * @see org.eclipse.tcf.services.IStreams.StreamsListener#created(java.lang.String, java.lang.String, java.lang.String) - */ - @Override - public void created(String stream_type, String stream_id, String context_id) { - Assert.isNotNull(stream_type); - Assert.isNotNull(stream_id); - - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_STREAMS_LISTENER_PROXY)) { - CoreBundleActivator.getTraceHandler().trace("StreamListenerProxy: created(" + stream_type + ", " + stream_id + ", " + context_id + ")", //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ - 0, ITraceIds.TRACE_STREAMS_LISTENER_PROXY, - IStatus.INFO, getClass()); - } - - // If the context_id is null, disconnect from the stream right away. We do not support - // old TCF agents not sending the context id in the created event. - if (context_id == null) { - IStreams service = channel.getRemoteService(IStreams.class); - if (service != null) { - service.disconnect(stream_id, new IStreams.DoneDisconnect() { - @Override - public void doneDisconnect(IToken token, Exception error) { - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_STREAMS_LISTENER_PROXY)) { - CoreBundleActivator.getTraceHandler().trace("StreamListenerProxy: disconnect -> context id must be not null.", //$NON-NLS-1$ - 0, ITraceIds.TRACE_STREAMS_LISTENER_PROXY, IStatus.INFO, getClass()); - } - } - }); - } - - return; - } - - boolean delayed = false; - boolean disconnect = true; - - // Loop all listeners - for (Object l : listeners.getListeners()) { - IStreamsListener listener = (IStreamsListener)l; - - // If the listener has no context set yet, the listener cannot decide if - // the event is consumed or not. In this case, the event must be delayed. - if (!listener.hasContext()) { - delayed |= true; - continue; - } - - // Does the listener consume the event? - boolean consume = listener.isCreatedConsumed(stream_type, stream_id, context_id); - if (consume) listener.created(stream_type, stream_id, context_id); - // If the created event is consumed by one listener, it cannot be disconnected anymore - disconnect &= !consume; - } - - if (delayed) { - // Context not set yet --> add to the delayed list - StreamCreatedEvent event = new StreamCreatedEvent(stream_type, stream_id, context_id); - synchronized (delayedCreatedEvents) { - if (!delayedCreatedEvents.contains(event)) { - delayedCreatedEvents.add(event); - - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_STREAMS_LISTENER_PROXY)) { - CoreBundleActivator.getTraceHandler().trace("StreamListenerProxy: delayed -> at least one listener does not have a context set", //$NON-NLS-1$ - 0, ITraceIds.TRACE_STREAMS_LISTENER_PROXY, IStatus.INFO, getClass()); - } - } - } - return; - } - - if (disconnect) { - IStreams service = channel.getRemoteService(IStreams.class); - if (service != null) { - service.disconnect(stream_id, new IStreams.DoneDisconnect() { - @Override - public void doneDisconnect(IToken token, Exception error) { - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_STREAMS_LISTENER_PROXY)) { - CoreBundleActivator.getTraceHandler().trace("StreamListenerProxy: disconnect -> not interested in context id", //$NON-NLS-1$ - 0, ITraceIds.TRACE_STREAMS_LISTENER_PROXY, IStatus.INFO, getClass()); - } - } - }); - } - } - } - - /* (non-Javadoc) - * @see org.eclipse.tcf.services.IStreams.StreamsListener#disposed(java.lang.String, java.lang.String) - */ - @Override - public void disposed(String stream_type, String stream_id) { - Assert.isNotNull(stream_type); - Assert.isNotNull(stream_id); - - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_STREAMS_LISTENER_PROXY)) { - CoreBundleActivator.getTraceHandler().trace("StreamListenerProxy: disposed(" + stream_type + ", " + stream_id + ")", //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ - 0, ITraceIds.TRACE_STREAMS_LISTENER_PROXY, - IStatus.INFO, getClass()); - } - - // If the delayed created events list is not empty, we have - // to check if one of the delayed create events got disposed - synchronized (delayedCreatedEvents) { - Iterator<StreamCreatedEvent> iterator = delayedCreatedEvents.iterator(); - while (iterator.hasNext()) { - StreamCreatedEvent event = iterator.next(); - if (stream_type.equals(event.streamType) && stream_id.equals(event.streamId)) { - // Remove the create event from the list - iterator.remove(); - } - } - } - - for (Object l : listeners.getListeners()) { - ((IStreamsListener)l).disposed(stream_type, stream_id); - } - } - } - /* (non-Javadoc) * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager#subscribeStream(org.eclipse.tcf.protocol.IChannel, java.lang.String, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.IStreamsListener, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.DoneSubscribeStream) */ diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/ChannelManager2.java b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/ChannelManager2.java new file mode 100644 index 000000000..c8baf866c --- /dev/null +++ b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/ChannelManager2.java @@ -0,0 +1,712 @@ +/******************************************************************************* + * Copyright (c) 2014 Wind River Systems, Inc. and others. All rights reserved. + * This program and the accompanying materials are made available under the terms + * of the Eclipse Public License v1.0 which accompanies this distribution, and is + * available at http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Wind River Systems - initial API and implementation + *******************************************************************************/ +package org.eclipse.tcf.te.tcf.core.internal; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.core.runtime.Assert; +import org.eclipse.core.runtime.IStatus; +import org.eclipse.core.runtime.PlatformObject; +import org.eclipse.osgi.util.NLS; +import org.eclipse.tcf.core.AbstractPeer; +import org.eclipse.tcf.protocol.IChannel; +import org.eclipse.tcf.protocol.IPeer; +import org.eclipse.tcf.protocol.IToken; +import org.eclipse.tcf.protocol.Protocol; +import org.eclipse.tcf.services.IStreams; +import org.eclipse.tcf.te.runtime.callback.Callback; +import org.eclipse.tcf.te.runtime.interfaces.callback.ICallback; +import org.eclipse.tcf.te.runtime.interfaces.properties.IPropertiesContainer; +import org.eclipse.tcf.te.runtime.properties.PropertiesContainer; +import org.eclipse.tcf.te.runtime.stepper.interfaces.IStepperOperationService; +import org.eclipse.tcf.te.runtime.stepper.job.StepperJob; +import org.eclipse.tcf.te.runtime.stepper.utils.StepperHelper; +import org.eclipse.tcf.te.tcf.core.activator.CoreBundleActivator; +import org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager; +import org.eclipse.tcf.te.tcf.core.interfaces.steps.ITcfStepAttributes; +import org.eclipse.tcf.te.tcf.core.interfaces.tracing.ITraceIds; +import org.eclipse.tcf.te.tcf.core.nls.Messages; +import org.eclipse.tcf.te.tcf.core.peers.Peer; + +/** + * Channel manager implementation. + */ +public class ChannelManager2 extends PlatformObject implements IChannelManager { + // The map of reference counters per channel + /* default */ final Map<IChannel, AtomicInteger> refCounters = new HashMap<IChannel, AtomicInteger>(); + // The map of channels per peer id + /* default */ final Map<String, IChannel> channels = new HashMap<String, IChannel>(); + // The map of pending open channel callback's per peer id + /* default */ final Map<String, List<DoneOpenChannel>> pendingDones = new HashMap<String, List<DoneOpenChannel>>(); + // The map of channels opened via "forceNew" flag (needed to handle the close channel correctly) + /* default */ final List<IChannel> forcedChannels = new ArrayList<IChannel>(); + // The map of stream listener proxies per channel + /* default */ final Map<IChannel, List<StreamListenerProxy>> streamProxies = new HashMap<IChannel, List<StreamListenerProxy>>(); + // The map of scheduled "open channel" stepper jobs per peer id + /* default */ final Map<String, StepperJob> pendingOpenChannel = new HashMap<String, StepperJob>(); + // The map of scheduled "close channel" stepper jobs per peer id + /* default */ final Map<String, StepperJob> pendingCloseChannel = new HashMap<String, StepperJob>(); + + /** + * Constructor + */ + public ChannelManager2() { + super(); + } + + /* (non-Javadoc) + * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager#openChannel(org.eclipse.tcf.protocol.IPeer, java.util.Map, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.DoneOpenChannel) + */ + @Override + public void openChannel(final IPeer peer, final Map<String, Boolean> flags, final DoneOpenChannel done) { + Assert.isNotNull(peer); + Assert.isNotNull(done); + + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(1, ITraceIds.TRACE_CHANNEL_MANAGER)) { + try { + throw new Throwable(); + } catch (Throwable e) { + CoreBundleActivator.getTraceHandler().trace("ChannelManager#openChannel called from:", //$NON-NLS-1$ + 1, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); + e.printStackTrace(); + } + } + + // The channel instance to return + IChannel channel = null; + + // Get the peer id + final String id = peer.getID(); + + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { + CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_message, id, flags), + 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); + } + + // First thing to determine is if to open a new channel or the shared + // channel can be used, if there is a shared channel at all. + boolean forceNew = flags != null && flags.containsKey(IChannelManager.FLAG_FORCE_NEW) ? flags.get(IChannelManager.FLAG_FORCE_NEW).booleanValue() : false; + boolean noValueAdd = flags != null && flags.containsKey(IChannelManager.FLAG_NO_VALUE_ADD) ? flags.get(IChannelManager.FLAG_NO_VALUE_ADD).booleanValue() : false; + boolean noPathMap = flags != null && flags.containsKey(IChannelManager.FLAG_NO_PATH_MAP) ? flags.get(IChannelManager.FLAG_NO_PATH_MAP).booleanValue() : false; + // If noValueAdd == true or noPathMap == true -> forceNew has to be true as well + if (noValueAdd || noPathMap) forceNew = true; + + final boolean finForceNew = forceNew; + + // Query the shared channel if not forced to open a new channel + if (!forceNew) channel = channels.get(id); + + // If a shared channel is available, check if the shared channel can be used + if (channel != null) { + // If the channel is still open, it's all done and the channel can be returned right away + if (channel.getState() == IChannel.STATE_OPEN) { + // Increase the reference count + AtomicInteger counter = refCounters.get(id); + if (counter == null) { + counter = new AtomicInteger(0); + refCounters.put(channel, counter); + } + counter.incrementAndGet(); + + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { + CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_reuse_message, id, counter.toString()), + 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); + } + + // Invoke the channel open done callback + done.doneOpenChannel(null, channel); + } + // If the channel is opening, wait for the channel to become fully opened. + // Add the done open channel callback to the list of pending callback's. + else if (channel.getState() == IChannel.STATE_OPENING) { + List<DoneOpenChannel> dones = pendingDones.get(id); + if (dones == null) { + dones = new ArrayList<DoneOpenChannel>(); + pendingDones.put(id, dones); + } + Assert.isNotNull(dones); + dones.add(done); + + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { + CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_pending_message, id, "0x" + Integer.toHexString(done.hashCode())), //$NON-NLS-1$ + 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); + } + } + else { + // Channel is not in open state -> drop the instance + channels.remove(id); + refCounters.remove(channel); + channel = null; + } + } + + // Channel not available -> open a new one + if (channel == null) { + // Check if there is a pending "open channel" stepper job + StepperJob job = pendingOpenChannel.get(id); + if (job == null) { + // No pending "open channel" stepper job -> schedule one and initiate opening the channel + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { + CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_new_message, id), + 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); + } + + // Create the data properties container passed to the "open channel" steps + final IPropertiesContainer data = new PropertiesContainer(); + data.setProperty(IChannelManager.FLAG_NO_VALUE_ADD, noValueAdd); + data.setProperty(IChannelManager.FLAG_NO_PATH_MAP, noPathMap); + + // Create the callback to be invoked once the "open channel" stepper job is completed + final ICallback callback = new Callback() { + @Override + protected void internalDone(Object caller, IStatus status) { + // Check for error + if (status.getSeverity() == IStatus.ERROR) { + // Extract the failure cause + Throwable error = status.getException(); + + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { + CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_failed_message, id, error), + 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); + } + + // Job is done -> remove it from the list of pending jobs + pendingOpenChannel.remove(id); + + // Invoke the primary "open channel" done callback + done.doneOpenChannel(error, null); + + // Invoke pending callback's + List<DoneOpenChannel> pending = pendingDones.remove(id); + if (pending != null && !pending.isEmpty()) { + for (DoneOpenChannel d : pending) { + d.doneOpenChannel(error, null); + } + } + } else { + // Get the channel + IChannel channel = (IChannel)data.getProperty(ITcfStepAttributes.ATTR_CHANNEL); + Assert.isNotNull(channel); + Assert.isTrue(channel.getState() == IChannel.STATE_OPEN); + + // Store the channel + if (!finForceNew) channels.put(id, channel); + if (!finForceNew) refCounters.put(channel, new AtomicInteger(1)); + if (finForceNew) forcedChannels.add(channel); + + // Job is done -> remove it from the list of pending jobs + pendingOpenChannel.remove(id); + + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { + CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_success_message, id), + 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); + } + + // Invoke the primary "open channel" done callback + done.doneOpenChannel(null, channel); + + // Invoke pending callback's + List<DoneOpenChannel> pending = pendingDones.remove(id); + if (pending != null && !pending.isEmpty()) { + for (DoneOpenChannel d : pending) { + d.doneOpenChannel(null, channel); + } + } + } + } + }; + + // Get the stepper operation service + IStepperOperationService stepperOperationService = StepperHelper.getService(peer, StepperOperationService.OPEN_CHANNEL); + + // Schedule the "open channel" stepper job + job = StepperHelper.scheduleStepperJob(peer, + StepperOperationService.OPEN_CHANNEL, + stepperOperationService, + data, + callback, + null); + + // Remember the "open channel" stepper job until finished + if (job != null) { + pendingOpenChannel.put(id, job); + } + } else { + // There is a pending "open channel" stepper job -> add the open channel callback to the list + List<DoneOpenChannel> dones = pendingDones.get(id); + if (dones == null) { + dones = new ArrayList<DoneOpenChannel>(); + pendingDones.put(id, dones); + } + Assert.isNotNull(dones); + dones.add(done); + + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { + CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_pending_message, id, "0x" + Integer.toHexString(done.hashCode())), //$NON-NLS-1$ + 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); + } + } + } + } + + /* (non-Javadoc) + * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager#openChannel(java.util.Map, java.util.Map, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.DoneOpenChannel) + */ + @Override + public void openChannel(final Map<String, String> peerAttributes, final Map<String, Boolean> flags, final DoneOpenChannel done) { + Assert.isNotNull(peerAttributes); + Assert.isNotNull(done); + + Runnable runnable = new Runnable() { + @Override + public void run() { + Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ + openChannel(getOrCreatePeerInstance(peerAttributes), flags, done); + } + }; + if (Protocol.isDispatchThread()) runnable.run(); + else Protocol.invokeLater(runnable); + } + + /** + * Tries to find an existing peer instance or create an new {@link IPeer} + * instance if not found. + * <p> + * <b>Note:</b> This method must be invoked at the TCF dispatch thread. + * + * @param peerAttributes The peer attributes. Must not be <code>null</code>. + * @return The peer instance. + */ + /* default */ IPeer getOrCreatePeerInstance(final Map<String, String> peerAttributes) { + Assert.isNotNull(peerAttributes); + Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ + + // Get the peer id from the properties + String peerId = peerAttributes.get(IPeer.ATTR_ID); + Assert.isNotNull(peerId); + + // Check if we shall open the peer transient + boolean isTransient = peerAttributes.containsKey("transient") ? Boolean.parseBoolean(peerAttributes.remove("transient")) : false; //$NON-NLS-1$ //$NON-NLS-2$ + + // Look the peer via the Locator Service. + IPeer peer = Protocol.getLocator().getPeers().get(peerId); + // If not peer could be found, create a new one + if (peer == null) { + peer = isTransient ? new Peer(peerAttributes) : new AbstractPeer(peerAttributes); + + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { + CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_createPeer_new_message, peerId, Boolean.valueOf(isTransient)), + 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); + } + } + + // Return the peer instance + return peer; + } + + /* (non-Javadoc) + * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager#getChannel(org.eclipse.tcf.protocol.IPeer) + */ + @Override + public IChannel getChannel(final IPeer peer) { + final AtomicReference<IChannel> channel = new AtomicReference<IChannel>(); + Runnable runnable = new Runnable() { + @Override + public void run() { + Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ + channel.set(internalGetChannel(peer)); + } + }; + if (Protocol.isDispatchThread()) runnable.run(); + else Protocol.invokeAndWait(runnable); + + return channel.get(); + } + + /** + * Returns the shared channel instance for the given peer. + * <p> + * <b>Note:</b> This method must be invoked at the TCF dispatch thread. + * + * @param peer The peer. Must not be <code>null</code>. + * @return The channel instance or <code>null</code>. + */ + public IChannel internalGetChannel(IPeer peer) { + Assert.isNotNull(peer); + Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ + + // Get the peer id + String id = peer.getID(); + + // Get the channel + IChannel channel = channels.get(id); + if (channel != null && !(channel.getState() == IChannel.STATE_OPEN || channel.getState() == IChannel.STATE_OPENING)) { + // Channel is not in open state -> drop the instance + channel = null; + channels.remove(id); + refCounters.remove(id); + } + + return channel; + } + + /* (non-Javadoc) + * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager#closeChannel(org.eclipse.tcf.protocol.IChannel) + */ + @Override + public void closeChannel(final IChannel channel) { + Runnable runnable = new Runnable() { + @Override + public void run() { + Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ + internalCloseChannel(channel); + } + }; + if (Protocol.isDispatchThread()) runnable.run(); + else Protocol.invokeLater(runnable); + } + + /** + * Closes the given channel. + * <p> + * If the given channel is a reference counted channel, the channel will be closed if the reference counter + * reaches 0. For non reference counted channels, the channel is closed immediately. + * <p> + * <b>Note:</b> This method must be invoked at the TCF dispatch thread. + * + * @param channel The channel. Must not be <code>null</code>. + */ + /* default */ void internalCloseChannel(final IChannel channel) { + Assert.isNotNull(channel); + Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ + + // Get the id of the remote peer + final IPeer peer = channel.getRemotePeer(); + final String id = peer.getID(); + + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { + CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_message, id), + 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); + } + + // Determine if the given channel is a reference counted channel + final boolean isRefCounted = !forcedChannels.contains(channel); + + // Get the reference counter (if the channel is a reference counted channel) + AtomicInteger counter = isRefCounted ? refCounters.get(id) : null; + + // If the counter is null or get 0 after the decrement, close the channel + if (counter == null || counter.decrementAndGet() == 0) { + // Check if there is a pending "close channel" stepper job + StepperJob job = pendingCloseChannel.get(id); + if (job == null) { + // No pending "close channel" stepper job -> schedule one and initiate closing the channel + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { + CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_close_message, id), + 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); + } + + // Create the data properties container passed to the "close channel" steps + final IPropertiesContainer data = new PropertiesContainer(); + + // Create the callback to be invoked once the "close channel" stepper job is completed + final ICallback callback = new Callback() { + @Override + protected void internalDone(Object caller, IStatus status) { + // Check for error + if (status.getSeverity() == IStatus.ERROR) { + // Extract the failure cause + Throwable error = status.getException(); + + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { + CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_failed_message, id, error), + 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); + } + + // Job is done -> remove it from the list of pending jobs + pendingCloseChannel.remove(id); + } else { + // Job is done -> remove it from the list of pending jobs + pendingCloseChannel.remove(id); + + // Clean the reference counter and the channel map + if (isRefCounted) refCounters.remove(id); + if (isRefCounted) channels.remove(channel); + if (!isRefCounted) forcedChannels.remove(channel); + + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { + CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_closed_message, id), + 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); + } + } + } + }; + + // Get the stepper operation service + IStepperOperationService stepperOperationService = StepperHelper.getService(peer, StepperOperationService.CLOSE_CHANNEL); + + // Schedule the "close channel" stepper job + job = StepperHelper.scheduleStepperJob(peer, + StepperOperationService.CLOSE_CHANNEL, + stepperOperationService, + data, + callback, + null); + + // Remember the "close channel" stepper job until finished + if (job != null) { + pendingCloseChannel.put(id, job); + } + } else { + // There is a pending "close channel" stepper job + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { + CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_pending_message, id), + 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); + } + } + } else { + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { + CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_inuse_message, id, counter.toString()), + 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); + } + } + + // Clean up the list of forced channels. Remove all channels already been closed. + ListIterator<IChannel> iter = forcedChannels.listIterator(); + while (iter.hasNext()) { + IChannel c = iter.next(); + if (c.getState() == IChannel.STATE_CLOSED) { + iter.remove(); + } + } + } + + /* (non-Javadoc) + * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager#shutdown(org.eclipse.tcf.protocol.IPeer) + */ + @Override + public void shutdown(final IPeer peer) { + Runnable runnable = new Runnable() { + @Override + public void run() { + Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ + internalShutdown(peer); + } + }; + if (Protocol.isDispatchThread()) runnable.run(); + else Protocol.invokeLater(runnable); + } + + /** + * Shutdown the communication to the given peer, no matter of the current + * reference count. A possible associated value-add is shutdown as well. + * + * @param peer The peer. Must not be <code>null</code>. + */ + /* default */ void internalShutdown(IPeer peer) { + Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ + Assert.isNotNull(peer); + + // Get the peer id + String id = peer.getID(); + + // First, close all channels that are not reference counted + ListIterator<IChannel> iter = forcedChannels.listIterator(); + while (iter.hasNext()) { + IChannel c = iter.next(); + if (id.equals(c.getRemotePeer().getID())) { + c.close(); + iter.remove(); + } + } + + // Get the channel + IChannel channel = internalGetChannel(peer); + if (channel != null) { + // Reset the reference count (will force a channel close) + refCounters.remove(id); + + // Close the channel + internalCloseChannel(channel); + } + } + + /* (non-Javadoc) + * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager#closeAll(boolean) + */ + @Override + public void closeAll(boolean wait) { + if (wait) Assert.isTrue(!Protocol.isDispatchThread()); + + Runnable runnable = new Runnable() { + @Override + public void run() { + Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ + internalCloseAll(); + } + }; + + if (Protocol.isDispatchThread()) runnable.run(); + else if (wait) Protocol.invokeAndWait(runnable); + else Protocol.invokeLater(runnable); + } + + /** + * Close all open channel, no matter of the current reference count. + * <p> + * <b>Note:</b> This method must be invoked at the TCF dispatch thread. + */ + /* default */ void internalCloseAll() { + Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ + + IChannel[] openChannels = channels.values().toArray(new IChannel[channels.values().size()]); + + refCounters.clear(); + channels.clear(); + + for (IChannel channel : openChannels) internalCloseChannel(channel); + } + + // ----- Streams handling ----- + + /* (non-Javadoc) + * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager#subscribeStream(org.eclipse.tcf.protocol.IChannel, java.lang.String, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.IStreamsListener, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.DoneSubscribeStream) + */ + @Override + public void subscribeStream(final IChannel channel, final String streamType, final IStreamsListener listener, final DoneSubscribeStream done) { + Assert.isNotNull(channel); + Assert.isNotNull(streamType); + Assert.isNotNull(listener); + Assert.isNotNull(done); + + if (channel.getState() != IChannel.STATE_OPEN) { + done.doneSubscribeStream(new Exception(Messages.ChannelManager_stream_closed_message)); + return; + } + + StreamListenerProxy proxy = null; + + // Get all the streams listener proxy instance for the given channel + List<StreamListenerProxy> proxies = streamProxies.get(channel); + // Loop the proxies and find the one for the given stream type + if (proxies != null) { + for (StreamListenerProxy candidate : proxies) { + if (streamType.equals(candidate.getStreamType())) { + proxy = candidate; + break; + } + } + } + + // If the proxy already exist, add the listener to the proxy and return immediately + if (proxy != null) { + proxy.addListener(listener); + done.doneSubscribeStream(null); + } else { + // No proxy yet -> subscribe to the stream type for real and register the proxy + proxy = new StreamListenerProxy(channel, streamType); + if (proxies == null) { + proxies = new ArrayList<StreamListenerProxy>(); + streamProxies.put(channel, proxies); + } + proxies.add(proxy); + proxy.addListener(listener); + + IStreams service = channel.getRemoteService(IStreams.class); + if (service != null) { + final StreamListenerProxy finProxy = proxy; + final List<StreamListenerProxy> finProxies = proxies; + + // Subscribe to the stream type + service.subscribe(streamType, proxy, new IStreams.DoneSubscribe() { + @Override + public void doneSubscribe(IToken token, Exception error) { + if (error != null) { + finProxy.removeListener(listener); + if (finProxy.isEmpty()) finProxies.remove(finProxy); + if (finProxies.isEmpty()) streamProxies.remove(channel); + } else { + finProxy.addListener(listener); + } + done.doneSubscribeStream(error); + } + }); + } else { + proxy.removeListener(listener); + if (proxy.isEmpty()) proxies.remove(proxy); + if (proxies.isEmpty()) streamProxies.remove(channel); + done.doneSubscribeStream(new Exception(Messages.ChannelManager_stream_missing_service_message)); + } + } + } + + /* (non-Javadoc) + * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager#unsubscribeStream(org.eclipse.tcf.protocol.IChannel, java.lang.String, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.IStreamsListener, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.DoneUnsubscribeStream) + */ + @Override + public void unsubscribeStream(final IChannel channel, final String streamType, final IStreamsListener listener, final DoneUnsubscribeStream done) { + Assert.isNotNull(channel); + Assert.isNotNull(streamType); + Assert.isNotNull(listener); + Assert.isNotNull(done); + + if (channel.getState() != IChannel.STATE_OPEN) { + done.doneUnsubscribeStream(new Exception(Messages.ChannelManager_stream_closed_message)); + return; + } + + StreamListenerProxy proxy = null; + + // Get all the streams listener proxy instance for the given channel + List<StreamListenerProxy> proxies = streamProxies.get(channel); + // Loop the proxies and find the one for the given stream type + if (proxies != null) { + for (StreamListenerProxy candidate : proxies) { + if (streamType.equals(candidate.getStreamType())) { + proxy = candidate; + break; + } + } + } + + if (proxy != null) { + // Remove the listener from the proxy + proxy.removeListener(listener); + // Are there remaining proxied listeners for this stream type? + if (proxy.isEmpty()) { + // Unregister the stream type + IStreams service = channel.getRemoteService(IStreams.class); + if (service != null) { + final StreamListenerProxy finProxy = proxy; + final List<StreamListenerProxy> finProxies = proxies; + + // Unsubscribe + service.unsubscribe(streamType, proxy, new IStreams.DoneUnsubscribe() { + @Override + public void doneUnsubscribe(IToken token, Exception error) { + finProxies.remove(finProxy); + if (finProxies.isEmpty()) streamProxies.remove(channel); + done.doneUnsubscribeStream(error); + } + }); + } else { + done.doneUnsubscribeStream(new Exception(Messages.ChannelManager_stream_missing_service_message)); + } + } + } + } + +} diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/StepperOperationService.java b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/StepperOperationService.java new file mode 100644 index 000000000..99bdbcd5e --- /dev/null +++ b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/StepperOperationService.java @@ -0,0 +1,105 @@ +/******************************************************************************* + * Copyright (c) 2014 Wind River Systems, Inc. and others. All rights reserved. + * This program and the accompanying materials are made available under the terms + * of the Eclipse Public License v1.0 which accompanies this distribution, and is + * available at http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Wind River Systems - initial API and implementation + *******************************************************************************/ +package org.eclipse.tcf.te.tcf.core.internal; + +import org.eclipse.core.runtime.Assert; +import org.eclipse.osgi.util.NLS; +import org.eclipse.tcf.protocol.IPeer; +import org.eclipse.tcf.te.runtime.stepper.services.AbstractStepperOperationService; +import org.eclipse.tcf.te.tcf.core.nls.Messages; + +/** + * Channel manager stepper operation service implementation. + */ +public class StepperOperationService extends AbstractStepperOperationService { + /** + * Open channel operation + */ + public static final String OPEN_CHANNEL = "openChannel"; //$NON-NLS-1$ + + /** + * Close channel operation + */ + public static final String CLOSE_CHANNEL = "closeChannel"; //$NON-NLS-1$ + + /** + * Constructor + */ + public StepperOperationService() { + super(); + } + + /* (non-Javadoc) + * @see org.eclipse.tcf.te.runtime.stepper.interfaces.IStepperOperationService#isHandledOperation(java.lang.Object, java.lang.String) + */ + @Override + public boolean isHandledOperation(Object context, String operation) { + Assert.isTrue(context instanceof IPeer); + return OPEN_CHANNEL.equals(operation) || CLOSE_CHANNEL.equals(operation); + } + + /* (non-Javadoc) + * @see org.eclipse.tcf.te.runtime.stepper.interfaces.IStepperOperationService#getStepGroupId(java.lang.Object, java.lang.String) + */ + @Override + public String getStepGroupId(Object context, String operation) { + Assert.isTrue(context instanceof IPeer); + + if (OPEN_CHANNEL.equals(operation)) + return "org.eclipse.tcf.te.tcf.core.openChannelStepGroup"; //$NON-NLS-1$ + if (CLOSE_CHANNEL.equals(operation)) + return "org.eclipse.tcf.te.tcf.core.closeChannelStepGroup"; //$NON-NLS-1$ + + return null; + } + + /* (non-Javadoc) + * @see org.eclipse.tcf.te.runtime.stepper.interfaces.IStepperOperationService#getStepGroupName(java.lang.Object, java.lang.String) + */ + @Override + public String getStepGroupName(Object context, String operation) { + Assert.isTrue(context instanceof IPeer); + + if (OPEN_CHANNEL.equals(operation)) + return NLS.bind(Messages.StepperOperationService_stepGroupName_openChannel, ((IPeer)context).getName()); + if (CLOSE_CHANNEL.equals(operation)) + return NLS.bind(Messages.StepperOperationService_stepGroupName_closeChannel, ((IPeer)context).getName()); + + return null; + } + + /* (non-Javadoc) + * @see org.eclipse.tcf.te.runtime.stepper.interfaces.IStepperOperationService#isEnabled(java.lang.Object, java.lang.String) + */ + @Override + public boolean isEnabled(Object context, String operation) { + Assert.isTrue(context instanceof IPeer); + return OPEN_CHANNEL.equals(operation) || CLOSE_CHANNEL.equals(operation); + } + + /* (non-Javadoc) + * @see org.eclipse.tcf.te.runtime.stepper.interfaces.IStepperOperationService#isCancelable(java.lang.Object, java.lang.String) + */ + @Override + public boolean isCancelable(Object context, String operation) { + Assert.isTrue(context instanceof IPeer); + return false; + } + + /* (non-Javadoc) + * @see org.eclipse.tcf.te.runtime.stepper.interfaces.IStepperOperationService#addToActionHistory(java.lang.Object, java.lang.String) + */ + @Override + public boolean addToActionHistory(Object context, String operation) { + Assert.isTrue(context instanceof IPeer); + return false; + } + +} diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/StreamListenerProxy.java b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/StreamListenerProxy.java new file mode 100644 index 000000000..e93cffbc5 --- /dev/null +++ b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/StreamListenerProxy.java @@ -0,0 +1,330 @@ +/******************************************************************************* + * Copyright (c) 2014 Wind River Systems, Inc. and others. All rights reserved. + * This program and the accompanying materials are made available under the terms + * of the Eclipse Public License v1.0 which accompanies this distribution, and is + * available at http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Wind River Systems - initial API and implementation + *******************************************************************************/ +package org.eclipse.tcf.te.tcf.core.internal; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.eclipse.core.runtime.Assert; +import org.eclipse.core.runtime.IStatus; +import org.eclipse.core.runtime.ListenerList; +import org.eclipse.tcf.protocol.IChannel; +import org.eclipse.tcf.protocol.IToken; +import org.eclipse.tcf.services.IStreams; +import org.eclipse.tcf.te.runtime.interfaces.IDisposable; +import org.eclipse.tcf.te.tcf.core.activator.CoreBundleActivator; +import org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager; +import org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.IStreamsListener; +import org.eclipse.tcf.te.tcf.core.interfaces.tracing.ITraceIds; + +/** + * Channel manager stream listener proxy implementation. + */ +final class StreamListenerProxy implements IStreams.StreamsListener, IChannelManager.IStreamsListenerProxy { + // The channel + private final IChannel channel; + // The stream type the proxy is registered for + private final String streamType; + // The list of proxied stream listeners + /* default */ ListenerList listeners = new ListenerList(); + // The list of delayed stream created events + private final List<StreamListenerProxy.StreamCreatedEvent> delayedCreatedEvents = new ArrayList<StreamListenerProxy.StreamCreatedEvent>(); + + /** + * Immutable stream created event. + */ + private final static class StreamCreatedEvent { + /** + * The stream type. + */ + public final String streamType; + /** + * The stream id. + */ + public final String streamId; + /** + * The context id. + */ + public final String contextId; + + // As the class is immutable, we do not need to build the toString + // value again and again. Build it once in the constructor and reuse it later. + private final String toString; + + /** + * Constructor. + * + * @param streamType The stream type. + * @param streamId The stream id. + * @param contextId The context id. + */ + public StreamCreatedEvent(String streamType, String streamId, String contextId) { + this.streamType = streamType; + this.streamId = streamId; + this.contextId = contextId; + + toString = toString(); + } + + /* (non-Javadoc) + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object obj) { + return obj instanceof StreamListenerProxy.StreamCreatedEvent + && toString().equals(((StreamListenerProxy.StreamCreatedEvent)obj).toString()); + } + + /* (non-Javadoc) + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + return toString().hashCode(); + } + + /* (non-Javadoc) + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + if (toString != null) return toString; + + StringBuilder builder = new StringBuilder(getClass().getSimpleName()); + builder.append(": streamType = "); //$NON-NLS-1$ + builder.append(streamType); + builder.append("; streamId = "); //$NON-NLS-1$ + builder.append(streamId); + builder.append("; contextId = "); //$NON-NLS-1$ + builder.append(contextId); + + return builder.toString(); + } + } + + /** + * Constructor + * + * @param The channel. Must not be <code>null</code>. + */ + public StreamListenerProxy(final IChannel channel, final String streamType) { + Assert.isNotNull(channel); + Assert.isNotNull(streamType); + + this.channel = channel; + this.channel.addChannelListener(new IChannel.IChannelListener() { + @Override + public void onChannelOpened() {} + + @Override + public void onChannelClosed(Throwable error) { + // Channel is closed, remove ourself + channel.removeChannelListener(this); + // Dispose all registered streams listener + Object[] candidates = listeners.getListeners(); + listeners.clear(); + for (Object listener : candidates) { + if (listener instanceof IDisposable) { + ((IDisposable)listener).dispose(); + } + } + } + + @Override + public void congestionLevel(int level) { + } + }); + + // Remember the stream type + this.streamType = streamType; + } + + /** + * Returns the stream type the proxy is registered for. + * + * @return The stream type. + */ + public String getStreamType() { + return streamType; + } + + /** + * Adds the given streams listener to the list of proxied listeners. + * + * @param listener The streams listener. Must not be <code>null</code>. + */ + public void addListener(IStreamsListener listener) { + Assert.isNotNull(listener); + listener.setProxy(this); + listeners.add(listener); + } + + /** + * Removes the given streams listener from the list of proxied listeners. + * + * @param listener The streams listener. Must not be <code>null</code>. + */ + public void removeListener(IStreamsListener listener) { + Assert.isNotNull(listener); + listener.setProxy(null); + listeners.remove(listener); + } + + /** + * Returns if the proxied listeners list is empty or not. + * + * @return <code>True</code> if the list is empty, <code>false</code> otherwise. + */ + public boolean isEmpty() { + return listeners.isEmpty(); + } + + /* (non-Javadoc) + * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.IStreamsListenerProxy#processDelayedCreatedEvents() + */ + @Override + public void processDelayedCreatedEvents() { + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_STREAMS_LISTENER_PROXY)) { + CoreBundleActivator.getTraceHandler().trace("StreamListenerProxy: processDelayedCreatedEvents()", //$NON-NLS-1$ + 0, ITraceIds.TRACE_STREAMS_LISTENER_PROXY, + IStatus.INFO, getClass()); + } + + synchronized (delayedCreatedEvents) { + // Make a snapshot of all delayed created events + StreamListenerProxy.StreamCreatedEvent[] events = delayedCreatedEvents.toArray(new StreamListenerProxy.StreamCreatedEvent[delayedCreatedEvents.size()]); + // Clear the events now, it will be refilled by calling the created method + delayedCreatedEvents.clear(); + // Loop the delayed created events and recall the created method to process them + for (StreamListenerProxy.StreamCreatedEvent event : events) { + created(event.streamType, event.streamId, event.contextId); + } + } + } + + /* (non-Javadoc) + * @see org.eclipse.tcf.services.IStreams.StreamsListener#created(java.lang.String, java.lang.String, java.lang.String) + */ + @Override + public void created(String stream_type, String stream_id, String context_id) { + Assert.isNotNull(stream_type); + Assert.isNotNull(stream_id); + + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_STREAMS_LISTENER_PROXY)) { + CoreBundleActivator.getTraceHandler().trace("StreamListenerProxy: created(" + stream_type + ", " + stream_id + ", " + context_id + ")", //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ + 0, ITraceIds.TRACE_STREAMS_LISTENER_PROXY, + IStatus.INFO, getClass()); + } + + // If the context_id is null, disconnect from the stream right away. We do not support + // old TCF agents not sending the context id in the created event. + if (context_id == null) { + IStreams service = channel.getRemoteService(IStreams.class); + if (service != null) { + service.disconnect(stream_id, new IStreams.DoneDisconnect() { + @Override + public void doneDisconnect(IToken token, Exception error) { + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_STREAMS_LISTENER_PROXY)) { + CoreBundleActivator.getTraceHandler().trace("StreamListenerProxy: disconnect -> context id must be not null.", //$NON-NLS-1$ + 0, ITraceIds.TRACE_STREAMS_LISTENER_PROXY, IStatus.INFO, getClass()); + } + } + }); + } + + return; + } + + boolean delayed = false; + boolean disconnect = true; + + // Loop all listeners + for (Object l : listeners.getListeners()) { + IStreamsListener listener = (IStreamsListener)l; + + // If the listener has no context set yet, the listener cannot decide if + // the event is consumed or not. In this case, the event must be delayed. + if (!listener.hasContext()) { + delayed |= true; + continue; + } + + // Does the listener consume the event? + boolean consume = listener.isCreatedConsumed(stream_type, stream_id, context_id); + if (consume) listener.created(stream_type, stream_id, context_id); + // If the created event is consumed by one listener, it cannot be disconnected anymore + disconnect &= !consume; + } + + if (delayed) { + // Context not set yet --> add to the delayed list + StreamListenerProxy.StreamCreatedEvent event = new StreamCreatedEvent(stream_type, stream_id, context_id); + synchronized (delayedCreatedEvents) { + if (!delayedCreatedEvents.contains(event)) { + delayedCreatedEvents.add(event); + + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_STREAMS_LISTENER_PROXY)) { + CoreBundleActivator.getTraceHandler().trace("StreamListenerProxy: delayed -> at least one listener does not have a context set", //$NON-NLS-1$ + 0, ITraceIds.TRACE_STREAMS_LISTENER_PROXY, IStatus.INFO, getClass()); + } + } + } + return; + } + + if (disconnect) { + IStreams service = channel.getRemoteService(IStreams.class); + if (service != null) { + service.disconnect(stream_id, new IStreams.DoneDisconnect() { + @Override + public void doneDisconnect(IToken token, Exception error) { + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_STREAMS_LISTENER_PROXY)) { + CoreBundleActivator.getTraceHandler().trace("StreamListenerProxy: disconnect -> not interested in context id", //$NON-NLS-1$ + 0, ITraceIds.TRACE_STREAMS_LISTENER_PROXY, IStatus.INFO, getClass()); + } + } + }); + } + } + } + + /* (non-Javadoc) + * @see org.eclipse.tcf.services.IStreams.StreamsListener#disposed(java.lang.String, java.lang.String) + */ + @Override + public void disposed(String stream_type, String stream_id) { + Assert.isNotNull(stream_type); + Assert.isNotNull(stream_id); + + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_STREAMS_LISTENER_PROXY)) { + CoreBundleActivator.getTraceHandler().trace("StreamListenerProxy: disposed(" + stream_type + ", " + stream_id + ")", //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ + 0, ITraceIds.TRACE_STREAMS_LISTENER_PROXY, + IStatus.INFO, getClass()); + } + + // If the delayed created events list is not empty, we have + // to check if one of the delayed create events got disposed + synchronized (delayedCreatedEvents) { + Iterator<StreamListenerProxy.StreamCreatedEvent> iterator = delayedCreatedEvents.iterator(); + while (iterator.hasNext()) { + StreamListenerProxy.StreamCreatedEvent event = iterator.next(); + if (stream_type.equals(event.streamType) && stream_id.equals(event.streamId)) { + // Remove the create event from the list + iterator.remove(); + } + } + } + + for (Object l : listeners.getListeners()) { + ((IStreamsListener)l).disposed(stream_type, stream_id); + } + } +}
\ No newline at end of file diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.java b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.java index 7cf894d30..4bd8d0201 100644 --- a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.java +++ b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.java @@ -74,6 +74,7 @@ public class Messages extends NLS { public static String ChannelManager_openChannel_message; public static String ChannelManager_openChannel_reuse_message; + public static String ChannelManager_openChannel_pending_message; public static String ChannelManager_openChannel_new_message; public static String ChannelManager_openChannel_success_message; public static String ChannelManager_openChannel_failed_message; @@ -91,9 +92,12 @@ public class Messages extends NLS { public static String ChannelManager_openChannel_redirect_failed; public static String ChannelManager_openChannel_redirect_invalidChannelState; public static String ChannelManager_createPeer_new_message; + public static String ChannelManager_closeChannel_close_message; public static String ChannelManager_closeChannel_message; public static String ChannelManager_closeChannel_inuse_message; public static String ChannelManager_closeChannel_closed_message; + public static String ChannelManager_closeChannel_pending_message; + public static String ChannelManager_closeChannel_failed_message; public static String ChannelManager_stream_closed_message; public static String ChannelManager_stream_missing_service_message; @@ -120,4 +124,7 @@ public class Messages extends NLS { public static String Extension_error_invalidChannelStateChangeListener; public static String AbstractJob_error_dialogTitle; + + public static String StepperOperationService_stepGroupName_openChannel; + public static String StepperOperationService_stepGroupName_closeChannel; } diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.properties b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.properties index f0442d313..0da1b2ceb 100644 --- a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.properties +++ b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.properties @@ -15,6 +15,7 @@ InternalChannelListener_onChannelClosed_cause=; Possibly caused by: {0} ChannelManager_openChannel_message=Request to open channel. Target id = {0}, flags = {1} ChannelManager_openChannel_reuse_message=Reusing already opened channel. Target id = {0}, new reference count = {1} +ChannelManager_openChannel_pending_message=Opening channel in progress. Callback ({1}) marked pending. Target id = {0} ChannelManager_openChannel_new_message=Opening new channel. Target id = {0} ChannelManager_openChannel_success_message=Opening channel succeeded. Target id = {0} ChannelManager_openChannel_failed_message=Opening channel failed. Target id = {0}, cause = {1} @@ -33,8 +34,11 @@ ChannelManager_openChannel_redirect_failed=Failed to redirect channel from {0} t ChannelManager_openChannel_redirect_invalidChannelState=Channel not in open state. Cannot redirect channel to {1}. ChannelManager_createPeer_new_message=New target created. Target id = {0}, isTransient = {1} ChannelManager_closeChannel_message=Request to close channel. Target id = {0} +ChannelManager_closeChannel_close_message=Closing channel. Target id = {0} ChannelManager_closeChannel_inuse_message=Channel not closed. Still in use. Target id = {0}, new reference count = {1} ChannelManager_closeChannel_closed_message=Closed channel. Target id = {0} +ChannelManager_closeChannel_pending_message=Closing channel in progress. Target id = {0} +ChannelManager_closeChannel_failed_message=Closing channel failed. Target id = {0}, cause = {1} ChannelManager_stream_closed_message=Channel must be open but is closed. ChannelManager_stream_missing_service_message=Streams service not available. @@ -61,3 +65,7 @@ Extension_error_invalidProtocolStateChangeListener=Failed to instantiate the pro Extension_error_invalidChannelStateChangeListener=Failed to instantiate the channel state change listener from extension point ''{0}''. AbstractJob_error_dialogTitle=Error + +StepperOperationService_stepGroupName_openChannel=Open channel to {0} +StepperOperationService_stepGroupName_closeChannel=Close channel to {0} + |