Skip to main content
aboutsummaryrefslogtreecommitdiffstats
blob: 511ee9a179435d67d6300450e8fe38219ca14cfb (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
# *****************************************************************************
# * Copyright (c) 2013-2014, 2016 Wind River Systems, Inc. and others.
# * All rights reserved. This program and the accompanying materials
# * are made available under the terms of the Eclipse Public License v1.0
# * which accompanies this distribution, and is available at
# * http://www.eclipse.org/legal/epl-v10.html
# *
# * Contributors:
# *     Wind River Systems - initial implementation
# *****************************************************************************

import atexit
import threading
import os.path
import sys
import time

import tcf as tcf  # @UnresolvedImport
import tcf.protocol as protocol  # @UnresolvedImport

import tcf.services.processes as processes  # @UnresolvedImport
import tcf.services.processes_v1 as processes_v1  # @UnresolvedImport
import tcf.services.runcontrol as runcontrol  # @UnresolvedImport
import tcf.services.streams as streams  # @UnresolvedImport


class TcfProtocolLogger (object):
    """A class to override TCF protocol's default logger.

    As we do not want TCF errors to be logged on the console, this logger
    simply does nothing on TCF protocol log messages.
    """

    def log(self, msg, x):
        """Logs the given message.

        :param msg: log entry text.
        :param x: an exception associated with the log entry or **None**.
        """
        pass

    def __del__(self):
        """Sometimes we get some protocol warnings. I would like to remove them
        by flushing the log cache here.
        """
        pass


class TcfValue(object):
    """TCF value container.

    A simple class to handle the value returned by an asynchronous TCF request.

    :param value: A value to initialise this TCF value class with.
    """
    def __init__(self, value=None):
        self._value = value

    def getValue(self):
        """Get this TCF stored value.

        If this TCF value has been set using the set() method, or at object
        creation time, the so-set value is returned. Else, **None** is
        returned.

        :returns: This TCF stored value, or **None**.
        """
        return (self._value)

    def setValue(self, value):
        """Set this TCF value.

        :param value: TCF value to set.

        :returns: **None**, always.
        """
        self._value = value


class StreamsListener(streams.StreamsListener):
    """A TCF streams service listener.

    :param service: streams service to listen to.
    """
    def __init__(self, service):
        self._service = service

    def created(self, streamType, streamID, contextID):
        """Called when a new stream is created.

        :param streamType: source type of the stream.
        :param streamID: ID of the stream.
        :param contextID: a context ID that is associated with the stream,
                          or **None**. Exact meaning of the context ID depends
                          on stream type. Stream types and context IDs are
                          defined by services that use Streams service to
                          transmit data.
        """
        class DoneRead(streams.DoneRead):
            """Call back interface for StreamsService.read() command.

            :param service: The streams service to read data from.
            :param streamID: The stream ID to read data from.
            :param size: Size of data to read on stream.
            """
            def __init__(self, service, streamID, size):
                self._service = service
                self._streamID = streamID
                self._size = size

            def doneRead(self, token, error, lost_size, data, eos):
                """Called when StreamsService.read() command is done.

                :param token: command handle.
                :param error: error object or **None**.
                :param lost_size: number of bytes that were lost because of
                                  buffer overflow. A *lost_size* of **-1**
                                  means unknown number of bytes were lost. If
                                  both *lost_size* and *data.length* are
                                  non-zero then lost bytes are considered
                                  located right before read bytes.
                :param data: bytes read from the stream.
                :param eos: true if end of stream was reached.
                """
                if data:
                    if isinstance(data, bytearray):
                        s = data.decode('utf-8')
                    else:
                        s = str(data)
                    sys.stdout.write(s)
                if not eos:
                    self._service.read(self._streamID, self._size, self)

        self._service.read(streamID, 4096, DoneRead(self._service, streamID,
                                                    4096))

    def disposed(self, streamType, streamID):
        """Called when a stream is disposed.

        :param streamType: source type of the stream.
        :param streamID: ID of the stream.
        """
        pass


def getService(connection, name):
    """Get a service proxy.

    The service proxy named *name* is retrieved from *connection*. If it
    exists (returned value is not **None**), it is then possible to send it
    TCF requests.

    :param connection: The connection to get service proxy from.
    :param name: The name of the service proxy to get from *connection*.

    :returns: A service proxy or **None**.
    """
    def callGetService(connection, condition, val):
        """Asynchronous request to get service proxy.

        :param connection: The connection to get service proxy from.
        :param condition: A threading.Condition the caller is pending on.
                          Caller is released from waiting through a
                          Condition.notify() call.
        :param val: A TcfValue handling the request returned value.

        :returns: **None**, always.
        """
        svc = connection.getRemoteService(name)
        val.setValue(svc)
        with condition:
            condition.notify()

    # create a condition to wait on, and a value to get the service proxy

    lock = threading.Condition()
    value = TcfValue()

    with lock:
        # Asynchronously call for the callGetService function.
        protocol.invokeLater(callGetService, connection=connection,
                             condition=lock, val=value)
        lock.wait(5)

    # Return TCF service proxy. May be None on timeout or missing service.

    return (value.getValue())


def getChildren(service, contextID=None):
    """Get a TCF context IDs from a given service.

    As many TCF services have a **getChildren()** command, this function is
    intended to implement a service-independant **getChildren()** command.

    :param service: The TCF service to get context list from.
    :param contextID: parent ID of the context list to get from *service*.

    :returns: A tuple of context IDs. Tuple may be empty on error, or if
              *contextID* does not have children.
    """
    def callGetChildren(service, condition, val):
        """Asynchronous request to get context children.

        :param service: The TCF service proxy to send request to.
        :param condition: A threading.Condition the caller is pending on.
                          Caller is released from waiting through a
                          Condition.notify() call.
        :param val: A TcfValue handling the request returned value.

        :returns: **None**, always.
        """
        class DoneGetChildren(object):
            """Client callback class for <service>.getChildren() command."""
            def doneGetChildren(self, token, error, ids):
                """Called when context list retrieval is done.

                :param token: pending command handle.
                :param error: error description if operation failed, **None**
                              if succeeded.
                :param context_ids: array of available context IDs.
                """
                if error:
                    protocol.log("Error from " + service.getName() +
                                 ".getContext()", error)
                else:
                    val.setValue(ids)
                with condition:
                    condition.notify()

        # start the process itself

        service.getChildren(contextID, DoneGetChildren())

    # create a condition to wait on, and a value to get the children ids

    lock = threading.Condition()
    value = TcfValue()

    with lock:
        # TCF requests must be called by the dispatch thread, wait for a
        # maximum of 10 seconds
        protocol.invokeLater(callGetChildren, service=service, condition=lock,
                             val=value)
        lock.wait(10)

    # Return the retrieved children IDs, or an empty tuple

    return (tuple(value.getValue() or []))


def getContext(service, contextID):
    """Get a TCF context from a given service.

    As most of the TCF services have a **getContext()** command, this
    function is intended to define a generic **getContext()** call which
    can address any service.

    As the function returns a context object, it is up to the caller to use it
    appropriately.

    For example, for the runcontrol service, check if the runcontrol context is
    a container :

    .. code-block:: python

        import tcf
        import tcf.services.runcontrol as runcontrol

        c = tcf.connect("TCP:127.0.0.1:1534")
        rcSvc = getService(c, runcontrol.NAME)
        rcIDs = getChildren(rcSvc, None)
        rcContext = getContext(rcSvc, rcIDs[0])

        print('Runcontrol context is a container: ' +
              str(rcContext.isContainer()))

    :param service: The TCF service to get context from.
    :param contextID: ID of the context to get from *service*

    :returns: A context properties dictionnary, or **None** on error.
    """
    def callGetContext(service, condition, val):
        """Asynchronous request to get context properties.

        :param service: The TCF service proxy to send request to.
        :param condition: A threading.Condition the caller is pending on.
                          Caller is released from waiting through a
                          Condition.notify() call.
        :param val: A TcfValue handling the request returned value.

        :returns: **None**, always.
        """
        class DoneGetContext(object):
            """Client callback class for <service>.getContext() command."""
            def doneGetContext(self, token, error, context):
                """Called when context data retrieval is done.

                :param token: pending command handle.
                :param error: error description if operation failed, **None**
                              if succeeded.
                :param context: context data.
                """
                if error:
                    protocol.log("Error from " + service.getName() +
                                 ".getContext()", error)
                else:
                    val.setValue(context)
                with condition:
                    condition.notify()

        # start the process itself

        service.getContext(contextID, DoneGetContext())

    # create a condition to wait on, and a value to get the children ids

    lock = threading.Condition()
    value = TcfValue()

    with lock:
        # TCF requests must be called by the dispatch thread, wait for a
        # maximum of 10 seconds
        protocol.invokeLater(callGetContext, service=service, condition=lock,
                             val=value)
        lock.wait(10)

    # Return the context properties, or None on error

    return (value.getValue())


def resume(context):
    """Resume a runcontrol context.

    The given *context* should be a RunControlContext, so that its resume()
    method may be called.

    :param context: A runcontrol context to resume.
    :type process: RunControlContext

    :returns: **None**, always.
    """
    def callResume(context, condition):
        """Asynchronous request to resume runcontrol context.

        :param context: The TCF RunControlContext to resume.
        :param condition: A threading.Condition the caller is pending on.
                          Caller is released from waiting through a
                          Condition.notify() call.

        :returns: **None**, always.
        """
        class DoneResume(runcontrol.DoneCommand):
            """Client call back interface for RunControlContext.resume()."""
            def doneCommand(self, token, error):
                """Called when run control command execution is complete.

                :param token: pending command handle.
                :param error: command execution error or **None**.
                """
                if error:
                    protocol.log("Error from RunContext.resume", error)
                with condition:
                    condition.notify()

        # Resume context with RM_RESUME mode, 1 time. No resume properties.

        context.resume(runcontrol.RM_RESUME, 1, {}, DoneResume())

    # create a condition to wait on

    lock = threading.Condition()

    with lock:
        # TCF requests must be called by the dispatch thread, wait for a
        # maximum of 10 seconds

        protocol.invokeLater(callResume, context=context, condition=lock)
        lock.wait(10)


def start(connection, path, *args):
    """Start a new process in suspended mode for the given connection.

    :param connection: The TCF connection to use services from.
    :param path: Path of the executable to start.
    :param args: command line arguments.
    """
    def callStart(connection, condition, path, arguments, val):
        """Asynchronous request to start a process.

        :param connection: The TCF connection to get processes service from.
        :param condition: A threading.Condition the caller is pending on.
                          Caller is released from waiting through a
                          Condition.notify() call.
        :param path: Path of the process to start on the target
        :param arguments: A list of program arguments for *path*
        :param val: A TcfValue handling the request returned value.

        :returns: **None**, always.
        """
        # get connection's processes service

        proc = connection.getRemoteService(processes_v1.NAME) or \
            connection.getRemoteService(processes.NAME)

        if not proc:
            with condition:
                print('No processes service available')
                condition.notify()
            return

        class DoneStart(processes.DoneStart):
            """Client callback interface for ProcessesService.start()."""

            def doneStart(self, token, error, process):
                """Called when process start is done.

                :param token: pending command handle.
                :param error: error description if operation failed, **None**
                              if  succeeded.
                :param process: ProcessContext object representing the started
                                process.
                """
                if error:
                    protocol.log("Error from Processes.start", error)
                else:
                    val.setValue(process)
                with condition:
                    condition.notify()

        # depending on the service, the start method only does 'doAttach', or
        # take a dictionnary of options

        if (proc.getName() == processes_v1.NAME):
            opts = {processes_v1.START_ATTACH: True,
                    processes_v1.START_USE_TERMINAL: True}
        else:
            opts = True

        # start the process itself

        cmdLine = [path]
        if arguments:
            cmdLine += arguments

        proc.start(os.path.dirname(path), path, cmdLine, None, opts,
                   DoneStart())

    # create a condition to wait on, and a value to get the children ids

    lock = threading.Condition()
    value = TcfValue()

    with lock:
        # TCF requests must be called by the dispatch thread, wait for a
        # maximum of 10 seconds
        protocol.invokeLater(callStart, connection=connection, condition=lock,
                             path=path, arguments=args, val=value)
        lock.wait(10)

    return (value.getValue())


def state(context):
    """Get the state of a RunControlContext.

    Getting the state of a RunControlContext is asynchronous.

    :param context: The context to get state for.
    :type context: RunControlContext

    :returns: A tuple of four elements representing the RunControl context
              state :

                  - A boolean stating if the context id suspended or not
                  - An integer repesenting the program counter of the context
                    (if it is suspended)
                  - A string representing the reason why the context is
                    suspended
                  - A dictionary of properties stating why the suspended state
                    properties (see runcontrol.STATE_*)
    """
    def callGetState(context, condition, val):
        """Asynchronous request to get the context state.

        :param context: The RunControlContext to get state for.
        :param condition: A threading.Condition the caller is pending on.
                          Caller is released from waiting through a
                          Condition.notify() call.
        :param val: A TcfValue handling the request returned value.

        :returns: **None**, always.
        """
        class DoneGetState(runcontrol.DoneGetState):
            """Client call back class for RunControlContext.getState()."""
            def doneGetState(self, token, error, suspended, pc, reason,
                             params):
                """Called when RunControlContext.getState() command execution
                is complete.

                :param token: pending command handle.
                :param error: command execution error or None.
                :param suspended: true if the context is suspended
                :param pc: program counter of the context (if suspended).
                :param reason: suspend reason (if suspended), see REASON_*.
                :param params: additional target specific data about context
                               state, see STATE_*.
                """
                if error:
                    protocol.log("Error from runcontrol.getState()", error)
                else:
                    val.setValue((suspended, pc, reason, params))
                with condition:
                    condition.notify()

        # start the process itself

        context.getState(DoneGetState())

    # create a condition to wait on, and a value to get the children ids

    lock = threading.Condition()
    value = TcfValue()

    with lock:
        # TCF requests must be called by the dispatch thread, wait for a
        # maximum of 10 seconds
        protocol.invokeLater(callGetState, context=context, condition=lock,
                             val=value)
        lock.wait(10)

    return (value.getValue())


# Some TCF initialisation


def subscribe(svc, streamType, listener):
    """Subscribe to a streams channel.

    :param svc: The TCF streams proxy to subscribe against.
    :param streamType: Type of the stream to register against. For now,
                       I only know of 'Terminals', 'Processes' and
                       'ProcessesV1' types.
    :param listener: The listener to subscribe to *svc*.
    :type listener: tcf.services.streams.StreamsListener

    :returns: **None**, always.
    """
    def callSubscribe(service, streamType, listener, condition):
        """Asynchronous request to subscribe a listener to streams service.

        :param service: The streams service to subscribe listener against.
        :param streamType: Type of the stream to register against. For now,
                           I only know of 'Terminals', 'Processes' and
                           'ProcessesV1' types.
        :param listener: The listener to subscribe to *service*.
        :type listener: tcf.services.streams.StreamsListener
        :param condition: A threading.Condition the caller is pending on.
                          Caller is released from waiting through a
                          Condition.notify() call.

        :returns: **None**, always.
        """
        class DoneSubscribe(streams.DoneSubscribe):
            """Call back interface for StreamsService.subscribe() command."""
            def doneSubscribe(self, token, error):
                """Called when stream subscription is done.

                :param token: pending command handle
                :param error: error description if operation failed, **None**
                              if succeeded.
                """
                if error:
                    protocol.log("Error from streams.subscribe()", error)
                with condition:
                    condition.notify()

        # start the process itself

        service.subscribe(streamType, listener, DoneSubscribe())

    # create a condition to wait on

    lock = threading.Condition()

    with lock:
        # TCF requests must be called by the dispatche thread, wait for a
        # maximum of 10 seconds
        protocol.invokeLater(callSubscribe, service=svc, streamType=streamType,
                             listener=listener, condition=lock)
        lock.wait(10)

# --------------------------------------------------------------------------- #

# TCF initialisation

protocol.startEventQueue()
protocol.setLogger(TcfProtocolLogger())
atexit.register(protocol.getEventQueue().shutdown)

try:
    c = tcf.connect("TCP:127.0.0.1:1534")
except Exception as e:
    protocol.log(e)
    sys.exit()

# If there is a streams service, listen to it

streamsSvc = getService(c, streams.NAME)
if streamsSvc:
    subscribe(streamsSvc, 'ProcessesV1', StreamsListener(streamsSvc))

p = start(c, '/bin/ls', '-l', '-a')

# this part is a bit tricky, the runcontrol contexts which accept a resume
# command are the contexts which have a state. Recursively try to find the
# runcontrol context which has a state, and resume it.

rcSvc = getService(c, runcontrol.NAME)

if rcSvc is None:
    print('No runcontrol service. Exiting ...')
    sys.exit()

context = getContext(rcSvc, p.getID())
print('Runcontrol context is a container: ' + str(context.isContainer()))

while context and not context.hasState():
    children = getChildren(rcSvc, context.getID())
    for child in children:
        context = getContext(rcSvc, child)
        if context and context.hasState():
            break

if context is None:
    print('No runcontrol context to resume. Exiting ...')
    sys.exit()

# get the state of this context. State is a tuple of
# (suspended, pc, reason, params)

ctxState = state(context)
print('Context state : ' + str(ctxState))

while ctxState and ctxState[0]:
    resume(context)
    # calling resume may end the context ... catch exceptions

    try:
        ctxState = state(context)
        if ctxState:
            print('Context state : ' + str(ctxState))
    except:
        pass

# Let the async calls the time to end ...

time.sleep(2)

Back to the top