Skip to main content
aboutsummaryrefslogtreecommitdiffstats
blob: 21d3767d4295c086391a18621658053236b4f8f1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
/*******************************************************************************
 * Copyright (c) 2012, 2013 Wind River Systems, Inc. and others. All rights reserved.
 * This program and the accompanying materials are made available under the terms
 * of the Eclipse Public License v1.0 which accompanies this distribution, and is
 * available at http://www.eclipse.org/legal/epl-v10.html
 *
 * Contributors:
 * Wind River Systems - initial API and implementation
 *******************************************************************************/
package org.eclipse.tcf.te.runtime.processes;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PipedInputStream;
import java.lang.reflect.Field;
import java.security.AccessController;
import java.security.PrivilegedAction;

import org.eclipse.tcf.te.runtime.activator.CoreBundleActivator;

/**
 * Monitor a given input streams and reads any incoming text from the streams.
 * If more than one stream is specified, the lines read from all streams are
 * combined within the reader to one single output.
 */
public class ProcessOutputReaderThread extends Thread {
	// Prefix for any output produced
	private String prefix;
	// The input stream instances as passed in
	private InputStream[] streams;
	// The reader instances to wrap the input streams
	private BufferedReader[] reader;
	// String builder to collect the read lines
	private StringBuilder lines;
	private String lastLine;

	// finished reading all the output
	private boolean finished;
	private boolean waiting;
	private Object waiterSemaphore;

	/**
	 * Constructor.
	 * <p>
	 * Monitor multiple streams in one.
	 *
	 * @param prefix A <code>String</code> prefixing every line of might be produced output, or <code>null</code>.
	 * @param streams The <code>InputStream</code>'s to monitor. Must not be <code>null</code>!
	 */
	public ProcessOutputReaderThread(String prefix, InputStream[] streams) {
		super("ProcessOutputReader-" + (prefix == null ? "" : prefix)); //$NON-NLS-1$ //$NON-NLS-2$
		assert streams != null;

		lastLine = ""; //$NON-NLS-1$
		finished = false;
		waiting = false;
		waiterSemaphore = new Object();
		if (prefix == null) {
			this.prefix = ""; //$NON-NLS-1$
		} else if (!prefix.trim().endsWith(":")) { //$NON-NLS-1$
			this.prefix = prefix.trim() + ": "; //$NON-NLS-1$
		} else {
			this.prefix = prefix;
		}

		// Set the input streams
		this.streams = streams;

		// connect to a stream reader
		reader = new BufferedReader[streams.length];
		for (int i = 0; i < streams.length; i++) {
			reader[i] = new BufferedReader(new InputStreamReader(streams[i]));
		}

		lines = new StringBuilder();
	}

	/**
	 * Returns if or if not the process output reader thread has finished.
	 *
	 * @return <code>true</code> if the thread is finished, <code>false</code> otherwise.
	 */
	public boolean isFinished() {
		return finished;
	}

	/**
	 * Wait at most timeout milliseconds, or until the process we are reading is finished.
	 *
	 * @param timeout Timeout in milliseconds to wait for (maximum).
	 */
	public void waitForFinish(long timeout) {
		if (!finished) {
			waiting = true;
			synchronized (waiterSemaphore) {
				try {
					waiterSemaphore.wait(timeout);
				} catch (InterruptedException e) {
					// just end the wait
				}
			}
		}
		return;
	}

	/**
	 * Wait until the process we are reading is finished.
	 */
	public void waitForFinish() {
		waitForFinish(0);
	}

	/**
	 * Returns the monitored output till the time of the call.
	 *
	 * @return <code>String</code> containing the monitored output.
	 */
	public synchronized String getOutput() {
		return lines.toString();
	}

	/**
	 * Get the last line that was read.
	 *
	 * @return String last line
	 */
	public synchronized String getLastLine() {
		return lastLine;
	}

	/**
	 * Process one line of output. May be overridden by subclasses to extend functionality.
	 * @param line last line that was read
	 */
	protected synchronized void processLine(String line) {
		if (line != null) {
			StringBuffer buffer = new StringBuffer(line.trim());
			while (buffer.length() > 0 && (buffer.charAt(buffer.length() - 1) == '\r' || buffer.charAt(buffer.length() - 1) == '\n')) {
				buffer.deleteCharAt(buffer.length() - 1);
			}
			line = buffer.toString();
			lastLine = line;
			lines.append(line);
			lines.append('\n');
			CoreBundleActivator.getTraceHandler().trace(getPrefix() + " processLine: " + line, 3, this); //$NON-NLS-1$
		}
	}

	/**
	 * Returns the trace line prefix.
	 *
	 * @return The trace line prefix or <code>null</code>.
	 */
	protected String getPrefix() {
		return prefix;
	}

	/**
	 * Called when the process finished and no more input is available. May be overridden by
	 * subclasses to extend functionality.
	 */
	protected void finish() {
		finished = true;
		if (waiting) {
			waiting = false;
			synchronized (waiterSemaphore) {
				waiterSemaphore.notifyAll();
			}
		}
	}

	/**
	 * Reads the available available input from the given stream.
	 *
	 * @return Total number of bytes read, or -1 if EOF reached.
	 */
	protected synchronized int readAvailableInput(BufferedReader reader) {
		if (reader != null) {
			int bytesRead = 0;
			try {
				while (reader.ready()) {
					String line = reader.readLine();
					if (line != null) {
						bytesRead = line.length();
						processLine(line);
					}
				}
			} catch (IOException e) {
				bytesRead = -1;
			}
			return bytesRead;
		}
		return -1;
	}

	/*
	 * Workaround for the old Java I/O system not being interruptible while reading data from a stream
	 * where possibly nothing is sent: We want to be able to interrupt the reader-thread if we think
	 * that we are no more interested in the data... Unfortunately, this implementation does not
	 * detect when the Stream is closed. inputStream.available() doesn't throw an exception in this
	 * case... so either we block indefinitely (having to potentially destroy the thread), or we can't
	 * detect when the stream is closed. alas, java.nio would solve both issues much more elegant, but
	 * what can we do, getting in the crap old InputStream object...
	 *
	 * Note: Do not synchronize this method. It may lead to dead locks because of the sleep call!
	 */
	protected void readInputUntilInterrupted() {
		boolean allStreamsEOF = false;
		while (!allStreamsEOF) {
			allStreamsEOF = true;
			int totalBytesRead = 0;
			for (int i = 0; i < reader.length; i++) {
				// we do mark reader which have reached EOF already with null.
				if (reader[i] == null) {
					continue;
				}
				int bytesRead = readAvailableInput(reader[i]);

				// If readAvailableInput(...) returns 0 and the stream read is a PipedInputStream,
				// we need to know if the stream got closed by the writer
				if (bytesRead == 0 && streams[i] instanceof PipedInputStream) {
					PipedInputStream in = (PipedInputStream)streams[i];
					try {
	                    final Field f = in.getClass().getDeclaredField("closedByWriter"); //$NON-NLS-1$
	    				AccessController.doPrivileged(new PrivilegedAction<Object>() {
	    					@Override
	    					public Object run() {
	    						f.setAccessible(true);
	    						return null;
	    					}
	    				});
	    				// If the piped input stream is closed from the writer
	    				// side, in example because EOF received on writer side,
	    				// close the stream from the reader side too.
	    				if (f.getBoolean(in)) bytesRead = -1;
                    }
                    catch (Exception e) { /* ignored on purpose */ }
				}

				// is EOF for the current stream
				if (bytesRead == -1) {
					try { reader[i].close(); } catch (IOException e) { /* ignored on purpose */ }
					reader[i] = null;
				} else {
					// at least this stream is still not EOF
					allStreamsEOF = false;
					if (bytesRead >= 0) {
						totalBytesRead += bytesRead;
					}
				}
			}

			if (!allStreamsEOF && totalBytesRead == 0) {
				// nothing read till here, sleep a little bit
				try {
					sleep(50);
				} catch (InterruptedException e) {
					CoreBundleActivator.getTraceHandler().trace(getPrefix() + " received interrupt request", 3, this); //$NON-NLS-1$
					// an interrupt to the sleep breaks the loop.
					allStreamsEOF = true;
				}
			}
		}
	}

	/**
	 * (non-Javadoc)
	 * @see java.lang.Runnable#run()
	 */
	@Override
	public void run() {
		try {
			CoreBundleActivator.getTraceHandler().trace(getPrefix() + " begin waiting for input", 3, this); //$NON-NLS-1$
			readInputUntilInterrupted();
		} finally {
			// close all readers if not done anyway
			for (BufferedReader element : reader) {
				if (element == null) {
					continue;
				}
				// should there be any input left, read it before closing the stream.
				readAvailableInput(element);
				// finally, close the stream now.
				try { element.close(); } catch (IOException e) { /* ignored on purpose */ }
			}

			// release all waiting threads.
			finish();

			CoreBundleActivator.getTraceHandler().trace(getPrefix() + " stop waiting for input", 3, this); //$NON-NLS-1$
		}
	}

}

Back to the top