diff options
5 files changed, 314 insertions, 1840 deletions
diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/channelmanager/ChannelManager.java b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/channelmanager/ChannelManager.java index ad957936e..1f3c83d3b 100644 --- a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/channelmanager/ChannelManager.java +++ b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/channelmanager/ChannelManager.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2011, 2014 Wind River Systems, Inc. and others. All rights reserved. + * Copyright (c) 2014 Wind River Systems, Inc. and others. All rights reserved. * This program and the accompanying materials are made available under the terms * of the Eclipse Public License v1.0 which accompanies this distribution, and is * available at http://www.eclipse.org/legal/epl-v10.html @@ -9,8 +9,6 @@ *******************************************************************************/ package org.eclipse.tcf.te.tcf.core.internal.channelmanager; -import java.io.PrintWriter; -import java.io.StringWriter; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -21,44 +19,52 @@ import java.util.concurrent.atomic.AtomicReference; import org.eclipse.core.runtime.Assert; import org.eclipse.core.runtime.IStatus; -import org.eclipse.core.runtime.OperationCanceledException; import org.eclipse.core.runtime.PlatformObject; import org.eclipse.osgi.util.NLS; import org.eclipse.tcf.protocol.IChannel; import org.eclipse.tcf.protocol.IPeer; import org.eclipse.tcf.protocol.IToken; import org.eclipse.tcf.protocol.Protocol; -import org.eclipse.tcf.services.IDiagnostics; -import org.eclipse.tcf.services.IPathMap; import org.eclipse.tcf.services.IStreams; import org.eclipse.tcf.te.runtime.callback.Callback; -import org.eclipse.tcf.te.runtime.services.ServiceManager; +import org.eclipse.tcf.te.runtime.interfaces.callback.ICallback; +import org.eclipse.tcf.te.runtime.interfaces.properties.IPropertiesContainer; +import org.eclipse.tcf.te.runtime.properties.PropertiesContainer; +import org.eclipse.tcf.te.runtime.stepper.interfaces.IStepAttributes; +import org.eclipse.tcf.te.runtime.stepper.interfaces.IStepContext; +import org.eclipse.tcf.te.runtime.stepper.interfaces.IStepperOperationService; +import org.eclipse.tcf.te.runtime.stepper.job.StepperJob; +import org.eclipse.tcf.te.runtime.stepper.utils.StepperHelper; import org.eclipse.tcf.te.tcf.core.activator.CoreBundleActivator; import org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager; -import org.eclipse.tcf.te.tcf.core.interfaces.IPathMapService; -import org.eclipse.tcf.te.tcf.core.interfaces.IPeerProperties; +import org.eclipse.tcf.te.tcf.core.interfaces.steps.ITcfStepAttributes; import org.eclipse.tcf.te.tcf.core.interfaces.tracing.ITraceIds; +import org.eclipse.tcf.te.tcf.core.internal.channelmanager.steps.ShutdownValueAddStep; import org.eclipse.tcf.te.tcf.core.nls.Messages; -import org.eclipse.tcf.te.tcf.core.util.persistence.PeerDataHelper; -import org.eclipse.tcf.te.tcf.core.va.ValueAddManager; -import org.eclipse.tcf.te.tcf.core.va.interfaces.IValueAdd; - /** - * TCF channel manager implementation. + * Channel manager implementation. */ -public final class ChannelManager extends PlatformObject implements IChannelManager { - // The map of reference counters per peer id - /* default */ final Map<String, AtomicInteger> refCounters = new HashMap<String, AtomicInteger>(); +public class ChannelManager extends PlatformObject implements IChannelManager { + // The map of reference counters per channel + /* default */ final Map<IChannel, AtomicInteger> refCounters = new HashMap<IChannel, AtomicInteger>(); // The map of channels per peer id /* default */ final Map<String, IChannel> channels = new HashMap<String, IChannel>(); - // The map of channels opened via "forceNew" flag (needed to handle the close channel correctly) + // The map of pending open channel callback's per peer id + /* default */ final Map<String, List<DoneOpenChannel>> pendingDones = new HashMap<String, List<DoneOpenChannel>>(); + // The list of channels opened via "forceNew" flag (needed to handle the close channel correctly) /* default */ final List<IChannel> forcedChannels = new ArrayList<IChannel>(); + // The map of flags used for opening a forced channel per channel + /* default */ final Map<IChannel, Map<String, Boolean>> forcedChannelFlags = new HashMap<IChannel, Map<String, Boolean>>(); // The map of stream listener proxies per channel /* default */ final Map<IChannel, List<StreamListenerProxy>> streamProxies = new HashMap<IChannel, List<StreamListenerProxy>>(); + // The map of scheduled "open channel" stepper jobs per peer id + /* default */ final Map<String, StepperJob> pendingOpenChannel = new HashMap<String, StepperJob>(); + // The map of scheduled "close channel" stepper jobs per peer id + /* default */ final Map<String, StepperJob> pendingCloseChannel = new HashMap<String, StepperJob>(); /** - * Constructor. + * Constructor */ public ChannelManager() { super(); @@ -69,112 +75,35 @@ public final class ChannelManager extends PlatformObject implements IChannelMana */ @Override public void openChannel(final IPeer peer, final Map<String, Boolean> flags, final DoneOpenChannel done) { + Assert.isNotNull(peer); + Assert.isNotNull(done); + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(1, ITraceIds.TRACE_CHANNEL_MANAGER)) { try { throw new Throwable(); } catch (Throwable e) { CoreBundleActivator.getTraceHandler().trace("ChannelManager#openChannel called from:", //$NON-NLS-1$ - 1, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this); + 1, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this); e.printStackTrace(); } } - DoneOpenChannel innerDone = null; - boolean noPathMap = flags != null && flags.containsKey(IChannelManager.FLAG_NO_PATH_MAP) ? flags.get(IChannelManager.FLAG_NO_PATH_MAP).booleanValue() : false; - if (noPathMap) { - innerDone = done; - } else { - innerDone = new DoneOpenChannel() { - @Override - public void doneOpenChannel(final Throwable error, final IChannel channel) { - // If open channel failed, pass on to the original done - if (error != null || channel == null || channel.getState() != IChannel.STATE_OPEN) { - done.doneOpenChannel(error, channel); - } else { - // Take care of the path map - final IPathMapService service = ServiceManager.getInstance().getService(peer, IPathMapService.class); - final IPathMap svc = channel.getRemoteService(IPathMap.class); - if (service != null && svc != null) { - // Apply the initial path map to the opened channel. - // This must happen outside the TCF dispatch thread as it may trigger - // the launch configuration change listeners. - Thread thread = new Thread(new Runnable() { - @Override - public void run() { - service.applyPathMap(peer, true, new Callback() { - @Override - protected void internalDone(Object caller, IStatus status) { - done.doneOpenChannel(error, channel); - } - }); - } - }); - thread.start(); - } else { - done.doneOpenChannel(null, channel); - } - } - } - }; - } - final DoneOpenChannel finInnerDone = innerDone; + // The client done callback must be called within the TCF event dispatch thread + final DoneOpenChannel internalDone = new DoneOpenChannel() { - Runnable runnable = new Runnable() { @Override - public void run() { - Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ - - // Check on the value-add's first - internalHandleValueAdds(peer, flags, new DoneHandleValueAdds() { + public void doneOpenChannel(final Throwable error, final IChannel channel) { + Runnable runnable = new Runnable() { @Override - public void doneHandleValueAdds(final Throwable error, final IValueAdd[] valueAdds) { - // If the error is null, continue and open the channel - if (error == null) { - // Do we have any value add in the chain? - if (valueAdds != null && valueAdds.length > 0) { - // There are value-add's -> chain them now - internalChainValueAdds(valueAdds, peer, flags, finInnerDone); - } else { - // Determine the proxy configuration - String proxyConfiguration = peer.getAttributes().get(IPeerProperties.PROP_PROXIES); - IPeer[] proxies = proxyConfiguration != null ? PeerDataHelper.decodePeerList(proxyConfiguration) : null; - if (proxies != null && proxies.length > 0) { - // There are proxies -> chain them now - internalChainProxies(proxies, peer, flags, finInnerDone); - } else { - // No value-add's and no proxies -> open a channel to the target peer directly - internalOpenChannel(peer, flags, finInnerDone); - } - } - } else { - // Shutdown the value-add's launched - internalShutdownValueAdds(peer, valueAdds); - // Fail the channel opening - finInnerDone.doneOpenChannel(error, null); - } + public void run() { + done.doneOpenChannel(error, channel); } - }); + }; + + if (Protocol.isDispatchThread()) runnable.run(); + else Protocol.invokeLater(runnable); } }; - if (Protocol.isDispatchThread()) runnable.run(); - else Protocol.invokeLater(runnable); - } - - /** - * Internal implementation of {@link #openChannel(IPeer, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.DoneOpenChannel)}. - * <p> - * Reference counted channels are cached by the channel manager and must be closed via {@link #closeChannel(IChannel)} . - * <p> - * Method must be called within the TCF dispatch thread. - * - * @param peer The peer. Must not be <code>null</code>. - * @param flags Map containing the flags to parameterize the channel opening, or <code>null</code>. - * @param done The client callback. Must not be <code>null</code>. - */ - /* default */ void internalOpenChannel(final IPeer peer, final Map<String, Boolean> flags, final DoneOpenChannel done) { - Assert.isNotNull(peer); - Assert.isNotNull(done); - Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ // The channel instance to return IChannel channel = null; @@ -184,133 +113,181 @@ public final class ChannelManager extends PlatformObject implements IChannelMana if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_message, id, flags), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this); + 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this); } - // Extract the flags of interest form the given flags map + // First thing to determine is if to open a new channel or the shared + // channel can be used, if there is a shared channel at all. boolean forceNew = flags != null && flags.containsKey(IChannelManager.FLAG_FORCE_NEW) ? flags.get(IChannelManager.FLAG_FORCE_NEW).booleanValue() : false; boolean noValueAdd = flags != null && flags.containsKey(IChannelManager.FLAG_NO_VALUE_ADD) ? flags.get(IChannelManager.FLAG_NO_VALUE_ADD).booleanValue() : false; - // If noValueAdd == true -> forceNew has to be true as well - if (noValueAdd) forceNew = true; + boolean noPathMap = flags != null && flags.containsKey(IChannelManager.FLAG_NO_PATH_MAP) ? flags.get(IChannelManager.FLAG_NO_PATH_MAP).booleanValue() : false; + // If noValueAdd == true or noPathMap == true -> forceNew has to be true as well + if (noValueAdd || noPathMap) forceNew = true; final boolean finForceNew = forceNew; - // Check if there is already a channel opened to this peer - channel = !forceNew ? channels.get(id) : null; - if (channel != null && (channel.getState() == IChannel.STATE_OPEN || channel.getState() == IChannel.STATE_OPENING)) { - // Increase the reference count - AtomicInteger counter = refCounters.get(id); - if (counter == null) { - counter = new AtomicInteger(0); - refCounters.put(id, counter); + // Query the shared channel if not forced to open a new channel + if (!forceNew) channel = channels.get(id); + + // If a shared channel is available, check if the shared channel can be used + if (channel != null) { + // If the channel is still open, it's all done and the channel can be returned right away + if (channel.getState() == IChannel.STATE_OPEN) { + // Increase the reference count + AtomicInteger counter = refCounters.get(channel); + if (counter == null) { + counter = new AtomicInteger(0); + refCounters.put(channel, counter); + } + counter.incrementAndGet(); + + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { + CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_reuse_message, id, counter.toString()), + 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this); + } + + // Invoke the channel open done callback + internalDone.doneOpenChannel(null, channel); } - counter.incrementAndGet(); + // If the channel is opening, wait for the channel to become fully opened. + // Add the done open channel callback to the list of pending callback's. + else if (channel.getState() == IChannel.STATE_OPENING) { + List<DoneOpenChannel> dones = pendingDones.get(id); + if (dones == null) { + dones = new ArrayList<DoneOpenChannel>(); + pendingDones.put(id, dones); + } + Assert.isNotNull(dones); + dones.add(internalDone); - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_reuse_message, id, counter.toString()), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this); + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { + CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_pending_message, id, "0x" + Integer.toHexString(internalDone.hashCode())), //$NON-NLS-1$ + 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this); + } + } + else { + // Channel is not in open state -> drop the instance + channels.remove(id); + refCounters.remove(channel); + channel = null; } - } else if (channel != null) { - // Channel is not in open state -> drop the instance - channel = null; - channels.remove(id); - refCounters.remove(id); } - // Opens a new channel if necessary + // Channel not available -> open a new one if (channel == null) { - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_new_message, id), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this); - } - - try { - channel = peer.openChannel(); - - if (channel != null) { - if (!forceNew) channels.put(id, channel); - if (!forceNew) refCounters.put(id, new AtomicInteger(1)); - if (forceNew) forcedChannels.add(channel); + // Check if there is a pending "open channel" stepper job + StepperJob job = pendingOpenChannel.get(id); + if (job == null) { + // No pending "open channel" stepper job -> schedule one and initiate opening the channel + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { + CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_new_message, id), + 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this); + } - // Register the channel listener - final IChannel finChannel = channel; - channel.addChannelListener(new IChannel.IChannelListener() { + // Create the data properties container passed to the "open channel" steps + final IPropertiesContainer data = new PropertiesContainer(); + // Set the flags to be passed to the "open channel" steps + data.setProperty(IChannelManager.FLAG_NO_VALUE_ADD, noValueAdd); + data.setProperty(IChannelManager.FLAG_NO_PATH_MAP, noPathMap); + // No recent action history persistence + data.setProperty(IStepAttributes.PROP_SKIP_LAST_RUN_HISTORY, true); - @Override - public void onChannelOpened() { - // Remove ourself as listener from the channel - finChannel.removeChannelListener(this); + // Create the callback to be invoked once the "open channel" stepper job is completed + final ICallback callback = new Callback() { + @Override + protected void internalDone(Object caller, IStatus status) { + // Check for error + if (status.getSeverity() == IStatus.ERROR) { + // Extract the failure cause + Throwable error = status.getException(); if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_success_message, id), + CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_failed_message, id, error), 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this); } - // Channel opening succeeded - done.doneOpenChannel(null, finChannel); - } + // Job is done -> remove it from the list of pending jobs + pendingOpenChannel.remove(id); - @Override - public void onChannelClosed(Throwable error) { - // Remove ourself as listener from the channel - finChannel.removeChannelListener(this); - // Clean the reference counter and the channel map - if (!finForceNew) channels.remove(id); - if (!finForceNew) refCounters.remove(id); - if (finForceNew) forcedChannels.remove(finChannel); + // Invoke the primary "open channel" done callback + internalDone.doneOpenChannel(error, null); + + // Invoke pending callback's + List<DoneOpenChannel> pending = pendingDones.remove(id); + if (pending != null && !pending.isEmpty()) { + for (DoneOpenChannel d : pending) { + d.doneOpenChannel(error, null); + } + } + } else { + // Get the channel + IChannel channel = (IChannel)data.getProperty(ITcfStepAttributes.ATTR_CHANNEL); + Assert.isNotNull(channel); + Assert.isTrue(channel.getState() == IChannel.STATE_OPEN); + + // Store the channel + if (!finForceNew) channels.put(id, channel); + if (!finForceNew) refCounters.put(channel, new AtomicInteger(1)); + if (finForceNew) forcedChannels.add(channel); + if (finForceNew) forcedChannelFlags.put(channel, flags); + + // Job is done -> remove it from the list of pending jobs + pendingOpenChannel.remove(id); if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_failed_message, id, error), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this); + CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_success_message, id), + 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this); } - // Channel opening failed - done.doneOpenChannel(error != null ? error : new OperationCanceledException(), finChannel); - } + // Invoke the primary "open channel" done callback + internalDone.doneOpenChannel(null, channel); - @Override - public void congestionLevel(int level) { - // ignored + // Invoke pending callback's + List<DoneOpenChannel> pending = pendingDones.remove(id); + if (pending != null && !pending.isEmpty()) { + for (DoneOpenChannel d : pending) { + d.doneOpenChannel(null, channel); + } + } } - }); - } else { - // Channel is null? Something went terrible wrong. - done.doneOpenChannel(new Exception("Unexpected null return value from IPeer#openChannel()!"), null); //$NON-NLS-1$ - } - } catch (Throwable e) { - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_failed_message, id, e), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this); - } + } + }; - // Channel opening failed - done.doneOpenChannel(e, channel); - } - } else { - // Wait for the channel to be fully opened if still in "OPENING" state - if (channel.getState() == IChannel.STATE_OPENING) { - final IChannel finChannel = channel; - channel.addChannelListener(new IChannel.IChannelListener() { + // Get the stepper operation service + IStepperOperationService stepperOperationService = StepperHelper.getService(peer, StepperOperationService.OPEN_CHANNEL); - @Override - public void onChannelOpened() { - finChannel.removeChannelListener(this); - done.doneOpenChannel(null, finChannel); - } + // Schedule the "open channel" stepper job + IStepContext stepContext = stepperOperationService.getStepContext(peer, StepperOperationService.OPEN_CHANNEL); + String stepGroupId = stepperOperationService.getStepGroupId(peer, StepperOperationService.OPEN_CHANNEL); - @Override - public void onChannelClosed(Throwable error) { - finChannel.removeChannelListener(this); - done.doneOpenChannel(error != null ? error : new OperationCanceledException(), finChannel); - } + if (stepGroupId != null && stepContext != null) { + String name = stepperOperationService.getStepGroupName(peer, StepperOperationService.OPEN_CHANNEL); + boolean isCancelable = stepperOperationService.isCancelable(peer, StepperOperationService.OPEN_CHANNEL); - @Override - public void congestionLevel(int level) { - } - }); - } - else { - done.doneOpenChannel(null, channel); + job = new StepperJob(name != null ? name : "", stepContext, data, stepGroupId, StepperOperationService.OPEN_CHANNEL, isCancelable, true); //$NON-NLS-1$ + job.setJobCallback(callback); + job.markStatusHandled(); + job.schedule(); + } + + // Remember the "open channel" stepper job until finished + if (job != null) { + pendingOpenChannel.put(id, job); + } + } else { + // There is a pending "open channel" stepper job -> add the open channel callback to the list + List<DoneOpenChannel> dones = pendingDones.get(id); + if (dones == null) { + dones = new ArrayList<DoneOpenChannel>(); + pendingDones.put(id, dones); + } + Assert.isNotNull(dones); + dones.add(internalDone); + + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { + CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_pending_message, id, "0x" + Integer.toHexString(internalDone.hashCode())), //$NON-NLS-1$ + 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this); + } } } } @@ -353,9 +330,9 @@ public final class ChannelManager extends PlatformObject implements IChannelMana IChannel channel = channels.get(id); if (channel != null && !(channel.getState() == IChannel.STATE_OPEN || channel.getState() == IChannel.STATE_OPENING)) { // Channel is not in open state -> drop the instance - channel = null; channels.remove(id); - refCounters.remove(id); + refCounters.remove(channel); + channel = null; } return channel; @@ -387,50 +364,136 @@ public final class ChannelManager extends PlatformObject implements IChannelMana * * @param channel The channel. Must not be <code>null</code>. */ - /* default */ void internalCloseChannel(IChannel channel) { + /* default */ void internalCloseChannel(final IChannel channel) { Assert.isNotNull(channel); Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ // Get the id of the remote peer - IPeer peer = channel.getRemotePeer(); - String id = peer.getID(); + final IPeer peer = channel.getRemotePeer(); + final String id = peer.getID(); if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_message, id), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this); + 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this); } // Determine if the given channel is a reference counted channel final boolean isRefCounted = !forcedChannels.contains(channel); // Get the reference counter (if the channel is a reference counted channel) - AtomicInteger counter = isRefCounted ? refCounters.get(id) : null; + AtomicInteger counter = isRefCounted ? refCounters.get(channel) : null; // If the counter is null or get 0 after the decrement, close the channel if (counter == null || counter.decrementAndGet() == 0) { - channel.close(); + // Check if there is a pending "close channel" stepper job + StepperJob job = pendingCloseChannel.get(id); + if (job == null) { + // No pending "close channel" stepper job -> schedule one and initiate closing the channel + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { + CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_close_message, id), + 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this); + } - // Get the value-add's for the peer to shutdown (if the reference counter is 0) - if (counter != null && counter.get() == 0) { - IValueAdd[] valueAdds = ValueAddManager.getInstance().getValueAdd(peer); - if (valueAdds != null && valueAdds.length > 0) { - internalShutdownValueAdds(peer, valueAdds); + // Create the data properties container passed to the "close channel" steps + final IPropertiesContainer data = new PropertiesContainer(); + // Set the channel to close + data.setProperty(ITcfStepAttributes.ATTR_CHANNEL, channel); + // No recent action history persistence + data.setProperty(IStepAttributes.PROP_SKIP_LAST_RUN_HISTORY, true); + + // Determine if the value-add's can be shutdown or must stay alive. + // In case the channel to close is not reference counted, but this is a reference + // counted channel to the same peer, and that channel is still open, the + // value-adds must stay alive. + if (!isRefCounted) { + IChannel shared = channels.get(id); + if (shared != null && (shared.getState() == IChannel.STATE_OPEN || shared.getState() == IChannel.STATE_OPENING)) { + data.setProperty(ShutdownValueAddStep.PROP_SKIP_SHUTDOWN_STEP, true); + } + } else { + // The channel is reference counted, that means it is a shared channel + // and normally it will shutdown the value-adds if closed. However, we + // can have not reference counted channels to the same target that is + // using value-add's. In this case we also have to skip shutting down + // the value-add. + for (IChannel c : forcedChannels) { + if (id.equals(c.getRemotePeer().getID())) { + Map<String, Boolean> flags = forcedChannelFlags.get(c); + boolean noValueAdd = flags != null && flags.containsKey(IChannelManager.FLAG_NO_VALUE_ADD) ? flags.get(IChannelManager.FLAG_NO_VALUE_ADD).booleanValue() : false; + if (!noValueAdd) { + data.setProperty(ShutdownValueAddStep.PROP_SKIP_SHUTDOWN_STEP, true); + break; + } + } + } } - } - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_closed_message, id), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this); - } + // Create the callback to be invoked once the "close channel" stepper job is completed + final ICallback callback = new Callback() { + @Override + protected void internalDone(Object caller, IStatus status) { + // Check for error + if (status.getSeverity() == IStatus.ERROR) { + // Extract the failure cause + Throwable error = status.getException(); + + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { + CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_failed_message, id, error), + 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this); + } + + // Job is done -> remove it from the list of pending jobs + pendingCloseChannel.remove(id); + } else { + // Job is done -> remove it from the list of pending jobs + pendingCloseChannel.remove(id); - // Clean the reference counter and the channel map - if (isRefCounted) refCounters.remove(id); - if (isRefCounted) channels.remove(id); - if (!isRefCounted) forcedChannels.remove(channel); + // Clean the reference counter and the channel map + if (isRefCounted) channels.remove(id); + if (isRefCounted) refCounters.remove(channel); + if (!isRefCounted) forcedChannels.remove(channel); + if (!isRefCounted) forcedChannelFlags.remove(channel); + + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { + CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_closed_message, id), + 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this); + } + } + } + }; + + // Get the stepper operation service + IStepperOperationService stepperOperationService = StepperHelper.getService(peer, StepperOperationService.CLOSE_CHANNEL); + + // Schedule the "close channel" stepper job + IStepContext stepContext = stepperOperationService.getStepContext(peer, StepperOperationService.CLOSE_CHANNEL); + String stepGroupId = stepperOperationService.getStepGroupId(peer, StepperOperationService.CLOSE_CHANNEL); + + if (stepGroupId != null && stepContext != null) { + String name = stepperOperationService.getStepGroupName(peer, StepperOperationService.CLOSE_CHANNEL); + boolean isCancelable = stepperOperationService.isCancelable(peer, StepperOperationService.CLOSE_CHANNEL); + + job = new StepperJob(name != null ? name : "", stepContext, data, stepGroupId, StepperOperationService.CLOSE_CHANNEL, isCancelable, true); //$NON-NLS-1$ + job.setJobCallback(callback); + job.markStatusHandled(); + job.schedule(); + } + + // Remember the "close channel" stepper job until finished + if (job != null) { + pendingCloseChannel.put(id, job); + } + } else { + // There is a pending "close channel" stepper job + if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { + CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_pending_message, id), + 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this); + } + } } else { if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_inuse_message, id, counter.toString()), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this); + 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this); } } @@ -440,6 +503,7 @@ public final class ChannelManager extends PlatformObject implements IChannelMana IChannel c = iter.next(); if (c.getState() == IChannel.STATE_CLOSED) { iter.remove(); + forcedChannelFlags.remove(c); } } } @@ -480,6 +544,7 @@ public final class ChannelManager extends PlatformObject implements IChannelMana if (id.equals(c.getRemotePeer().getID())) { c.close(); iter.remove(); + forcedChannelFlags.remove(c); } } @@ -487,17 +552,11 @@ public final class ChannelManager extends PlatformObject implements IChannelMana IChannel channel = internalGetChannel(peer); if (channel != null) { // Reset the reference count (will force a channel close) - refCounters.remove(id); + refCounters.remove(channel); // Close the channel internalCloseChannel(channel); } - - // Make sure to shutdown all value-add's for the peer - IValueAdd[] valueAdds = ValueAddManager.getInstance().getValueAdd(peer); - if (valueAdds != null && valueAdds.length > 0) { - internalShutdownValueAdds(peer, valueAdds); - } } /* (non-Javadoc) @@ -534,851 +593,9 @@ public final class ChannelManager extends PlatformObject implements IChannelMana channels.clear(); for (IChannel channel : openChannels) internalCloseChannel(channel); - - internalShutdownAllValueAdds(); - } - - /** - * Shutdown the given value-adds for the given peer. - * - * @param peer The peer. Must not be <code>null</code>. - * @param valueAdds The list of value-adds. Must not be <code>null</code>. - */ - /* default */ void internalShutdownValueAdds(final IPeer peer, final IValueAdd[] valueAdds) { - Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ - Assert.isNotNull(peer); - Assert.isNotNull(valueAdds); - - // Get the peer id - final String id = peer.getID(); - - if (valueAdds.length > 0) { - doShutdownValueAdds(id, valueAdds); - } - } - - /** - * Shutdown the given value-adds for the given peer id. - * - * @param id The peer id. Must not be <code>null</code>. - * @param valueAdds The list of value-add's. Must not be <code>null</code>. - */ - /* default */ void doShutdownValueAdds(final String id, final IValueAdd[] valueAdds) { - Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ - Assert.isNotNull(id); - Assert.isNotNull(valueAdds); - - for (IValueAdd valueAdd : valueAdds) { - valueAdd.shutdown(id, new Callback() { - @Override - protected void internalDone(Object caller, IStatus status) { - } - }); - } - } - - /** - * Shutdown all value-add's running. Called from {@link #closeAll(boolean)} - */ - /* default */ void internalShutdownAllValueAdds() { - Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ - - // Get all value-add's - IValueAdd[] valueAdds = ValueAddManager.getInstance().getValueAdds(false); - for (IValueAdd valueAdd : valueAdds) { - valueAdd.shutdownAll(new Callback() { - @Override - protected void internalDone(Object caller, IStatus status) { - } - }); - } - } - - /** - * Client call back interface for internalHandleValueAdds(...). - */ - interface DoneHandleValueAdds { - /** - * Called when all the value-adds are launched or the launched failed. - * - * @param error The error description if operation failed, <code>null</code> if succeeded. - * @param valueAdds The list of value-adds or <code>null</code>. - */ - void doneHandleValueAdds(Throwable error, IValueAdd[] valueAdds); - } - - /** - * Check on the value-adds for the given peer. Launch the value-adds - * if necessary. - * - * @param peer The peer. Must not be <code>null</code>. - * @param flags Map containing the flags to parameterize the channel opening, or <code>null</code>. - * @param done The client callback. Must not be <code>null</code>. - */ - /* default */ void internalHandleValueAdds(final IPeer peer, final Map<String, Boolean> flags, final DoneHandleValueAdds done) { - Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ - Assert.isNotNull(peer); - Assert.isNotNull(done); - - // Get the peer id - final String id = peer.getID(); - - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_valueAdd_check, id), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this); - } - - // Extract the flags of interest form the given flags map - boolean forceNew = flags != null && flags.containsKey(IChannelManager.FLAG_FORCE_NEW) ? flags.get(IChannelManager.FLAG_FORCE_NEW).booleanValue() : false; - boolean noValueAdd = flags != null && flags.containsKey(IChannelManager.FLAG_NO_VALUE_ADD) ? flags.get(IChannelManager.FLAG_NO_VALUE_ADD).booleanValue() : false; - // If noValueAdd == true -> forceNew has to be true as well - if (noValueAdd) forceNew = true; - - // Check if there is already a channel opened to this peer - IChannel channel = !forceNew ? channels.get(id) : null; - if (noValueAdd || channel != null && (channel.getState() == IChannel.STATE_OPEN || channel.getState() == IChannel.STATE_OPENING)) { - // Got an existing channel or a channel without value-add decoration - // got requested -> drop out immediately - done.doneHandleValueAdds(null, null); - return; - } - - internalHandleValueAdds(peer, done); - } - - /* default */ final Map<String, List<DoneHandleValueAdds>> inProgress = new HashMap<String, List<DoneHandleValueAdds>>(); - - /** - * Check on the value-adds for the given peer. Launch the value-adds - * if necessary. - * - * @param peer The peer. Must not be <code>null</code>. - * @param done The client callback. Must not be <code>null</code>. - */ - /* default */ void internalHandleValueAdds(final IPeer peer, final DoneHandleValueAdds done) { - Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ - Assert.isNotNull(peer); - Assert.isNotNull(done); - - // Get the peer id - final String id = peer.getID(); - - // If a launch for the same value add is in progress already, attach the new done to - // the list to call and drop out - if (inProgress.containsKey(id)) { - List<DoneHandleValueAdds> dones = inProgress.get(id); - Assert.isNotNull(dones); - dones.add(done); - return; - } - - // Add the done callback to a list of waiting callbacks per peer - List<DoneHandleValueAdds> dones = new ArrayList<DoneHandleValueAdds>(); - dones.add(done); - inProgress.put(id, dones); - - // The "myDone" callback is invoking the callbacks from the list - // of waiting callbacks. - final DoneHandleValueAdds myDone = new DoneHandleValueAdds() { - - @Override - public void doneHandleValueAdds(Throwable error, IValueAdd[] valueAdds) { - // Get the list of the original done callbacks - List<DoneHandleValueAdds> dones = inProgress.remove(id); - for (DoneHandleValueAdds done : dones) { - done.doneHandleValueAdds(error, valueAdds); - } - } - }; - - // Do we have applicable value-add contributions - final IValueAdd[] valueAdds = ValueAddManager.getInstance().getValueAdd(peer); - if (valueAdds.length == 0) { - // There are no applicable value-add's -> drop out immediately - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_valueAdd_noneApplicable, id), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this); - } - myDone.doneHandleValueAdds(null, valueAdds); - return; - } - - // There are at least applicable value-add contributions - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_valueAdd_numApplicable, Integer.valueOf(valueAdds.length), id), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this); - } - - final List<IValueAdd> available = new ArrayList<IValueAdd>(); - - final DoneLaunchValueAdd innerDone = new DoneLaunchValueAdd() { - @Override - public void doneLaunchValueAdd(Throwable error, List<IValueAdd> available) { - myDone.doneHandleValueAdds(error, available.toArray(new IValueAdd[available.size()])); - } - }; - - doLaunchValueAdd(id, valueAdds, 0, available, innerDone); - } - - /** - * Client call back interface for doLaunchValueAdd(...). - */ - interface DoneLaunchValueAdd { - /** - * Called when a value-add has been chained. - * - * @param error The error description if operation failed, <code>null</code> if succeeded. - * @param available The list of available value-adds. - */ - void doneLaunchValueAdd(Throwable error, List<IValueAdd> available); - } - - /** - * Test the value-add at the given index to be alive. Launch the value-add if necessary. - * - * @param id The peer id. Must not be <code>null</code>. - * @param valueAdds The list of value-add's to check. Must not be <code>null</code>. - * @param i The index. - * @param available The list of available value-adds. Must not be <code>null</code>. - * @param done The client callback. Must not be <code>null</code>. - */ - /* default */ void doLaunchValueAdd(final String id, final IValueAdd[] valueAdds, final int i, final List<IValueAdd> available, final DoneLaunchValueAdd done) { - Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ - Assert.isNotNull(id); - Assert.isNotNull(valueAdds); - Assert.isTrue(valueAdds.length > 0); - Assert.isNotNull(available); - Assert.isNotNull(done); - - // Get the value-add to launch - final IValueAdd valueAdd = valueAdds[i]; - - // Check if the value-add to launch is alive - valueAdd.isAlive(id, new Callback() { - @Override - protected void internalDone(Object caller, IStatus status) { - boolean alive = ((Boolean)getResult()).booleanValue(); - - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_valueAdd_isAlive, new Object[] { Integer.valueOf(i), valueAdd.getLabel(), Boolean.valueOf(alive), id }), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this); - } - - if (!alive) { - // Launch the value-add - valueAdd.launch(id, new Callback() { - @Override - protected void internalDone(Object caller, IStatus status) { - Throwable error = status.getException(); - - String message = null; - if (error != null) { - message = error.getLocalizedMessage(); - if ((message == null || "".equals(message)) && error.getCause() != null) { //$NON-NLS-1$ - message = error.getCause().getLocalizedMessage(); - } - } - - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_valueAdd_launch, new Object[] { Integer.valueOf(i), valueAdd.getLabel(), - (error == null ? "success" : "failed"), //$NON-NLS-1$ //$NON-NLS-2$ - (error != null ? message : null), - id }), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this); - - // Print the stack trace of the error too - if (error != null) { - StringWriter sw = new StringWriter(); - error.printStackTrace(new PrintWriter(sw, true)); - - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_valueAdd_launch_exception, new Object[] { - Integer.valueOf(i), valueAdd.getLabel(), - sw.getBuffer().toString() - }), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this); - } - } - - // If we got an error and the value-add is optional, - // ignore the error and drop the value-add from the chain. - if (error != null && valueAdd.isOptional()) { - error = null; - } else if (error == null) { - available.add(valueAdd); - } - - // If the value-add failed to launch, no other value-add's are launched - if (error != null) { - done.doneLaunchValueAdd(error, available); - } else { - // Launch the next one, if there is any - if (i + 1 < valueAdds.length) { - DoneLaunchValueAdd innerDone = new DoneLaunchValueAdd() { - @Override - public void doneLaunchValueAdd(Throwable error, List<IValueAdd> available) { - done.doneLaunchValueAdd(error, available); - } - }; - doLaunchValueAdd(id, valueAdds, i + 1, available, innerDone); - } else { - // Last value-add in chain launched -> call parent callback - done.doneLaunchValueAdd(null, available); - } - } - } - }); - } else { - // Already alive -> add it to the list of available value-add's - available.add(valueAdd); - // Launch the next one, if there is any - if (i + 1 < valueAdds.length) { - DoneLaunchValueAdd innerDone = new DoneLaunchValueAdd() { - @Override - public void doneLaunchValueAdd(Throwable error, List<IValueAdd> available) { - done.doneLaunchValueAdd(error, available); - } - }; - doLaunchValueAdd(id, valueAdds, i + 1, available, innerDone); - } else { - // Last value-add in chain launched -> call parent callback - done.doneLaunchValueAdd(null, available); - } - } - } - }); - } - - /** - * Client call back interface for doChainValueAdd(...). - */ - interface DoneChainValueAdd { - /** - * Called when a value-add has been chained. - * - * @param error The error description if operation failed, <code>null</code> if succeeded. - * @param channel The channel object or <code>null</code>. - */ - void doneChainValueAdd(Throwable error, IChannel channel); - } - - /** - * Client call back interface for doChainProxies(...). - */ - interface DoneChainProxies { - /** - * Called when a proxies has been chained. - * - * @param error The error description if operation failed, <code>null</code> if succeeded. - * @param channel The channel object or <code>null</code>. - */ - void doneChainProxies(Throwable error, IChannel channel); - } - - /** - * Chain the value-adds until the original target peer is reached. - * - * @param valueAdds The list of value-add's to chain. Must not be <code>null</code>. - * @param peer The original target peer. Must not be <code>null</code>. - * @param flags Map containing the flags to parameterize the channel opening, or <code>null</code>. - * @param done The client callback. Must not be <code>null</code>. - */ - /* default */ void internalChainValueAdds(final IValueAdd[] valueAdds, final IPeer peer, final Map<String, Boolean> flags, final DoneOpenChannel done) { - Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ - Assert.isNotNull(valueAdds); - Assert.isNotNull(peer); - Assert.isNotNull(done); - - // Get the peer id - final String id = peer.getID(); - - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_valueAdd_startChaining, id), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this); - } - - // Extract the flags of interest form the given flags map - boolean forceNew = flags != null && flags.containsKey(IChannelManager.FLAG_FORCE_NEW) ? flags.get(IChannelManager.FLAG_FORCE_NEW).booleanValue() : false; - boolean noValueAdd = flags != null && flags.containsKey(IChannelManager.FLAG_NO_VALUE_ADD) ? flags.get(IChannelManager.FLAG_NO_VALUE_ADD).booleanValue() : false; - boolean noPathMap = flags != null && flags.containsKey(IChannelManager.FLAG_NO_PATH_MAP) ? flags.get(IChannelManager.FLAG_NO_PATH_MAP).booleanValue() : false; - // If noValueAdd == true or noPathMap == true -> forceNew has to be true as well - if (noValueAdd || noPathMap) forceNew = true; - - final boolean finForceNew = forceNew; - - // Check if there is already a channel opened to this peer - IChannel channel = !forceNew ? channels.get(id) : null; - if (channel != null && (channel.getState() == IChannel.STATE_OPEN || channel.getState() == IChannel.STATE_OPENING)) { - // Increase the reference count - AtomicInteger counter = refCounters.get(id); - if (counter == null) { - counter = new AtomicInteger(0); - refCounters.put(id, counter); - } - counter.incrementAndGet(); - - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_reuse_message, id, counter.toString()), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this); - } - // Got an existing channel -> drop out immediately if the channel is - // already fully opened. Otherwise wait for the channel to be fully open. - if (channel.getState() == IChannel.STATE_OPENING) { - final IChannel finChannel = channel; - channel.addChannelListener(new IChannel.IChannelListener() { - @Override - public void onChannelOpened() { - finChannel.removeChannelListener(this); - done.doneOpenChannel(null, finChannel); - } - @Override - public void onChannelClosed(Throwable error) { - finChannel.removeChannelListener(this); - done.doneOpenChannel(error != null ? error : new OperationCanceledException(), finChannel); - } - @Override - public void congestionLevel(int level) { - } - }); - } - else { - done.doneOpenChannel(null, channel); - } - return; - } else if (channel != null) { - // Channel is not in open state -> drop the instance - channels.remove(id); - refCounters.remove(id); - } - - // No existing channel -> open a new one - final DoneChainValueAdd chainValueAddDone = new DoneChainValueAdd() { - @Override - public void doneChainValueAdd(final Throwable error, final IChannel channel) { - // Ending up here means that the channel is redirected to the last - // value-add in the chain, but it is not yet redirected through the - // proxy configuration. - String proxyConfiguration = peer.getAttributes().get(IPeerProperties.PROP_PROXIES); - IPeer[] proxies = proxyConfiguration != null ? PeerDataHelper.decodePeerList(proxyConfiguration) : null; - - // Create the done callback - final DoneChainProxies chainProxiesDone = new DoneChainProxies() { - @Override - public void doneChainProxies(final Throwable error, final IChannel channel) { - // Invoke the outer callback - done.doneOpenChannel(error, channel); - } - }; - - // Continue the redirect chain by chaining the proxies and connecting to the target - doChainProxies(id, peer.getAttributes(), proxies, finForceNew, valueAdds.length, channel, chainProxiesDone); - } - }; - - doChainValueAdd(id, forceNew, valueAdds, chainValueAddDone); } - /* default */ void doChainValueAdd(final String id, final boolean forceNew, final IValueAdd[] valueAdds, final DoneChainValueAdd done) { - Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ - Assert.isNotNull(id); - Assert.isNotNull(valueAdds); - Assert.isNotNull(done); - - // The index of the currently processed value-add - final AtomicInteger index = new AtomicInteger(0); - - // Get the value-add to chain - final AtomicReference<IValueAdd> valueAdd = new AtomicReference<IValueAdd>(); - valueAdd.set(valueAdds[index.get()]); - Assert.isNotNull(valueAdd.get()); - // Get the next value-add in chain - final AtomicReference<IValueAdd> nextValueAdd = new AtomicReference<IValueAdd>(); - nextValueAdd.set(index.get() + 1 < valueAdds.length ? valueAdds[index.get() + 1] : null); - - // Get the peer for the value-add to chain - final AtomicReference<IPeer> valueAddPeer = new AtomicReference<IPeer>(); - valueAddPeer.set(valueAdd.get().getPeer(id)); - if (valueAddPeer.get() == null) { - done.doneChainValueAdd(new IllegalStateException("Invalid value-add peer."), null); //$NON-NLS-1$ - return; - } - - // Get the peer for the next value-add in chain - final AtomicReference<IPeer> nextValueAddPeer = new AtomicReference<IPeer>(); - nextValueAddPeer.set(nextValueAdd.get() != null ? nextValueAdd.get().getPeer(id) : null); - if (nextValueAdd.get() != null && nextValueAddPeer.get() == null) { - done.doneChainValueAdd(new IllegalStateException("Invalid value-add peer."), null); //$NON-NLS-1$ - return; - } - - IChannel channel = null; - try { - // Open a channel to the value-add - channel = valueAddPeer.get().openChannel(); - if (channel != null) { - if (!forceNew) channels.put(id, channel); - if (!forceNew) refCounters.put(id, new AtomicInteger(1)); - if (forceNew) forcedChannels.add(channel); - - // Create and attach the channel listener to catch open/closed events - final IChannel finChannel = channel; - final IChannel.IChannelListener finChannelListener = new IChannel.IChannelListener() { - @Override - public void onChannelOpened() { - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - if (index.get() == 0) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_succeeded, - new Object[] { valueAddPeer.get().getID(), Integer.valueOf(index.get()), id }), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this); - - } else { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_redirect_succeeded, - new Object[] { valueAddPeer.get().getID(), finChannel.getRemotePeer().getID(), Integer.valueOf(index.get()) }), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this); - } - } - - // Channel opened. Check if we are done. - if (nextValueAdd.get() == null) { - // Remove ourself as channel listener - finChannel.removeChannelListener(this); - - // No other value-add in the chain -> all done - done.doneChainValueAdd(null, finChannel); - } else { - // Process the next value-add in chain - index.incrementAndGet(); - - // Update the value-add references - valueAdd.set(nextValueAdd.get()); - valueAddPeer.set(nextValueAddPeer.get()); - - nextValueAdd.set(index.get() + 1 < valueAdds.length ? valueAdds[index.get() + 1] : null); - nextValueAddPeer.set(nextValueAdd.get() != null ? nextValueAdd.get().getPeer(id) : null); - if (nextValueAdd.get() != null && nextValueAddPeer.get() == null) { - // Remove ourself as channel listener - finChannel.removeChannelListener(this); - // Close the channel - finChannel.close(); - // Invoke the callback - done.doneChainValueAdd(new IllegalStateException("Invalid value-add peer."), null); //$NON-NLS-1$ - return; - } - - // Redirect the channel to the next value-add in chain - finChannel.redirect(valueAddPeer.get().getAttributes()); - } - } - - @Override - public void onChannelClosed(Throwable error) { - // Remove ourself as channel listener - finChannel.removeChannelListener(this); - - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - if (index.get() == 0) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_failed, - new Object[] { valueAddPeer.get().getID(), Integer.valueOf(index.get()), id }), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this); - - } else { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_redirect_failed, finChannel.getRemotePeer().getID(), valueAddPeer.get().getID()), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this); - } - } - - // Clean the reference counter and the channel map - if (!forceNew) channels.remove(id); - if (!forceNew) refCounters.remove(id); - if (forceNew) forcedChannels.remove(finChannel); - - // Channel redirect failed -> This will break everything - done.doneChainValueAdd(error, finChannel); - } - - @Override - public void congestionLevel(int level) { - } - }; - channel.addChannelListener(finChannelListener); - } else { - // Channel is null? Something went terrible wrong. - done.doneChainValueAdd(new Exception("Unexpected null return value from IPeer#openChannel()!"), null); //$NON-NLS-1$ - - } - } catch (Throwable e) { - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_failed_message, id, e), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this); - } - - // Channel opening failed - done.doneChainValueAdd(e, channel); - } - } - - /** - * Chain the proxies until the original target peer is reached. - * - * @param proxies The list of proxies to chain. Must not be <code>null</code>. - * @param peer The original target peer. Must not be <code>null</code>. - * @param flags Map containing the flags to parameterize the channel opening, or <code>null</code>. - * @param done The client callback. Must not be <code>null</code>. - */ - /* default */ void internalChainProxies(final IPeer[] proxies, final IPeer peer, final Map<String, Boolean> flags, final DoneOpenChannel done) { - Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ - Assert.isNotNull(proxies); - Assert.isTrue(proxies.length > 0); - Assert.isNotNull(peer); - Assert.isNotNull(done); - - // Get the peer id - final String id = peer.getID(); - - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_proxies_startChaining, id), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this); - } - - // Extract the flags of interest form the given flags map - boolean forceNew = flags != null && flags.containsKey(IChannelManager.FLAG_FORCE_NEW) ? flags.get(IChannelManager.FLAG_FORCE_NEW).booleanValue() : false; - boolean noValueAdd = flags != null && flags.containsKey(IChannelManager.FLAG_NO_VALUE_ADD) ? flags.get(IChannelManager.FLAG_NO_VALUE_ADD).booleanValue() : false; - boolean noPathMap = flags != null && flags.containsKey(IChannelManager.FLAG_NO_PATH_MAP) ? flags.get(IChannelManager.FLAG_NO_PATH_MAP).booleanValue() : false; - // If noValueAdd == true or noPathMap == true -> forceNew has to be true as well - if (noValueAdd || noPathMap) forceNew = true; - - final boolean finForceNew = forceNew; - - // Check if there is already a channel opened to this peer - IChannel channel = !forceNew ? channels.get(id) : null; - if (channel != null && (channel.getState() == IChannel.STATE_OPEN || channel.getState() == IChannel.STATE_OPENING)) { - // Increase the reference count - AtomicInteger counter = refCounters.get(id); - if (counter == null) { - counter = new AtomicInteger(0); - refCounters.put(id, counter); - } - counter.incrementAndGet(); - - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_reuse_message, id, counter.toString()), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this); - } - // Got an existing channel -> drop out immediately if the channel is - // already fully opened. Otherwise wait for the channel to be fully open. - if (channel.getState() == IChannel.STATE_OPENING) { - final IChannel finChannel = channel; - channel.addChannelListener(new IChannel.IChannelListener() { - @Override - public void onChannelOpened() { - finChannel.removeChannelListener(this); - done.doneOpenChannel(null, finChannel); - } - @Override - public void onChannelClosed(Throwable error) { - finChannel.removeChannelListener(this); - done.doneOpenChannel(error != null ? error : new OperationCanceledException(), finChannel); - } - @Override - public void congestionLevel(int level) { - } - }); - } - else { - done.doneOpenChannel(null, channel); - } - return; - } else if (channel != null) { - // Channel is not in open state -> drop the instance - channels.remove(id); - refCounters.remove(id); - } - - // No existing channel -> open a new one - - // Get the first proxy. This is the one we have to open the channel too. - IPeer firstProxy = proxies[0]; - Assert.isNotNull(firstProxy); - - // Remove the first proxy from the array and build up a new array describing - // the remaining proxy chain - final IPeer[] remainingProxies = new IPeer[proxies.length - 1]; - if (remainingProxies.length > 0) System.arraycopy(proxies, 1, remainingProxies, 0, remainingProxies.length); - - // Open a channel to the first proxy - channel = null; - try { - channel = firstProxy.openChannel(); - if (channel != null) { - if (!forceNew) channels.put(id, channel); - if (!forceNew) refCounters.put(id, new AtomicInteger(1)); - if (forceNew) forcedChannels.add(channel); - - // Create and attach the channel listener to catch open/closed events - final IChannel finChannel = channel; - final IChannel.IChannelListener finChannelListener = new IChannel.IChannelListener() { - @Override - public void onChannelOpened() { - // Remove ourself as channel listener - finChannel.removeChannelListener(this); - - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_success_message, id), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this); - } - - // Channel opened to the first proxy. Process the proxy chain and - // redirect the channel through the proxies until the original target - // is reached - final DoneChainProxies chainProxiesDone = new DoneChainProxies() { - @Override - public void doneChainProxies(final Throwable error, final IChannel channel) { - // Invoke the outer callback - done.doneOpenChannel(error, channel); - } - }; - - // Continue the redirect chain by chaining the proxies and connecting to the target - doChainProxies(id, peer.getAttributes(), remainingProxies, finForceNew, 0, finChannel, chainProxiesDone); - } - - @Override - public void onChannelClosed(Throwable error) { - // Remove ourself as channel listener - finChannel.removeChannelListener(this); - - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_failed_message, id, error), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this); - } - - // Clean the reference counter and the channel map - if (!finForceNew) channels.remove(id); - if (!finForceNew) refCounters.remove(id); - if (finForceNew) forcedChannels.remove(finChannel); - - // Channel open failed -> This will break everything - done.doneOpenChannel(error, finChannel); - } - - @Override - public void congestionLevel(int level) { - } - }; - channel.addChannelListener(finChannelListener); - } else { - // Channel is null? Something went terrible wrong. - done.doneOpenChannel(new Exception("Unexpected null return value from IPeer#openChannel()!"), null); //$NON-NLS-1$ - - } - } catch (Throwable e) { - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_failed_message, id, e), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this); - } - - // Channel opening failed - done.doneOpenChannel(e, channel); - } - } - - /* default */ void doChainProxies(final String id, final Map<String, String> attrs, final IPeer[] proxies, final boolean forceNew, final int numberOfValueAdds, final IChannel channel, final DoneChainProxies done) { - Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ - Assert.isNotNull(id); - Assert.isNotNull(attrs); - Assert.isNotNull(channel); - Assert.isNotNull(done); - - // The index of the currently processed proxy - final AtomicInteger index = new AtomicInteger(0); - - // Get the proxy to chain - final AtomicReference<IPeer> proxy = new AtomicReference<IPeer>(); - proxy.set(proxies != null && proxies.length > 0 ? proxies[index.get()] : null); - // Get the next proxy in chain - final AtomicReference<IPeer> nextProxy = new AtomicReference<IPeer>(); - nextProxy.set(proxies != null && index.get() + 1 < proxies.length ? proxies[index.get() + 1] : null); - - // The channel must be in open or opening state, otherwise we cannot do the redirect - if (channel.getState() == IChannel.STATE_CLOSED) { - done.doneChainProxies(new Exception(NLS.bind(Messages.ChannelManager_openChannel_redirect_invalidChannelState, id)), channel); - return; - } - - // Determine the ID of the last value-add in the chain - final String lastValueAddID = channel.getRemotePeer().getID(); - - // Create and attach the channel listener - channel.addChannelListener(new IChannel.IChannelListener() { - @Override - public void onChannelOpened() { - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_redirect_succeeded, - new Object[] { lastValueAddID, channel.getRemotePeer().getID(), Integer.valueOf(numberOfValueAdds + index.get()) }), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this); - } - - // Channel redirect succeeded. Check if we are done. - if (proxy.get() == null) { - // Remove ourself as channel listener - channel.removeChannelListener(this); - - // No other proxy is in the chain -> reached the target -> all done - // HACK * HACK * HACK (is eh nur kurz drin sagt Tobias) - IDiagnostics svc = channel.getRemoteService(IDiagnostics.class); - if (svc != null) { - svc.echo("WR_HostFsSendCommands", new IDiagnostics.DoneEcho() { //$NON-NLS-1$ - @Override - public void doneEcho(IToken token, Throwable error, String s) { - done.doneChainProxies(null, channel); - } - }); - } else { - done.doneChainProxies(null, channel); - } - } else { - // Update the proxy reference - proxy.set(nextProxy.get()); - - // Process the next proxy in chain - index.incrementAndGet(); - - // Determine the next proxy to redirect too - nextProxy.set(index.get() < proxies.length ? proxies[index.get()] : null); - - // Redirect the channel to the next proxy in chain, if available, - // or directly to the target if no more proxies are configured - channel.redirect(proxy.get() != null ? proxy.get().getAttributes() : attrs); - } - } - - @Override - public void onChannelClosed(Throwable error) { - // Remove ourself as channel listener - channel.removeChannelListener(this); - - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_redirect_failed, lastValueAddID, id), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this); - } - - // Clean the reference counter and the channel map - if (forceNew) channels.remove(id); - if (forceNew) refCounters.remove(id); - if (forceNew) forcedChannels.remove(channel); - - // Channel redirect failed -> This will break everything - done.doneChainProxies(error, channel); - } - - @Override - public void congestionLevel(int level) { - } - }); - - // If there is no proxy configured, directly redirect to the target - channel.redirect(proxy.get() != null ? proxy.get().getAttributes() : attrs); - } + // ----- Streams handling ----- /* (non-Javadoc) * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager#subscribeStream(org.eclipse.tcf.protocol.IChannel, java.lang.String, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.IStreamsListener, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.DoneSubscribeStream) @@ -1506,4 +723,5 @@ public final class ChannelManager extends PlatformObject implements IChannelMana } } } + } diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/channelmanager/ChannelManager2.java b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/channelmanager/ChannelManager2.java deleted file mode 100644 index f7681eda9..000000000 --- a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/channelmanager/ChannelManager2.java +++ /dev/null @@ -1,727 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2014 Wind River Systems, Inc. and others. All rights reserved. - * This program and the accompanying materials are made available under the terms - * of the Eclipse Public License v1.0 which accompanies this distribution, and is - * available at http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * Wind River Systems - initial API and implementation - *******************************************************************************/ -package org.eclipse.tcf.te.tcf.core.internal.channelmanager; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import org.eclipse.core.runtime.Assert; -import org.eclipse.core.runtime.IStatus; -import org.eclipse.core.runtime.PlatformObject; -import org.eclipse.osgi.util.NLS; -import org.eclipse.tcf.protocol.IChannel; -import org.eclipse.tcf.protocol.IPeer; -import org.eclipse.tcf.protocol.IToken; -import org.eclipse.tcf.protocol.Protocol; -import org.eclipse.tcf.services.IStreams; -import org.eclipse.tcf.te.runtime.callback.Callback; -import org.eclipse.tcf.te.runtime.interfaces.callback.ICallback; -import org.eclipse.tcf.te.runtime.interfaces.properties.IPropertiesContainer; -import org.eclipse.tcf.te.runtime.properties.PropertiesContainer; -import org.eclipse.tcf.te.runtime.stepper.interfaces.IStepAttributes; -import org.eclipse.tcf.te.runtime.stepper.interfaces.IStepContext; -import org.eclipse.tcf.te.runtime.stepper.interfaces.IStepperOperationService; -import org.eclipse.tcf.te.runtime.stepper.job.StepperJob; -import org.eclipse.tcf.te.runtime.stepper.utils.StepperHelper; -import org.eclipse.tcf.te.tcf.core.activator.CoreBundleActivator; -import org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager; -import org.eclipse.tcf.te.tcf.core.interfaces.steps.ITcfStepAttributes; -import org.eclipse.tcf.te.tcf.core.interfaces.tracing.ITraceIds; -import org.eclipse.tcf.te.tcf.core.internal.channelmanager.steps.ShutdownValueAddStep; -import org.eclipse.tcf.te.tcf.core.nls.Messages; - -/** - * Channel manager implementation. - */ -public class ChannelManager2 extends PlatformObject implements IChannelManager { - // The map of reference counters per channel - /* default */ final Map<IChannel, AtomicInteger> refCounters = new HashMap<IChannel, AtomicInteger>(); - // The map of channels per peer id - /* default */ final Map<String, IChannel> channels = new HashMap<String, IChannel>(); - // The map of pending open channel callback's per peer id - /* default */ final Map<String, List<DoneOpenChannel>> pendingDones = new HashMap<String, List<DoneOpenChannel>>(); - // The list of channels opened via "forceNew" flag (needed to handle the close channel correctly) - /* default */ final List<IChannel> forcedChannels = new ArrayList<IChannel>(); - // The map of flags used for opening a forced channel per channel - /* default */ final Map<IChannel, Map<String, Boolean>> forcedChannelFlags = new HashMap<IChannel, Map<String, Boolean>>(); - // The map of stream listener proxies per channel - /* default */ final Map<IChannel, List<StreamListenerProxy>> streamProxies = new HashMap<IChannel, List<StreamListenerProxy>>(); - // The map of scheduled "open channel" stepper jobs per peer id - /* default */ final Map<String, StepperJob> pendingOpenChannel = new HashMap<String, StepperJob>(); - // The map of scheduled "close channel" stepper jobs per peer id - /* default */ final Map<String, StepperJob> pendingCloseChannel = new HashMap<String, StepperJob>(); - - /** - * Constructor - */ - public ChannelManager2() { - super(); - } - - /* (non-Javadoc) - * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager#openChannel(org.eclipse.tcf.protocol.IPeer, java.util.Map, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.DoneOpenChannel) - */ - @Override - public void openChannel(final IPeer peer, final Map<String, Boolean> flags, final DoneOpenChannel done) { - Assert.isNotNull(peer); - Assert.isNotNull(done); - - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(1, ITraceIds.TRACE_CHANNEL_MANAGER)) { - try { - throw new Throwable(); - } catch (Throwable e) { - CoreBundleActivator.getTraceHandler().trace("ChannelManager#openChannel called from:", //$NON-NLS-1$ - 1, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); - e.printStackTrace(); - } - } - - // The client done callback must be called within the TCF event dispatch thread - final DoneOpenChannel internalDone = new DoneOpenChannel() { - - @Override - public void doneOpenChannel(final Throwable error, final IChannel channel) { - Runnable runnable = new Runnable() { - @Override - public void run() { - done.doneOpenChannel(error, channel); - } - }; - - if (Protocol.isDispatchThread()) runnable.run(); - else Protocol.invokeLater(runnable); - } - }; - - // The channel instance to return - IChannel channel = null; - - // Get the peer id - final String id = peer.getID(); - - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_message, id, flags), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); - } - - // First thing to determine is if to open a new channel or the shared - // channel can be used, if there is a shared channel at all. - boolean forceNew = flags != null && flags.containsKey(IChannelManager.FLAG_FORCE_NEW) ? flags.get(IChannelManager.FLAG_FORCE_NEW).booleanValue() : false; - boolean noValueAdd = flags != null && flags.containsKey(IChannelManager.FLAG_NO_VALUE_ADD) ? flags.get(IChannelManager.FLAG_NO_VALUE_ADD).booleanValue() : false; - boolean noPathMap = flags != null && flags.containsKey(IChannelManager.FLAG_NO_PATH_MAP) ? flags.get(IChannelManager.FLAG_NO_PATH_MAP).booleanValue() : false; - // If noValueAdd == true or noPathMap == true -> forceNew has to be true as well - if (noValueAdd || noPathMap) forceNew = true; - - final boolean finForceNew = forceNew; - - // Query the shared channel if not forced to open a new channel - if (!forceNew) channel = channels.get(id); - - // If a shared channel is available, check if the shared channel can be used - if (channel != null) { - // If the channel is still open, it's all done and the channel can be returned right away - if (channel.getState() == IChannel.STATE_OPEN) { - // Increase the reference count - AtomicInteger counter = refCounters.get(channel); - if (counter == null) { - counter = new AtomicInteger(0); - refCounters.put(channel, counter); - } - counter.incrementAndGet(); - - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_reuse_message, id, counter.toString()), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); - } - - // Invoke the channel open done callback - internalDone.doneOpenChannel(null, channel); - } - // If the channel is opening, wait for the channel to become fully opened. - // Add the done open channel callback to the list of pending callback's. - else if (channel.getState() == IChannel.STATE_OPENING) { - List<DoneOpenChannel> dones = pendingDones.get(id); - if (dones == null) { - dones = new ArrayList<DoneOpenChannel>(); - pendingDones.put(id, dones); - } - Assert.isNotNull(dones); - dones.add(internalDone); - - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_pending_message, id, "0x" + Integer.toHexString(internalDone.hashCode())), //$NON-NLS-1$ - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); - } - } - else { - // Channel is not in open state -> drop the instance - channels.remove(id); - refCounters.remove(channel); - channel = null; - } - } - - // Channel not available -> open a new one - if (channel == null) { - // Check if there is a pending "open channel" stepper job - StepperJob job = pendingOpenChannel.get(id); - if (job == null) { - // No pending "open channel" stepper job -> schedule one and initiate opening the channel - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_new_message, id), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); - } - - // Create the data properties container passed to the "open channel" steps - final IPropertiesContainer data = new PropertiesContainer(); - // Set the flags to be passed to the "open channel" steps - data.setProperty(IChannelManager.FLAG_NO_VALUE_ADD, noValueAdd); - data.setProperty(IChannelManager.FLAG_NO_PATH_MAP, noPathMap); - // No recent action history persistence - data.setProperty(IStepAttributes.PROP_SKIP_LAST_RUN_HISTORY, true); - - // Create the callback to be invoked once the "open channel" stepper job is completed - final ICallback callback = new Callback() { - @Override - protected void internalDone(Object caller, IStatus status) { - // Check for error - if (status.getSeverity() == IStatus.ERROR) { - // Extract the failure cause - Throwable error = status.getException(); - - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_failed_message, id, error), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); - } - - // Job is done -> remove it from the list of pending jobs - pendingOpenChannel.remove(id); - - // Invoke the primary "open channel" done callback - internalDone.doneOpenChannel(error, null); - - // Invoke pending callback's - List<DoneOpenChannel> pending = pendingDones.remove(id); - if (pending != null && !pending.isEmpty()) { - for (DoneOpenChannel d : pending) { - d.doneOpenChannel(error, null); - } - } - } else { - // Get the channel - IChannel channel = (IChannel)data.getProperty(ITcfStepAttributes.ATTR_CHANNEL); - Assert.isNotNull(channel); - Assert.isTrue(channel.getState() == IChannel.STATE_OPEN); - - // Store the channel - if (!finForceNew) channels.put(id, channel); - if (!finForceNew) refCounters.put(channel, new AtomicInteger(1)); - if (finForceNew) forcedChannels.add(channel); - if (finForceNew) forcedChannelFlags.put(channel, flags); - - // Job is done -> remove it from the list of pending jobs - pendingOpenChannel.remove(id); - - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_success_message, id), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); - } - - // Invoke the primary "open channel" done callback - internalDone.doneOpenChannel(null, channel); - - // Invoke pending callback's - List<DoneOpenChannel> pending = pendingDones.remove(id); - if (pending != null && !pending.isEmpty()) { - for (DoneOpenChannel d : pending) { - d.doneOpenChannel(null, channel); - } - } - } - } - }; - - // Get the stepper operation service - IStepperOperationService stepperOperationService = StepperHelper.getService(peer, StepperOperationService.OPEN_CHANNEL); - - // Schedule the "open channel" stepper job - IStepContext stepContext = stepperOperationService.getStepContext(peer, StepperOperationService.OPEN_CHANNEL); - String stepGroupId = stepperOperationService.getStepGroupId(peer, StepperOperationService.OPEN_CHANNEL); - - if (stepGroupId != null && stepContext != null) { - String name = stepperOperationService.getStepGroupName(peer, StepperOperationService.OPEN_CHANNEL); - boolean isCancelable = stepperOperationService.isCancelable(peer, StepperOperationService.OPEN_CHANNEL); - - job = new StepperJob(name != null ? name : "", stepContext, data, stepGroupId, StepperOperationService.OPEN_CHANNEL, isCancelable, true); //$NON-NLS-1$ - job.setJobCallback(callback); - job.markStatusHandled(); - job.schedule(); - } - - // Remember the "open channel" stepper job until finished - if (job != null) { - pendingOpenChannel.put(id, job); - } - } else { - // There is a pending "open channel" stepper job -> add the open channel callback to the list - List<DoneOpenChannel> dones = pendingDones.get(id); - if (dones == null) { - dones = new ArrayList<DoneOpenChannel>(); - pendingDones.put(id, dones); - } - Assert.isNotNull(dones); - dones.add(internalDone); - - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_pending_message, id, "0x" + Integer.toHexString(internalDone.hashCode())), //$NON-NLS-1$ - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); - } - } - } - } - - /* (non-Javadoc) - * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager#getChannel(org.eclipse.tcf.protocol.IPeer) - */ - @Override - public IChannel getChannel(final IPeer peer) { - final AtomicReference<IChannel> channel = new AtomicReference<IChannel>(); - Runnable runnable = new Runnable() { - @Override - public void run() { - Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ - channel.set(internalGetChannel(peer)); - } - }; - if (Protocol.isDispatchThread()) runnable.run(); - else Protocol.invokeAndWait(runnable); - - return channel.get(); - } - - /** - * Returns the shared channel instance for the given peer. - * <p> - * <b>Note:</b> This method must be invoked at the TCF dispatch thread. - * - * @param peer The peer. Must not be <code>null</code>. - * @return The channel instance or <code>null</code>. - */ - public IChannel internalGetChannel(IPeer peer) { - Assert.isNotNull(peer); - Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ - - // Get the peer id - String id = peer.getID(); - - // Get the channel - IChannel channel = channels.get(id); - if (channel != null && !(channel.getState() == IChannel.STATE_OPEN || channel.getState() == IChannel.STATE_OPENING)) { - // Channel is not in open state -> drop the instance - channels.remove(id); - refCounters.remove(channel); - channel = null; - } - - return channel; - } - - /* (non-Javadoc) - * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager#closeChannel(org.eclipse.tcf.protocol.IChannel) - */ - @Override - public void closeChannel(final IChannel channel) { - Runnable runnable = new Runnable() { - @Override - public void run() { - Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ - internalCloseChannel(channel); - } - }; - if (Protocol.isDispatchThread()) runnable.run(); - else Protocol.invokeLater(runnable); - } - - /** - * Closes the given channel. - * <p> - * If the given channel is a reference counted channel, the channel will be closed if the reference counter - * reaches 0. For non reference counted channels, the channel is closed immediately. - * <p> - * <b>Note:</b> This method must be invoked at the TCF dispatch thread. - * - * @param channel The channel. Must not be <code>null</code>. - */ - /* default */ void internalCloseChannel(final IChannel channel) { - Assert.isNotNull(channel); - Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ - - // Get the id of the remote peer - final IPeer peer = channel.getRemotePeer(); - final String id = peer.getID(); - - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_message, id), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); - } - - // Determine if the given channel is a reference counted channel - final boolean isRefCounted = !forcedChannels.contains(channel); - - // Get the reference counter (if the channel is a reference counted channel) - AtomicInteger counter = isRefCounted ? refCounters.get(channel) : null; - - // If the counter is null or get 0 after the decrement, close the channel - if (counter == null || counter.decrementAndGet() == 0) { - // Check if there is a pending "close channel" stepper job - StepperJob job = pendingCloseChannel.get(id); - if (job == null) { - // No pending "close channel" stepper job -> schedule one and initiate closing the channel - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_close_message, id), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); - } - - // Create the data properties container passed to the "close channel" steps - final IPropertiesContainer data = new PropertiesContainer(); - // Set the channel to close - data.setProperty(ITcfStepAttributes.ATTR_CHANNEL, channel); - // No recent action history persistence - data.setProperty(IStepAttributes.PROP_SKIP_LAST_RUN_HISTORY, true); - - // Determine if the value-add's can be shutdown or must stay alive. - // In case the channel to close is not reference counted, but this is a reference - // counted channel to the same peer, and that channel is still open, the - // value-adds must stay alive. - if (!isRefCounted) { - IChannel shared = channels.get(id); - if (shared != null && (shared.getState() == IChannel.STATE_OPEN || shared.getState() == IChannel.STATE_OPENING)) { - data.setProperty(ShutdownValueAddStep.PROP_SKIP_SHUTDOWN_STEP, true); - } - } else { - // The channel is reference counted, that means it is a shared channel - // and normally it will shutdown the value-adds if closed. However, we - // can have not reference counted channels to the same target that is - // using value-add's. In this case we also have to skip shutting down - // the value-add. - for (IChannel c : forcedChannels) { - if (id.equals(c.getRemotePeer().getID())) { - Map<String, Boolean> flags = forcedChannelFlags.get(c); - boolean noValueAdd = flags != null && flags.containsKey(IChannelManager.FLAG_NO_VALUE_ADD) ? flags.get(IChannelManager.FLAG_NO_VALUE_ADD).booleanValue() : false; - if (!noValueAdd) { - data.setProperty(ShutdownValueAddStep.PROP_SKIP_SHUTDOWN_STEP, true); - break; - } - } - } - } - - // Create the callback to be invoked once the "close channel" stepper job is completed - final ICallback callback = new Callback() { - @Override - protected void internalDone(Object caller, IStatus status) { - // Check for error - if (status.getSeverity() == IStatus.ERROR) { - // Extract the failure cause - Throwable error = status.getException(); - - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_failed_message, id, error), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); - } - - // Job is done -> remove it from the list of pending jobs - pendingCloseChannel.remove(id); - } else { - // Job is done -> remove it from the list of pending jobs - pendingCloseChannel.remove(id); - - // Clean the reference counter and the channel map - if (isRefCounted) channels.remove(id); - if (isRefCounted) refCounters.remove(channel); - if (!isRefCounted) forcedChannels.remove(channel); - if (!isRefCounted) forcedChannelFlags.remove(channel); - - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_closed_message, id), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); - } - } - } - }; - - // Get the stepper operation service - IStepperOperationService stepperOperationService = StepperHelper.getService(peer, StepperOperationService.CLOSE_CHANNEL); - - // Schedule the "close channel" stepper job - IStepContext stepContext = stepperOperationService.getStepContext(peer, StepperOperationService.CLOSE_CHANNEL); - String stepGroupId = stepperOperationService.getStepGroupId(peer, StepperOperationService.CLOSE_CHANNEL); - - if (stepGroupId != null && stepContext != null) { - String name = stepperOperationService.getStepGroupName(peer, StepperOperationService.CLOSE_CHANNEL); - boolean isCancelable = stepperOperationService.isCancelable(peer, StepperOperationService.CLOSE_CHANNEL); - - job = new StepperJob(name != null ? name : "", stepContext, data, stepGroupId, StepperOperationService.CLOSE_CHANNEL, isCancelable, true); //$NON-NLS-1$ - job.setJobCallback(callback); - job.markStatusHandled(); - job.schedule(); - } - - // Remember the "close channel" stepper job until finished - if (job != null) { - pendingCloseChannel.put(id, job); - } - } else { - // There is a pending "close channel" stepper job - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_pending_message, id), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); - } - } - } else { - if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) { - CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_inuse_message, id, counter.toString()), - 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this); - } - } - - // Clean up the list of forced channels. Remove all channels already been closed. - ListIterator<IChannel> iter = forcedChannels.listIterator(); - while (iter.hasNext()) { - IChannel c = iter.next(); - if (c.getState() == IChannel.STATE_CLOSED) { - iter.remove(); - forcedChannelFlags.remove(c); - } - } - } - - /* (non-Javadoc) - * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager#shutdown(org.eclipse.tcf.protocol.IPeer) - */ - @Override - public void shutdown(final IPeer peer) { - Runnable runnable = new Runnable() { - @Override - public void run() { - Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ - internalShutdown(peer); - } - }; - if (Protocol.isDispatchThread()) runnable.run(); - else Protocol.invokeLater(runnable); - } - - /** - * Shutdown the communication to the given peer, no matter of the current - * reference count. A possible associated value-add is shutdown as well. - * - * @param peer The peer. Must not be <code>null</code>. - */ - /* default */ void internalShutdown(IPeer peer) { - Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ - Assert.isNotNull(peer); - - // Get the peer id - String id = peer.getID(); - - // First, close all channels that are not reference counted - ListIterator<IChannel> iter = forcedChannels.listIterator(); - while (iter.hasNext()) { - IChannel c = iter.next(); - if (id.equals(c.getRemotePeer().getID())) { - c.close(); - iter.remove(); - forcedChannelFlags.remove(c); - } - } - - // Get the channel - IChannel channel = internalGetChannel(peer); - if (channel != null) { - // Reset the reference count (will force a channel close) - refCounters.remove(channel); - - // Close the channel - internalCloseChannel(channel); - } - } - - /* (non-Javadoc) - * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager#closeAll(boolean) - */ - @Override - public void closeAll(boolean wait) { - if (wait) Assert.isTrue(!Protocol.isDispatchThread()); - - Runnable runnable = new Runnable() { - @Override - public void run() { - Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ - internalCloseAll(); - } - }; - - if (Protocol.isDispatchThread()) runnable.run(); - else if (wait) Protocol.invokeAndWait(runnable); - else Protocol.invokeLater(runnable); - } - - /** - * Close all open channel, no matter of the current reference count. - * <p> - * <b>Note:</b> This method must be invoked at the TCF dispatch thread. - */ - /* default */ void internalCloseAll() { - Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ - - IChannel[] openChannels = channels.values().toArray(new IChannel[channels.values().size()]); - - refCounters.clear(); - channels.clear(); - - for (IChannel channel : openChannels) internalCloseChannel(channel); - } - - // ----- Streams handling ----- - - /* (non-Javadoc) - * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager#subscribeStream(org.eclipse.tcf.protocol.IChannel, java.lang.String, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.IStreamsListener, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.DoneSubscribeStream) - */ - @Override - public void subscribeStream(final IChannel channel, final String streamType, final IStreamsListener listener, final DoneSubscribeStream done) { - Assert.isNotNull(channel); - Assert.isNotNull(streamType); - Assert.isNotNull(listener); - Assert.isNotNull(done); - - if (channel.getState() != IChannel.STATE_OPEN) { - done.doneSubscribeStream(new Exception(Messages.ChannelManager_stream_closed_message)); - return; - } - - StreamListenerProxy proxy = null; - - // Get all the streams listener proxy instance for the given channel - List<StreamListenerProxy> proxies = streamProxies.get(channel); - // Loop the proxies and find the one for the given stream type - if (proxies != null) { - for (StreamListenerProxy candidate : proxies) { - if (streamType.equals(candidate.getStreamType())) { - proxy = candidate; - break; - } - } - } - - // If the proxy already exist, add the listener to the proxy and return immediately - if (proxy != null) { - proxy.addListener(listener); - done.doneSubscribeStream(null); - } else { - // No proxy yet -> subscribe to the stream type for real and register the proxy - proxy = new StreamListenerProxy(channel, streamType); - if (proxies == null) { - proxies = new ArrayList<StreamListenerProxy>(); - streamProxies.put(channel, proxies); - } - proxies.add(proxy); - proxy.addListener(listener); - - IStreams service = channel.getRemoteService(IStreams.class); - if (service != null) { - final StreamListenerProxy finProxy = proxy; - final List<StreamListenerProxy> finProxies = proxies; - - // Subscribe to the stream type - service.subscribe(streamType, proxy, new IStreams.DoneSubscribe() { - @Override - public void doneSubscribe(IToken token, Exception error) { - if (error != null) { - finProxy.removeListener(listener); - if (finProxy.isEmpty()) finProxies.remove(finProxy); - if (finProxies.isEmpty()) streamProxies.remove(channel); - } else { - finProxy.addListener(listener); - } - done.doneSubscribeStream(error); - } - }); - } else { - proxy.removeListener(listener); - if (proxy.isEmpty()) proxies.remove(proxy); - if (proxies.isEmpty()) streamProxies.remove(channel); - done.doneSubscribeStream(new Exception(Messages.ChannelManager_stream_missing_service_message)); - } - } - } - - /* (non-Javadoc) - * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager#unsubscribeStream(org.eclipse.tcf.protocol.IChannel, java.lang.String, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.IStreamsListener, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.DoneUnsubscribeStream) - */ - @Override - public void unsubscribeStream(final IChannel channel, final String streamType, final IStreamsListener listener, final DoneUnsubscribeStream done) { - Assert.isNotNull(channel); - Assert.isNotNull(streamType); - Assert.isNotNull(listener); - Assert.isNotNull(done); - - if (channel.getState() != IChannel.STATE_OPEN) { - done.doneUnsubscribeStream(new Exception(Messages.ChannelManager_stream_closed_message)); - return; - } - - StreamListenerProxy proxy = null; - - // Get all the streams listener proxy instance for the given channel - List<StreamListenerProxy> proxies = streamProxies.get(channel); - // Loop the proxies and find the one for the given stream type - if (proxies != null) { - for (StreamListenerProxy candidate : proxies) { - if (streamType.equals(candidate.getStreamType())) { - proxy = candidate; - break; - } - } - } - - if (proxy != null) { - // Remove the listener from the proxy - proxy.removeListener(listener); - // Are there remaining proxied listeners for this stream type? - if (proxy.isEmpty()) { - // Unregister the stream type - IStreams service = channel.getRemoteService(IStreams.class); - if (service != null) { - final StreamListenerProxy finProxy = proxy; - final List<StreamListenerProxy> finProxies = proxies; - - // Unsubscribe - service.unsubscribe(streamType, proxy, new IStreams.DoneUnsubscribe() { - @Override - public void doneUnsubscribe(IToken token, Exception error) { - finProxies.remove(finProxy); - if (finProxies.isEmpty()) streamProxies.remove(channel); - done.doneUnsubscribeStream(error); - } - }); - } else { - done.doneUnsubscribeStream(new Exception(Messages.ChannelManager_stream_missing_service_message)); - } - } - } - } - -} diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/channelmanager/steps/ApplyPathMapsStep.java b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/channelmanager/steps/ApplyPathMapsStep.java index 5d3f94a2c..7a8fdf9a6 100644 --- a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/channelmanager/steps/ApplyPathMapsStep.java +++ b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/channelmanager/steps/ApplyPathMapsStep.java @@ -25,6 +25,7 @@ import org.eclipse.tcf.te.runtime.stepper.StepperAttributeUtil; import org.eclipse.tcf.te.runtime.stepper.interfaces.IFullQualifiedId; import org.eclipse.tcf.te.runtime.stepper.interfaces.IStepContext; import org.eclipse.tcf.te.tcf.core.activator.CoreBundleActivator; +import org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager; import org.eclipse.tcf.te.tcf.core.interfaces.IPathMapService; import org.eclipse.tcf.te.tcf.core.interfaces.steps.ITcfStepAttributes; import org.eclipse.tcf.te.tcf.core.steps.AbstractPeerStep; @@ -66,16 +67,22 @@ public class ApplyPathMapsStep extends AbstractPeerStep { final IPeer peer = getActivePeerContext(context, data, fullQualifiedId); Assert.isNotNull(peer); - // Apply the initial path map to the opened channel. - // - // This must happen outside the TCF dispatch thread as it may trigger - // the launch configuration change listeners. - Assert.isTrue(!Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ - final IPathMapService service = ServiceManager.getInstance().getService(peer, IPathMapService.class); - if (service != null) { - // Pass in the channel for direct use. IChannelManager.getChannel(peer) - // does return null while still executing the "open channel" step group. - service.applyPathMap(channel, true, callback); + final boolean applyPathMaps = !StepperAttributeUtil.getBooleanProperty(IChannelManager.FLAG_NO_PATH_MAP, fullQualifiedId, data); + + if (applyPathMaps) { + // Apply the initial path map to the opened channel. + // + // This must happen outside the TCF dispatch thread as it may trigger + // the launch configuration change listeners. + Assert.isTrue(!Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$ + final IPathMapService service = ServiceManager.getInstance().getService(peer, IPathMapService.class); + if (service != null) { + // Pass in the channel for direct use. IChannelManager.getChannel(peer) + // does return null while still executing the "open channel" step group. + service.applyPathMap(channel, true, callback); + } else { + callback(data, fullQualifiedId, callback, Status.OK_STATUS, null); + } } else { callback(data, fullQualifiedId, callback, Status.OK_STATUS, null); } diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.java b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.java index 5b774472e..ea33b1e18 100644 --- a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.java +++ b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.java @@ -78,19 +78,7 @@ public class Messages extends NLS { public static String ChannelManager_openChannel_new_message; public static String ChannelManager_openChannel_success_message; public static String ChannelManager_openChannel_failed_message; - public static String ChannelManager_openChannel_valueAdd_check; - public static String ChannelManager_openChannel_valueAdd_noneApplicable; - public static String ChannelManager_openChannel_valueAdd_numApplicable; - public static String ChannelManager_openChannel_valueAdd_isAlive; - public static String ChannelManager_openChannel_valueAdd_launch; - public static String ChannelManager_openChannel_valueAdd_launch_exception; - public static String ChannelManager_openChannel_valueAdd_startChaining; - public static String ChannelManager_openChannel_proxies_startChaining; - public static String ChannelManager_openChannel_succeeded; public static String ChannelManager_openChannel_failed; - public static String ChannelManager_openChannel_redirect_succeeded; - public static String ChannelManager_openChannel_redirect_failed; - public static String ChannelManager_openChannel_redirect_invalidChannelState; public static String ChannelManager_closeChannel_close_message; public static String ChannelManager_closeChannel_message; public static String ChannelManager_closeChannel_inuse_message; diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.properties b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.properties index 4586af976..cb22aee2b 100644 --- a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.properties +++ b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.properties @@ -19,19 +19,7 @@ ChannelManager_openChannel_pending_message=Opening channel in progress. Callback ChannelManager_openChannel_new_message=Opening new channel. Target id = {0} ChannelManager_openChannel_success_message=Opening channel succeeded. Target id = {0} ChannelManager_openChannel_failed_message=Opening channel failed. Target id = {0}, cause = {1} -ChannelManager_openChannel_valueAdd_check=Checking for value add status. Target id = {0} -ChannelManager_openChannel_valueAdd_noneApplicable=No applicable value-add contributions found. Target id = {0} -ChannelManager_openChannel_valueAdd_numApplicable={0} applicable value-add contributions found. Target id = {1} -ChannelManager_openChannel_valueAdd_isAlive=Value-add #{0} ({1}): isAlive = {2}. Target id = {3} -ChannelManager_openChannel_valueAdd_launch=Value-add #{0} ({1}): launch {2} (error = ''{3}''). Target id = {4} -ChannelManager_openChannel_valueAdd_launch_exception=Value-add #{0} ({1}): Launch exception was:\n{2} -ChannelManager_openChannel_valueAdd_startChaining=Start chaining of value-add's. Target id = {0} -ChannelManager_openChannel_proxies_startChaining=Start chaining of proxies. Target id = {0} -ChannelManager_openChannel_succeeded=Successfully opened channel to {0}. Redirect Level = {1}. Target id = {2} ChannelManager_openChannel_failed=Failed to open channel to {0}. Redirect Level = {1}. Target id = {2} -ChannelManager_openChannel_redirect_succeeded=Successfully redirected channel from {0} to {1}. Redirect Level = {2} -ChannelManager_openChannel_redirect_failed=Failed to redirect channel from {0} to {1}. -ChannelManager_openChannel_redirect_invalidChannelState=Channel not in open state. Cannot redirect channel to {1}. ChannelManager_closeChannel_message=Request to close channel. Target id = {0} ChannelManager_closeChannel_close_message=Closing channel. Target id = {0} ChannelManager_closeChannel_inuse_message=Channel not closed. Still in use. Target id = {0}, new reference count = {1} |