Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorUwe Stieber2014-07-21 03:49:09 -0400
committerUwe Stieber2014-07-21 03:49:09 -0400
commit6f84c7afe43a1bb6e739abdb2bcf99d6342ec418 (patch)
tree4b0103c15381e85b93ff5228a4f7f092a6856e6e
parent98cd236aa12962c763f4a4c636a377e174cb2a62 (diff)
downloadorg.eclipse.tcf-6f84c7afe43a1bb6e739abdb2bcf99d6342ec418.tar.gz
org.eclipse.tcf-6f84c7afe43a1bb6e739abdb2bcf99d6342ec418.tar.xz
org.eclipse.tcf-6f84c7afe43a1bb6e739abdb2bcf99d6342ec418.zip
Target Explorer: Switch to stepper based channel manager implementation
-rw-r--r--target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/channelmanager/ChannelManager.java1376
-rw-r--r--target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/channelmanager/ChannelManager2.java727
-rw-r--r--target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/channelmanager/steps/ApplyPathMapsStep.java27
-rw-r--r--target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.java12
-rw-r--r--target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.properties12
5 files changed, 314 insertions, 1840 deletions
diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/channelmanager/ChannelManager.java b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/channelmanager/ChannelManager.java
index ad957936e..1f3c83d3b 100644
--- a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/channelmanager/ChannelManager.java
+++ b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/channelmanager/ChannelManager.java
@@ -1,5 +1,5 @@
/*******************************************************************************
- * Copyright (c) 2011, 2014 Wind River Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2014 Wind River Systems, Inc. and others. All rights reserved.
* This program and the accompanying materials are made available under the terms
* of the Eclipse Public License v1.0 which accompanies this distribution, and is
* available at http://www.eclipse.org/legal/epl-v10.html
@@ -9,8 +9,6 @@
*******************************************************************************/
package org.eclipse.tcf.te.tcf.core.internal.channelmanager;
-import java.io.PrintWriter;
-import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -21,44 +19,52 @@ import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.core.runtime.Assert;
import org.eclipse.core.runtime.IStatus;
-import org.eclipse.core.runtime.OperationCanceledException;
import org.eclipse.core.runtime.PlatformObject;
import org.eclipse.osgi.util.NLS;
import org.eclipse.tcf.protocol.IChannel;
import org.eclipse.tcf.protocol.IPeer;
import org.eclipse.tcf.protocol.IToken;
import org.eclipse.tcf.protocol.Protocol;
-import org.eclipse.tcf.services.IDiagnostics;
-import org.eclipse.tcf.services.IPathMap;
import org.eclipse.tcf.services.IStreams;
import org.eclipse.tcf.te.runtime.callback.Callback;
-import org.eclipse.tcf.te.runtime.services.ServiceManager;
+import org.eclipse.tcf.te.runtime.interfaces.callback.ICallback;
+import org.eclipse.tcf.te.runtime.interfaces.properties.IPropertiesContainer;
+import org.eclipse.tcf.te.runtime.properties.PropertiesContainer;
+import org.eclipse.tcf.te.runtime.stepper.interfaces.IStepAttributes;
+import org.eclipse.tcf.te.runtime.stepper.interfaces.IStepContext;
+import org.eclipse.tcf.te.runtime.stepper.interfaces.IStepperOperationService;
+import org.eclipse.tcf.te.runtime.stepper.job.StepperJob;
+import org.eclipse.tcf.te.runtime.stepper.utils.StepperHelper;
import org.eclipse.tcf.te.tcf.core.activator.CoreBundleActivator;
import org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager;
-import org.eclipse.tcf.te.tcf.core.interfaces.IPathMapService;
-import org.eclipse.tcf.te.tcf.core.interfaces.IPeerProperties;
+import org.eclipse.tcf.te.tcf.core.interfaces.steps.ITcfStepAttributes;
import org.eclipse.tcf.te.tcf.core.interfaces.tracing.ITraceIds;
+import org.eclipse.tcf.te.tcf.core.internal.channelmanager.steps.ShutdownValueAddStep;
import org.eclipse.tcf.te.tcf.core.nls.Messages;
-import org.eclipse.tcf.te.tcf.core.util.persistence.PeerDataHelper;
-import org.eclipse.tcf.te.tcf.core.va.ValueAddManager;
-import org.eclipse.tcf.te.tcf.core.va.interfaces.IValueAdd;
-
/**
- * TCF channel manager implementation.
+ * Channel manager implementation.
*/
-public final class ChannelManager extends PlatformObject implements IChannelManager {
- // The map of reference counters per peer id
- /* default */ final Map<String, AtomicInteger> refCounters = new HashMap<String, AtomicInteger>();
+public class ChannelManager extends PlatformObject implements IChannelManager {
+ // The map of reference counters per channel
+ /* default */ final Map<IChannel, AtomicInteger> refCounters = new HashMap<IChannel, AtomicInteger>();
// The map of channels per peer id
/* default */ final Map<String, IChannel> channels = new HashMap<String, IChannel>();
- // The map of channels opened via "forceNew" flag (needed to handle the close channel correctly)
+ // The map of pending open channel callback's per peer id
+ /* default */ final Map<String, List<DoneOpenChannel>> pendingDones = new HashMap<String, List<DoneOpenChannel>>();
+ // The list of channels opened via "forceNew" flag (needed to handle the close channel correctly)
/* default */ final List<IChannel> forcedChannels = new ArrayList<IChannel>();
+ // The map of flags used for opening a forced channel per channel
+ /* default */ final Map<IChannel, Map<String, Boolean>> forcedChannelFlags = new HashMap<IChannel, Map<String, Boolean>>();
// The map of stream listener proxies per channel
/* default */ final Map<IChannel, List<StreamListenerProxy>> streamProxies = new HashMap<IChannel, List<StreamListenerProxy>>();
+ // The map of scheduled "open channel" stepper jobs per peer id
+ /* default */ final Map<String, StepperJob> pendingOpenChannel = new HashMap<String, StepperJob>();
+ // The map of scheduled "close channel" stepper jobs per peer id
+ /* default */ final Map<String, StepperJob> pendingCloseChannel = new HashMap<String, StepperJob>();
/**
- * Constructor.
+ * Constructor
*/
public ChannelManager() {
super();
@@ -69,112 +75,35 @@ public final class ChannelManager extends PlatformObject implements IChannelMana
*/
@Override
public void openChannel(final IPeer peer, final Map<String, Boolean> flags, final DoneOpenChannel done) {
+ Assert.isNotNull(peer);
+ Assert.isNotNull(done);
+
if (CoreBundleActivator.getTraceHandler().isSlotEnabled(1, ITraceIds.TRACE_CHANNEL_MANAGER)) {
try {
throw new Throwable();
} catch (Throwable e) {
CoreBundleActivator.getTraceHandler().trace("ChannelManager#openChannel called from:", //$NON-NLS-1$
- 1, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this);
+ 1, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this);
e.printStackTrace();
}
}
- DoneOpenChannel innerDone = null;
- boolean noPathMap = flags != null && flags.containsKey(IChannelManager.FLAG_NO_PATH_MAP) ? flags.get(IChannelManager.FLAG_NO_PATH_MAP).booleanValue() : false;
- if (noPathMap) {
- innerDone = done;
- } else {
- innerDone = new DoneOpenChannel() {
- @Override
- public void doneOpenChannel(final Throwable error, final IChannel channel) {
- // If open channel failed, pass on to the original done
- if (error != null || channel == null || channel.getState() != IChannel.STATE_OPEN) {
- done.doneOpenChannel(error, channel);
- } else {
- // Take care of the path map
- final IPathMapService service = ServiceManager.getInstance().getService(peer, IPathMapService.class);
- final IPathMap svc = channel.getRemoteService(IPathMap.class);
- if (service != null && svc != null) {
- // Apply the initial path map to the opened channel.
- // This must happen outside the TCF dispatch thread as it may trigger
- // the launch configuration change listeners.
- Thread thread = new Thread(new Runnable() {
- @Override
- public void run() {
- service.applyPathMap(peer, true, new Callback() {
- @Override
- protected void internalDone(Object caller, IStatus status) {
- done.doneOpenChannel(error, channel);
- }
- });
- }
- });
- thread.start();
- } else {
- done.doneOpenChannel(null, channel);
- }
- }
- }
- };
- }
- final DoneOpenChannel finInnerDone = innerDone;
+ // The client done callback must be called within the TCF event dispatch thread
+ final DoneOpenChannel internalDone = new DoneOpenChannel() {
- Runnable runnable = new Runnable() {
@Override
- public void run() {
- Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
-
- // Check on the value-add's first
- internalHandleValueAdds(peer, flags, new DoneHandleValueAdds() {
+ public void doneOpenChannel(final Throwable error, final IChannel channel) {
+ Runnable runnable = new Runnable() {
@Override
- public void doneHandleValueAdds(final Throwable error, final IValueAdd[] valueAdds) {
- // If the error is null, continue and open the channel
- if (error == null) {
- // Do we have any value add in the chain?
- if (valueAdds != null && valueAdds.length > 0) {
- // There are value-add's -> chain them now
- internalChainValueAdds(valueAdds, peer, flags, finInnerDone);
- } else {
- // Determine the proxy configuration
- String proxyConfiguration = peer.getAttributes().get(IPeerProperties.PROP_PROXIES);
- IPeer[] proxies = proxyConfiguration != null ? PeerDataHelper.decodePeerList(proxyConfiguration) : null;
- if (proxies != null && proxies.length > 0) {
- // There are proxies -> chain them now
- internalChainProxies(proxies, peer, flags, finInnerDone);
- } else {
- // No value-add's and no proxies -> open a channel to the target peer directly
- internalOpenChannel(peer, flags, finInnerDone);
- }
- }
- } else {
- // Shutdown the value-add's launched
- internalShutdownValueAdds(peer, valueAdds);
- // Fail the channel opening
- finInnerDone.doneOpenChannel(error, null);
- }
+ public void run() {
+ done.doneOpenChannel(error, channel);
}
- });
+ };
+
+ if (Protocol.isDispatchThread()) runnable.run();
+ else Protocol.invokeLater(runnable);
}
};
- if (Protocol.isDispatchThread()) runnable.run();
- else Protocol.invokeLater(runnable);
- }
-
- /**
- * Internal implementation of {@link #openChannel(IPeer, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.DoneOpenChannel)}.
- * <p>
- * Reference counted channels are cached by the channel manager and must be closed via {@link #closeChannel(IChannel)} .
- * <p>
- * Method must be called within the TCF dispatch thread.
- *
- * @param peer The peer. Must not be <code>null</code>.
- * @param flags Map containing the flags to parameterize the channel opening, or <code>null</code>.
- * @param done The client callback. Must not be <code>null</code>.
- */
- /* default */ void internalOpenChannel(final IPeer peer, final Map<String, Boolean> flags, final DoneOpenChannel done) {
- Assert.isNotNull(peer);
- Assert.isNotNull(done);
- Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
// The channel instance to return
IChannel channel = null;
@@ -184,133 +113,181 @@ public final class ChannelManager extends PlatformObject implements IChannelMana
if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_message, id, flags),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this);
+ 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this);
}
- // Extract the flags of interest form the given flags map
+ // First thing to determine is if to open a new channel or the shared
+ // channel can be used, if there is a shared channel at all.
boolean forceNew = flags != null && flags.containsKey(IChannelManager.FLAG_FORCE_NEW) ? flags.get(IChannelManager.FLAG_FORCE_NEW).booleanValue() : false;
boolean noValueAdd = flags != null && flags.containsKey(IChannelManager.FLAG_NO_VALUE_ADD) ? flags.get(IChannelManager.FLAG_NO_VALUE_ADD).booleanValue() : false;
- // If noValueAdd == true -> forceNew has to be true as well
- if (noValueAdd) forceNew = true;
+ boolean noPathMap = flags != null && flags.containsKey(IChannelManager.FLAG_NO_PATH_MAP) ? flags.get(IChannelManager.FLAG_NO_PATH_MAP).booleanValue() : false;
+ // If noValueAdd == true or noPathMap == true -> forceNew has to be true as well
+ if (noValueAdd || noPathMap) forceNew = true;
final boolean finForceNew = forceNew;
- // Check if there is already a channel opened to this peer
- channel = !forceNew ? channels.get(id) : null;
- if (channel != null && (channel.getState() == IChannel.STATE_OPEN || channel.getState() == IChannel.STATE_OPENING)) {
- // Increase the reference count
- AtomicInteger counter = refCounters.get(id);
- if (counter == null) {
- counter = new AtomicInteger(0);
- refCounters.put(id, counter);
+ // Query the shared channel if not forced to open a new channel
+ if (!forceNew) channel = channels.get(id);
+
+ // If a shared channel is available, check if the shared channel can be used
+ if (channel != null) {
+ // If the channel is still open, it's all done and the channel can be returned right away
+ if (channel.getState() == IChannel.STATE_OPEN) {
+ // Increase the reference count
+ AtomicInteger counter = refCounters.get(channel);
+ if (counter == null) {
+ counter = new AtomicInteger(0);
+ refCounters.put(channel, counter);
+ }
+ counter.incrementAndGet();
+
+ if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
+ CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_reuse_message, id, counter.toString()),
+ 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this);
+ }
+
+ // Invoke the channel open done callback
+ internalDone.doneOpenChannel(null, channel);
}
- counter.incrementAndGet();
+ // If the channel is opening, wait for the channel to become fully opened.
+ // Add the done open channel callback to the list of pending callback's.
+ else if (channel.getState() == IChannel.STATE_OPENING) {
+ List<DoneOpenChannel> dones = pendingDones.get(id);
+ if (dones == null) {
+ dones = new ArrayList<DoneOpenChannel>();
+ pendingDones.put(id, dones);
+ }
+ Assert.isNotNull(dones);
+ dones.add(internalDone);
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_reuse_message, id, counter.toString()),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this);
+ if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
+ CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_pending_message, id, "0x" + Integer.toHexString(internalDone.hashCode())), //$NON-NLS-1$
+ 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this);
+ }
+ }
+ else {
+ // Channel is not in open state -> drop the instance
+ channels.remove(id);
+ refCounters.remove(channel);
+ channel = null;
}
- } else if (channel != null) {
- // Channel is not in open state -> drop the instance
- channel = null;
- channels.remove(id);
- refCounters.remove(id);
}
- // Opens a new channel if necessary
+ // Channel not available -> open a new one
if (channel == null) {
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_new_message, id),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this);
- }
-
- try {
- channel = peer.openChannel();
-
- if (channel != null) {
- if (!forceNew) channels.put(id, channel);
- if (!forceNew) refCounters.put(id, new AtomicInteger(1));
- if (forceNew) forcedChannels.add(channel);
+ // Check if there is a pending "open channel" stepper job
+ StepperJob job = pendingOpenChannel.get(id);
+ if (job == null) {
+ // No pending "open channel" stepper job -> schedule one and initiate opening the channel
+ if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
+ CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_new_message, id),
+ 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this);
+ }
- // Register the channel listener
- final IChannel finChannel = channel;
- channel.addChannelListener(new IChannel.IChannelListener() {
+ // Create the data properties container passed to the "open channel" steps
+ final IPropertiesContainer data = new PropertiesContainer();
+ // Set the flags to be passed to the "open channel" steps
+ data.setProperty(IChannelManager.FLAG_NO_VALUE_ADD, noValueAdd);
+ data.setProperty(IChannelManager.FLAG_NO_PATH_MAP, noPathMap);
+ // No recent action history persistence
+ data.setProperty(IStepAttributes.PROP_SKIP_LAST_RUN_HISTORY, true);
- @Override
- public void onChannelOpened() {
- // Remove ourself as listener from the channel
- finChannel.removeChannelListener(this);
+ // Create the callback to be invoked once the "open channel" stepper job is completed
+ final ICallback callback = new Callback() {
+ @Override
+ protected void internalDone(Object caller, IStatus status) {
+ // Check for error
+ if (status.getSeverity() == IStatus.ERROR) {
+ // Extract the failure cause
+ Throwable error = status.getException();
if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_success_message, id),
+ CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_failed_message, id, error),
0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this);
}
- // Channel opening succeeded
- done.doneOpenChannel(null, finChannel);
- }
+ // Job is done -> remove it from the list of pending jobs
+ pendingOpenChannel.remove(id);
- @Override
- public void onChannelClosed(Throwable error) {
- // Remove ourself as listener from the channel
- finChannel.removeChannelListener(this);
- // Clean the reference counter and the channel map
- if (!finForceNew) channels.remove(id);
- if (!finForceNew) refCounters.remove(id);
- if (finForceNew) forcedChannels.remove(finChannel);
+ // Invoke the primary "open channel" done callback
+ internalDone.doneOpenChannel(error, null);
+
+ // Invoke pending callback's
+ List<DoneOpenChannel> pending = pendingDones.remove(id);
+ if (pending != null && !pending.isEmpty()) {
+ for (DoneOpenChannel d : pending) {
+ d.doneOpenChannel(error, null);
+ }
+ }
+ } else {
+ // Get the channel
+ IChannel channel = (IChannel)data.getProperty(ITcfStepAttributes.ATTR_CHANNEL);
+ Assert.isNotNull(channel);
+ Assert.isTrue(channel.getState() == IChannel.STATE_OPEN);
+
+ // Store the channel
+ if (!finForceNew) channels.put(id, channel);
+ if (!finForceNew) refCounters.put(channel, new AtomicInteger(1));
+ if (finForceNew) forcedChannels.add(channel);
+ if (finForceNew) forcedChannelFlags.put(channel, flags);
+
+ // Job is done -> remove it from the list of pending jobs
+ pendingOpenChannel.remove(id);
if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_failed_message, id, error),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this);
+ CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_success_message, id),
+ 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this);
}
- // Channel opening failed
- done.doneOpenChannel(error != null ? error : new OperationCanceledException(), finChannel);
- }
+ // Invoke the primary "open channel" done callback
+ internalDone.doneOpenChannel(null, channel);
- @Override
- public void congestionLevel(int level) {
- // ignored
+ // Invoke pending callback's
+ List<DoneOpenChannel> pending = pendingDones.remove(id);
+ if (pending != null && !pending.isEmpty()) {
+ for (DoneOpenChannel d : pending) {
+ d.doneOpenChannel(null, channel);
+ }
+ }
}
- });
- } else {
- // Channel is null? Something went terrible wrong.
- done.doneOpenChannel(new Exception("Unexpected null return value from IPeer#openChannel()!"), null); //$NON-NLS-1$
- }
- } catch (Throwable e) {
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_failed_message, id, e),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this);
- }
+ }
+ };
- // Channel opening failed
- done.doneOpenChannel(e, channel);
- }
- } else {
- // Wait for the channel to be fully opened if still in "OPENING" state
- if (channel.getState() == IChannel.STATE_OPENING) {
- final IChannel finChannel = channel;
- channel.addChannelListener(new IChannel.IChannelListener() {
+ // Get the stepper operation service
+ IStepperOperationService stepperOperationService = StepperHelper.getService(peer, StepperOperationService.OPEN_CHANNEL);
- @Override
- public void onChannelOpened() {
- finChannel.removeChannelListener(this);
- done.doneOpenChannel(null, finChannel);
- }
+ // Schedule the "open channel" stepper job
+ IStepContext stepContext = stepperOperationService.getStepContext(peer, StepperOperationService.OPEN_CHANNEL);
+ String stepGroupId = stepperOperationService.getStepGroupId(peer, StepperOperationService.OPEN_CHANNEL);
- @Override
- public void onChannelClosed(Throwable error) {
- finChannel.removeChannelListener(this);
- done.doneOpenChannel(error != null ? error : new OperationCanceledException(), finChannel);
- }
+ if (stepGroupId != null && stepContext != null) {
+ String name = stepperOperationService.getStepGroupName(peer, StepperOperationService.OPEN_CHANNEL);
+ boolean isCancelable = stepperOperationService.isCancelable(peer, StepperOperationService.OPEN_CHANNEL);
- @Override
- public void congestionLevel(int level) {
- }
- });
- }
- else {
- done.doneOpenChannel(null, channel);
+ job = new StepperJob(name != null ? name : "", stepContext, data, stepGroupId, StepperOperationService.OPEN_CHANNEL, isCancelable, true); //$NON-NLS-1$
+ job.setJobCallback(callback);
+ job.markStatusHandled();
+ job.schedule();
+ }
+
+ // Remember the "open channel" stepper job until finished
+ if (job != null) {
+ pendingOpenChannel.put(id, job);
+ }
+ } else {
+ // There is a pending "open channel" stepper job -> add the open channel callback to the list
+ List<DoneOpenChannel> dones = pendingDones.get(id);
+ if (dones == null) {
+ dones = new ArrayList<DoneOpenChannel>();
+ pendingDones.put(id, dones);
+ }
+ Assert.isNotNull(dones);
+ dones.add(internalDone);
+
+ if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
+ CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_pending_message, id, "0x" + Integer.toHexString(internalDone.hashCode())), //$NON-NLS-1$
+ 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this);
+ }
}
}
}
@@ -353,9 +330,9 @@ public final class ChannelManager extends PlatformObject implements IChannelMana
IChannel channel = channels.get(id);
if (channel != null && !(channel.getState() == IChannel.STATE_OPEN || channel.getState() == IChannel.STATE_OPENING)) {
// Channel is not in open state -> drop the instance
- channel = null;
channels.remove(id);
- refCounters.remove(id);
+ refCounters.remove(channel);
+ channel = null;
}
return channel;
@@ -387,50 +364,136 @@ public final class ChannelManager extends PlatformObject implements IChannelMana
*
* @param channel The channel. Must not be <code>null</code>.
*/
- /* default */ void internalCloseChannel(IChannel channel) {
+ /* default */ void internalCloseChannel(final IChannel channel) {
Assert.isNotNull(channel);
Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
// Get the id of the remote peer
- IPeer peer = channel.getRemotePeer();
- String id = peer.getID();
+ final IPeer peer = channel.getRemotePeer();
+ final String id = peer.getID();
if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_message, id),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this);
+ 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this);
}
// Determine if the given channel is a reference counted channel
final boolean isRefCounted = !forcedChannels.contains(channel);
// Get the reference counter (if the channel is a reference counted channel)
- AtomicInteger counter = isRefCounted ? refCounters.get(id) : null;
+ AtomicInteger counter = isRefCounted ? refCounters.get(channel) : null;
// If the counter is null or get 0 after the decrement, close the channel
if (counter == null || counter.decrementAndGet() == 0) {
- channel.close();
+ // Check if there is a pending "close channel" stepper job
+ StepperJob job = pendingCloseChannel.get(id);
+ if (job == null) {
+ // No pending "close channel" stepper job -> schedule one and initiate closing the channel
+ if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
+ CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_close_message, id),
+ 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this);
+ }
- // Get the value-add's for the peer to shutdown (if the reference counter is 0)
- if (counter != null && counter.get() == 0) {
- IValueAdd[] valueAdds = ValueAddManager.getInstance().getValueAdd(peer);
- if (valueAdds != null && valueAdds.length > 0) {
- internalShutdownValueAdds(peer, valueAdds);
+ // Create the data properties container passed to the "close channel" steps
+ final IPropertiesContainer data = new PropertiesContainer();
+ // Set the channel to close
+ data.setProperty(ITcfStepAttributes.ATTR_CHANNEL, channel);
+ // No recent action history persistence
+ data.setProperty(IStepAttributes.PROP_SKIP_LAST_RUN_HISTORY, true);
+
+ // Determine if the value-add's can be shutdown or must stay alive.
+ // In case the channel to close is not reference counted, but this is a reference
+ // counted channel to the same peer, and that channel is still open, the
+ // value-adds must stay alive.
+ if (!isRefCounted) {
+ IChannel shared = channels.get(id);
+ if (shared != null && (shared.getState() == IChannel.STATE_OPEN || shared.getState() == IChannel.STATE_OPENING)) {
+ data.setProperty(ShutdownValueAddStep.PROP_SKIP_SHUTDOWN_STEP, true);
+ }
+ } else {
+ // The channel is reference counted, that means it is a shared channel
+ // and normally it will shutdown the value-adds if closed. However, we
+ // can have not reference counted channels to the same target that is
+ // using value-add's. In this case we also have to skip shutting down
+ // the value-add.
+ for (IChannel c : forcedChannels) {
+ if (id.equals(c.getRemotePeer().getID())) {
+ Map<String, Boolean> flags = forcedChannelFlags.get(c);
+ boolean noValueAdd = flags != null && flags.containsKey(IChannelManager.FLAG_NO_VALUE_ADD) ? flags.get(IChannelManager.FLAG_NO_VALUE_ADD).booleanValue() : false;
+ if (!noValueAdd) {
+ data.setProperty(ShutdownValueAddStep.PROP_SKIP_SHUTDOWN_STEP, true);
+ break;
+ }
+ }
+ }
}
- }
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_closed_message, id),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this);
- }
+ // Create the callback to be invoked once the "close channel" stepper job is completed
+ final ICallback callback = new Callback() {
+ @Override
+ protected void internalDone(Object caller, IStatus status) {
+ // Check for error
+ if (status.getSeverity() == IStatus.ERROR) {
+ // Extract the failure cause
+ Throwable error = status.getException();
+
+ if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
+ CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_failed_message, id, error),
+ 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this);
+ }
+
+ // Job is done -> remove it from the list of pending jobs
+ pendingCloseChannel.remove(id);
+ } else {
+ // Job is done -> remove it from the list of pending jobs
+ pendingCloseChannel.remove(id);
- // Clean the reference counter and the channel map
- if (isRefCounted) refCounters.remove(id);
- if (isRefCounted) channels.remove(id);
- if (!isRefCounted) forcedChannels.remove(channel);
+ // Clean the reference counter and the channel map
+ if (isRefCounted) channels.remove(id);
+ if (isRefCounted) refCounters.remove(channel);
+ if (!isRefCounted) forcedChannels.remove(channel);
+ if (!isRefCounted) forcedChannelFlags.remove(channel);
+
+ if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
+ CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_closed_message, id),
+ 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this);
+ }
+ }
+ }
+ };
+
+ // Get the stepper operation service
+ IStepperOperationService stepperOperationService = StepperHelper.getService(peer, StepperOperationService.CLOSE_CHANNEL);
+
+ // Schedule the "close channel" stepper job
+ IStepContext stepContext = stepperOperationService.getStepContext(peer, StepperOperationService.CLOSE_CHANNEL);
+ String stepGroupId = stepperOperationService.getStepGroupId(peer, StepperOperationService.CLOSE_CHANNEL);
+
+ if (stepGroupId != null && stepContext != null) {
+ String name = stepperOperationService.getStepGroupName(peer, StepperOperationService.CLOSE_CHANNEL);
+ boolean isCancelable = stepperOperationService.isCancelable(peer, StepperOperationService.CLOSE_CHANNEL);
+
+ job = new StepperJob(name != null ? name : "", stepContext, data, stepGroupId, StepperOperationService.CLOSE_CHANNEL, isCancelable, true); //$NON-NLS-1$
+ job.setJobCallback(callback);
+ job.markStatusHandled();
+ job.schedule();
+ }
+
+ // Remember the "close channel" stepper job until finished
+ if (job != null) {
+ pendingCloseChannel.put(id, job);
+ }
+ } else {
+ // There is a pending "close channel" stepper job
+ if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
+ CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_pending_message, id),
+ 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this);
+ }
+ }
} else {
if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_inuse_message, id, counter.toString()),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this);
+ 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this);
}
}
@@ -440,6 +503,7 @@ public final class ChannelManager extends PlatformObject implements IChannelMana
IChannel c = iter.next();
if (c.getState() == IChannel.STATE_CLOSED) {
iter.remove();
+ forcedChannelFlags.remove(c);
}
}
}
@@ -480,6 +544,7 @@ public final class ChannelManager extends PlatformObject implements IChannelMana
if (id.equals(c.getRemotePeer().getID())) {
c.close();
iter.remove();
+ forcedChannelFlags.remove(c);
}
}
@@ -487,17 +552,11 @@ public final class ChannelManager extends PlatformObject implements IChannelMana
IChannel channel = internalGetChannel(peer);
if (channel != null) {
// Reset the reference count (will force a channel close)
- refCounters.remove(id);
+ refCounters.remove(channel);
// Close the channel
internalCloseChannel(channel);
}
-
- // Make sure to shutdown all value-add's for the peer
- IValueAdd[] valueAdds = ValueAddManager.getInstance().getValueAdd(peer);
- if (valueAdds != null && valueAdds.length > 0) {
- internalShutdownValueAdds(peer, valueAdds);
- }
}
/* (non-Javadoc)
@@ -534,851 +593,9 @@ public final class ChannelManager extends PlatformObject implements IChannelMana
channels.clear();
for (IChannel channel : openChannels) internalCloseChannel(channel);
-
- internalShutdownAllValueAdds();
- }
-
- /**
- * Shutdown the given value-adds for the given peer.
- *
- * @param peer The peer. Must not be <code>null</code>.
- * @param valueAdds The list of value-adds. Must not be <code>null</code>.
- */
- /* default */ void internalShutdownValueAdds(final IPeer peer, final IValueAdd[] valueAdds) {
- Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
- Assert.isNotNull(peer);
- Assert.isNotNull(valueAdds);
-
- // Get the peer id
- final String id = peer.getID();
-
- if (valueAdds.length > 0) {
- doShutdownValueAdds(id, valueAdds);
- }
- }
-
- /**
- * Shutdown the given value-adds for the given peer id.
- *
- * @param id The peer id. Must not be <code>null</code>.
- * @param valueAdds The list of value-add's. Must not be <code>null</code>.
- */
- /* default */ void doShutdownValueAdds(final String id, final IValueAdd[] valueAdds) {
- Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
- Assert.isNotNull(id);
- Assert.isNotNull(valueAdds);
-
- for (IValueAdd valueAdd : valueAdds) {
- valueAdd.shutdown(id, new Callback() {
- @Override
- protected void internalDone(Object caller, IStatus status) {
- }
- });
- }
- }
-
- /**
- * Shutdown all value-add's running. Called from {@link #closeAll(boolean)}
- */
- /* default */ void internalShutdownAllValueAdds() {
- Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
-
- // Get all value-add's
- IValueAdd[] valueAdds = ValueAddManager.getInstance().getValueAdds(false);
- for (IValueAdd valueAdd : valueAdds) {
- valueAdd.shutdownAll(new Callback() {
- @Override
- protected void internalDone(Object caller, IStatus status) {
- }
- });
- }
- }
-
- /**
- * Client call back interface for internalHandleValueAdds(...).
- */
- interface DoneHandleValueAdds {
- /**
- * Called when all the value-adds are launched or the launched failed.
- *
- * @param error The error description if operation failed, <code>null</code> if succeeded.
- * @param valueAdds The list of value-adds or <code>null</code>.
- */
- void doneHandleValueAdds(Throwable error, IValueAdd[] valueAdds);
- }
-
- /**
- * Check on the value-adds for the given peer. Launch the value-adds
- * if necessary.
- *
- * @param peer The peer. Must not be <code>null</code>.
- * @param flags Map containing the flags to parameterize the channel opening, or <code>null</code>.
- * @param done The client callback. Must not be <code>null</code>.
- */
- /* default */ void internalHandleValueAdds(final IPeer peer, final Map<String, Boolean> flags, final DoneHandleValueAdds done) {
- Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
- Assert.isNotNull(peer);
- Assert.isNotNull(done);
-
- // Get the peer id
- final String id = peer.getID();
-
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_valueAdd_check, id),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this);
- }
-
- // Extract the flags of interest form the given flags map
- boolean forceNew = flags != null && flags.containsKey(IChannelManager.FLAG_FORCE_NEW) ? flags.get(IChannelManager.FLAG_FORCE_NEW).booleanValue() : false;
- boolean noValueAdd = flags != null && flags.containsKey(IChannelManager.FLAG_NO_VALUE_ADD) ? flags.get(IChannelManager.FLAG_NO_VALUE_ADD).booleanValue() : false;
- // If noValueAdd == true -> forceNew has to be true as well
- if (noValueAdd) forceNew = true;
-
- // Check if there is already a channel opened to this peer
- IChannel channel = !forceNew ? channels.get(id) : null;
- if (noValueAdd || channel != null && (channel.getState() == IChannel.STATE_OPEN || channel.getState() == IChannel.STATE_OPENING)) {
- // Got an existing channel or a channel without value-add decoration
- // got requested -> drop out immediately
- done.doneHandleValueAdds(null, null);
- return;
- }
-
- internalHandleValueAdds(peer, done);
- }
-
- /* default */ final Map<String, List<DoneHandleValueAdds>> inProgress = new HashMap<String, List<DoneHandleValueAdds>>();
-
- /**
- * Check on the value-adds for the given peer. Launch the value-adds
- * if necessary.
- *
- * @param peer The peer. Must not be <code>null</code>.
- * @param done The client callback. Must not be <code>null</code>.
- */
- /* default */ void internalHandleValueAdds(final IPeer peer, final DoneHandleValueAdds done) {
- Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
- Assert.isNotNull(peer);
- Assert.isNotNull(done);
-
- // Get the peer id
- final String id = peer.getID();
-
- // If a launch for the same value add is in progress already, attach the new done to
- // the list to call and drop out
- if (inProgress.containsKey(id)) {
- List<DoneHandleValueAdds> dones = inProgress.get(id);
- Assert.isNotNull(dones);
- dones.add(done);
- return;
- }
-
- // Add the done callback to a list of waiting callbacks per peer
- List<DoneHandleValueAdds> dones = new ArrayList<DoneHandleValueAdds>();
- dones.add(done);
- inProgress.put(id, dones);
-
- // The "myDone" callback is invoking the callbacks from the list
- // of waiting callbacks.
- final DoneHandleValueAdds myDone = new DoneHandleValueAdds() {
-
- @Override
- public void doneHandleValueAdds(Throwable error, IValueAdd[] valueAdds) {
- // Get the list of the original done callbacks
- List<DoneHandleValueAdds> dones = inProgress.remove(id);
- for (DoneHandleValueAdds done : dones) {
- done.doneHandleValueAdds(error, valueAdds);
- }
- }
- };
-
- // Do we have applicable value-add contributions
- final IValueAdd[] valueAdds = ValueAddManager.getInstance().getValueAdd(peer);
- if (valueAdds.length == 0) {
- // There are no applicable value-add's -> drop out immediately
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_valueAdd_noneApplicable, id),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this);
- }
- myDone.doneHandleValueAdds(null, valueAdds);
- return;
- }
-
- // There are at least applicable value-add contributions
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_valueAdd_numApplicable, Integer.valueOf(valueAdds.length), id),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this);
- }
-
- final List<IValueAdd> available = new ArrayList<IValueAdd>();
-
- final DoneLaunchValueAdd innerDone = new DoneLaunchValueAdd() {
- @Override
- public void doneLaunchValueAdd(Throwable error, List<IValueAdd> available) {
- myDone.doneHandleValueAdds(error, available.toArray(new IValueAdd[available.size()]));
- }
- };
-
- doLaunchValueAdd(id, valueAdds, 0, available, innerDone);
- }
-
- /**
- * Client call back interface for doLaunchValueAdd(...).
- */
- interface DoneLaunchValueAdd {
- /**
- * Called when a value-add has been chained.
- *
- * @param error The error description if operation failed, <code>null</code> if succeeded.
- * @param available The list of available value-adds.
- */
- void doneLaunchValueAdd(Throwable error, List<IValueAdd> available);
- }
-
- /**
- * Test the value-add at the given index to be alive. Launch the value-add if necessary.
- *
- * @param id The peer id. Must not be <code>null</code>.
- * @param valueAdds The list of value-add's to check. Must not be <code>null</code>.
- * @param i The index.
- * @param available The list of available value-adds. Must not be <code>null</code>.
- * @param done The client callback. Must not be <code>null</code>.
- */
- /* default */ void doLaunchValueAdd(final String id, final IValueAdd[] valueAdds, final int i, final List<IValueAdd> available, final DoneLaunchValueAdd done) {
- Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
- Assert.isNotNull(id);
- Assert.isNotNull(valueAdds);
- Assert.isTrue(valueAdds.length > 0);
- Assert.isNotNull(available);
- Assert.isNotNull(done);
-
- // Get the value-add to launch
- final IValueAdd valueAdd = valueAdds[i];
-
- // Check if the value-add to launch is alive
- valueAdd.isAlive(id, new Callback() {
- @Override
- protected void internalDone(Object caller, IStatus status) {
- boolean alive = ((Boolean)getResult()).booleanValue();
-
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_valueAdd_isAlive, new Object[] { Integer.valueOf(i), valueAdd.getLabel(), Boolean.valueOf(alive), id }),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this);
- }
-
- if (!alive) {
- // Launch the value-add
- valueAdd.launch(id, new Callback() {
- @Override
- protected void internalDone(Object caller, IStatus status) {
- Throwable error = status.getException();
-
- String message = null;
- if (error != null) {
- message = error.getLocalizedMessage();
- if ((message == null || "".equals(message)) && error.getCause() != null) { //$NON-NLS-1$
- message = error.getCause().getLocalizedMessage();
- }
- }
-
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_valueAdd_launch, new Object[] { Integer.valueOf(i), valueAdd.getLabel(),
- (error == null ? "success" : "failed"), //$NON-NLS-1$ //$NON-NLS-2$
- (error != null ? message : null),
- id }),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this);
-
- // Print the stack trace of the error too
- if (error != null) {
- StringWriter sw = new StringWriter();
- error.printStackTrace(new PrintWriter(sw, true));
-
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_valueAdd_launch_exception, new Object[] {
- Integer.valueOf(i), valueAdd.getLabel(),
- sw.getBuffer().toString()
- }),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this);
- }
- }
-
- // If we got an error and the value-add is optional,
- // ignore the error and drop the value-add from the chain.
- if (error != null && valueAdd.isOptional()) {
- error = null;
- } else if (error == null) {
- available.add(valueAdd);
- }
-
- // If the value-add failed to launch, no other value-add's are launched
- if (error != null) {
- done.doneLaunchValueAdd(error, available);
- } else {
- // Launch the next one, if there is any
- if (i + 1 < valueAdds.length) {
- DoneLaunchValueAdd innerDone = new DoneLaunchValueAdd() {
- @Override
- public void doneLaunchValueAdd(Throwable error, List<IValueAdd> available) {
- done.doneLaunchValueAdd(error, available);
- }
- };
- doLaunchValueAdd(id, valueAdds, i + 1, available, innerDone);
- } else {
- // Last value-add in chain launched -> call parent callback
- done.doneLaunchValueAdd(null, available);
- }
- }
- }
- });
- } else {
- // Already alive -> add it to the list of available value-add's
- available.add(valueAdd);
- // Launch the next one, if there is any
- if (i + 1 < valueAdds.length) {
- DoneLaunchValueAdd innerDone = new DoneLaunchValueAdd() {
- @Override
- public void doneLaunchValueAdd(Throwable error, List<IValueAdd> available) {
- done.doneLaunchValueAdd(error, available);
- }
- };
- doLaunchValueAdd(id, valueAdds, i + 1, available, innerDone);
- } else {
- // Last value-add in chain launched -> call parent callback
- done.doneLaunchValueAdd(null, available);
- }
- }
- }
- });
- }
-
- /**
- * Client call back interface for doChainValueAdd(...).
- */
- interface DoneChainValueAdd {
- /**
- * Called when a value-add has been chained.
- *
- * @param error The error description if operation failed, <code>null</code> if succeeded.
- * @param channel The channel object or <code>null</code>.
- */
- void doneChainValueAdd(Throwable error, IChannel channel);
- }
-
- /**
- * Client call back interface for doChainProxies(...).
- */
- interface DoneChainProxies {
- /**
- * Called when a proxies has been chained.
- *
- * @param error The error description if operation failed, <code>null</code> if succeeded.
- * @param channel The channel object or <code>null</code>.
- */
- void doneChainProxies(Throwable error, IChannel channel);
- }
-
- /**
- * Chain the value-adds until the original target peer is reached.
- *
- * @param valueAdds The list of value-add's to chain. Must not be <code>null</code>.
- * @param peer The original target peer. Must not be <code>null</code>.
- * @param flags Map containing the flags to parameterize the channel opening, or <code>null</code>.
- * @param done The client callback. Must not be <code>null</code>.
- */
- /* default */ void internalChainValueAdds(final IValueAdd[] valueAdds, final IPeer peer, final Map<String, Boolean> flags, final DoneOpenChannel done) {
- Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
- Assert.isNotNull(valueAdds);
- Assert.isNotNull(peer);
- Assert.isNotNull(done);
-
- // Get the peer id
- final String id = peer.getID();
-
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_valueAdd_startChaining, id),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this);
- }
-
- // Extract the flags of interest form the given flags map
- boolean forceNew = flags != null && flags.containsKey(IChannelManager.FLAG_FORCE_NEW) ? flags.get(IChannelManager.FLAG_FORCE_NEW).booleanValue() : false;
- boolean noValueAdd = flags != null && flags.containsKey(IChannelManager.FLAG_NO_VALUE_ADD) ? flags.get(IChannelManager.FLAG_NO_VALUE_ADD).booleanValue() : false;
- boolean noPathMap = flags != null && flags.containsKey(IChannelManager.FLAG_NO_PATH_MAP) ? flags.get(IChannelManager.FLAG_NO_PATH_MAP).booleanValue() : false;
- // If noValueAdd == true or noPathMap == true -> forceNew has to be true as well
- if (noValueAdd || noPathMap) forceNew = true;
-
- final boolean finForceNew = forceNew;
-
- // Check if there is already a channel opened to this peer
- IChannel channel = !forceNew ? channels.get(id) : null;
- if (channel != null && (channel.getState() == IChannel.STATE_OPEN || channel.getState() == IChannel.STATE_OPENING)) {
- // Increase the reference count
- AtomicInteger counter = refCounters.get(id);
- if (counter == null) {
- counter = new AtomicInteger(0);
- refCounters.put(id, counter);
- }
- counter.incrementAndGet();
-
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_reuse_message, id, counter.toString()),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this);
- }
- // Got an existing channel -> drop out immediately if the channel is
- // already fully opened. Otherwise wait for the channel to be fully open.
- if (channel.getState() == IChannel.STATE_OPENING) {
- final IChannel finChannel = channel;
- channel.addChannelListener(new IChannel.IChannelListener() {
- @Override
- public void onChannelOpened() {
- finChannel.removeChannelListener(this);
- done.doneOpenChannel(null, finChannel);
- }
- @Override
- public void onChannelClosed(Throwable error) {
- finChannel.removeChannelListener(this);
- done.doneOpenChannel(error != null ? error : new OperationCanceledException(), finChannel);
- }
- @Override
- public void congestionLevel(int level) {
- }
- });
- }
- else {
- done.doneOpenChannel(null, channel);
- }
- return;
- } else if (channel != null) {
- // Channel is not in open state -> drop the instance
- channels.remove(id);
- refCounters.remove(id);
- }
-
- // No existing channel -> open a new one
- final DoneChainValueAdd chainValueAddDone = new DoneChainValueAdd() {
- @Override
- public void doneChainValueAdd(final Throwable error, final IChannel channel) {
- // Ending up here means that the channel is redirected to the last
- // value-add in the chain, but it is not yet redirected through the
- // proxy configuration.
- String proxyConfiguration = peer.getAttributes().get(IPeerProperties.PROP_PROXIES);
- IPeer[] proxies = proxyConfiguration != null ? PeerDataHelper.decodePeerList(proxyConfiguration) : null;
-
- // Create the done callback
- final DoneChainProxies chainProxiesDone = new DoneChainProxies() {
- @Override
- public void doneChainProxies(final Throwable error, final IChannel channel) {
- // Invoke the outer callback
- done.doneOpenChannel(error, channel);
- }
- };
-
- // Continue the redirect chain by chaining the proxies and connecting to the target
- doChainProxies(id, peer.getAttributes(), proxies, finForceNew, valueAdds.length, channel, chainProxiesDone);
- }
- };
-
- doChainValueAdd(id, forceNew, valueAdds, chainValueAddDone);
}
- /* default */ void doChainValueAdd(final String id, final boolean forceNew, final IValueAdd[] valueAdds, final DoneChainValueAdd done) {
- Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
- Assert.isNotNull(id);
- Assert.isNotNull(valueAdds);
- Assert.isNotNull(done);
-
- // The index of the currently processed value-add
- final AtomicInteger index = new AtomicInteger(0);
-
- // Get the value-add to chain
- final AtomicReference<IValueAdd> valueAdd = new AtomicReference<IValueAdd>();
- valueAdd.set(valueAdds[index.get()]);
- Assert.isNotNull(valueAdd.get());
- // Get the next value-add in chain
- final AtomicReference<IValueAdd> nextValueAdd = new AtomicReference<IValueAdd>();
- nextValueAdd.set(index.get() + 1 < valueAdds.length ? valueAdds[index.get() + 1] : null);
-
- // Get the peer for the value-add to chain
- final AtomicReference<IPeer> valueAddPeer = new AtomicReference<IPeer>();
- valueAddPeer.set(valueAdd.get().getPeer(id));
- if (valueAddPeer.get() == null) {
- done.doneChainValueAdd(new IllegalStateException("Invalid value-add peer."), null); //$NON-NLS-1$
- return;
- }
-
- // Get the peer for the next value-add in chain
- final AtomicReference<IPeer> nextValueAddPeer = new AtomicReference<IPeer>();
- nextValueAddPeer.set(nextValueAdd.get() != null ? nextValueAdd.get().getPeer(id) : null);
- if (nextValueAdd.get() != null && nextValueAddPeer.get() == null) {
- done.doneChainValueAdd(new IllegalStateException("Invalid value-add peer."), null); //$NON-NLS-1$
- return;
- }
-
- IChannel channel = null;
- try {
- // Open a channel to the value-add
- channel = valueAddPeer.get().openChannel();
- if (channel != null) {
- if (!forceNew) channels.put(id, channel);
- if (!forceNew) refCounters.put(id, new AtomicInteger(1));
- if (forceNew) forcedChannels.add(channel);
-
- // Create and attach the channel listener to catch open/closed events
- final IChannel finChannel = channel;
- final IChannel.IChannelListener finChannelListener = new IChannel.IChannelListener() {
- @Override
- public void onChannelOpened() {
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- if (index.get() == 0) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_succeeded,
- new Object[] { valueAddPeer.get().getID(), Integer.valueOf(index.get()), id }),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this);
-
- } else {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_redirect_succeeded,
- new Object[] { valueAddPeer.get().getID(), finChannel.getRemotePeer().getID(), Integer.valueOf(index.get()) }),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this);
- }
- }
-
- // Channel opened. Check if we are done.
- if (nextValueAdd.get() == null) {
- // Remove ourself as channel listener
- finChannel.removeChannelListener(this);
-
- // No other value-add in the chain -> all done
- done.doneChainValueAdd(null, finChannel);
- } else {
- // Process the next value-add in chain
- index.incrementAndGet();
-
- // Update the value-add references
- valueAdd.set(nextValueAdd.get());
- valueAddPeer.set(nextValueAddPeer.get());
-
- nextValueAdd.set(index.get() + 1 < valueAdds.length ? valueAdds[index.get() + 1] : null);
- nextValueAddPeer.set(nextValueAdd.get() != null ? nextValueAdd.get().getPeer(id) : null);
- if (nextValueAdd.get() != null && nextValueAddPeer.get() == null) {
- // Remove ourself as channel listener
- finChannel.removeChannelListener(this);
- // Close the channel
- finChannel.close();
- // Invoke the callback
- done.doneChainValueAdd(new IllegalStateException("Invalid value-add peer."), null); //$NON-NLS-1$
- return;
- }
-
- // Redirect the channel to the next value-add in chain
- finChannel.redirect(valueAddPeer.get().getAttributes());
- }
- }
-
- @Override
- public void onChannelClosed(Throwable error) {
- // Remove ourself as channel listener
- finChannel.removeChannelListener(this);
-
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- if (index.get() == 0) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_failed,
- new Object[] { valueAddPeer.get().getID(), Integer.valueOf(index.get()), id }),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this);
-
- } else {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_redirect_failed, finChannel.getRemotePeer().getID(), valueAddPeer.get().getID()),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this);
- }
- }
-
- // Clean the reference counter and the channel map
- if (!forceNew) channels.remove(id);
- if (!forceNew) refCounters.remove(id);
- if (forceNew) forcedChannels.remove(finChannel);
-
- // Channel redirect failed -> This will break everything
- done.doneChainValueAdd(error, finChannel);
- }
-
- @Override
- public void congestionLevel(int level) {
- }
- };
- channel.addChannelListener(finChannelListener);
- } else {
- // Channel is null? Something went terrible wrong.
- done.doneChainValueAdd(new Exception("Unexpected null return value from IPeer#openChannel()!"), null); //$NON-NLS-1$
-
- }
- } catch (Throwable e) {
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_failed_message, id, e),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this);
- }
-
- // Channel opening failed
- done.doneChainValueAdd(e, channel);
- }
- }
-
- /**
- * Chain the proxies until the original target peer is reached.
- *
- * @param proxies The list of proxies to chain. Must not be <code>null</code>.
- * @param peer The original target peer. Must not be <code>null</code>.
- * @param flags Map containing the flags to parameterize the channel opening, or <code>null</code>.
- * @param done The client callback. Must not be <code>null</code>.
- */
- /* default */ void internalChainProxies(final IPeer[] proxies, final IPeer peer, final Map<String, Boolean> flags, final DoneOpenChannel done) {
- Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
- Assert.isNotNull(proxies);
- Assert.isTrue(proxies.length > 0);
- Assert.isNotNull(peer);
- Assert.isNotNull(done);
-
- // Get the peer id
- final String id = peer.getID();
-
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_proxies_startChaining, id),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this);
- }
-
- // Extract the flags of interest form the given flags map
- boolean forceNew = flags != null && flags.containsKey(IChannelManager.FLAG_FORCE_NEW) ? flags.get(IChannelManager.FLAG_FORCE_NEW).booleanValue() : false;
- boolean noValueAdd = flags != null && flags.containsKey(IChannelManager.FLAG_NO_VALUE_ADD) ? flags.get(IChannelManager.FLAG_NO_VALUE_ADD).booleanValue() : false;
- boolean noPathMap = flags != null && flags.containsKey(IChannelManager.FLAG_NO_PATH_MAP) ? flags.get(IChannelManager.FLAG_NO_PATH_MAP).booleanValue() : false;
- // If noValueAdd == true or noPathMap == true -> forceNew has to be true as well
- if (noValueAdd || noPathMap) forceNew = true;
-
- final boolean finForceNew = forceNew;
-
- // Check if there is already a channel opened to this peer
- IChannel channel = !forceNew ? channels.get(id) : null;
- if (channel != null && (channel.getState() == IChannel.STATE_OPEN || channel.getState() == IChannel.STATE_OPENING)) {
- // Increase the reference count
- AtomicInteger counter = refCounters.get(id);
- if (counter == null) {
- counter = new AtomicInteger(0);
- refCounters.put(id, counter);
- }
- counter.incrementAndGet();
-
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_reuse_message, id, counter.toString()),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this);
- }
- // Got an existing channel -> drop out immediately if the channel is
- // already fully opened. Otherwise wait for the channel to be fully open.
- if (channel.getState() == IChannel.STATE_OPENING) {
- final IChannel finChannel = channel;
- channel.addChannelListener(new IChannel.IChannelListener() {
- @Override
- public void onChannelOpened() {
- finChannel.removeChannelListener(this);
- done.doneOpenChannel(null, finChannel);
- }
- @Override
- public void onChannelClosed(Throwable error) {
- finChannel.removeChannelListener(this);
- done.doneOpenChannel(error != null ? error : new OperationCanceledException(), finChannel);
- }
- @Override
- public void congestionLevel(int level) {
- }
- });
- }
- else {
- done.doneOpenChannel(null, channel);
- }
- return;
- } else if (channel != null) {
- // Channel is not in open state -> drop the instance
- channels.remove(id);
- refCounters.remove(id);
- }
-
- // No existing channel -> open a new one
-
- // Get the first proxy. This is the one we have to open the channel too.
- IPeer firstProxy = proxies[0];
- Assert.isNotNull(firstProxy);
-
- // Remove the first proxy from the array and build up a new array describing
- // the remaining proxy chain
- final IPeer[] remainingProxies = new IPeer[proxies.length - 1];
- if (remainingProxies.length > 0) System.arraycopy(proxies, 1, remainingProxies, 0, remainingProxies.length);
-
- // Open a channel to the first proxy
- channel = null;
- try {
- channel = firstProxy.openChannel();
- if (channel != null) {
- if (!forceNew) channels.put(id, channel);
- if (!forceNew) refCounters.put(id, new AtomicInteger(1));
- if (forceNew) forcedChannels.add(channel);
-
- // Create and attach the channel listener to catch open/closed events
- final IChannel finChannel = channel;
- final IChannel.IChannelListener finChannelListener = new IChannel.IChannelListener() {
- @Override
- public void onChannelOpened() {
- // Remove ourself as channel listener
- finChannel.removeChannelListener(this);
-
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_success_message, id),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this);
- }
-
- // Channel opened to the first proxy. Process the proxy chain and
- // redirect the channel through the proxies until the original target
- // is reached
- final DoneChainProxies chainProxiesDone = new DoneChainProxies() {
- @Override
- public void doneChainProxies(final Throwable error, final IChannel channel) {
- // Invoke the outer callback
- done.doneOpenChannel(error, channel);
- }
- };
-
- // Continue the redirect chain by chaining the proxies and connecting to the target
- doChainProxies(id, peer.getAttributes(), remainingProxies, finForceNew, 0, finChannel, chainProxiesDone);
- }
-
- @Override
- public void onChannelClosed(Throwable error) {
- // Remove ourself as channel listener
- finChannel.removeChannelListener(this);
-
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_failed_message, id, error),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this);
- }
-
- // Clean the reference counter and the channel map
- if (!finForceNew) channels.remove(id);
- if (!finForceNew) refCounters.remove(id);
- if (finForceNew) forcedChannels.remove(finChannel);
-
- // Channel open failed -> This will break everything
- done.doneOpenChannel(error, finChannel);
- }
-
- @Override
- public void congestionLevel(int level) {
- }
- };
- channel.addChannelListener(finChannelListener);
- } else {
- // Channel is null? Something went terrible wrong.
- done.doneOpenChannel(new Exception("Unexpected null return value from IPeer#openChannel()!"), null); //$NON-NLS-1$
-
- }
- } catch (Throwable e) {
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_failed_message, id, e),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, this);
- }
-
- // Channel opening failed
- done.doneOpenChannel(e, channel);
- }
- }
-
- /* default */ void doChainProxies(final String id, final Map<String, String> attrs, final IPeer[] proxies, final boolean forceNew, final int numberOfValueAdds, final IChannel channel, final DoneChainProxies done) {
- Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
- Assert.isNotNull(id);
- Assert.isNotNull(attrs);
- Assert.isNotNull(channel);
- Assert.isNotNull(done);
-
- // The index of the currently processed proxy
- final AtomicInteger index = new AtomicInteger(0);
-
- // Get the proxy to chain
- final AtomicReference<IPeer> proxy = new AtomicReference<IPeer>();
- proxy.set(proxies != null && proxies.length > 0 ? proxies[index.get()] : null);
- // Get the next proxy in chain
- final AtomicReference<IPeer> nextProxy = new AtomicReference<IPeer>();
- nextProxy.set(proxies != null && index.get() + 1 < proxies.length ? proxies[index.get() + 1] : null);
-
- // The channel must be in open or opening state, otherwise we cannot do the redirect
- if (channel.getState() == IChannel.STATE_CLOSED) {
- done.doneChainProxies(new Exception(NLS.bind(Messages.ChannelManager_openChannel_redirect_invalidChannelState, id)), channel);
- return;
- }
-
- // Determine the ID of the last value-add in the chain
- final String lastValueAddID = channel.getRemotePeer().getID();
-
- // Create and attach the channel listener
- channel.addChannelListener(new IChannel.IChannelListener() {
- @Override
- public void onChannelOpened() {
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_redirect_succeeded,
- new Object[] { lastValueAddID, channel.getRemotePeer().getID(), Integer.valueOf(numberOfValueAdds + index.get()) }),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this);
- }
-
- // Channel redirect succeeded. Check if we are done.
- if (proxy.get() == null) {
- // Remove ourself as channel listener
- channel.removeChannelListener(this);
-
- // No other proxy is in the chain -> reached the target -> all done
- // HACK * HACK * HACK (is eh nur kurz drin sagt Tobias)
- IDiagnostics svc = channel.getRemoteService(IDiagnostics.class);
- if (svc != null) {
- svc.echo("WR_HostFsSendCommands", new IDiagnostics.DoneEcho() { //$NON-NLS-1$
- @Override
- public void doneEcho(IToken token, Throwable error, String s) {
- done.doneChainProxies(null, channel);
- }
- });
- } else {
- done.doneChainProxies(null, channel);
- }
- } else {
- // Update the proxy reference
- proxy.set(nextProxy.get());
-
- // Process the next proxy in chain
- index.incrementAndGet();
-
- // Determine the next proxy to redirect too
- nextProxy.set(index.get() < proxies.length ? proxies[index.get()] : null);
-
- // Redirect the channel to the next proxy in chain, if available,
- // or directly to the target if no more proxies are configured
- channel.redirect(proxy.get() != null ? proxy.get().getAttributes() : attrs);
- }
- }
-
- @Override
- public void onChannelClosed(Throwable error) {
- // Remove ourself as channel listener
- channel.removeChannelListener(this);
-
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_redirect_failed, lastValueAddID, id),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager.this);
- }
-
- // Clean the reference counter and the channel map
- if (forceNew) channels.remove(id);
- if (forceNew) refCounters.remove(id);
- if (forceNew) forcedChannels.remove(channel);
-
- // Channel redirect failed -> This will break everything
- done.doneChainProxies(error, channel);
- }
-
- @Override
- public void congestionLevel(int level) {
- }
- });
-
- // If there is no proxy configured, directly redirect to the target
- channel.redirect(proxy.get() != null ? proxy.get().getAttributes() : attrs);
- }
+ // ----- Streams handling -----
/* (non-Javadoc)
* @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager#subscribeStream(org.eclipse.tcf.protocol.IChannel, java.lang.String, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.IStreamsListener, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.DoneSubscribeStream)
@@ -1506,4 +723,5 @@ public final class ChannelManager extends PlatformObject implements IChannelMana
}
}
}
+
}
diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/channelmanager/ChannelManager2.java b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/channelmanager/ChannelManager2.java
deleted file mode 100644
index f7681eda9..000000000
--- a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/channelmanager/ChannelManager2.java
+++ /dev/null
@@ -1,727 +0,0 @@
-/*******************************************************************************
- * Copyright (c) 2014 Wind River Systems, Inc. and others. All rights reserved.
- * This program and the accompanying materials are made available under the terms
- * of the Eclipse Public License v1.0 which accompanies this distribution, and is
- * available at http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Wind River Systems - initial API and implementation
- *******************************************************************************/
-package org.eclipse.tcf.te.tcf.core.internal.channelmanager;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.eclipse.core.runtime.Assert;
-import org.eclipse.core.runtime.IStatus;
-import org.eclipse.core.runtime.PlatformObject;
-import org.eclipse.osgi.util.NLS;
-import org.eclipse.tcf.protocol.IChannel;
-import org.eclipse.tcf.protocol.IPeer;
-import org.eclipse.tcf.protocol.IToken;
-import org.eclipse.tcf.protocol.Protocol;
-import org.eclipse.tcf.services.IStreams;
-import org.eclipse.tcf.te.runtime.callback.Callback;
-import org.eclipse.tcf.te.runtime.interfaces.callback.ICallback;
-import org.eclipse.tcf.te.runtime.interfaces.properties.IPropertiesContainer;
-import org.eclipse.tcf.te.runtime.properties.PropertiesContainer;
-import org.eclipse.tcf.te.runtime.stepper.interfaces.IStepAttributes;
-import org.eclipse.tcf.te.runtime.stepper.interfaces.IStepContext;
-import org.eclipse.tcf.te.runtime.stepper.interfaces.IStepperOperationService;
-import org.eclipse.tcf.te.runtime.stepper.job.StepperJob;
-import org.eclipse.tcf.te.runtime.stepper.utils.StepperHelper;
-import org.eclipse.tcf.te.tcf.core.activator.CoreBundleActivator;
-import org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager;
-import org.eclipse.tcf.te.tcf.core.interfaces.steps.ITcfStepAttributes;
-import org.eclipse.tcf.te.tcf.core.interfaces.tracing.ITraceIds;
-import org.eclipse.tcf.te.tcf.core.internal.channelmanager.steps.ShutdownValueAddStep;
-import org.eclipse.tcf.te.tcf.core.nls.Messages;
-
-/**
- * Channel manager implementation.
- */
-public class ChannelManager2 extends PlatformObject implements IChannelManager {
- // The map of reference counters per channel
- /* default */ final Map<IChannel, AtomicInteger> refCounters = new HashMap<IChannel, AtomicInteger>();
- // The map of channels per peer id
- /* default */ final Map<String, IChannel> channels = new HashMap<String, IChannel>();
- // The map of pending open channel callback's per peer id
- /* default */ final Map<String, List<DoneOpenChannel>> pendingDones = new HashMap<String, List<DoneOpenChannel>>();
- // The list of channels opened via "forceNew" flag (needed to handle the close channel correctly)
- /* default */ final List<IChannel> forcedChannels = new ArrayList<IChannel>();
- // The map of flags used for opening a forced channel per channel
- /* default */ final Map<IChannel, Map<String, Boolean>> forcedChannelFlags = new HashMap<IChannel, Map<String, Boolean>>();
- // The map of stream listener proxies per channel
- /* default */ final Map<IChannel, List<StreamListenerProxy>> streamProxies = new HashMap<IChannel, List<StreamListenerProxy>>();
- // The map of scheduled "open channel" stepper jobs per peer id
- /* default */ final Map<String, StepperJob> pendingOpenChannel = new HashMap<String, StepperJob>();
- // The map of scheduled "close channel" stepper jobs per peer id
- /* default */ final Map<String, StepperJob> pendingCloseChannel = new HashMap<String, StepperJob>();
-
- /**
- * Constructor
- */
- public ChannelManager2() {
- super();
- }
-
- /* (non-Javadoc)
- * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager#openChannel(org.eclipse.tcf.protocol.IPeer, java.util.Map, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.DoneOpenChannel)
- */
- @Override
- public void openChannel(final IPeer peer, final Map<String, Boolean> flags, final DoneOpenChannel done) {
- Assert.isNotNull(peer);
- Assert.isNotNull(done);
-
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(1, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- try {
- throw new Throwable();
- } catch (Throwable e) {
- CoreBundleActivator.getTraceHandler().trace("ChannelManager#openChannel called from:", //$NON-NLS-1$
- 1, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this);
- e.printStackTrace();
- }
- }
-
- // The client done callback must be called within the TCF event dispatch thread
- final DoneOpenChannel internalDone = new DoneOpenChannel() {
-
- @Override
- public void doneOpenChannel(final Throwable error, final IChannel channel) {
- Runnable runnable = new Runnable() {
- @Override
- public void run() {
- done.doneOpenChannel(error, channel);
- }
- };
-
- if (Protocol.isDispatchThread()) runnable.run();
- else Protocol.invokeLater(runnable);
- }
- };
-
- // The channel instance to return
- IChannel channel = null;
-
- // Get the peer id
- final String id = peer.getID();
-
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_message, id, flags),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this);
- }
-
- // First thing to determine is if to open a new channel or the shared
- // channel can be used, if there is a shared channel at all.
- boolean forceNew = flags != null && flags.containsKey(IChannelManager.FLAG_FORCE_NEW) ? flags.get(IChannelManager.FLAG_FORCE_NEW).booleanValue() : false;
- boolean noValueAdd = flags != null && flags.containsKey(IChannelManager.FLAG_NO_VALUE_ADD) ? flags.get(IChannelManager.FLAG_NO_VALUE_ADD).booleanValue() : false;
- boolean noPathMap = flags != null && flags.containsKey(IChannelManager.FLAG_NO_PATH_MAP) ? flags.get(IChannelManager.FLAG_NO_PATH_MAP).booleanValue() : false;
- // If noValueAdd == true or noPathMap == true -> forceNew has to be true as well
- if (noValueAdd || noPathMap) forceNew = true;
-
- final boolean finForceNew = forceNew;
-
- // Query the shared channel if not forced to open a new channel
- if (!forceNew) channel = channels.get(id);
-
- // If a shared channel is available, check if the shared channel can be used
- if (channel != null) {
- // If the channel is still open, it's all done and the channel can be returned right away
- if (channel.getState() == IChannel.STATE_OPEN) {
- // Increase the reference count
- AtomicInteger counter = refCounters.get(channel);
- if (counter == null) {
- counter = new AtomicInteger(0);
- refCounters.put(channel, counter);
- }
- counter.incrementAndGet();
-
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_reuse_message, id, counter.toString()),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this);
- }
-
- // Invoke the channel open done callback
- internalDone.doneOpenChannel(null, channel);
- }
- // If the channel is opening, wait for the channel to become fully opened.
- // Add the done open channel callback to the list of pending callback's.
- else if (channel.getState() == IChannel.STATE_OPENING) {
- List<DoneOpenChannel> dones = pendingDones.get(id);
- if (dones == null) {
- dones = new ArrayList<DoneOpenChannel>();
- pendingDones.put(id, dones);
- }
- Assert.isNotNull(dones);
- dones.add(internalDone);
-
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_pending_message, id, "0x" + Integer.toHexString(internalDone.hashCode())), //$NON-NLS-1$
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this);
- }
- }
- else {
- // Channel is not in open state -> drop the instance
- channels.remove(id);
- refCounters.remove(channel);
- channel = null;
- }
- }
-
- // Channel not available -> open a new one
- if (channel == null) {
- // Check if there is a pending "open channel" stepper job
- StepperJob job = pendingOpenChannel.get(id);
- if (job == null) {
- // No pending "open channel" stepper job -> schedule one and initiate opening the channel
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_new_message, id),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this);
- }
-
- // Create the data properties container passed to the "open channel" steps
- final IPropertiesContainer data = new PropertiesContainer();
- // Set the flags to be passed to the "open channel" steps
- data.setProperty(IChannelManager.FLAG_NO_VALUE_ADD, noValueAdd);
- data.setProperty(IChannelManager.FLAG_NO_PATH_MAP, noPathMap);
- // No recent action history persistence
- data.setProperty(IStepAttributes.PROP_SKIP_LAST_RUN_HISTORY, true);
-
- // Create the callback to be invoked once the "open channel" stepper job is completed
- final ICallback callback = new Callback() {
- @Override
- protected void internalDone(Object caller, IStatus status) {
- // Check for error
- if (status.getSeverity() == IStatus.ERROR) {
- // Extract the failure cause
- Throwable error = status.getException();
-
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_failed_message, id, error),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this);
- }
-
- // Job is done -> remove it from the list of pending jobs
- pendingOpenChannel.remove(id);
-
- // Invoke the primary "open channel" done callback
- internalDone.doneOpenChannel(error, null);
-
- // Invoke pending callback's
- List<DoneOpenChannel> pending = pendingDones.remove(id);
- if (pending != null && !pending.isEmpty()) {
- for (DoneOpenChannel d : pending) {
- d.doneOpenChannel(error, null);
- }
- }
- } else {
- // Get the channel
- IChannel channel = (IChannel)data.getProperty(ITcfStepAttributes.ATTR_CHANNEL);
- Assert.isNotNull(channel);
- Assert.isTrue(channel.getState() == IChannel.STATE_OPEN);
-
- // Store the channel
- if (!finForceNew) channels.put(id, channel);
- if (!finForceNew) refCounters.put(channel, new AtomicInteger(1));
- if (finForceNew) forcedChannels.add(channel);
- if (finForceNew) forcedChannelFlags.put(channel, flags);
-
- // Job is done -> remove it from the list of pending jobs
- pendingOpenChannel.remove(id);
-
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_success_message, id),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this);
- }
-
- // Invoke the primary "open channel" done callback
- internalDone.doneOpenChannel(null, channel);
-
- // Invoke pending callback's
- List<DoneOpenChannel> pending = pendingDones.remove(id);
- if (pending != null && !pending.isEmpty()) {
- for (DoneOpenChannel d : pending) {
- d.doneOpenChannel(null, channel);
- }
- }
- }
- }
- };
-
- // Get the stepper operation service
- IStepperOperationService stepperOperationService = StepperHelper.getService(peer, StepperOperationService.OPEN_CHANNEL);
-
- // Schedule the "open channel" stepper job
- IStepContext stepContext = stepperOperationService.getStepContext(peer, StepperOperationService.OPEN_CHANNEL);
- String stepGroupId = stepperOperationService.getStepGroupId(peer, StepperOperationService.OPEN_CHANNEL);
-
- if (stepGroupId != null && stepContext != null) {
- String name = stepperOperationService.getStepGroupName(peer, StepperOperationService.OPEN_CHANNEL);
- boolean isCancelable = stepperOperationService.isCancelable(peer, StepperOperationService.OPEN_CHANNEL);
-
- job = new StepperJob(name != null ? name : "", stepContext, data, stepGroupId, StepperOperationService.OPEN_CHANNEL, isCancelable, true); //$NON-NLS-1$
- job.setJobCallback(callback);
- job.markStatusHandled();
- job.schedule();
- }
-
- // Remember the "open channel" stepper job until finished
- if (job != null) {
- pendingOpenChannel.put(id, job);
- }
- } else {
- // There is a pending "open channel" stepper job -> add the open channel callback to the list
- List<DoneOpenChannel> dones = pendingDones.get(id);
- if (dones == null) {
- dones = new ArrayList<DoneOpenChannel>();
- pendingDones.put(id, dones);
- }
- Assert.isNotNull(dones);
- dones.add(internalDone);
-
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_openChannel_pending_message, id, "0x" + Integer.toHexString(internalDone.hashCode())), //$NON-NLS-1$
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this);
- }
- }
- }
- }
-
- /* (non-Javadoc)
- * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager#getChannel(org.eclipse.tcf.protocol.IPeer)
- */
- @Override
- public IChannel getChannel(final IPeer peer) {
- final AtomicReference<IChannel> channel = new AtomicReference<IChannel>();
- Runnable runnable = new Runnable() {
- @Override
- public void run() {
- Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
- channel.set(internalGetChannel(peer));
- }
- };
- if (Protocol.isDispatchThread()) runnable.run();
- else Protocol.invokeAndWait(runnable);
-
- return channel.get();
- }
-
- /**
- * Returns the shared channel instance for the given peer.
- * <p>
- * <b>Note:</b> This method must be invoked at the TCF dispatch thread.
- *
- * @param peer The peer. Must not be <code>null</code>.
- * @return The channel instance or <code>null</code>.
- */
- public IChannel internalGetChannel(IPeer peer) {
- Assert.isNotNull(peer);
- Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
-
- // Get the peer id
- String id = peer.getID();
-
- // Get the channel
- IChannel channel = channels.get(id);
- if (channel != null && !(channel.getState() == IChannel.STATE_OPEN || channel.getState() == IChannel.STATE_OPENING)) {
- // Channel is not in open state -> drop the instance
- channels.remove(id);
- refCounters.remove(channel);
- channel = null;
- }
-
- return channel;
- }
-
- /* (non-Javadoc)
- * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager#closeChannel(org.eclipse.tcf.protocol.IChannel)
- */
- @Override
- public void closeChannel(final IChannel channel) {
- Runnable runnable = new Runnable() {
- @Override
- public void run() {
- Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
- internalCloseChannel(channel);
- }
- };
- if (Protocol.isDispatchThread()) runnable.run();
- else Protocol.invokeLater(runnable);
- }
-
- /**
- * Closes the given channel.
- * <p>
- * If the given channel is a reference counted channel, the channel will be closed if the reference counter
- * reaches 0. For non reference counted channels, the channel is closed immediately.
- * <p>
- * <b>Note:</b> This method must be invoked at the TCF dispatch thread.
- *
- * @param channel The channel. Must not be <code>null</code>.
- */
- /* default */ void internalCloseChannel(final IChannel channel) {
- Assert.isNotNull(channel);
- Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
-
- // Get the id of the remote peer
- final IPeer peer = channel.getRemotePeer();
- final String id = peer.getID();
-
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_message, id),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this);
- }
-
- // Determine if the given channel is a reference counted channel
- final boolean isRefCounted = !forcedChannels.contains(channel);
-
- // Get the reference counter (if the channel is a reference counted channel)
- AtomicInteger counter = isRefCounted ? refCounters.get(channel) : null;
-
- // If the counter is null or get 0 after the decrement, close the channel
- if (counter == null || counter.decrementAndGet() == 0) {
- // Check if there is a pending "close channel" stepper job
- StepperJob job = pendingCloseChannel.get(id);
- if (job == null) {
- // No pending "close channel" stepper job -> schedule one and initiate closing the channel
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_close_message, id),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this);
- }
-
- // Create the data properties container passed to the "close channel" steps
- final IPropertiesContainer data = new PropertiesContainer();
- // Set the channel to close
- data.setProperty(ITcfStepAttributes.ATTR_CHANNEL, channel);
- // No recent action history persistence
- data.setProperty(IStepAttributes.PROP_SKIP_LAST_RUN_HISTORY, true);
-
- // Determine if the value-add's can be shutdown or must stay alive.
- // In case the channel to close is not reference counted, but this is a reference
- // counted channel to the same peer, and that channel is still open, the
- // value-adds must stay alive.
- if (!isRefCounted) {
- IChannel shared = channels.get(id);
- if (shared != null && (shared.getState() == IChannel.STATE_OPEN || shared.getState() == IChannel.STATE_OPENING)) {
- data.setProperty(ShutdownValueAddStep.PROP_SKIP_SHUTDOWN_STEP, true);
- }
- } else {
- // The channel is reference counted, that means it is a shared channel
- // and normally it will shutdown the value-adds if closed. However, we
- // can have not reference counted channels to the same target that is
- // using value-add's. In this case we also have to skip shutting down
- // the value-add.
- for (IChannel c : forcedChannels) {
- if (id.equals(c.getRemotePeer().getID())) {
- Map<String, Boolean> flags = forcedChannelFlags.get(c);
- boolean noValueAdd = flags != null && flags.containsKey(IChannelManager.FLAG_NO_VALUE_ADD) ? flags.get(IChannelManager.FLAG_NO_VALUE_ADD).booleanValue() : false;
- if (!noValueAdd) {
- data.setProperty(ShutdownValueAddStep.PROP_SKIP_SHUTDOWN_STEP, true);
- break;
- }
- }
- }
- }
-
- // Create the callback to be invoked once the "close channel" stepper job is completed
- final ICallback callback = new Callback() {
- @Override
- protected void internalDone(Object caller, IStatus status) {
- // Check for error
- if (status.getSeverity() == IStatus.ERROR) {
- // Extract the failure cause
- Throwable error = status.getException();
-
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_failed_message, id, error),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this);
- }
-
- // Job is done -> remove it from the list of pending jobs
- pendingCloseChannel.remove(id);
- } else {
- // Job is done -> remove it from the list of pending jobs
- pendingCloseChannel.remove(id);
-
- // Clean the reference counter and the channel map
- if (isRefCounted) channels.remove(id);
- if (isRefCounted) refCounters.remove(channel);
- if (!isRefCounted) forcedChannels.remove(channel);
- if (!isRefCounted) forcedChannelFlags.remove(channel);
-
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_closed_message, id),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this);
- }
- }
- }
- };
-
- // Get the stepper operation service
- IStepperOperationService stepperOperationService = StepperHelper.getService(peer, StepperOperationService.CLOSE_CHANNEL);
-
- // Schedule the "close channel" stepper job
- IStepContext stepContext = stepperOperationService.getStepContext(peer, StepperOperationService.CLOSE_CHANNEL);
- String stepGroupId = stepperOperationService.getStepGroupId(peer, StepperOperationService.CLOSE_CHANNEL);
-
- if (stepGroupId != null && stepContext != null) {
- String name = stepperOperationService.getStepGroupName(peer, StepperOperationService.CLOSE_CHANNEL);
- boolean isCancelable = stepperOperationService.isCancelable(peer, StepperOperationService.CLOSE_CHANNEL);
-
- job = new StepperJob(name != null ? name : "", stepContext, data, stepGroupId, StepperOperationService.CLOSE_CHANNEL, isCancelable, true); //$NON-NLS-1$
- job.setJobCallback(callback);
- job.markStatusHandled();
- job.schedule();
- }
-
- // Remember the "close channel" stepper job until finished
- if (job != null) {
- pendingCloseChannel.put(id, job);
- }
- } else {
- // There is a pending "close channel" stepper job
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_pending_message, id),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this);
- }
- }
- } else {
- if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_CHANNEL_MANAGER)) {
- CoreBundleActivator.getTraceHandler().trace(NLS.bind(Messages.ChannelManager_closeChannel_inuse_message, id, counter.toString()),
- 0, ITraceIds.TRACE_CHANNEL_MANAGER, IStatus.INFO, ChannelManager2.this);
- }
- }
-
- // Clean up the list of forced channels. Remove all channels already been closed.
- ListIterator<IChannel> iter = forcedChannels.listIterator();
- while (iter.hasNext()) {
- IChannel c = iter.next();
- if (c.getState() == IChannel.STATE_CLOSED) {
- iter.remove();
- forcedChannelFlags.remove(c);
- }
- }
- }
-
- /* (non-Javadoc)
- * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager#shutdown(org.eclipse.tcf.protocol.IPeer)
- */
- @Override
- public void shutdown(final IPeer peer) {
- Runnable runnable = new Runnable() {
- @Override
- public void run() {
- Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
- internalShutdown(peer);
- }
- };
- if (Protocol.isDispatchThread()) runnable.run();
- else Protocol.invokeLater(runnable);
- }
-
- /**
- * Shutdown the communication to the given peer, no matter of the current
- * reference count. A possible associated value-add is shutdown as well.
- *
- * @param peer The peer. Must not be <code>null</code>.
- */
- /* default */ void internalShutdown(IPeer peer) {
- Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
- Assert.isNotNull(peer);
-
- // Get the peer id
- String id = peer.getID();
-
- // First, close all channels that are not reference counted
- ListIterator<IChannel> iter = forcedChannels.listIterator();
- while (iter.hasNext()) {
- IChannel c = iter.next();
- if (id.equals(c.getRemotePeer().getID())) {
- c.close();
- iter.remove();
- forcedChannelFlags.remove(c);
- }
- }
-
- // Get the channel
- IChannel channel = internalGetChannel(peer);
- if (channel != null) {
- // Reset the reference count (will force a channel close)
- refCounters.remove(channel);
-
- // Close the channel
- internalCloseChannel(channel);
- }
- }
-
- /* (non-Javadoc)
- * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager#closeAll(boolean)
- */
- @Override
- public void closeAll(boolean wait) {
- if (wait) Assert.isTrue(!Protocol.isDispatchThread());
-
- Runnable runnable = new Runnable() {
- @Override
- public void run() {
- Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
- internalCloseAll();
- }
- };
-
- if (Protocol.isDispatchThread()) runnable.run();
- else if (wait) Protocol.invokeAndWait(runnable);
- else Protocol.invokeLater(runnable);
- }
-
- /**
- * Close all open channel, no matter of the current reference count.
- * <p>
- * <b>Note:</b> This method must be invoked at the TCF dispatch thread.
- */
- /* default */ void internalCloseAll() {
- Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
-
- IChannel[] openChannels = channels.values().toArray(new IChannel[channels.values().size()]);
-
- refCounters.clear();
- channels.clear();
-
- for (IChannel channel : openChannels) internalCloseChannel(channel);
- }
-
- // ----- Streams handling -----
-
- /* (non-Javadoc)
- * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager#subscribeStream(org.eclipse.tcf.protocol.IChannel, java.lang.String, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.IStreamsListener, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.DoneSubscribeStream)
- */
- @Override
- public void subscribeStream(final IChannel channel, final String streamType, final IStreamsListener listener, final DoneSubscribeStream done) {
- Assert.isNotNull(channel);
- Assert.isNotNull(streamType);
- Assert.isNotNull(listener);
- Assert.isNotNull(done);
-
- if (channel.getState() != IChannel.STATE_OPEN) {
- done.doneSubscribeStream(new Exception(Messages.ChannelManager_stream_closed_message));
- return;
- }
-
- StreamListenerProxy proxy = null;
-
- // Get all the streams listener proxy instance for the given channel
- List<StreamListenerProxy> proxies = streamProxies.get(channel);
- // Loop the proxies and find the one for the given stream type
- if (proxies != null) {
- for (StreamListenerProxy candidate : proxies) {
- if (streamType.equals(candidate.getStreamType())) {
- proxy = candidate;
- break;
- }
- }
- }
-
- // If the proxy already exist, add the listener to the proxy and return immediately
- if (proxy != null) {
- proxy.addListener(listener);
- done.doneSubscribeStream(null);
- } else {
- // No proxy yet -> subscribe to the stream type for real and register the proxy
- proxy = new StreamListenerProxy(channel, streamType);
- if (proxies == null) {
- proxies = new ArrayList<StreamListenerProxy>();
- streamProxies.put(channel, proxies);
- }
- proxies.add(proxy);
- proxy.addListener(listener);
-
- IStreams service = channel.getRemoteService(IStreams.class);
- if (service != null) {
- final StreamListenerProxy finProxy = proxy;
- final List<StreamListenerProxy> finProxies = proxies;
-
- // Subscribe to the stream type
- service.subscribe(streamType, proxy, new IStreams.DoneSubscribe() {
- @Override
- public void doneSubscribe(IToken token, Exception error) {
- if (error != null) {
- finProxy.removeListener(listener);
- if (finProxy.isEmpty()) finProxies.remove(finProxy);
- if (finProxies.isEmpty()) streamProxies.remove(channel);
- } else {
- finProxy.addListener(listener);
- }
- done.doneSubscribeStream(error);
- }
- });
- } else {
- proxy.removeListener(listener);
- if (proxy.isEmpty()) proxies.remove(proxy);
- if (proxies.isEmpty()) streamProxies.remove(channel);
- done.doneSubscribeStream(new Exception(Messages.ChannelManager_stream_missing_service_message));
- }
- }
- }
-
- /* (non-Javadoc)
- * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager#unsubscribeStream(org.eclipse.tcf.protocol.IChannel, java.lang.String, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.IStreamsListener, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.DoneUnsubscribeStream)
- */
- @Override
- public void unsubscribeStream(final IChannel channel, final String streamType, final IStreamsListener listener, final DoneUnsubscribeStream done) {
- Assert.isNotNull(channel);
- Assert.isNotNull(streamType);
- Assert.isNotNull(listener);
- Assert.isNotNull(done);
-
- if (channel.getState() != IChannel.STATE_OPEN) {
- done.doneUnsubscribeStream(new Exception(Messages.ChannelManager_stream_closed_message));
- return;
- }
-
- StreamListenerProxy proxy = null;
-
- // Get all the streams listener proxy instance for the given channel
- List<StreamListenerProxy> proxies = streamProxies.get(channel);
- // Loop the proxies and find the one for the given stream type
- if (proxies != null) {
- for (StreamListenerProxy candidate : proxies) {
- if (streamType.equals(candidate.getStreamType())) {
- proxy = candidate;
- break;
- }
- }
- }
-
- if (proxy != null) {
- // Remove the listener from the proxy
- proxy.removeListener(listener);
- // Are there remaining proxied listeners for this stream type?
- if (proxy.isEmpty()) {
- // Unregister the stream type
- IStreams service = channel.getRemoteService(IStreams.class);
- if (service != null) {
- final StreamListenerProxy finProxy = proxy;
- final List<StreamListenerProxy> finProxies = proxies;
-
- // Unsubscribe
- service.unsubscribe(streamType, proxy, new IStreams.DoneUnsubscribe() {
- @Override
- public void doneUnsubscribe(IToken token, Exception error) {
- finProxies.remove(finProxy);
- if (finProxies.isEmpty()) streamProxies.remove(channel);
- done.doneUnsubscribeStream(error);
- }
- });
- } else {
- done.doneUnsubscribeStream(new Exception(Messages.ChannelManager_stream_missing_service_message));
- }
- }
- }
- }
-
-}
diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/channelmanager/steps/ApplyPathMapsStep.java b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/channelmanager/steps/ApplyPathMapsStep.java
index 5d3f94a2c..7a8fdf9a6 100644
--- a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/channelmanager/steps/ApplyPathMapsStep.java
+++ b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/channelmanager/steps/ApplyPathMapsStep.java
@@ -25,6 +25,7 @@ import org.eclipse.tcf.te.runtime.stepper.StepperAttributeUtil;
import org.eclipse.tcf.te.runtime.stepper.interfaces.IFullQualifiedId;
import org.eclipse.tcf.te.runtime.stepper.interfaces.IStepContext;
import org.eclipse.tcf.te.tcf.core.activator.CoreBundleActivator;
+import org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager;
import org.eclipse.tcf.te.tcf.core.interfaces.IPathMapService;
import org.eclipse.tcf.te.tcf.core.interfaces.steps.ITcfStepAttributes;
import org.eclipse.tcf.te.tcf.core.steps.AbstractPeerStep;
@@ -66,16 +67,22 @@ public class ApplyPathMapsStep extends AbstractPeerStep {
final IPeer peer = getActivePeerContext(context, data, fullQualifiedId);
Assert.isNotNull(peer);
- // Apply the initial path map to the opened channel.
- //
- // This must happen outside the TCF dispatch thread as it may trigger
- // the launch configuration change listeners.
- Assert.isTrue(!Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
- final IPathMapService service = ServiceManager.getInstance().getService(peer, IPathMapService.class);
- if (service != null) {
- // Pass in the channel for direct use. IChannelManager.getChannel(peer)
- // does return null while still executing the "open channel" step group.
- service.applyPathMap(channel, true, callback);
+ final boolean applyPathMaps = !StepperAttributeUtil.getBooleanProperty(IChannelManager.FLAG_NO_PATH_MAP, fullQualifiedId, data);
+
+ if (applyPathMaps) {
+ // Apply the initial path map to the opened channel.
+ //
+ // This must happen outside the TCF dispatch thread as it may trigger
+ // the launch configuration change listeners.
+ Assert.isTrue(!Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
+ final IPathMapService service = ServiceManager.getInstance().getService(peer, IPathMapService.class);
+ if (service != null) {
+ // Pass in the channel for direct use. IChannelManager.getChannel(peer)
+ // does return null while still executing the "open channel" step group.
+ service.applyPathMap(channel, true, callback);
+ } else {
+ callback(data, fullQualifiedId, callback, Status.OK_STATUS, null);
+ }
} else {
callback(data, fullQualifiedId, callback, Status.OK_STATUS, null);
}
diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.java b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.java
index 5b774472e..ea33b1e18 100644
--- a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.java
+++ b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.java
@@ -78,19 +78,7 @@ public class Messages extends NLS {
public static String ChannelManager_openChannel_new_message;
public static String ChannelManager_openChannel_success_message;
public static String ChannelManager_openChannel_failed_message;
- public static String ChannelManager_openChannel_valueAdd_check;
- public static String ChannelManager_openChannel_valueAdd_noneApplicable;
- public static String ChannelManager_openChannel_valueAdd_numApplicable;
- public static String ChannelManager_openChannel_valueAdd_isAlive;
- public static String ChannelManager_openChannel_valueAdd_launch;
- public static String ChannelManager_openChannel_valueAdd_launch_exception;
- public static String ChannelManager_openChannel_valueAdd_startChaining;
- public static String ChannelManager_openChannel_proxies_startChaining;
- public static String ChannelManager_openChannel_succeeded;
public static String ChannelManager_openChannel_failed;
- public static String ChannelManager_openChannel_redirect_succeeded;
- public static String ChannelManager_openChannel_redirect_failed;
- public static String ChannelManager_openChannel_redirect_invalidChannelState;
public static String ChannelManager_closeChannel_close_message;
public static String ChannelManager_closeChannel_message;
public static String ChannelManager_closeChannel_inuse_message;
diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.properties b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.properties
index 4586af976..cb22aee2b 100644
--- a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.properties
+++ b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.properties
@@ -19,19 +19,7 @@ ChannelManager_openChannel_pending_message=Opening channel in progress. Callback
ChannelManager_openChannel_new_message=Opening new channel. Target id = {0}
ChannelManager_openChannel_success_message=Opening channel succeeded. Target id = {0}
ChannelManager_openChannel_failed_message=Opening channel failed. Target id = {0}, cause = {1}
-ChannelManager_openChannel_valueAdd_check=Checking for value add status. Target id = {0}
-ChannelManager_openChannel_valueAdd_noneApplicable=No applicable value-add contributions found. Target id = {0}
-ChannelManager_openChannel_valueAdd_numApplicable={0} applicable value-add contributions found. Target id = {1}
-ChannelManager_openChannel_valueAdd_isAlive=Value-add #{0} ({1}): isAlive = {2}. Target id = {3}
-ChannelManager_openChannel_valueAdd_launch=Value-add #{0} ({1}): launch {2} (error = ''{3}''). Target id = {4}
-ChannelManager_openChannel_valueAdd_launch_exception=Value-add #{0} ({1}): Launch exception was:\n{2}
-ChannelManager_openChannel_valueAdd_startChaining=Start chaining of value-add's. Target id = {0}
-ChannelManager_openChannel_proxies_startChaining=Start chaining of proxies. Target id = {0}
-ChannelManager_openChannel_succeeded=Successfully opened channel to {0}. Redirect Level = {1}. Target id = {2}
ChannelManager_openChannel_failed=Failed to open channel to {0}. Redirect Level = {1}. Target id = {2}
-ChannelManager_openChannel_redirect_succeeded=Successfully redirected channel from {0} to {1}. Redirect Level = {2}
-ChannelManager_openChannel_redirect_failed=Failed to redirect channel from {0} to {1}.
-ChannelManager_openChannel_redirect_invalidChannelState=Channel not in open state. Cannot redirect channel to {1}.
ChannelManager_closeChannel_message=Request to close channel. Target id = {0}
ChannelManager_closeChannel_close_message=Closing channel. Target id = {0}
ChannelManager_closeChannel_inuse_message=Channel not closed. Still in use. Target id = {0}, new reference count = {1}

Back to the top