Skip to main content
aboutsummaryrefslogtreecommitdiffstats
blob: 4374c507903f5f6e45a41c1cfa560a9c89267047 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
/*******************************************************************************
 * Copyright (c) 2012 Draeger Medical GmbH (http://www.draeger.com).
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/legal/epl-v10.html
 *
 * CONTRIBUTORS:
 * 		Peter Karlitschek (initial contribution)
 *
 *******************************************************************************/

#include "common/messaging/Message.h"
#include "common/messaging/MessageDispatcher.h"
#include "common/messaging/IMessageService.h"

namespace etRuntime {

MessageDispatcher::MessageDispatcher(IRTObject* parent, const Address& addr, const String& name) :
		RTObject(parent, name),
		m_local_map(),
		m_freeAdresses(),
		m_pollingMessageReceiver(),
		m_address(addr),
		m_nextFreeObjId(addr.m_objectID + 1) {
	m_local_map[addr.m_objectID] = this;
}

Address MessageDispatcher::getFreeAddress() {
	if (m_freeAdresses.empty()) {
		return Address(getAddress().m_nodeID, getAddress().m_threadID, m_nextFreeObjId++);
	}
	else {
		Address address = m_freeAdresses.back();
		m_freeAdresses.pop_back();

		return address;
	}
}
void MessageDispatcher::freeAddress(const Address& addr) {
	m_freeAdresses.push_back(addr);
}
void MessageDispatcher::addMessageReceiver(IMessageReceiver& receiver) {
	if (!receiver.getAddress().isValid())
		return;

	if (receiver.getAddress().m_nodeID == m_address.m_nodeID
			&& receiver.getAddress().m_threadID == m_address.m_threadID) {

		m_local_map[receiver.getAddress().m_objectID] = &receiver;
	}

}

void MessageDispatcher::removeMessageReceiver(IMessageReceiver& receiver) {
	if (!receiver.getAddress().isValid())
		return;

	m_local_map.erase(receiver.getAddress().m_objectID);
}

void MessageDispatcher::addPollingMessageReceiver(IMessageReceiver& receiver) {
	m_pollingMessageReceiver.insert(&receiver);
}
void MessageDispatcher::removePollingMessageReceiver(IMessageReceiver& receiver) {
	m_pollingMessageReceiver.erase(&receiver);
}

void MessageDispatcher::receive(const Message* msg) {

	IMessageReceiver* receiver = 0;
	if (msg->getAddress().m_nodeID == m_address.m_nodeID && msg->getAddress().m_threadID == m_address.m_threadID) {
		Map<int, IMessageReceiver*>::iterator it = m_local_map.find(msg->getAddress().m_objectID);
		if (it != m_local_map.end()) {
			receiver = it->second;
		}
	}
	if (receiver == this) {
		for (Set<IMessageReceiver*>::iterator it = m_pollingMessageReceiver.begin();
				it != m_pollingMessageReceiver.end(); ++it) {
			(*it)->receive(msg);
		}
	} else if (receiver != 0) {
		receiver->receive(msg);
		// TODO: error handling for not found addresses
	}

	msg->~Message();
	dynamic_cast<IMessageService*>(getParent())->returnMessageBuffer(msg);
}

String MessageDispatcher::toString() const {
	return getName() + " " + getAddress().toID().c_str();
}

} /* namespace etRuntime */

Back to the top