Skip to main content
aboutsummaryrefslogtreecommitdiffstats
blob: 264b4fbcec9ae0053441df66f0cf27ccd35b8bff (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
/*
 * MessageService.cpp
 *
 *  Created on: 22.08.2012
 *      Author: karlitsc
 */

#include "MessageService.h"

namespace etRuntime {

MessageService::MessageService(IRTObject* parent, Address addr, std::string name, int priority)
	:	// super("MessageService "+name),
		IMessageReceiver(),
		IRTObject(),
		m_parent(parent),
		m_name(name),
		m_running(false),
		m_messageQueue(this, "Queue"),
		m_messageDispatcher( this,
				Address(addr.m_nodeID, addr.m_threadID,	addr.m_objectID + 1),
				"Dispatcher"),
		m_address(addr),
		m_lastMessageTimestamp(0),
        m_asyncActors()
{

	// check and set priority
//			assert priority >= Thread.MIN_PRIORITY : ("priority smaller than Thread.MIN_PRIORITY (1)");
//			assert priority <= Thread.MAX_PRIORITY : ("priority bigger than Thread.MAX_PRIORITY (10)");
//			this.setPriority(priority);

}

MessageService::~MessageService() {
	m_parent = 0;
}

void MessageService::run() {
	m_running = true;
	while (m_running) {
		pollOneMessage();
	}
}

void MessageService::runOnce() {
	pollAsyncActors();
	while (m_messageQueue.isNotEmpty()){
		pollOneMessage();
	}
}

//TODO: synchronized
void MessageService::receive(Message* msg) {
	if (msg != 0) {
		m_messageQueue.push(msg);
		//TODO
		//notifyAll(); // wake up thread to compute message
	}
}

std::string MessageService::getInstancePath() const {
	std::string path = PATH_DELIM + m_name;

	if (m_parent != 0)
		path = m_parent->getInstancePath() + path;

	return path;
}

std::string MessageService::getInstancePathName() const {
	std::string path = PATHNAME_DELIM + m_name;

	if (m_parent != 0)
		path = m_parent->getInstancePathName() + path;

	return path;
}

// TODO: synchronized
void MessageService::terminate() {
	if (m_running) {
		m_running = false;
		//TODO:
		// notifyAll();
	}
}
//TODO: synchronized
void MessageService::pollOneMessage() {
	Message* msg = m_messageQueue.pop(); // get next Message from Queue
	if (msg == 0) {
		//TODO:
		// no message in queue -> wait till Thread is notified
//		try {
//			wait();
//		} catch (InterruptedException e) {
//		}
	} else {
		//TODO: set timestamp
		// m_lastMessageTimestamp = System.currentTimeMillis();
		m_messageDispatcher.receive(msg);
	}

}

void MessageService::addAsyncActor(IEventReceiver& evtReceiver) {
	m_asyncActors.push_back(&evtReceiver);
}

void MessageService::pollAsyncActors() {
	std::vector<IEventReceiver*>::iterator it = m_asyncActors.begin();
	for ( ; it != m_asyncActors.end(); ++it) {
		// polling event
		(*it)->receiveEvent(0,0,0);
	}
}


} /* namespace etRuntime */

Back to the top