1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
/*
* MessageService.cpp
*
* Created on: 22.08.2012
* Author: karlitsc
*/
#include "MessageService.h"
namespace etRuntime {
MessageService::MessageService(IRTObject* parent, Address addr, std::string name, int priority)
: // super("MessageService "+name),
IMessageReceiver(),
IRTObject(),
m_parent(parent),
m_name(name),
m_running(false),
m_messageQueue(this, "Queue"),
m_messageDispatcher( this,
Address(addr.m_nodeID, addr.m_threadID, addr.m_objectID + 1),
"Dispatcher"),
m_address(addr),
m_lastMessageTimestamp(0),
m_asyncActors()
{
// check and set priority
// assert priority >= Thread.MIN_PRIORITY : ("priority smaller than Thread.MIN_PRIORITY (1)");
// assert priority <= Thread.MAX_PRIORITY : ("priority bigger than Thread.MAX_PRIORITY (10)");
// this.setPriority(priority);
}
MessageService::~MessageService() {
m_parent = 0;
}
void MessageService::run() {
m_running = true;
while (m_running) {
pollOneMessage();
}
}
void MessageService::runOnce() {
pollAsyncActors();
while (m_messageQueue.isNotEmpty()){
pollOneMessage();
}
}
//TODO: synchronized
void MessageService::receive(Message* msg) {
if (msg != 0) {
m_messageQueue.push(msg);
//TODO
//notifyAll(); // wake up thread to compute message
}
}
std::string MessageService::getInstancePath() const {
std::string path = PATH_DELIM + m_name;
if (m_parent != 0)
path = m_parent->getInstancePath() + path;
return path;
}
std::string MessageService::getInstancePathName() const {
std::string path = PATHNAME_DELIM + m_name;
if (m_parent != 0)
path = m_parent->getInstancePathName() + path;
return path;
}
// TODO: synchronized
void MessageService::terminate() {
if (m_running) {
m_running = false;
//TODO:
// notifyAll();
}
}
//TODO: synchronized
void MessageService::pollOneMessage() {
Message* msg = m_messageQueue.pop(); // get next Message from Queue
if (msg == 0) {
//TODO:
// no message in queue -> wait till Thread is notified
// try {
// wait();
// } catch (InterruptedException e) {
// }
} else {
//TODO: set timestamp
// m_lastMessageTimestamp = System.currentTimeMillis();
m_messageDispatcher.receive(msg);
}
}
void MessageService::addAsyncActor(IEventReceiver& evtReceiver) {
m_asyncActors.push_back(&evtReceiver);
}
void MessageService::pollAsyncActors() {
std::vector<IEventReceiver*>::iterator it = m_asyncActors.begin();
for ( ; it != m_asyncActors.end(); ++it) {
// polling event
(*it)->receiveEvent(0,0,0);
}
}
} /* namespace etRuntime */
|