Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'target_explorer/plugins/org.eclipse.tm.te.tcf.processes.core/src/org/eclipse/tm/te/tcf/processes/core/launcher/ProcessStreamsListener.java')
-rw-r--r--target_explorer/plugins/org.eclipse.tm.te.tcf.processes.core/src/org/eclipse/tm/te/tcf/processes/core/launcher/ProcessStreamsListener.java880
1 files changed, 880 insertions, 0 deletions
diff --git a/target_explorer/plugins/org.eclipse.tm.te.tcf.processes.core/src/org/eclipse/tm/te/tcf/processes/core/launcher/ProcessStreamsListener.java b/target_explorer/plugins/org.eclipse.tm.te.tcf.processes.core/src/org/eclipse/tm/te/tcf/processes/core/launcher/ProcessStreamsListener.java
new file mode 100644
index 000000000..8624f34c2
--- /dev/null
+++ b/target_explorer/plugins/org.eclipse.tm.te.tcf.processes.core/src/org/eclipse/tm/te/tcf/processes/core/launcher/ProcessStreamsListener.java
@@ -0,0 +1,880 @@
+/*******************************************************************************
+ * Copyright (c) 2011 Wind River Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the terms
+ * of the Eclipse Public License v1.0 which accompanies this distribution, and is
+ * available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Wind River Systems - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.tm.te.tcf.processes.core.launcher;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+
+import org.eclipse.core.runtime.Assert;
+import org.eclipse.core.runtime.IStatus;
+import org.eclipse.core.runtime.Platform;
+import org.eclipse.core.runtime.Status;
+import org.eclipse.osgi.util.NLS;
+import org.eclipse.tm.tcf.protocol.IToken;
+import org.eclipse.tm.tcf.protocol.Protocol;
+import org.eclipse.tm.tcf.services.IProcesses;
+import org.eclipse.tm.tcf.services.IStreams;
+import org.eclipse.tm.tcf.services.IStreams.StreamsListener;
+import org.eclipse.tm.tcf.util.TCFTask;
+import org.eclipse.tm.te.core.async.AsyncCallbackCollector;
+import org.eclipse.tm.te.runtime.callback.Callback;
+import org.eclipse.tm.te.runtime.interfaces.callback.ICallback;
+import org.eclipse.tm.te.tcf.core.utils.ExceptionUtils;
+import org.eclipse.tm.te.tcf.processes.core.activator.CoreBundleActivator;
+import org.eclipse.tm.te.tcf.processes.core.interfaces.launcher.IProcessContextAwareListener;
+import org.eclipse.tm.te.tcf.processes.core.internal.tracing.ITraceIds;
+import org.eclipse.tm.te.tcf.processes.core.nls.Messages;
+
+/**
+ * Remote process streams listener implementation.
+ */
+public class ProcessStreamsListener implements StreamsListener, IProcessContextAwareListener {
+ // The parent process launcher instance
+ private final ProcessLauncher parent;
+ // The remote process context
+ private IProcesses.ProcessContext context;
+ // The list of registered stream data receivers
+ private final List<ProcessStreamsDataReceiver> dataReceiver = new ArrayList<ProcessStreamsDataReceiver>();
+ // The stream data provider
+ private ProcessStreamsDataProvider dataProvider;
+ // The list of delayed stream created events
+ private final List<StreamCreatedEvent> delayedCreatedEvents = new ArrayList<StreamCreatedEvent>();
+ // The list of created runnable's
+ private final List<Runnable> runnables = new ArrayList<Runnable>();
+
+ /**
+ * Immutable stream created event.
+ */
+ private final static class StreamCreatedEvent {
+ /**
+ * The stream type.
+ */
+ public final String streamType;
+ /**
+ * The stream id.
+ */
+ public final String streamId;
+ /**
+ * The context id.
+ */
+ public final String contextId;
+
+ // As the class is immutable, we do not need to build the toString
+ // value again and again. Build it once in the constructor and reuse it later.
+ private final String toString;
+
+ /**
+ * Constructor.
+ *
+ * @param streamType The stream type.
+ * @param streamId The stream id.
+ * @param contextId The context id.
+ */
+ public StreamCreatedEvent(String streamType, String streamId, String contextId) {
+ this.streamType = streamType;
+ this.streamId = streamId;
+ this.contextId = contextId;
+
+ toString = toString();
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof StreamCreatedEvent
+ && toString().equals(((StreamCreatedEvent)obj).toString());
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ if (toString != null) return toString;
+
+ StringBuilder builder = new StringBuilder(getClass().getSimpleName());
+ builder.append(": streamType = "); //$NON-NLS-1$
+ builder.append(streamType);
+ builder.append("; streamId = "); //$NON-NLS-1$
+ builder.append(streamId);
+ builder.append("; contextId = "); //$NON-NLS-1$
+ builder.append(contextId);
+
+ return builder.toString();
+ }
+ }
+
+ /**
+ * Remote process stream reader runnable implementation. The
+ * runnable will be executed within a thread and is responsible to read the
+ * incoming data from the associated stream and forward them to the registered receivers.
+ */
+ protected class StreamReaderRunnable implements Runnable {
+ // The associated stream id
+ private final String streamId;
+ // The associated stream type id
+ private final String streamTypeId;
+ // The list of receivers applicable for the associated stream type id
+ private final List<ProcessStreamsDataReceiver> receivers = new ArrayList<ProcessStreamsDataReceiver>();
+ // The currently active read task
+ private TCFTask<ReadData> activeTask;
+ // The callback to invoke if the runnable stopped
+ private ICallback callback;
+
+ // Flag to stop the runnable
+ private boolean stopped = false;
+
+ /**
+ * Immutable class describing the result returned by {@link StreamReaderRunnable#read(IStreams, String, int)}.
+ */
+ protected class ReadData {
+ /**
+ * The number of lost bytes in case of a buffer overflow. If <code>-1</code>,
+ * an unknown number of bytes were lost. If non-zero and <code>data.length</code> is
+ * non-zero, the lost bytes are considered located right before the read bytes.
+ */
+ public final int lostBytes;
+ /**
+ * The read data as byte array.
+ */
+ public final byte[] data;
+ /**
+ * Flag to signal if the end of the stream has been reached.
+ */
+ public final boolean eos;
+
+ /**
+ * Constructor.
+ */
+ public ReadData(int lostBytes, byte[] data, boolean eos) {
+ this.lostBytes = lostBytes;
+ this.data = data;
+ this.eos = eos;
+ }
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param streamId The associated stream id. Must not be <code>null</code>.
+ * @param streamTypeId The associated stream type id. Must not be <code>null</code>.
+ * @param receivers The list of registered data receivers. Must not be <code>null</code>.
+ */
+ public StreamReaderRunnable(String streamId, String streamTypeId, ProcessStreamsDataReceiver[] receivers) {
+ Assert.isNotNull(streamId);
+ Assert.isNotNull(streamTypeId);
+ Assert.isNotNull(receivers);
+
+ this.streamId = streamId;
+ this.streamTypeId = streamTypeId;
+
+ // Loop the list of receivers and filter out the applicable ones
+ for (ProcessStreamsDataReceiver receiver : receivers) {
+ if (receiver.isApplicable(this.streamTypeId))
+ this.receivers.add(receiver);
+ }
+ }
+
+ /**
+ * Returns the associated stream id.
+ *
+ * @return The associated stream id.
+ */
+ public final String getStreamId() {
+ return streamId;
+ }
+
+ /**
+ * Returns if or if not the list of applicable receivers is empty.
+ *
+ * @return <code>True</code> if the list of applicable receivers is empty, <code>false</code> otherwise.
+ */
+ public final boolean isEmpty() {
+ return receivers.isEmpty();
+ }
+
+ /**
+ * Stop the runnable.
+ *
+ * @param callback The callback to invoke if the runnable stopped.
+ */
+ public final synchronized void stop(ICallback callback) {
+ // If the runnable is stopped already, invoke the callback directly
+ if (stopped) {
+ if (callback != null) callback.done(this, Status.OK_STATUS);
+ return;
+ }
+
+ // Store the callback instance
+ this.callback = callback;
+ // Mark the runnable as stopped
+ stopped = true;
+ }
+
+ /**
+ * Returns if the runnable should stop.
+ */
+ protected final synchronized boolean isStopped() {
+ return stopped;
+ }
+
+ /**
+ * Sets the currently active reader task.
+ *
+ * @param task The currently active reader task or <code>null</code>.
+ */
+ protected final synchronized void setActiveTask(TCFTask<ReadData> task) {
+ activeTask = task;
+ }
+
+ /**
+ * Returns the currently active reader task.
+ *
+ * @return The currently active reader task or <code>null</code>.
+ */
+ protected final TCFTask<ReadData> getActiveTask() {
+ return activeTask;
+ }
+
+ /**
+ * Returns the callback instance to invoke.
+ *
+ * @return The callback instance or <code>null</code>.
+ */
+ protected final ICallback getCallback() {
+ return callback;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ // Create a snapshot of the receivers
+ final ProcessStreamsDataReceiver[] receivers = this.receivers.toArray(new ProcessStreamsDataReceiver[this.receivers.size()]);
+ // Get the service instance from the parent
+ final IStreams svcStreams = getParent().getSvcStreams();
+
+ // Run until stopped and the streams service is available
+ while (!isStopped() && svcStreams != null) {
+ try {
+ ReadData streamData = read(svcStreams, streamId, 1024);
+ if (streamData != null) {
+ // Check if the received data contains some stream data
+ if (streamData.data != null) {
+ // Notify the data receivers about the new received data
+ notifyReceiver(new String(streamData.data), receivers);
+ }
+ // If the end of the stream have been reached --> break out
+ if (streamData.eos) break;
+ }
+ } catch (Exception e) {
+ // An error occurred -> Dump to the error log
+ e = ExceptionUtils.checkAndUnwrapException(e);
+ // Check if the blocking read task got canceled
+ if (!(e instanceof CancellationException)) {
+ // Log the error to the user, might be something serious
+ IStatus status = new Status(IStatus.ERROR, CoreBundleActivator.getUniqueIdentifier(),
+ NLS.bind(Messages.ProcessStreamReaderRunnable_error_readFailed, streamId, e.getLocalizedMessage()),
+ e);
+ Platform.getLog(CoreBundleActivator.getContext().getBundle()).log(status);
+ }
+ // break out of the loop
+ break;
+ }
+ }
+
+ // Disconnect from the stream
+ if (svcStreams != null) {
+ svcStreams.disconnect(streamId, new IStreams.DoneDisconnect() {
+ @Override
+ @SuppressWarnings("synthetic-access")
+ public void doneDisconnect(IToken token, Exception error) {
+ // Disconnect is done, ignore any error, invoke the callback
+ synchronized (this) {
+ if (getCallback() != null) getCallback().done(this, Status.OK_STATUS);
+ }
+ // Mark the runnable definitely stopped
+ stopped = true;
+ }
+ });
+ } else {
+ // Invoke the callback directly, if any
+ synchronized (this) {
+ if (callback != null) callback.done(this, Status.OK_STATUS);
+ }
+ // Mark the runnable definitely stopped
+ stopped = true;
+ }
+ }
+
+ /**
+ * Reads data from the stream and blocks until some data has been received.
+ *
+ * @param service The streams service. Must not be <code>null</code>.
+ * @param streamId The stream id. Must not be <code>null</code>.
+ * @param size The size of the data to read.
+ *
+ * @return The read data.
+ *
+ * @throws Exception In case the read fails.
+ */
+ protected final ReadData read(final IStreams service, final String streamId, final int size) throws Exception {
+ Assert.isNotNull(service);
+ Assert.isNotNull(streamId);
+ Assert.isTrue(!Protocol.isDispatchThread());
+
+ // Create the task object
+ TCFTask<ReadData> task = new TCFTask<ReadData>(getParent().getChannel()) {
+ @Override
+ public void run() {
+ service.read(streamId, size, new IStreams.DoneRead() {
+ /* (non-Javadoc)
+ * @see org.eclipse.tm.tcf.services.IStreams.DoneRead#doneRead(org.eclipse.tm.tcf.protocol.IToken, java.lang.Exception, int, byte[], boolean)
+ */
+ @Override
+ public void doneRead(IToken token, Exception error, int lostSize, byte[] data, boolean eos) {
+ if (error == null) done(new ReadData(lostSize, data, eos));
+ else error(error);
+ }
+ });
+ }
+ };
+
+ // Push the task object to the runnable instance
+ setActiveTask(task);
+
+ // Block until some data is received
+ return task.get();
+ }
+
+ /**
+ * Notify the data receiver that some data has been received.
+ *
+ * @param data The data or <code>null</code>.
+ */
+ protected final void notifyReceiver(final String data, final ProcessStreamsDataReceiver[] receivers) {
+ if (data == null) return;
+ // Notify the data receiver
+ for (ProcessStreamsDataReceiver receiver : receivers) {
+ try {
+ // Get the writer
+ Writer writer = receiver.getWriter();
+ // Append the data
+ writer.append(data);
+ // And flush it
+ writer.flush();
+ } catch (IOException e) {
+ if (CoreBundleActivator.getTraceHandler().isSlotEnabled(1, null)) {
+ IStatus status = new Status(IStatus.WARNING, CoreBundleActivator.getUniqueIdentifier(),
+ NLS.bind(Messages.ProcessStreamReaderRunnable_error_appendFailed, streamId, data),
+ e);
+ Platform.getLog(CoreBundleActivator.getContext().getBundle()).log(status);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Default TCF remote process stream writer runnable implementation. The
+ * runnable will be executed within a thread and is responsible to read the
+ * incoming data from the registered providers and forward them to the associated stream.
+ */
+ protected class ProcessStreamWriterRunnable implements Runnable {
+ // The associated stream id
+ private final String streamId;
+ // The associated stream type id
+ @SuppressWarnings("unused")
+ private final String streamTypeId;
+ // The data provider applicable for the associated stream type id
+ private final ProcessStreamsDataProvider provider;
+ // The currently active write task
+ private TCFTask<Object> activeTask;
+ // The callback to invoke if the runnable stopped
+ private ICallback callback;
+
+ // Flag to stop the runnable
+ private boolean stopped = false;
+
+ /**
+ * Constructor.
+ *
+ * @param streamId The associated stream id. Must not be <code>null</code>.
+ * @param streamTypeId The associated stream type id. Must not be <code>null</code>.
+ * @param provider The data provider. Must not be <code>null</code> and must be applicable for the stream type.
+ */
+ public ProcessStreamWriterRunnable(String streamId, String streamTypeId, ProcessStreamsDataProvider provider) {
+ Assert.isNotNull(streamId);
+ Assert.isNotNull(streamTypeId);
+ Assert.isNotNull(provider);
+ Assert.isTrue(provider.isApplicable(streamTypeId));
+
+ this.streamId = streamId;
+ this.streamTypeId = streamTypeId;
+ this.provider = provider;
+ }
+
+ /**
+ * Returns the associated stream id.
+ *
+ * @return The associated stream id.
+ */
+ public final String getStreamId() {
+ return streamId;
+ }
+
+ /**
+ * Stop the runnable.
+ *
+ * @param callback The callback to invoke if the runnable stopped.
+ */
+ public final synchronized void stop(ICallback callback) {
+ // If the runnable is stopped already, invoke the callback directly
+ if (stopped) {
+ if (callback != null) callback.done(this, Status.OK_STATUS);
+ return;
+ }
+
+ // Store the callback instance
+ this.callback = callback;
+ // Mark the runnable as stopped
+ stopped = true;
+ }
+
+ /**
+ * Returns if the runnable should stop.
+ */
+ protected final synchronized boolean isStopped() {
+ return stopped;
+ }
+
+ /**
+ * Sets the currently active writer task.
+ *
+ * @param task The currently active writer task or <code>null</code>.
+ */
+ protected final synchronized void setActiveTask(TCFTask<Object> task) {
+ activeTask = task;
+ }
+
+ /**
+ * Returns the currently active writer task.
+ *
+ * @return The currently active writer task or <code>null</code>.
+ */
+ protected final TCFTask<Object> getActiveTask() {
+ return activeTask;
+ }
+
+ /**
+ * Returns the callback instance to invoke.
+ *
+ * @return The callback instance or <code>null</code>.
+ */
+ protected final ICallback getCallback() {
+ return callback;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ // If not data provider is set, we are done here immediately
+ if (provider == null) {
+ // Invoke the callback directly, if any
+ synchronized (this) {
+ if (callback != null) callback.done(this, Status.OK_STATUS);
+ }
+ // Mark the runnable definitely stopped
+ stopped = true;
+
+ return;
+ }
+
+ // Get the service instance from the parent
+ final IStreams svcStreams = getParent().getSvcStreams();
+
+ // Create the data buffer instance
+ final char[] buffer = new char[1024];
+
+ // Run until stopped and the streams service is available
+ while (!isStopped() && svcStreams != null) {
+ try {
+ // Read available data from the data provider
+ int charactersRead = provider.getReader().read(buffer, 0, 1024);
+ // Have we reached the end of the stream -> break out
+ if (charactersRead == -1) break;
+ // If we read some data from the provider, write it to the stream
+ if (charactersRead > 0) write(svcStreams, streamId, new String(buffer).getBytes(), charactersRead);
+ } catch (Exception e) {
+ // An error occurred -> Dump to the error log
+ e = ExceptionUtils.checkAndUnwrapException(e);
+ // Check if the blocking read task got canceled
+ if (!(e instanceof CancellationException)) {
+ // Log the error to the user, might be something serious
+ IStatus status = new Status(IStatus.ERROR, CoreBundleActivator.getUniqueIdentifier(),
+ NLS.bind(Messages.ProcessStreamWriterRunnable_error_writeFailed, streamId, e.getLocalizedMessage()),
+ e);
+ Platform.getLog(CoreBundleActivator.getContext().getBundle()).log(status);
+ }
+ // break out of the loop
+ break;
+ }
+ }
+
+ // Disconnect from the stream
+ if (svcStreams != null) {
+ svcStreams.disconnect(streamId, new IStreams.DoneDisconnect() {
+ @Override
+ @SuppressWarnings("synthetic-access")
+ public void doneDisconnect(IToken token, Exception error) {
+ // Disconnect is done, ignore any error, invoke the callback
+ synchronized (this) {
+ if (getCallback() != null) getCallback().done(this, Status.OK_STATUS);
+ }
+ // Mark the runnable definitely stopped
+ stopped = true;
+ }
+ });
+ } else {
+ // Invoke the callback directly, if any
+ synchronized (this) {
+ if (callback != null) callback.done(this, Status.OK_STATUS);
+ }
+ // Mark the runnable definitely stopped
+ stopped = true;
+ }
+ }
+
+ /**
+ * Writes data to the stream.
+ *
+ * @param service The streams service. Must not be <code>null</code>.
+ * @param streamId The stream id. Must not be <code>null</code>.
+ * @param data The data buffer. Must not be <code>null</code>.
+ * @param size The size of the data to write.
+ *
+ * @throws Exception In case the write fails.
+ */
+ protected final void write(final IStreams service, final String streamId, final byte[] data, final int size) throws Exception {
+ Assert.isNotNull(service);
+ Assert.isNotNull(streamId);
+ Assert.isTrue(!Protocol.isDispatchThread());
+
+ // Create the task object
+ TCFTask<Object> task = new TCFTask<Object>() {
+ @Override
+ public void run() {
+ service.write(streamId, data, 0, size, new IStreams.DoneWrite() {
+ /* (non-Javadoc)
+ * @see org.eclipse.tm.tcf.services.IStreams.DoneWrite#doneWrite(org.eclipse.tm.tcf.protocol.IToken, java.lang.Exception)
+ */
+ @Override
+ public void doneWrite(IToken token, Exception error) {
+ if (error == null) done(null);
+ else error(error);
+ }
+ });
+ }
+ };
+ task.get();
+
+ // Push the task object to the runnable instance
+ setActiveTask(task);
+
+ // Execute the write
+ task.get();
+ }
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param parent The parent process launcher instance. Must not be <code>null</code>
+ */
+ public ProcessStreamsListener(ProcessLauncher parent) {
+ Assert.isNotNull(parent);
+ this.parent = parent;
+ }
+
+ /**
+ * Returns the parent process launcher instance.
+ *
+ * @return The parent process launcher instance.
+ */
+ protected final ProcessLauncher getParent() {
+ return parent;
+ }
+
+ /**
+ * Dispose the streams listener instance.
+ *
+ * @param callback The callback to invoke if the dispose finished or <code>null</code>.
+ */
+ public void dispose(final ICallback callback) {
+ // Store a final reference to the streams listener instance
+ final IStreams.StreamsListener finStreamsListener = this;
+
+ // Store a final reference to the data receivers list
+ final List<ProcessStreamsDataReceiver> finDataReceivers;
+ synchronized (dataReceiver) {
+ finDataReceivers = new ArrayList<ProcessStreamsDataReceiver>(dataReceiver);
+ dataReceiver.clear();
+ }
+
+ // Create a new collector to catch all runnable stop callback's
+ AsyncCallbackCollector collector = new AsyncCallbackCollector(new Callback() {
+ /* (non-Javadoc)
+ * @see org.eclipse.tm.te.runtime.callback.Callback#internalDone(java.lang.Object, org.eclipse.core.runtime.IStatus)
+ */
+ @Override
+ protected void internalDone(final Object caller, final IStatus status) {
+ // Get the service instance from the parent
+ IStreams svcStreams = getParent().getSvcStreams();
+ // Unsubscribe the streams listener from the service
+ svcStreams.unsubscribe(IProcesses.NAME, finStreamsListener, new IStreams.DoneUnsubscribe() {
+ @Override
+ public void doneUnsubscribe(IToken token, Exception error) {
+ // Loop all registered listeners and close them
+ for (ProcessStreamsDataReceiver receiver : finDataReceivers) receiver.dispose();
+ // Call the original outer callback
+ if (callback != null) callback.done(caller, status);
+ }
+ });
+ }
+ });
+
+ // Loop all runnable's and force them to stop
+ synchronized (runnables) {
+ for (Runnable runnable : runnables) {
+ if (runnable instanceof StreamReaderRunnable) {
+ ((StreamReaderRunnable)runnable).stop(new AsyncCallbackCollector.SimpleCollectorCallback(collector));
+ }
+ }
+ runnables.clear();
+ }
+
+ // Mark the collector initialization done
+ collector.initDone();
+ }
+
+ /**
+ * Adds the given receiver to the stream data receiver list.
+ *
+ * @param receiver The data receiver. Must not be <code>null</code>.
+ */
+ public void registerDataReceiver(ProcessStreamsDataReceiver receiver) {
+ Assert.isNotNull(receiver);
+ synchronized (dataReceiver) {
+ if (!dataReceiver.contains(receiver)) dataReceiver.add(receiver);
+ }
+ }
+
+ /**
+ * Removes the given receiver from the stream data receiver list.
+ *
+ * @param receiver The data receiver. Must not be <code>null</code>.
+ */
+ public void unregisterDataReceiver(ProcessStreamsDataReceiver receiver) {
+ Assert.isNotNull(receiver);
+ synchronized (dataReceiver) {
+ dataReceiver.remove(receiver);
+ }
+ }
+
+ /**
+ * Sets the stream data provider instance.
+ *
+ * @param provider The stream data provider instance or <code>null</code>.
+ */
+ public void setDataProvider(ProcessStreamsDataProvider provider) {
+ dataProvider = provider;
+ }
+
+ /**
+ * Returns the stream data provider instance.
+ *
+ * @return The stream data provider instance or <code>null</code>.
+ */
+ public ProcessStreamsDataProvider getDataProvider() {
+ return dataProvider;
+ }
+
+ /* (non-Javadoc)
+ * @see org.eclipse.tm.te.tcf.processes.core.interfaces.launcher.IProcessContextAwareListener#setProcessContext(org.eclipse.tm.tcf.services.IProcesses.ProcessContext)
+ */
+ @Override
+ public void setProcessContext(IProcesses.ProcessContext context) {
+ Assert.isNotNull(context);
+ this.context = context;
+
+ if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_STREAMS_LISTENER)) {
+ CoreBundleActivator.getTraceHandler().trace("Process context set to: id='" + context.getID() + "', name='" + context.getName() + "'", //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+ 0, ITraceIds.TRACE_STREAMS_LISTENER,
+ IStatus.INFO, getClass());
+ }
+
+ // Loop all delayed create events and look for the streams for our context
+ synchronized (delayedCreatedEvents) {
+ Iterator<StreamCreatedEvent> iterator = delayedCreatedEvents.iterator();
+ while (iterator.hasNext()) {
+ StreamCreatedEvent event = iterator.next();
+ if (context.getID().equals(event.contextId) || event.contextId == null) {
+ // Re-dispatch the event
+ created(event.streamType, event.streamId, event.contextId);
+ }
+ }
+ // Clear all events
+ delayedCreatedEvents.clear();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.eclipse.tm.te.tcf.processes.core.interfaces.launcher.IProcessContextAwareListener#getProcessContext()
+ */
+ @Override
+ public IProcesses.ProcessContext getProcessContext() {
+ return context;
+ }
+
+ /* (non-Javadoc)
+ * @see org.eclipse.tm.tcf.services.IStreams.StreamsListener#created(java.lang.String, java.lang.String, java.lang.String)
+ */
+ @Override
+ public void created(String streamType, String streamId, String contextId) {
+ // We ignore any other stream type than IProcesses.NAME
+ if (!IProcesses.NAME.equals(streamType)) return;
+
+ if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_STREAMS_LISTENER)) {
+ CoreBundleActivator.getTraceHandler().trace("New remote process stream created: streamId='" + streamId + "', contextId='" + contextId + "'", //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+ 0, ITraceIds.TRACE_STREAMS_LISTENER,
+ IStatus.INFO, getClass());
+ }
+
+ // If a process context is set, check if the created event is for the
+ // monitored process context
+ final IProcesses.ProcessContext context = getProcessContext();
+ // The contextId is null if used with an older TCF agent not sending the third parameter
+ if (context != null && (context.getID().equals(contextId) || contextId == null)) {
+ // Create a snapshot of the registered data receivers
+ ProcessStreamsDataReceiver[] receivers;
+ synchronized (dataReceiver) {
+ receivers = dataReceiver.toArray(new ProcessStreamsDataReceiver[dataReceiver.size()]);
+ }
+ // The created event is for the monitored process context
+ // --> Create the stream reader thread(s)
+ if (streamId != null && streamId.equals(context.getProperties().get(IProcesses.PROP_STDIN_ID))) {
+ // Data provider set?
+ if (dataProvider != null) {
+ // Create the stdin stream writer runnable
+ ProcessStreamWriterRunnable runnable = new ProcessStreamWriterRunnable(streamId, IProcesses.PROP_STDIN_ID, dataProvider);
+ // Add to the list of created runnable's
+ synchronized (runnables) { runnables.add(runnable); }
+ // And create and start the thread
+ Thread thread = new Thread(runnable, "Thread-" + IProcesses.PROP_STDIN_ID + "-" + streamId); //$NON-NLS-1$ //$NON-NLS-2$
+ thread.start();
+ }
+ }
+ if (streamId != null && streamId.equals(context.getProperties().get(IProcesses.PROP_STDOUT_ID))) {
+ // Create the stdout stream reader runnable
+ StreamReaderRunnable runnable = new StreamReaderRunnable(streamId, IProcesses.PROP_STDOUT_ID, receivers);
+ // If not empty, create the thread
+ if (!runnable.isEmpty()) {
+ // Add to the list of created runnable's
+ synchronized (runnables) { runnables.add(runnable); }
+ // And create and start the thread
+ Thread thread = new Thread(runnable, "Thread-" + IProcesses.PROP_STDOUT_ID + "-" + streamId); //$NON-NLS-1$ //$NON-NLS-2$
+ thread.start();
+ }
+ }
+ if (streamId != null && streamId.equals(context.getProperties().get(IProcesses.PROP_STDERR_ID))) {
+ // Create the stdout stream reader runnable
+ StreamReaderRunnable runnable = new StreamReaderRunnable(streamId, IProcesses.PROP_STDERR_ID, receivers);
+ // If not empty, create the thread
+ if (!runnable.isEmpty()) {
+ // Add to the list of created runnable's
+ synchronized (runnables) { runnables.add(runnable); }
+ // And create and start the thread
+ Thread thread = new Thread(runnable, "Thread-" + IProcesses.PROP_STDERR_ID + "-" + streamId); //$NON-NLS-1$ //$NON-NLS-2$
+ thread.start();
+ }
+ }
+ } else if (context == null) {
+ // Context not set yet --> add to the delayed list
+ StreamCreatedEvent event = new StreamCreatedEvent(streamType, streamId, contextId);
+ synchronized (delayedCreatedEvents) {
+ if (!delayedCreatedEvents.contains(event)) delayedCreatedEvents.add(event);
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.eclipse.tm.tcf.services.IStreams.StreamsListener#disposed(java.lang.String, java.lang.String)
+ */
+ @Override
+ public void disposed(String streamType, String streamId) {
+ // We ignore any other stream type than IProcesses.NAME
+ if (!IProcesses.NAME.equals(streamType)) return;
+
+ if (CoreBundleActivator.getTraceHandler().isSlotEnabled(0, ITraceIds.TRACE_STREAMS_LISTENER)) {
+ CoreBundleActivator.getTraceHandler().trace("Remote process stream disposed: streamId='" + streamId + "'", //$NON-NLS-1$ //$NON-NLS-2$
+ 0, ITraceIds.TRACE_STREAMS_LISTENER,
+ IStatus.INFO, getClass());
+ }
+
+ // If the delayed created events list is not empty, we have
+ // to check if one of the delayed create events got disposed
+ synchronized (delayedCreatedEvents) {
+ Iterator<StreamCreatedEvent> iterator = delayedCreatedEvents.iterator();
+ while (iterator.hasNext()) {
+ StreamCreatedEvent event = iterator.next();
+ if (event.streamType != null && event.streamType.equals(streamType)
+ && event.streamId != null && event.streamId.equals(streamId)) {
+ // Remove the create event from the list
+ iterator.remove();
+ }
+ }
+ }
+
+ // Stop the thread(s) if the disposed event is for the active
+ // monitored stream id(s).
+ synchronized (runnables) {
+ Iterator<Runnable> iterator = runnables.iterator();
+ while (iterator.hasNext()) {
+ Runnable runnable = iterator.next();
+ if (runnable instanceof StreamReaderRunnable) {
+ StreamReaderRunnable myRunnable = (StreamReaderRunnable)runnable;
+ if (myRunnable.getStreamId().equals(streamId)) {
+ // This method is called within the TCF event dispatch thread, so
+ // we cannot wait for a callback here
+ myRunnable.stop(null);
+ iterator.remove();
+ }
+ }
+ }
+ }
+ }
+}

Back to the top