Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorUwe Stieber2014-04-22 13:11:17 -0400
committerUwe Stieber2014-04-23 07:25:55 -0400
commit244b70627ea39844583a70af64bbffaa77e89b30 (patch)
treebcb092fb3d18c30463d4e89c43d990a99978d89f
parent79624d4bedf7d19d2463102b50bbc557b369720e (diff)
downloadorg.eclipse.tcf-244b70627ea39844583a70af64bbffaa77e89b30.tar.gz
org.eclipse.tcf-244b70627ea39844583a70af64bbffaa77e89b30.tar.xz
org.eclipse.tcf-244b70627ea39844583a70af64bbffaa77e89b30.zip
Target Explorer: Fix streams handling for ProcessLauncher and TerminalsLauncher
-rw-r--r--target_explorer/plugins/org.eclipse.tcf.te.runtime.services/src/org/eclipse/tcf/te/runtime/services/interfaces/constants/ITerminalsConnectorConstants.java78
-rw-r--r--target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/interfaces/IChannelManager.java51
-rw-r--r--target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/ChannelManager.java239
-rw-r--r--target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.java2
-rw-r--r--target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.properties2
-rw-r--r--target_explorer/plugins/org.eclipse.tcf.te.tcf.processes.core/src/org/eclipse/tcf/te/tcf/processes/core/interfaces/launcher/IProcessLauncher.java8
-rw-r--r--target_explorer/plugins/org.eclipse.tcf.te.tcf.processes.core/src/org/eclipse/tcf/te/tcf/processes/core/launcher/ProcessLauncher.java55
-rw-r--r--target_explorer/plugins/org.eclipse.tcf.te.tcf.processes.core/src/org/eclipse/tcf/te/tcf/processes/core/launcher/ProcessStreamsListener.java131
-rw-r--r--target_explorer/plugins/org.eclipse.tcf.te.tcf.terminals.core/src/org/eclipse/tcf/te/tcf/terminals/core/interfaces/launcher/ITerminalsLauncher.java8
-rw-r--r--target_explorer/plugins/org.eclipse.tcf.te.tcf.terminals.core/src/org/eclipse/tcf/te/tcf/terminals/core/launcher/TerminalsLauncher.java41
-rw-r--r--target_explorer/plugins/org.eclipse.tcf.te.tcf.terminals.core/src/org/eclipse/tcf/te/tcf/terminals/core/launcher/TerminalsStreamsListener.java49
11 files changed, 559 insertions, 105 deletions
diff --git a/target_explorer/plugins/org.eclipse.tcf.te.runtime.services/src/org/eclipse/tcf/te/runtime/services/interfaces/constants/ITerminalsConnectorConstants.java b/target_explorer/plugins/org.eclipse.tcf.te.runtime.services/src/org/eclipse/tcf/te/runtime/services/interfaces/constants/ITerminalsConnectorConstants.java
index dd804d6e5..2c4f41647 100644
--- a/target_explorer/plugins/org.eclipse.tcf.te.runtime.services/src/org/eclipse/tcf/te/runtime/services/interfaces/constants/ITerminalsConnectorConstants.java
+++ b/target_explorer/plugins/org.eclipse.tcf.te.runtime.services/src/org/eclipse/tcf/te/runtime/services/interfaces/constants/ITerminalsConnectorConstants.java
@@ -18,59 +18,59 @@ public interface ITerminalsConnectorConstants {
/**
* Property: The unique id of the terminals view to open.
*/
- public static final String PROP_ID = "id"; //$NON-NLS-1$
+ public static final String PROP_ID = "terminal.id"; //$NON-NLS-1$
/**
* Property: The unique secondary id of the terminals view to open.
*/
- public static final String PROP_SECONDARY_ID = "secondaryId"; //$NON-NLS-1$
+ public static final String PROP_SECONDARY_ID = "terminal.secondaryId"; //$NON-NLS-1$
/**
* Property: The title of the terminal tab to open.
*/
- public static final String PROP_TITLE = "title"; //$NON-NLS-1$
+ public static final String PROP_TITLE = "terminal.title"; //$NON-NLS-1$
/**
* Property: The encoding of the terminal tab to open.
*/
- public static final String PROP_ENCODING = "encoding"; //$NON-NLS-1$
+ public static final String PROP_ENCODING = "terminal.encoding"; //$NON-NLS-1$
/**
* Property: Custom data object to associate with the terminal tab.
*/
- public static final String PROP_DATA = "data"; //$NON-NLS-1$
+ public static final String PROP_DATA = "terminal.data"; //$NON-NLS-1$
/**
* Property: External selection to associate with the terminal tab.
*/
- public static final String PROP_SELECTION = "selection"; //$NON-NLS-1$
+ public static final String PROP_SELECTION = "terminal.selection"; //$NON-NLS-1$
/**
* Property: Flag to force a new terminal tab.
*/
- public static final String PROP_FORCE_NEW = "forceNew"; //$NON-NLS-1$
+ public static final String PROP_FORCE_NEW = "terminal.forceNew"; //$NON-NLS-1$
/**
* Property: Flag to signal if the terminal tab shall have a disconnect button or not.
*/
- public static final String PROP_HAS_DISCONNECT_BUTTON = "hasDisconnectButton"; //$NON-NLS-1$
+ public static final String PROP_HAS_DISCONNECT_BUTTON = "terminal.hasDisconnectButton"; //$NON-NLS-1$
/**
* Property: Terminals launcher delegate id.
*/
- public static final String PROP_DELEGATE_ID = "delegateId"; //$NON-NLS-1$
+ public static final String PROP_DELEGATE_ID = "terminal.delegateId"; //$NON-NLS-1$
/**
* Property: Terminals connector type id.
*/
- public static final String PROP_CONNECTOR_TYPE_ID = "connector.type.id"; //$NON-NLS-1$
+ public static final String PROP_CONNECTOR_TYPE_ID = "terminal.connector.type.id"; //$NON-NLS-1$
/**
* Property: Specific terminal connector type id. Allows clients to
* override the specifically used terminal connector
* implementation for a given type.
*/
- public static final String PROP_TERMINAL_CONNECTOR_ID = "tm.terminal.connector.id"; //$NON-NLS-1$
+ public static final String PROP_TERMINAL_CONNECTOR_ID = "terminal.tm.terminal.connector.id"; //$NON-NLS-1$
// ***** Generic terminals connector properties *****
@@ -79,13 +79,13 @@ public interface ITerminalsConnectorConstants {
* connector implementation may interpret this value differently. If not
* set, the terminal connector may use a default value.
*/
- public static final String PROP_TIMEOUT = "timeout"; //$NON-NLS-1$
+ public static final String PROP_TIMEOUT = "terminal.timeout"; //$NON-NLS-1$
/**
* Property: Flag to control if a local echo is needed from the terminal widget.
* <p>Typical for process and streams terminals.
*/
- public static final String PROP_LOCAL_ECHO = "localEcho"; //$NON-NLS-1$
+ public static final String PROP_LOCAL_ECHO = "terminal.localEcho"; //$NON-NLS-1$
/**
* Property: Data flag to tell the terminal to not reconnect when hitting enter
@@ -93,25 +93,25 @@ public interface ITerminalsConnectorConstants {
* The flag can be set by adding an IPropertiesContainer with the set
* flag as PROP_DATA.
*/
- public static final String PROP_DATA_NO_RECONNECT = "data.noReconnect"; //$NON-NLS-1$
+ public static final String PROP_DATA_NO_RECONNECT = "terminal.data.noReconnect"; //$NON-NLS-1$
/**
* Property: The line separator used by the terminal input.
* <p>Typical for process and streams terminals.
*/
- public static final String PROP_LINE_SEPARATOR = "lineSeparator"; //$NON-NLS-1$
+ public static final String PROP_LINE_SEPARATOR = "terminal.lineSeparator"; //$NON-NLS-1$
/**
* Property: The list of stdout listeners to attach to the corresponding stream monitor.
* <p>Typical for process and streams terminals.
*/
- public static final String PROP_STDOUT_LISTENERS = "stdoutListeners"; //$NON-NLS-1$
+ public static final String PROP_STDOUT_LISTENERS = "terminal.stdoutListeners"; //$NON-NLS-1$
/**
* Property: The list of stderr listeners to attach to the corresponding stream monitor.
* <p>Typical for process and streams terminals.
*/
- public static final String PROP_STDERR_LISTENERS = "stderrListeners"; //$NON-NLS-1$
+ public static final String PROP_STDERR_LISTENERS = "terminal.stderrListeners"; //$NON-NLS-1$
// ***** IP based terminals connector properties *****
@@ -119,19 +119,19 @@ public interface ITerminalsConnectorConstants {
* Property: Host name or IP address the terminal server is running.
* <p>Typical for telnet or ssh terminals.
*/
- public static final String PROP_IP_HOST = "ip.host"; //$NON-NLS-1$
+ public static final String PROP_IP_HOST = "terminal.ip.host"; //$NON-NLS-1$
/**
* Property: Port at which the terminal server is providing the console input and output.
* <p>Typical for telnet or ssh terminals.
*/
- public static final String PROP_IP_PORT = "ip.port"; //$NON-NLS-1$
+ public static final String PROP_IP_PORT = "terminal.ip.port"; //$NON-NLS-1$
/**
* Property: An offset to add to the specified port number.
* <p>Typical for telnet or ssh terminals.
*/
- public static final String PROP_IP_PORT_OFFSET = "ip.port.offset"; //$NON-NLS-1$
+ public static final String PROP_IP_PORT_OFFSET = "terminal.ip.port.offset"; //$NON-NLS-1$
// ***** Process based terminals connector properties *****
@@ -139,43 +139,43 @@ public interface ITerminalsConnectorConstants {
* Property: Process image path.
* <p>Typical for process terminals.
*/
- public static final String PROP_PROCESS_PATH = "process.path"; //$NON-NLS-1$
+ public static final String PROP_PROCESS_PATH = "terminal.process.path"; //$NON-NLS-1$
/**
* Property: Process arguments.
* <p>Typical for process terminals.
*/
- public static final String PROP_PROCESS_ARGS = "process.args"; //$NON-NLS-1$
+ public static final String PROP_PROCESS_ARGS = "terminal.process.args"; //$NON-NLS-1$
/**
* Property: Process arguments.
* <p>Typical for process terminals.
*/
- public static final String PROP_PROCESS_WORKING_DIR = "process.working_dir"; //$NON-NLS-1$
+ public static final String PROP_PROCESS_WORKING_DIR = "terminal.process.working_dir"; //$NON-NLS-1$
/**
* Property: Process environment.
* <p>Typical for process terminals.
*/
- public static final String PROP_PROCESS_ENVIRONMENT = "process.environment"; //$NON-NLS-1$
+ public static final String PROP_PROCESS_ENVIRONMENT = "terminal.process.environment"; //$NON-NLS-1$
/**
* Property: Flag to merge process environment with native environment.
* <p>Typical for process terminals.
*/
- public static final String PROP_PROCESS_MERGE_ENVIRONMENT = "process.environment.merge"; //$NON-NLS-1$
+ public static final String PROP_PROCESS_MERGE_ENVIRONMENT = "terminal.process.environment.merge"; //$NON-NLS-1$
/**
* Property: Runtime process instance.
* <p>Typical for process terminals.
*/
- public static final String PROP_PROCESS_OBJ = "process"; //$NON-NLS-1$
+ public static final String PROP_PROCESS_OBJ = "terminal.process"; //$NON-NLS-1$
/**
* Property: Runtime process PTY instance.
* <p>Typical for process terminals.
*/
- public static final String PROP_PTY_OBJ = "pty"; //$NON-NLS-1$
+ public static final String PROP_PTY_OBJ = "terminal.pty"; //$NON-NLS-1$
// ***** Streams based terminals connector properties *****
@@ -183,66 +183,66 @@ public interface ITerminalsConnectorConstants {
* Property: Stdin streams instance.
* <p>Typical for streams terminals.
*/
- public static final String PROP_STREAMS_STDIN = "streams.stdin"; //$NON-NLS-1$
+ public static final String PROP_STREAMS_STDIN = "terminal.streams.stdin"; //$NON-NLS-1$
/**
* Property: Stdout streams instance.
* <p>Typical for streams terminals.
*/
- public static final String PROP_STREAMS_STDOUT = "streams.stdout"; //$NON-NLS-1$
+ public static final String PROP_STREAMS_STDOUT = "terminal.streams.stdout"; //$NON-NLS-1$
/**
* Property: Stderr streams instance.
* <p>Typical for streams terminals.
*/
- public static final String PROP_STREAMS_STDERR = "streams.stderr"; //$NON-NLS-1$
+ public static final String PROP_STREAMS_STDERR = "terminal.streams.stderr"; //$NON-NLS-1$
// ***** Ssh specific properties *****
/**
* Property: ssh keep alive value.
*/
- public static final String PROP_SSH_KEEP_ALIVE = "ssh.keep_alive"; //$NON-NLS-1$
+ public static final String PROP_SSH_KEEP_ALIVE = "terminal.ssh.keep_alive"; //$NON-NLS-1$
/**
* Property: Ssh password.
*/
- public static final String PROP_SSH_PASSWORD = "ssh.password"; //$NON-NLS-1$
+ public static final String PROP_SSH_PASSWORD = "terminal.ssh.password"; //$NON-NLS-1$
/**
* Property: Ssh user.
*/
- public static final String PROP_SSH_USER = "ssh.user"; //$NON-NLS-1$
+ public static final String PROP_SSH_USER = "terminal.ssh.user"; //$NON-NLS-1$
// ***** Serial specific properties *****
/**
* The serial device name.
*/
- public static final String PROP_SERIAL_DEVICE = "serial.device"; //$NON-NLS-1$
+ public static final String PROP_SERIAL_DEVICE = "terminal.serial.device"; //$NON-NLS-1$
/**
* The baud rate.
*/
- public static final String PROP_SERIAL_BAUD_RATE = "serial.baudrate"; //$NON-NLS-1$
+ public static final String PROP_SERIAL_BAUD_RATE = "terminal.serial.baudrate"; //$NON-NLS-1$
/**
* The data bits
*/
- public static final String PROP_SERIAL_DATA_BITS = "serial.databits"; //$NON-NLS-1$
+ public static final String PROP_SERIAL_DATA_BITS = "terminal.serial.databits"; //$NON-NLS-1$
/**
* The parity
*/
- public static final String PROP_SERIAL_PARITY = "serial.parity"; //$NON-NLS-1$
+ public static final String PROP_SERIAL_PARITY = "terminal.serial.parity"; //$NON-NLS-1$
/**
* The stop bits
*/
- public static final String PROP_SERIAL_STOP_BITS = "serial.stopbits"; //$NON-NLS-1$
+ public static final String PROP_SERIAL_STOP_BITS = "terminal.serial.stopbits"; //$NON-NLS-1$
/**
* The flow control
*/
- public static final String PROP_SERIAL_FLOW_CONTROL = "serial.flowcontrol"; //$NON-NLS-1$
+ public static final String PROP_SERIAL_FLOW_CONTROL = "terminal.serial.flowcontrol"; //$NON-NLS-1$
}
diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/interfaces/IChannelManager.java b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/interfaces/IChannelManager.java
index b1c446974..9a31534e0 100644
--- a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/interfaces/IChannelManager.java
+++ b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/interfaces/IChannelManager.java
@@ -15,6 +15,7 @@ import org.eclipse.core.runtime.IAdaptable;
import org.eclipse.core.runtime.OperationCanceledException;
import org.eclipse.tcf.protocol.IChannel;
import org.eclipse.tcf.protocol.IPeer;
+import org.eclipse.tcf.services.IStreams;
/**
* TCF channel manager public API declaration.
@@ -29,7 +30,7 @@ public interface IChannelManager extends IAdaptable {
* If not present in the flags map passed in to open channel, the default value is
* <code>false</code>.
*/
- public static final String FLAG_FORCE_NEW = "forceNew"; //$NON-NLS-1$
+ public static final String FLAG_FORCE_NEW = "channel.forceNew"; //$NON-NLS-1$
/**
* If set to <code>true</code>, a new and not reference counted channel is opened,
@@ -42,7 +43,7 @@ public interface IChannelManager extends IAdaptable {
* If not present in the flags map passed in to open channel, the default value is
* <code>false</code>.
*/
- public static final String FLAG_NO_VALUE_ADD = "noValueAdd"; //$NON-NLS-1$
+ public static final String FLAG_NO_VALUE_ADD = "channel.noValueAdd"; //$NON-NLS-1$
/**
* If set to <code>true</code>, a new and not reference counted channel is opened,
@@ -54,7 +55,7 @@ public interface IChannelManager extends IAdaptable {
* If not present in the flags map passed in to open channel, the default value is
* <code>false</code>.
*/
- public static final String FLAG_NO_PATH_MAP = "noPathMap"; //$NON-NLS-1$
+ public static final String FLAG_NO_PATH_MAP = "channel.noPathMap"; //$NON-NLS-1$
/**
* Client call back interface for openChannel(...).
@@ -143,4 +144,48 @@ public interface IChannelManager extends IAdaptable {
* the method will return immediately.
*/
public void closeAll(boolean wait);
+
+ /**
+ * Client call back interface for subscribeStream(...).
+ */
+ interface DoneSubscribeStream {
+ /**
+ * Called when subscribing to a stream type is done.
+ *
+ * @param error The error description if operation failed, <code>null</code> if succeeded.
+ */
+ void doneSubscribeStream(Throwable error);
+ }
+
+ /**
+ * Subscribe to the given stream type if not yet subscribed and register the given streams listener.
+ *
+ * @param channel The channel. Must not be <code>null</code>.
+ * @param streamType The stream source type. Must not be <code>null</code>.
+ * @param listener The streams listener. Must not be <code>null</code>.
+ * @param done The client callback. Must not be <code>null</code>.
+ */
+ public void subscribeStream(IChannel channel, String streamType, IStreams.StreamsListener listener, DoneSubscribeStream done);
+
+ /**
+ * Client call back interface for unsubscribeStream(...).
+ */
+ interface DoneUnsubscribeStream {
+ /**
+ * Called when unsubscribing a stream type is done.
+ *
+ * @param error The error description if operation failed, <code>null</code> if succeeded.
+ */
+ void doneUnsubscribeStream(Throwable error);
+ }
+
+ /**
+ * Unsubscribe from the given stream type if subscribed and unregister the given streams listener.
+ *
+ * @param channel The channel. Must not be <code>null</code>.
+ * @param streamType The stream source type. Must not be <code>null</code>.
+ * @param listener The streams listener. Must not be <code>null</code>.
+ * @param done The client callback. Must not be <code>null</code>.
+ */
+ public void unsubscribeStream(IChannel channel, String streamType, IStreams.StreamsListener listener, DoneUnsubscribeStream done);
}
diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/ChannelManager.java b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/ChannelManager.java
index 2612f79cb..7540d9448 100644
--- a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/ChannelManager.java
+++ b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/internal/ChannelManager.java
@@ -19,6 +19,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.core.runtime.Assert;
import org.eclipse.core.runtime.IStatus;
+import org.eclipse.core.runtime.ListenerList;
import org.eclipse.core.runtime.OperationCanceledException;
import org.eclipse.core.runtime.PlatformObject;
import org.eclipse.osgi.util.NLS;
@@ -29,7 +30,10 @@ import org.eclipse.tcf.protocol.IToken;
import org.eclipse.tcf.protocol.Protocol;
import org.eclipse.tcf.services.IPathMap;
import org.eclipse.tcf.services.IPathMap.PathMapRule;
+import org.eclipse.tcf.services.IStreams;
+import org.eclipse.tcf.services.IStreams.StreamsListener;
import org.eclipse.tcf.te.runtime.callback.Callback;
+import org.eclipse.tcf.te.runtime.interfaces.IDisposable;
import org.eclipse.tcf.te.runtime.services.ServiceManager;
import org.eclipse.tcf.te.tcf.core.activator.CoreBundleActivator;
import org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager;
@@ -51,6 +55,8 @@ public final class ChannelManager extends PlatformObject implements IChannelMana
/* default */ final Map<String, IChannel> channels = new HashMap<String, IChannel>();
// The map of channels opened via "forceNew" flag (needed to handle the close channel correctly)
/* default */ final List<IChannel> forcedChannels = new ArrayList<IChannel>();
+ // The map of stream listener proxies per channel
+ /* default */ final Map<IChannel, List<StreamListenerProxy>> streamProxies = new HashMap<IChannel, List<StreamListenerProxy>>();
/**
* Constructor.
@@ -1161,4 +1167,237 @@ public final class ChannelManager extends PlatformObject implements IChannelMana
done.doneChainValueAdd(e, channel);
}
}
+
+ /**
+ * Private stream listener proxy implementation.
+ */
+ private final class StreamListenerProxy implements IStreams.StreamsListener {
+ // The stream type the proxy is registered for
+ private final String streamType;
+ // The list of proxied stream listeners
+ /* default */ ListenerList listeners = new ListenerList();
+
+ /**
+ * Constructor
+ *
+ * @param The channel. Must not be <code>null</code>.
+ */
+ public StreamListenerProxy(final IChannel channel, final String streamType) {
+ Assert.isNotNull(channel);
+ Assert.isNotNull(streamType);
+
+ channel.addChannelListener(new IChannel.IChannelListener() {
+ @Override
+ public void onChannelOpened() {}
+
+ @Override
+ public void onChannelClosed(Throwable error) {
+ // Channel is closed, remove ourself
+ channel.removeChannelListener(this);
+ // Dispose all registered streams listener
+ Object[] candidates = listeners.getListeners();
+ listeners.clear();
+ for (Object listener : candidates) {
+ if (listener instanceof IDisposable) {
+ ((IDisposable)listener).dispose();
+ }
+ }
+ }
+
+ @Override
+ public void congestionLevel(int level) {
+ }
+ });
+
+ // Remember the stream type
+ this.streamType = streamType;
+ }
+
+ /**
+ * Returns the stream type the proxy is registered for.
+ *
+ * @return The stream type.
+ */
+ public String getStreamType() {
+ return streamType;
+ }
+
+ /**
+ * Adds the given streams listener to the list of proxied listeners.
+ *
+ * @param listener The streams listener. Must not be <code>null</code>.
+ */
+ public void addListener(IStreams.StreamsListener listener) {
+ Assert.isNotNull(listener);
+ listeners.add(listener);
+ }
+
+ /**
+ * Removes the given streams listener from the list of proxied listeners.
+ *
+ * @param listener The streams listener. Must not be <code>null</code>.
+ */
+ public void removeListener(IStreams.StreamsListener listener) {
+ Assert.isNotNull(listener);
+ listeners.remove(listener);
+ }
+
+ /**
+ * Returns if the proxied listeners list is empty or not.
+ *
+ * @return <code>True</code> if the list is empty, <code>false</code> otherwise.
+ */
+ public boolean isEmpty() {
+ return listeners.isEmpty();
+ }
+
+ /* (non-Javadoc)
+ * @see org.eclipse.tcf.services.IStreams.StreamsListener#created(java.lang.String, java.lang.String, java.lang.String)
+ */
+ @Override
+ public void created(String stream_type, String stream_id, String context_id) {
+ for (Object l : listeners.getListeners()) {
+ ((IStreams.StreamsListener)l).created(stream_type, stream_id, context_id);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.eclipse.tcf.services.IStreams.StreamsListener#disposed(java.lang.String, java.lang.String)
+ */
+ @Override
+ public void disposed(String stream_type, String stream_id) {
+ for (Object l : listeners.getListeners()) {
+ ((IStreams.StreamsListener)l).disposed(stream_type, stream_id);
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager#subscribeStream(org.eclipse.tcf.protocol.IChannel, java.lang.String, org.eclipse.tcf.services.IStreams.StreamsListener, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.DoneSubscribeStream)
+ */
+ @Override
+ public void subscribeStream(final IChannel channel, final String streamType, final StreamsListener listener, final DoneSubscribeStream done) {
+ Assert.isNotNull(channel);
+ Assert.isNotNull(streamType);
+ Assert.isNotNull(listener);
+ Assert.isNotNull(done);
+
+ if (channel.getState() != IChannel.STATE_OPEN) {
+ done.doneSubscribeStream(new Exception(Messages.ChannelManager_stream_closed_message));
+ return;
+ }
+
+ StreamListenerProxy proxy = null;
+
+ // Get all the streams listener proxy instance for the given channel
+ List<StreamListenerProxy> proxies = streamProxies.get(channel);
+ // Loop the proxies and find the one for the given stream type
+ if (proxies != null) {
+ for (StreamListenerProxy candidate : proxies) {
+ if (streamType.equals(candidate.getStreamType())) {
+ proxy = candidate;
+ break;
+ }
+ }
+ }
+
+ // If the proxy already exist, add the listener to the proxy and return immediately
+ if (proxy != null) {
+ proxy.addListener(listener);
+ done.doneSubscribeStream(null);
+ } else {
+ // No proxy yet -> subscribe to the stream type for real and register the proxy
+ proxy = new StreamListenerProxy(channel, streamType);
+ if (proxies == null) {
+ proxies = new ArrayList<StreamListenerProxy>();
+ streamProxies.put(channel, proxies);
+ }
+ proxies.add(proxy);
+ proxy.addListener(listener);
+
+ IStreams service = channel.getRemoteService(IStreams.class);
+ if (service != null) {
+ final StreamListenerProxy finProxy = proxy;
+ final List<StreamListenerProxy> finProxies = proxies;
+
+ // Subscribe to the stream type
+ service.subscribe(streamType, proxy, new IStreams.DoneSubscribe() {
+ @Override
+ public void doneSubscribe(IToken token, Exception error) {
+ if (error != null) {
+ finProxy.removeListener(listener);
+ if (finProxy.isEmpty()) finProxies.remove(finProxy);
+ if (finProxies.isEmpty()) streamProxies.remove(channel);
+ } else {
+ finProxy.addListener(listener);
+ }
+ done.doneSubscribeStream(error);
+ }
+ });
+ } else {
+ proxy.removeListener(listener);
+ if (proxy.isEmpty()) proxies.remove(proxy);
+ if (proxies.isEmpty()) streamProxies.remove(channel);
+ done.doneSubscribeStream(new Exception(Messages.ChannelManager_stream_missing_service_message));
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager#unsubscribeStream(org.eclipse.tcf.protocol.IChannel, java.lang.String, org.eclipse.tcf.services.IStreams.StreamsListener, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.DoneUnsubscribeStream)
+ */
+ @Override
+ public void unsubscribeStream(final IChannel channel, final String streamType, final StreamsListener listener, final DoneUnsubscribeStream done) {
+ Assert.isNotNull(channel);
+ Assert.isNotNull(streamType);
+ Assert.isNotNull(listener);
+ Assert.isNotNull(done);
+
+ if (channel.getState() != IChannel.STATE_OPEN) {
+ done.doneUnsubscribeStream(new Exception(Messages.ChannelManager_stream_closed_message));
+ return;
+ }
+
+ StreamListenerProxy proxy = null;
+
+ // Get all the streams listener proxy instance for the given channel
+ List<StreamListenerProxy> proxies = streamProxies.get(channel);
+ // Loop the proxies and find the one for the given stream type
+ if (proxies != null) {
+ for (StreamListenerProxy candidate : proxies) {
+ if (streamType.equals(candidate.getStreamType())) {
+ proxy = candidate;
+ break;
+ }
+ }
+ }
+
+ if (proxy != null) {
+ // Remove the listener from the proxy
+ proxy.removeListener(listener);
+ // Are there remaining proxied listeners for this stream type?
+ if (proxy.isEmpty()) {
+ // Unregister the stream type
+ IStreams service = channel.getRemoteService(IStreams.class);
+ if (service != null) {
+ final StreamListenerProxy finProxy = proxy;
+ final List<StreamListenerProxy> finProxies = proxies;
+
+ // Unsubscribe
+ service.unsubscribe(streamType, proxy, new IStreams.DoneUnsubscribe() {
+ @Override
+ public void doneUnsubscribe(IToken token, Exception error) {
+ finProxies.remove(finProxy);
+ if (finProxies.isEmpty()) streamProxies.remove(channel);
+ done.doneUnsubscribeStream(error);
+ }
+ });
+ } else {
+ done.doneUnsubscribeStream(new Exception(Messages.ChannelManager_stream_missing_service_message));
+ }
+ }
+ }
+ }
+
+
}
diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.java b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.java
index d30ea41bf..2f0a7b418 100644
--- a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.java
+++ b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.java
@@ -89,6 +89,8 @@ public class Messages extends NLS {
public static String ChannelManager_closeChannel_message;
public static String ChannelManager_closeChannel_inuse_message;
public static String ChannelManager_closeChannel_closed_message;
+ public static String ChannelManager_stream_closed_message;
+ public static String ChannelManager_stream_missing_service_message;
public static String AbstractExternalValueAdd_error_invalidLocation;
public static String AbstractExternalValueAdd_output;
diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.properties b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.properties
index 14a1a4991..381ac934e 100644
--- a/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.properties
+++ b/target_explorer/plugins/org.eclipse.tcf.te.tcf.core/src/org/eclipse/tcf/te/tcf/core/nls/Messages.properties
@@ -30,6 +30,8 @@ ChannelManager_createPeer_new_message=New target created. Target id = {0}, isTra
ChannelManager_closeChannel_message=Request to close channel. Target id = {0}
ChannelManager_closeChannel_inuse_message=Channel not closed. Still in use. Target id = {0}, new reference count = {1}
ChannelManager_closeChannel_closed_message=Closed channel. Target id = {0}
+ChannelManager_stream_closed_message=Channel must be open but is closed.
+ChannelManager_stream_missing_service_message=Streams service not available.
AbstractExternalValueAdd_error_invalidLocation={0} executable image does not exist or is not readable.\n\nLocation: {1}
AbstractExternalValueAdd_output=Output read from value-add: {0}. Target id = {1}
diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.processes.core/src/org/eclipse/tcf/te/tcf/processes/core/interfaces/launcher/IProcessLauncher.java b/target_explorer/plugins/org.eclipse.tcf.te.tcf.processes.core/src/org/eclipse/tcf/te/tcf/processes/core/interfaces/launcher/IProcessLauncher.java
index 1e0d41981..f92ed4e6c 100644
--- a/target_explorer/plugins/org.eclipse.tcf.te.tcf.processes.core/src/org/eclipse/tcf/te/tcf/processes/core/interfaces/launcher/IProcessLauncher.java
+++ b/target_explorer/plugins/org.eclipse.tcf.te.tcf.processes.core/src/org/eclipse/tcf/te/tcf/processes/core/interfaces/launcher/IProcessLauncher.java
@@ -12,6 +12,7 @@ package org.eclipse.tcf.te.tcf.processes.core.interfaces.launcher;
import java.util.Map;
import org.eclipse.core.runtime.IAdaptable;
+import org.eclipse.tcf.protocol.IChannel;
import org.eclipse.tcf.protocol.IPeer;
import org.eclipse.tcf.te.runtime.interfaces.callback.ICallback;
import org.eclipse.tcf.te.runtime.interfaces.properties.IPropertiesContainer;
@@ -123,6 +124,13 @@ public interface IProcessLauncher extends IAdaptable {
public static String PROP_PROCESSESV1_PARAMS = "processesV1.params"; //$NON-NLS-1$
/**
+ * Property denoting the communication channel to use.
+ * <p>
+ * The property type is {@link IChannel}.
+ */
+ public static String PROP_CHANNEL = "process.channel"; //$NON-NLS-1$
+
+ /**
* Launch a remote process defined by the given launch properties at the target specified by the
* given peer.
* <p>
diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.processes.core/src/org/eclipse/tcf/te/tcf/processes/core/launcher/ProcessLauncher.java b/target_explorer/plugins/org.eclipse.tcf.te.tcf.processes.core/src/org/eclipse/tcf/te/tcf/processes/core/launcher/ProcessLauncher.java
index a271dd978..a5848daa5 100644
--- a/target_explorer/plugins/org.eclipse.tcf.te.tcf.processes.core/src/org/eclipse/tcf/te/tcf/processes/core/launcher/ProcessLauncher.java
+++ b/target_explorer/plugins/org.eclipse.tcf.te.tcf.processes.core/src/org/eclipse/tcf/te/tcf/processes/core/launcher/ProcessLauncher.java
@@ -77,6 +77,10 @@ import org.eclipse.tcf.te.tcf.processes.core.nls.Messages;
public class ProcessLauncher extends PlatformObject implements IProcessLauncher {
// The channel instance
/* default */ IChannel channel = null;
+ // Flag to signal if the channel needs to be closed on disposed
+ /* default */ boolean closeChannelOnDispose = false;
+ // Flag to signal if the channel is a private or shared channel
+ /* default */ boolean sharedChannel = false;
// The process properties instance
/* default */ IPropertiesContainer properties;
@@ -103,6 +107,9 @@ public class ProcessLauncher extends PlatformObject implements IProcessLauncher
// The active token.
IToken activeToken = null;
+ /**
+ * Message ID for error message in case the process launch failed.
+ */
public static final String PROCESS_LAUNCH_FAILED_MESSAGE = "processLaunchFailedMessage"; //$NON-NLS-1$
/**
@@ -120,7 +127,6 @@ public class ProcessLauncher extends PlatformObject implements IProcessLauncher
this.streamsProxy = streamsProxy;
}
-
/* (non-Javadoc)
* @see org.eclipse.tcf.te.tcf.processes.core.interfaces.launcher.IProcessLauncher#dispose()
*/
@@ -144,7 +150,7 @@ public class ProcessLauncher extends PlatformObject implements IProcessLauncher
protected void internalDone(Object caller, IStatus status) {
Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
// Close the channel as all disposal is done
- if (finChannel != null) {
+ if (finChannel != null && closeChannelOnDispose) {
Tcf.getChannelManager().closeChannel(finChannel);
}
}
@@ -317,7 +323,7 @@ public class ProcessLauncher extends PlatformObject implements IProcessLauncher
IStatus status = new Status(IStatus.WARNING, CoreBundleActivator.getUniqueIdentifier(), message, error);
Platform.getLog(CoreBundleActivator.getContext().getBundle()).log(status);
- // Dispose the launcher directly
+ // Dispose the launcher
dispose();
}
}
@@ -354,9 +360,19 @@ public class ProcessLauncher extends PlatformObject implements IProcessLauncher
// Remember the process properties
this.properties = properties;
+ // Check if we get the channel to use passed in by the launch properties
+ if (properties.containsKey(IProcessLauncher.PROP_CHANNEL)) {
+ IChannel c = (IChannel) properties.getProperty(IProcessLauncher.PROP_CHANNEL);
+ if (c != null && c.getState() == IChannel.STATE_OPEN) {
+ channel = c;
+ closeChannelOnDispose = false;
+ sharedChannel = true;
+ }
+ }
+
// Open a dedicated channel to the given peer
- Map<String, Boolean> flags = new HashMap<String, Boolean>();
- flags.put(IChannelManager.FLAG_FORCE_NEW, Boolean.TRUE);
+ final Map<String, Boolean> flags = new HashMap<String, Boolean>();
+ flags.put(IChannelManager.FLAG_FORCE_NEW, properties.containsKey(IChannelManager.FLAG_FORCE_NEW) ? Boolean.valueOf(properties.getBooleanProperty(IChannelManager.FLAG_FORCE_NEW)) : Boolean.TRUE);
if (channel != null && channel.getState() == IChannel.STATE_OPEN) {
Protocol.invokeLater(new Runnable() {
@Override
@@ -374,11 +390,14 @@ public class ProcessLauncher extends PlatformObject implements IProcessLauncher
public void doneOpenChannel(Throwable error, IChannel channel) {
if (error == null) {
ProcessLauncher.this.channel = channel;
+ ProcessLauncher.this.closeChannelOnDispose = true;
+ ProcessLauncher.this.sharedChannel = !flags.get(IChannelManager.FLAG_FORCE_NEW).booleanValue();
+
onChannelOpenDone(peer);
} else {
IStatus status = new Status(IStatus.ERROR, CoreBundleActivator.getUniqueIdentifier(),
- NLS.bind(Messages.ProcessLauncher_error_channelConnectFailed, peer.getID(), error.getLocalizedMessage()),
- error);
+ NLS.bind(Messages.ProcessLauncher_error_channelConnectFailed, peer.getID(), error.getLocalizedMessage()),
+ error);
invokeCallback(status, null);
}
}
@@ -490,9 +509,11 @@ public class ProcessLauncher extends PlatformObject implements IProcessLauncher
streamsListener = createStreamsListener();
// If available, we need to subscribe to the streams.
if (streamsListener != null) {
- getSvcStreams().subscribe(getSvcProcesses() instanceof IProcessesV1 ? IProcessesV1.NAME : IProcesses.NAME, streamsListener, new IStreams.DoneSubscribe() {
+ // Subscribe the streams service
+ Tcf.getChannelManager().subscribeStream(channel, getSvcProcesses() instanceof IProcessesV1 ? IProcessesV1.NAME : IProcesses.NAME, streamsListener, new IChannelManager.DoneSubscribeStream() {
+
@Override
- public void doneSubscribe(IToken token, Exception error) {
+ public void doneSubscribeStream(Throwable error) {
// In case the subscribe to the stream fails, we pass on
// the error to the user and stop the launch
if (error != null) {
@@ -524,7 +545,7 @@ public class ProcessLauncher extends PlatformObject implements IProcessLauncher
/**
* Initialize and attach the output console and/or the output file.
* <p>
- * Called from {@link IStreams#subscribe(String, org.eclipse.tcf.services.IStreams.StreamsListener, org.eclipse.tcf.services.IStreams.DoneSubscribe)}.
+ * Called from {@link IChannelManager#subscribeStream(IChannel, String, org.eclipse.tcf.services.IStreams.StreamsListener, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.DoneSubscribeStream)}
*/
protected void onSubscribeStreamsDone() {
Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
@@ -647,6 +668,11 @@ public class ProcessLauncher extends PlatformObject implements IProcessLauncher
collector.initDone();
}
+ /**
+ * Returns the message template for the process launch failed error message.
+ *
+ * @return The message template.
+ */
protected String getProcessLaunchFailedMessageTemplate() {
if (properties != null && properties.containsKey(PROCESS_LAUNCH_FAILED_MESSAGE)) {
return properties.getStringProperty(PROCESS_LAUNCH_FAILED_MESSAGE);
@@ -1031,6 +1057,15 @@ public class ProcessLauncher extends PlatformObject implements IProcessLauncher
}
/**
+ * Returns if the channel is a private or shared channel.
+ *
+ * @return <code>True</code> if the channel a shared channel, <code>false</code> otherwise.
+ */
+ public final boolean isSharedChannel() {
+ return sharedChannel;
+ }
+
+ /**
* Returns the process properties container.
*
* @return The process properties container or <code>null</code> if none.
diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.processes.core/src/org/eclipse/tcf/te/tcf/processes/core/launcher/ProcessStreamsListener.java b/target_explorer/plugins/org.eclipse.tcf.te.tcf.processes.core/src/org/eclipse/tcf/te/tcf/processes/core/launcher/ProcessStreamsListener.java
index c1881f5ab..f5058ce73 100644
--- a/target_explorer/plugins/org.eclipse.tcf.te.tcf.processes.core/src/org/eclipse/tcf/te/tcf/processes/core/launcher/ProcessStreamsListener.java
+++ b/target_explorer/plugins/org.eclipse.tcf.te.tcf.processes.core/src/org/eclipse/tcf/te/tcf/processes/core/launcher/ProcessStreamsListener.java
@@ -29,8 +29,11 @@ import org.eclipse.tcf.services.IProcessesV1;
import org.eclipse.tcf.services.IStreams;
import org.eclipse.tcf.te.runtime.callback.AsyncCallbackCollector;
import org.eclipse.tcf.te.runtime.callback.Callback;
+import org.eclipse.tcf.te.runtime.interfaces.IDisposable;
import org.eclipse.tcf.te.runtime.interfaces.callback.ICallback;
+import org.eclipse.tcf.te.tcf.core.Tcf;
import org.eclipse.tcf.te.tcf.core.async.CallbackInvocationDelegate;
+import org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager;
import org.eclipse.tcf.te.tcf.core.streams.StreamsDataProvider;
import org.eclipse.tcf.te.tcf.core.streams.StreamsDataReceiver;
import org.eclipse.tcf.te.tcf.core.util.ExceptionUtils;
@@ -43,9 +46,11 @@ import org.eclipse.tcf.util.TCFTask;
/**
* Remote process streams listener implementation.
*/
-public class ProcessStreamsListener implements IStreams.StreamsListener, IProcessContextAwareListener {
+public class ProcessStreamsListener implements IStreams.StreamsListener, IProcessContextAwareListener, IDisposable {
// The channel instance
/* default */ IChannel channel;
+ // Flag to signal if the channel is a private or shared channel
+ /* default */ boolean sharedChannel;
// The streams service instance
/* default */ IStreams svcStreams;
// The processes service name
@@ -140,7 +145,7 @@ public class ProcessStreamsListener implements IStreams.StreamsListener, IProces
*/
protected class StreamReaderRunnable implements Runnable {
// The associated stream id
- private final String streamId;
+ /* default */ final String streamId;
// The associated stream type id
private final String streamTypeId;
// The list of receivers applicable for the associated stream type id
@@ -236,6 +241,8 @@ public class ProcessStreamsListener implements IStreams.StreamsListener, IProces
// Store the callback instance
this.callback = callback;
+ // Mark the runnable as stopped
+ stopped = true;
}
/**
@@ -282,7 +289,9 @@ public class ProcessStreamsListener implements IStreams.StreamsListener, IProces
notifyReceiver(new String(streamData.data), receivers);
}
// If the end of the stream have been reached --> break out
- if (streamData.eos) break;
+ if (streamData.eos) {
+ break;
+ }
}
} catch (Exception e) {
// An error occurred -> Dump to the error log
@@ -302,18 +311,25 @@ public class ProcessStreamsListener implements IStreams.StreamsListener, IProces
// Disconnect from the stream
if (svcStreams != null) {
- svcStreams.disconnect(streamId, new IStreams.DoneDisconnect() {
+ Runnable runnable = new Runnable() {
@Override
- @SuppressWarnings("synthetic-access")
- public void doneDisconnect(IToken token, Exception error) {
- synchronized (this) {
- // Mark the runnable definitely stopped
- stopped = true;
- // Disconnect is done, ignore any error, invoke the callback
- if (callback != null) callback.done(this, Status.OK_STATUS);
- }
+ public void run() {
+ svcStreams.disconnect(streamId, new IStreams.DoneDisconnect() {
+ @Override
+ @SuppressWarnings("synthetic-access")
+ public void doneDisconnect(IToken token, Exception error) {
+ synchronized (this) {
+ // Mark the runnable definitely stopped
+ stopped = true;
+ // Disconnect is done, ignore any error, invoke the callback
+ if (callback != null) callback.done(this, Status.OK_STATUS);
+ }
+ }
+ });
}
- });
+ };
+
+ Protocol.invokeLater(runnable);
} else {
synchronized (this) {
// Mark the runnable definitely stopped
@@ -405,9 +421,9 @@ public class ProcessStreamsListener implements IStreams.StreamsListener, IProces
* runnable will be executed within a thread and is responsible to read the
* incoming data from the registered providers and forward them to the associated stream.
*/
- protected class ProcessStreamWriterRunnable implements Runnable {
+ protected class StreamWriterRunnable implements Runnable {
// The associated stream id
- private final String streamId;
+ /* default */ final String streamId;
// The associated stream type id
private final String streamTypeId;
// The data provider applicable for the associated stream type id
@@ -427,7 +443,7 @@ public class ProcessStreamsListener implements IStreams.StreamsListener, IProces
* @param streamTypeId The associated stream type id. Must not be <code>null</code>.
* @param provider The data provider. Must not be <code>null</code> and must be applicable for the stream type.
*/
- public ProcessStreamWriterRunnable(String streamId, String streamTypeId, StreamsDataProvider provider) {
+ public StreamWriterRunnable(String streamId, String streamTypeId, StreamsDataProvider provider) {
Assert.isNotNull(streamId);
Assert.isNotNull(streamTypeId);
Assert.isNotNull(provider);
@@ -555,18 +571,30 @@ public class ProcessStreamsListener implements IStreams.StreamsListener, IProces
// Disconnect from the stream
if (svcStreams != null) {
- svcStreams.disconnect(streamId, new IStreams.DoneDisconnect() {
+ Runnable runnable = new Runnable() {
@Override
- @SuppressWarnings("synthetic-access")
- public void doneDisconnect(IToken token, Exception error) {
- synchronized (this) {
- // Mark the runnable definitely stopped
- stopped = true;
- }
- // Disconnect is done, ignore any error, invoke the callback
- if (getCallback() != null) getCallback().done(this, Status.OK_STATUS);
+ public void run() {
+ svcStreams.eos(streamId, new IStreams.DoneEOS() {
+ @Override
+ public void doneEOS(IToken token, Exception error) {
+ svcStreams.disconnect(streamId, new IStreams.DoneDisconnect() {
+ @Override
+ @SuppressWarnings("synthetic-access")
+ public void doneDisconnect(IToken token, Exception error) {
+ synchronized (this) {
+ // Mark the runnable definitely stopped
+ stopped = true;
+ }
+ // Disconnect is done, ignore any error, invoke the callback
+ if (getCallback() != null) getCallback().done(this, Status.OK_STATUS);
+ }
+ });
+ }
+ });
}
- });
+ };
+
+ Protocol.invokeLater(runnable);
} else {
synchronized (this) {
// Mark the runnable definitely stopped
@@ -626,10 +654,19 @@ public class ProcessStreamsListener implements IStreams.StreamsListener, IProces
public ProcessStreamsListener(ProcessLauncher parent) {
Assert.isNotNull(parent);
this.channel = parent.getChannel();
+ this.sharedChannel = parent.isSharedChannel();
this.svcStreams = parent.getSvcStreams();
this.svcProcessesName = parent.getSvcProcesses() instanceof IProcessesV1 ? IProcessesV1.NAME : IProcesses.NAME;
}
+ /* (non-Javadoc)
+ * @see org.eclipse.tcf.te.runtime.interfaces.IDisposable#dispose()
+ */
+ @Override
+ public void dispose() {
+ dispose(null);
+ }
+
/**
* Dispose the streams listener instance.
*
@@ -655,9 +692,9 @@ public class ProcessStreamsListener implements IStreams.StreamsListener, IProces
protected void internalDone(final Object caller, final IStatus status) {
Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
// Unsubscribe the streams listener from the service
- svcStreams.unsubscribe(svcProcessesName, finStreamsListener, new IStreams.DoneUnsubscribe() {
+ Tcf.getChannelManager().unsubscribeStream(channel, svcProcessesName, finStreamsListener, new IChannelManager.DoneUnsubscribeStream() {
@Override
- public void doneUnsubscribe(IToken token, Exception error) {
+ public void doneUnsubscribeStream(Throwable error) {
// Loop all registered listeners and close them
for (StreamsDataReceiver receiver : finDataReceivers) receiver.dispose();
// Call the original outer callback
@@ -673,6 +710,9 @@ public class ProcessStreamsListener implements IStreams.StreamsListener, IProces
if (runnable instanceof StreamReaderRunnable) {
((StreamReaderRunnable)runnable).stop(new AsyncCallbackCollector.SimpleCollectorCallback(collector));
}
+ if (runnable instanceof StreamWriterRunnable) {
+ ((StreamWriterRunnable)runnable).stop(new AsyncCallbackCollector.SimpleCollectorCallback(collector));
+ }
}
runnables.clear();
}
@@ -737,14 +777,26 @@ public class ProcessStreamsListener implements IStreams.StreamsListener, IProces
IStatus.INFO, getClass());
}
- // Loop all delayed create events and look for the streams for our context
+ // Loop all delayed create events
synchronized (delayedCreatedEvents) {
Iterator<StreamCreatedEvent> iterator = delayedCreatedEvents.iterator();
while (iterator.hasNext()) {
- StreamCreatedEvent event = iterator.next();
+ final StreamCreatedEvent event = iterator.next();
+ // If the created event matches the process context id, re-dispatch the created event
if (context.getID().equals(event.contextId) || event.contextId == null) {
- // Re-dispatch the event
created(event.streamType, event.streamId, event.contextId);
+ } else if (!sharedChannel) {
+ // Disconnect from streams not matching the process context id
+ svcStreams.disconnect(event.streamId, new IStreams.DoneDisconnect() {
+ @Override
+ public void doneDisconnect(IToken token, Exception error) {
+ if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_STREAMS_LISTENER)) {
+ CoreBundleActivator.getTraceHandler().trace("Remote process stream disconnected (different context): streamId='" + event.streamId + "'", //$NON-NLS-1$ //$NON-NLS-2$
+ 0, ITraceIds.TRACE_STREAMS_LISTENER,
+ IStatus.INFO, getClass());
+ }
+ }
+ });
}
}
// Clear all events
@@ -764,7 +816,7 @@ public class ProcessStreamsListener implements IStreams.StreamsListener, IProces
* @see org.eclipse.tcf.services.IStreams.StreamsListener#created(java.lang.String, java.lang.String, java.lang.String)
*/
@Override
- public void created(String streamType, String streamId, String contextId) {
+ public void created(final String streamType, final String streamId, final String contextId) {
// We ignore any other stream type than the associated process service name
if (!svcProcessesName.equals(streamType)) return;
@@ -790,7 +842,7 @@ public class ProcessStreamsListener implements IStreams.StreamsListener, IProces
// Data provider set?
if (dataProvider != null) {
// Create the stdin stream writer runnable
- ProcessStreamWriterRunnable runnable = new ProcessStreamWriterRunnable(streamId, IProcesses.PROP_STDIN_ID, dataProvider);
+ StreamWriterRunnable runnable = new StreamWriterRunnable(streamId, IProcesses.PROP_STDIN_ID, dataProvider);
// Add to the list of created runnable's
synchronized (runnables) { runnables.add(runnable); }
// And create and start the thread
@@ -822,6 +874,19 @@ public class ProcessStreamsListener implements IStreams.StreamsListener, IProces
thread.start();
}
}
+ } else if (context != null && !context.getID().equals(contextId) && !sharedChannel) {
+ // Streams created event received for a context which is not the
+ // one we are interested in. Send a disconnect for those streams.
+ svcStreams.disconnect(streamId, new IStreams.DoneDisconnect() {
+ @Override
+ public void doneDisconnect(IToken token, Exception error) {
+ if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_STREAMS_LISTENER)) {
+ CoreBundleActivator.getTraceHandler().trace("Remote process stream disconnected (different context): streamId='" + streamId + "'", //$NON-NLS-1$ //$NON-NLS-2$
+ 0, ITraceIds.TRACE_STREAMS_LISTENER,
+ IStatus.INFO, getClass());
+ }
+ }
+ });
} else if (context == null) {
// Context not set yet --> add to the delayed list
StreamCreatedEvent event = new StreamCreatedEvent(streamType, streamId, contextId);
diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.terminals.core/src/org/eclipse/tcf/te/tcf/terminals/core/interfaces/launcher/ITerminalsLauncher.java b/target_explorer/plugins/org.eclipse.tcf.te.tcf.terminals.core/src/org/eclipse/tcf/te/tcf/terminals/core/interfaces/launcher/ITerminalsLauncher.java
index 7d5fc33b3..ab19a81eb 100644
--- a/target_explorer/plugins/org.eclipse.tcf.te.tcf.terminals.core/src/org/eclipse/tcf/te/tcf/terminals/core/interfaces/launcher/ITerminalsLauncher.java
+++ b/target_explorer/plugins/org.eclipse.tcf.te.tcf.terminals.core/src/org/eclipse/tcf/te/tcf/terminals/core/interfaces/launcher/ITerminalsLauncher.java
@@ -12,6 +12,7 @@ package org.eclipse.tcf.te.tcf.terminals.core.interfaces.launcher;
import java.util.Map;
import org.eclipse.core.runtime.IAdaptable;
+import org.eclipse.tcf.protocol.IChannel;
import org.eclipse.tcf.protocol.IPeer;
import org.eclipse.tcf.te.runtime.interfaces.callback.ICallback;
import org.eclipse.tcf.te.runtime.interfaces.properties.IPropertiesContainer;
@@ -59,6 +60,13 @@ public interface ITerminalsLauncher extends IAdaptable {
public static String PROP_CONNECTION_NAME = "connection.name"; //$NON-NLS-1$
/**
+ * Property denoting the communication channel to use.
+ * <p>
+ * The property type is {@link IChannel}.
+ */
+ public static String PROP_CHANNEL = "terminal.channel"; //$NON-NLS-1$
+
+ /**
* Launch a remote terminal defined by the given launch properties at the target specified by the
* given peer.
*
diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.terminals.core/src/org/eclipse/tcf/te/tcf/terminals/core/launcher/TerminalsLauncher.java b/target_explorer/plugins/org.eclipse.tcf.te.tcf.terminals.core/src/org/eclipse/tcf/te/tcf/terminals/core/launcher/TerminalsLauncher.java
index 39f5f8b9f..32c24b2a3 100644
--- a/target_explorer/plugins/org.eclipse.tcf.te.tcf.terminals.core/src/org/eclipse/tcf/te/tcf/terminals/core/launcher/TerminalsLauncher.java
+++ b/target_explorer/plugins/org.eclipse.tcf.te.tcf.terminals.core/src/org/eclipse/tcf/te/tcf/terminals/core/launcher/TerminalsLauncher.java
@@ -71,6 +71,10 @@ import org.eclipse.tcf.te.tcf.terminals.core.nls.Messages;
public class TerminalsLauncher extends PlatformObject implements ITerminalsLauncher {
// The channel instance
/* default */ IChannel channel;
+ // Flag to signal if the channel needs to be closed on disposed
+ /* default */ boolean closeChannelOnDispose = false;
+ // Flag to signal if the channel is a private or shared channel
+ /* default */ boolean sharedChannel = false;
// The terminals properties instance
private IPropertiesContainer properties;
@@ -121,7 +125,9 @@ public class TerminalsLauncher extends PlatformObject implements ITerminalsLaunc
protected void internalDone(Object caller, IStatus status) {
Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
// Close the channel as all disposal is done
- if (finChannel != null) Tcf.getChannelManager().closeChannel(finChannel);
+ if (finChannel != null && closeChannelOnDispose) {
+ Tcf.getChannelManager().closeChannel(finChannel);
+ }
}
}, new CallbackInvocationDelegate());
@@ -248,9 +254,19 @@ public class TerminalsLauncher extends PlatformObject implements ITerminalsLaunc
// Remember the terminal properties
this.properties = properties;
+ // Check if we get the channel to use passed in by the launch properties
+ if (properties.containsKey(ITerminalsLauncher.PROP_CHANNEL)) {
+ IChannel c = (IChannel) properties.getProperty(ITerminalsLauncher.PROP_CHANNEL);
+ if (c != null && c.getState() == IChannel.STATE_OPEN) {
+ channel = c;
+ closeChannelOnDispose = false;
+ sharedChannel = true;
+ }
+ }
+
// Open a dedicated channel to the given peer
- Map<String, Boolean> flags = new HashMap<String, Boolean>();
- flags.put(IChannelManager.FLAG_FORCE_NEW, Boolean.TRUE);
+ final Map<String, Boolean> flags = new HashMap<String, Boolean>();
+ flags.put(IChannelManager.FLAG_FORCE_NEW, properties.containsKey(IChannelManager.FLAG_FORCE_NEW) ? Boolean.valueOf(properties.getBooleanProperty(IChannelManager.FLAG_FORCE_NEW)) : Boolean.TRUE);
Tcf.getChannelManager().openChannel(peer, flags, new IChannelManager.DoneOpenChannel() {
/* (non-Javadoc)
* @see org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.DoneOpenChannel#doneOpenChannel(java.lang.Throwable, org.eclipse.tcf.protocol.IChannel)
@@ -259,6 +275,8 @@ public class TerminalsLauncher extends PlatformObject implements ITerminalsLaunc
public void doneOpenChannel(Throwable error, IChannel channel) {
if (error == null) {
TerminalsLauncher.this.channel = channel;
+ TerminalsLauncher.this.closeChannelOnDispose = true;
+ TerminalsLauncher.this.sharedChannel = !flags.get(IChannelManager.FLAG_FORCE_NEW).booleanValue();
// Attach a channel listener so we can dispose ourself if the channel
// is closed from the remote side.
@@ -350,15 +368,15 @@ public class TerminalsLauncher extends PlatformObject implements ITerminalsLaunc
streamsListener = createStreamsListener();
// If available, we need to subscribe to the streams.
if (streamsListener != null) {
- getSvcStreams().subscribe(ITerminals.NAME, streamsListener, new IStreams.DoneSubscribe() {
+ Tcf.getChannelManager().subscribeStream(channel, ITerminals.NAME, streamsListener, new IChannelManager.DoneSubscribeStream() {
@Override
- public void doneSubscribe(IToken token, Exception error) {
+ public void doneSubscribeStream(Throwable error) {
// In case the subscribe to the stream fails, we pass on
// the error to the user and stop the launch
if (error != null) {
// Construct the error message to show to the user
String message = NLS.bind(Messages.TerminalsLauncher_error_terminalLaunchFailed,
- properties.getStringProperty(ITerminalsLauncher.PROP_CONNECTION_NAME));
+ properties.getStringProperty(ITerminalsLauncher.PROP_CONNECTION_NAME));
message += NLS.bind(Messages.TerminalsLauncher_error_possibleCause, Messages.TerminalsLauncher_cause_subscribeFailed);
// Construct the status object
@@ -379,7 +397,7 @@ public class TerminalsLauncher extends PlatformObject implements ITerminalsLaunc
/**
* Initialize and attach the output console and/or the output file.
* <p>
- * Called from {@link IStreams#subscribe(String, org.eclipse.tcf.services.IStreams.StreamsListener, org.eclipse.tcf.services.IStreams.DoneSubscribe)}.
+ * Called from {@link IChannelManager#subscribeStream(IChannel, String, org.eclipse.tcf.services.IStreams.StreamsListener, org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager.DoneSubscribeStream)}
*/
protected void onSubscribeStreamsDone() {
// Get the properties container
@@ -703,6 +721,15 @@ public class TerminalsLauncher extends PlatformObject implements ITerminalsLaunc
}
/**
+ * Returns if the channel is a private or shared channel.
+ *
+ * @return <code>True</code> if the channel a shared channel, <code>false</code> otherwise.
+ */
+ public final boolean isSharedChannel() {
+ return sharedChannel;
+ }
+
+ /**
* Returns the terminals properties container.
*
* @return The terminals properties container or <code>null</code> if none.
diff --git a/target_explorer/plugins/org.eclipse.tcf.te.tcf.terminals.core/src/org/eclipse/tcf/te/tcf/terminals/core/launcher/TerminalsStreamsListener.java b/target_explorer/plugins/org.eclipse.tcf.te.tcf.terminals.core/src/org/eclipse/tcf/te/tcf/terminals/core/launcher/TerminalsStreamsListener.java
index bd84f7593..46b691cb4 100644
--- a/target_explorer/plugins/org.eclipse.tcf.te.tcf.terminals.core/src/org/eclipse/tcf/te/tcf/terminals/core/launcher/TerminalsStreamsListener.java
+++ b/target_explorer/plugins/org.eclipse.tcf.te.tcf.terminals.core/src/org/eclipse/tcf/te/tcf/terminals/core/launcher/TerminalsStreamsListener.java
@@ -29,7 +29,9 @@ import org.eclipse.tcf.services.ITerminals.TerminalContext;
import org.eclipse.tcf.te.runtime.callback.AsyncCallbackCollector;
import org.eclipse.tcf.te.runtime.callback.Callback;
import org.eclipse.tcf.te.runtime.interfaces.callback.ICallback;
+import org.eclipse.tcf.te.tcf.core.Tcf;
import org.eclipse.tcf.te.tcf.core.async.CallbackInvocationDelegate;
+import org.eclipse.tcf.te.tcf.core.interfaces.IChannelManager;
import org.eclipse.tcf.te.tcf.core.streams.StreamsDataProvider;
import org.eclipse.tcf.te.tcf.core.streams.StreamsDataReceiver;
import org.eclipse.tcf.te.tcf.core.util.ExceptionUtils;
@@ -44,7 +46,7 @@ import org.eclipse.tcf.util.TCFTask;
*/
public class TerminalsStreamsListener implements IStreams.StreamsListener, ITerminalsContextAwareListener {
// The parent terminals launcher instance
- private final TerminalsLauncher parent;
+ /* default */ final TerminalsLauncher parent;
// The remote terminal context
private ITerminals.TerminalContext context;
// The list of registered stream data receivers
@@ -371,9 +373,8 @@ public class TerminalsStreamsListener implements IStreams.StreamsListener, ITerm
protected final void disconnect(final IStreams service, final String streamId) {
Assert.isNotNull(service);
Assert.isNotNull(streamId);
- Assert.isTrue(!Protocol.isDispatchThread());
- Protocol.invokeAndWait(new Runnable() {
+ Protocol.invokeLater(new Runnable() {
@Override
public void run() {
service.disconnect(streamId, new IStreams.DoneDisconnect() {
@@ -639,9 +640,8 @@ public class TerminalsStreamsListener implements IStreams.StreamsListener, ITerm
protected final void disconnect(final IStreams service, final String streamId) {
Assert.isNotNull(service);
Assert.isNotNull(streamId);
- Assert.isTrue(!Protocol.isDispatchThread());
- Protocol.invokeAndWait(new Runnable() {
+ Protocol.invokeLater(new Runnable() {
@Override
public void run() {
// Write EOS first
@@ -711,12 +711,10 @@ public class TerminalsStreamsListener implements IStreams.StreamsListener, ITerm
@Override
protected void internalDone(final Object caller, final IStatus status) {
Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
- // Get the service instance from the parent
- IStreams svcStreams = getParent().getSvcStreams();
// Unsubscribe the streams listener from the service
- svcStreams.unsubscribe(ITerminals.NAME, finStreamsListener, new IStreams.DoneUnsubscribe() {
+ Tcf.getChannelManager().unsubscribeStream(parent.getChannel(), ITerminals.NAME, finStreamsListener, new IChannelManager.DoneUnsubscribeStream() {
@Override
- public void doneUnsubscribe(IToken token, Exception error) {
+ public void doneUnsubscribeStream(Throwable error) {
// Loop all registered listeners and close them
for (StreamsDataReceiver receiver : finDataReceivers) receiver.dispose();
// Call the original outer callback
@@ -796,14 +794,26 @@ public class TerminalsStreamsListener implements IStreams.StreamsListener, ITerm
IStatus.INFO, getClass());
}
- // Loop all delayed create events and look for the streams for our context
+ // Loop all delayed create events
synchronized (delayedCreatedEvents) {
Iterator<StreamCreatedEvent> iterator = delayedCreatedEvents.iterator();
while (iterator.hasNext()) {
- StreamCreatedEvent event = iterator.next();
+ final StreamCreatedEvent event = iterator.next();
+ // If the created event matches the process context id, re-dispatch the created event
if (context.getID().equals(event.contextId) || context.getProcessID().equals(event.contextId) || event.contextId == null) {
- // Re-dispatch the event
created(event.streamType, event.streamId, event.contextId);
+ } else if (!parent.isSharedChannel()) {
+ // Disconnect from streams not matching the process context id
+ parent.getSvcStreams().disconnect(event.streamId, new IStreams.DoneDisconnect() {
+ @Override
+ public void doneDisconnect(IToken token, Exception error) {
+ if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_STREAMS_LISTENER)) {
+ CoreBundleActivator.getTraceHandler().trace("Remote terminals stream disconnected (different context): streamId='" + event.streamId + "'", //$NON-NLS-1$ //$NON-NLS-2$
+ 0, ITraceIds.TRACE_STREAMS_LISTENER,
+ IStatus.INFO, getClass());
+ }
+ }
+ });
}
}
// Clear all events
@@ -823,7 +833,7 @@ public class TerminalsStreamsListener implements IStreams.StreamsListener, ITerm
* @see org.eclipse.tcf.services.IStreams.StreamsListener#created(java.lang.String, java.lang.String, java.lang.String)
*/
@Override
- public void created(String streamType, String streamId, String contextId) {
+ public void created(final String streamType, final String streamId, final String contextId) {
// We ignore any other stream type than ITerminals.NAME
if (!ITerminals.NAME.equals(streamType)) return;
@@ -886,6 +896,19 @@ public class TerminalsStreamsListener implements IStreams.StreamsListener, ITerm
thread.start();
}
}
+ } else if (context != null && !context.getID().equals(contextId) && !context.getProcessID().equals(contextId) && !parent.isSharedChannel()) {
+ // Streams created event received for a context which is not the
+ // one we are interested in. Send a disconnect for those streams.
+ parent.getSvcStreams().disconnect(streamId, new IStreams.DoneDisconnect() {
+ @Override
+ public void doneDisconnect(IToken token, Exception error) {
+ if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_STREAMS_LISTENER)) {
+ CoreBundleActivator.getTraceHandler().trace("Remote terminals stream disconnected (different context): streamId='" + streamId + "'", //$NON-NLS-1$ //$NON-NLS-2$
+ 0, ITraceIds.TRACE_STREAMS_LISTENER,
+ IStatus.INFO, getClass());
+ }
+ }
+ });
} else if (context == null) {
// Context not set yet --> add to the delayed list
StreamCreatedEvent event = new StreamCreatedEvent(streamType, streamId, contextId);

Back to the top