Skip to main content
aboutsummaryrefslogtreecommitdiffstats
blob: e6a1660d3e355a5926b153168e2b2cd62274d9ee (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
/*******************************************************************************
 * Copyright (c) 2011, 2014 Wind River Systems, Inc. and others. All rights reserved.
 * This program and the accompanying materials are made available under the terms
 * of the Eclipse Public License v1.0 which accompanies this distribution, and is
 * available at http://www.eclipse.org/legal/epl-v10.html
 *
 * Contributors:
 * Wind River Systems - initial API and implementation
 *******************************************************************************/
package org.eclipse.tcf.te.ui.terminals.streams;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;

import org.eclipse.core.runtime.Assert;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status;
import org.eclipse.osgi.util.NLS;
import org.eclipse.tcf.te.core.terminals.interfaces.constants.ILineSeparatorConstants;
import org.eclipse.tcf.te.ui.terminals.activator.UIPlugin;
import org.eclipse.tcf.te.ui.terminals.nls.Messages;
import org.eclipse.tm.internal.terminal.provisional.api.ITerminalControl;
import org.eclipse.ui.services.IDisposable;

/**
 * Input stream monitor implementation.
 * <p>
 * <b>Note:</b> The input is coming <i>from</i> the terminal. Therefore, the input
 * stream monitor is attached to the stdin stream of the monitored (remote) process.
 */
@SuppressWarnings("restriction")
public class InputStreamMonitor extends OutputStream implements IDisposable {
    // Reference to the parent terminal control
	private final ITerminalControl terminalControl;

	// Reference to the monitored (output) stream
    private final OutputStream stream;

    // Reference to the thread writing the stream
    private volatile Thread thread;

    // Flag to mark the monitor disposed. When disposed,
    // no further data is written from the monitored stream.
    private volatile boolean disposed;

    // A list of object to dispose if this monitor is disposed
    private final List<IDisposable> disposables = new ArrayList<IDisposable>();

    // Queue to buffer the data to write to the output stream
    private final Queue<byte[]> queue = new LinkedList<byte[]>();

    // ***** Line separator replacement logic *****
    // ***** Adapted from org.eclipse.tcf.internal.terminal.local.LocalTerminalOutputStream *****

	private final static int TERMINAL_SENDS_CR = 0;
	private final static int TERMINAL_SENDS_CRLF = 1;
	private final static int PROGRAM_EXPECTS_LF = 0;
	private final static int PROGRAM_EXPECTS_CRLF = 1;
	private final static int PROGRAM_EXPECTS_CR = 2;
	private final static int NO_CHANGE = 0;
	private final static int CHANGE_CR_TO_LF = 1;
	private final static int INSERT_LF_AFTER_CR = 2;
	private final static int REMOVE_CR = 3;
	private final static int REMOVE_LF = 4;

	// CRLF conversion table:
	//
	// Expected line separator -->         |       LF        |        CRLF        |       CR       |
	// ------------------------------------+-----------------+--------------------+----------------+
	// Local echo off - control sends CR   | change CR to LF | insert LF after CR | no change      |
	// ------------------------------------+-----------------+--------------------+----------------+
	// Local echo on - control sends CRLF  | remove CR       | no change          | remove LF      |
	//
	private final static int[][] CRLF_REPLACEMENT = {

		{CHANGE_CR_TO_LF, INSERT_LF_AFTER_CR, NO_CHANGE},
		{REMOVE_CR, NO_CHANGE, REMOVE_LF}
	};

	private int replacement;

    /**
     * Constructor.
     *
     * @param terminalControl The parent terminal control. Must not be <code>null</code>.
     * @param stream The stream. Must not be <code>null</code>.
	 * @param localEcho Local echo on or off.
	 * @param lineSeparator The line separator used by the stream.
     */
	public InputStreamMonitor(ITerminalControl terminalControl, OutputStream stream, boolean localEcho, String lineSeparator) {
    	super();

    	Assert.isNotNull(terminalControl);
    	this.terminalControl = terminalControl;
    	Assert.isNotNull(stream);
        this.stream = stream;

        // Determine the line separator replacement setting
		int terminalSends = localEcho ? TERMINAL_SENDS_CRLF : TERMINAL_SENDS_CR;
		if (lineSeparator == null) {
			replacement = NO_CHANGE;
		} else {
			int programExpects;
			if (lineSeparator.equals(ILineSeparatorConstants.LINE_SEPARATOR_LF)) {
				programExpects = PROGRAM_EXPECTS_LF;
			}
			else if (lineSeparator.equals(ILineSeparatorConstants.LINE_SEPARATOR_CR)) {
				programExpects = PROGRAM_EXPECTS_CR;
			}
			else {
				programExpects = PROGRAM_EXPECTS_CRLF;
			}
			replacement = CRLF_REPLACEMENT[terminalSends][programExpects];
		}

    }

	/**
	 * Returns the associated terminal control.
	 *
	 * @return The associated terminal control.
	 */
	protected final ITerminalControl getTerminalControl() {
		return terminalControl;
	}

	/**
	 * Adds the given disposable object to the list. The method will do nothing
	 * if either the disposable object is already part of the list or the monitor
	 * is disposed.
	 *
	 * @param disposable The disposable object. Must not be <code>null</code>.
	 */
	public final void addDisposable(IDisposable disposable) {
		Assert.isNotNull(disposable);
		if (!disposed && !disposables.contains(disposable)) disposables.add(disposable);
	}

	/**
	 * Removes the disposable object from the list.
	 *
	 * @param disposable The disposable object. Must not be <code>null</code>.
	 */
	public final void removeDisposable(IDisposable disposable) {
		Assert.isNotNull(disposable);
		disposables.remove(disposable);
	}

	/* (non-Javadoc)
	 * @see org.eclipse.tcf.te.runtime.interfaces.IDisposable#dispose()
	 */
	@Override
	public void dispose() {
		// If already disposed --> return immediately
		if (disposed) return;

		// Mark the monitor disposed
    	disposed = true;

        // Close the stream (ignore exceptions on close)
        try { stream.close(); } catch (IOException e) { /* ignored on purpose */ }
        // And interrupt the thread
        close();

        // Dispose all registered disposable objects
        for (IDisposable disposable : disposables) disposable.dispose();
        // Clear the list
        disposables.clear();
	}

    /**
     * Close the terminal input stream monitor.
     */
    @Override
	public void close() {
    	// Not initialized -> return immediately
    	if (thread == null) return;

    	// Copy the reference
    	final Thread oldThread = thread;
    	// Unlink the monitor from the thread
    	thread = null;
    	// And interrupt the writer thread
    	oldThread.interrupt();
    }

    /**
     * Starts the terminal output stream monitor.
     */
    public void startMonitoring() {
    	// If already initialized -> return immediately
    	if (thread != null) return;

    	// Create a new runnable which is constantly reading from the stream
    	Runnable runnable = new Runnable() {
    		@Override
			public void run() {
    			writeStream();
    		}
    	};

    	// Create the writer thread
    	thread = new Thread(runnable, "Terminal Input Stream Monitor Thread"); //$NON-NLS-1$

    	// Configure the writer thread
        thread.setDaemon(true);

        // Start the processing
        thread.start();
    }


    /**
     * Reads from the queue and writes the read content to the stream.
     */
    protected void writeStream() {
    	// Read from the queue and write to the stream until disposed
        outer: while (thread != null && !disposed) {
            byte[] data;
			// If the queue is empty, wait until notified
    		synchronized(queue) {
	        	while (queue.isEmpty()) {
	        		if (disposed) break outer;
					try {
						queue.wait();
					} catch (InterruptedException e) {
						break outer;
					}
	        	}
	        	// Retrieves the queue head (is null if queue is empty (should never happen))
	        	data = queue.poll();
    		}
            if (data != null) {
            	try {
            		// Break up writes into max 1000 byte junks to avoid console input buffer overflows on Windows
            		int written = 0;
            		byte[] buf = new byte[1000];
            		while (written < data.length) {
	            		int len = Math.min(buf.length, data.length - written);
	            		System.arraycopy(data, written, buf, 0, len);
	               		// Write the data to the stream
	            		stream.write(buf, 0, len);
						written += len;
	            		// Flush the stream immediately
	            		stream.flush();
	            		// Wait a little between writes to allow input being processed
	            		if (written < data.length)
	            			Thread.sleep(100);
            		}
            	} catch (IOException e) {
                	// IOException received. If this is happening when already disposed -> ignore
    				if (!disposed) {
    					IStatus status = new Status(IStatus.ERROR, UIPlugin.getUniqueIdentifier(),
    												NLS.bind(Messages.InputStreamMonitor_error_writingToStream, e.getLocalizedMessage()), e);
    					UIPlugin.getDefault().getLog().log(status);
    				}
            	}
                catch (InterruptedException e) {
                	break;
                }
            }
        }

        // Dispose the stream
        dispose();
    }

	/* (non-Javadoc)
	 * @see java.io.OutputStream#write(int)
	 */
    @Override
    public void write(int b) throws IOException {
        synchronized(queue) {
            queue.add(new byte[] { (byte)b });
            queue.notifyAll();
        }
    }

    /* (non-Javadoc)
     * @see java.io.OutputStream#write(byte[], int, int)
     */
    @Override
    public void write(byte[] b, int off, int len) throws IOException {
    	// Write the whole block to the queue to avoid synchronization
    	// to happen for every byte. To do so, we have to avoid calling
    	// the super method. Therefore we have to do the same checking
    	// here as the base class does.

    	// Null check. See the implementation in OutputStream.
    	if (b == null) throw new NullPointerException();

    	// Boundary check. See the implementation in OutputStream.
    	if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
    		throw new IndexOutOfBoundsException();
    	}
    	else if (len == 0) {
    		return;
    	}

        // Make sure that the written block is not interlaced with other input.
        synchronized(queue) {
        	// Preprocess the block to be written
        	byte[] processedBytes = onWriteContentToStream(b, off, len);
        	// If the returned array is not the original one, adjust offset and length
        	if (processedBytes != b) {
        		off = 0; len = processedBytes.length; b = processedBytes;
        	}

        	// Get the content from the byte buffer specified by offset and length
        	byte[] bytes = new byte[len];
        	int j = 0;
        	for (int i = 0 ; i < len ; i++) {
        	    bytes[j++] = b[off + i];
        	}

        	queue.add(bytes);
        	queue.notifyAll();
        }
    }

    /**
     * Allow for processing of data from byte stream from the terminal before
     * it is written to the output stream. If the returned byte array is different
     * than the one that was passed in with the bytes argument, then the
     * length value will be adapted.
     *
     * @param bytes The byte stream. Must not be <code>null</code>.
     * @param off The offset.
     * @param len the length.
     *
     * @return The processed byte stream.
     *
     */
    protected byte[] onWriteContentToStream(byte[] bytes, int off, int len) {
    	Assert.isNotNull(bytes);

    	if (replacement != NO_CHANGE && len > 0) {
    		String origText = new String(bytes, off, len);
    		String text = null;
    		//
    		// TODO: check whether this is correct! new String(byte[], int, int) always uses the default
    		//       encoding!

    		if (replacement == CHANGE_CR_TO_LF) {
    			text = origText.replace('\r', '\n');
    		}
    		else if (replacement == INSERT_LF_AFTER_CR) {
    			text = origText.replaceAll("\r\n|\r", "\r\n"); //$NON-NLS-1$ //$NON-NLS-2$
    		}
    		else if (replacement == REMOVE_CR) {
    			text = origText.replaceAll(ILineSeparatorConstants.LINE_SEPARATOR_CR, ""); //$NON-NLS-1$
    		}
    		else if (replacement == REMOVE_LF) {
    			text = origText.replaceAll(ILineSeparatorConstants.LINE_SEPARATOR_LF, ""); //$NON-NLS-1$
    		}

    		if (text != null && !origText.equals(text)) {
    			bytes = text.getBytes();
    		}
    	}

    	return bytes;
    }
}

Back to the top