Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/org.eclipse.etrice.runtime.cpp/src/common/messaging/MessageService.cpp')
-rw-r--r--runtime/org.eclipse.etrice.runtime.cpp/src/common/messaging/MessageService.cpp72
1 files changed, 55 insertions, 17 deletions
diff --git a/runtime/org.eclipse.etrice.runtime.cpp/src/common/messaging/MessageService.cpp b/runtime/org.eclipse.etrice.runtime.cpp/src/common/messaging/MessageService.cpp
index e2d74934c..62d62d66f 100644
--- a/runtime/org.eclipse.etrice.runtime.cpp/src/common/messaging/MessageService.cpp
+++ b/runtime/org.eclipse.etrice.runtime.cpp/src/common/messaging/MessageService.cpp
@@ -10,7 +10,6 @@
*
*******************************************************************************/
-#include "common/messaging/MessageDispatcher.h"
#include "common/messaging/MessageSeQueue.h"
#include "common/messaging/MessageService.h"
#include "common/messaging/MessageServiceController.h"
@@ -21,12 +20,16 @@
namespace etRuntime {
-MessageService::MessageService(IRTObject* parent, IMessageService::ExecMode mode, int node, int thread, const std::string& name, int priority) :
- AbstractMessageService(parent, name, node, thread),
+MessageService::MessageService(IRTObject* parent, IMessageService::ExecMode mode, int node, int thread, const std::string& name,
+ IMessageMemory* memory, int priority) :
+ RTObject(parent, name),
m_running(false),
m_execMode(mode),
m_lastMessageTimestamp(0),
- m_pollingMessage(getMessageDispatcher().getAddress(), 0) {
+ m_address(node, thread, 0),
+ m_messageQueue(this, "Queue"),
+ m_messageDispatcher(this, m_address.createInc(), "Dispatcher"),
+ m_messageMemory(memory) {
etTime interval;
interval.sec = 0;
@@ -35,12 +38,15 @@ MessageService::MessageService(IRTObject* parent, IMessageService::ExecMode mode
}
MessageService::MessageService(IRTObject* parent, IMessageService::ExecMode mode, etTime interval, int node, int thread,
- const std::string& name, int priority) :
- AbstractMessageService(parent, name, node, thread),
+ const std::string& name, IMessageMemory* memory, int priority) :
+ RTObject(parent, name),
m_running(false),
m_execMode(mode),
m_lastMessageTimestamp(0),
- m_pollingMessage(getMessageDispatcher().getAddress(), 0) {
+ m_address(node, thread, 0),
+ m_messageQueue(this, "Queue"),
+ m_messageDispatcher(this, m_address.createInc(), "Dispatcher"),
+ m_messageMemory(memory) {
MessageService_init(interval, priority);
}
@@ -67,12 +73,18 @@ void MessageService::MessageService_init(etTime interval, int priority) {
}
MessageService::~MessageService() {
+ while(m_messageQueue.getSize() > 0) {
+ const Message* msg = m_messageQueue.pop();
+ msg->~Message();
+ returnMessageBuffer(msg);
+ }
etMutex_destruct(&m_mutex);
etSema_destruct(&m_executionSemaphore);
etThread_destruct(&m_thread);
if (m_execMode == IMessageService::POLLED || m_execMode == IMessageService::MIXED) {
etTimer_destruct(&m_timer);
}
+ delete m_messageMemory;
}
void MessageService::start() {
@@ -86,7 +98,7 @@ void MessageService::start() {
void MessageService::run() {
while (m_running) {
etMutex_enter(&m_mutex);
- const Message* msg = getMessageQueue().pop(); // get next Message from Queue
+ const Message* msg = m_messageQueue.pop(); // get next Message from Queue
etMutex_leave(&m_mutex);
if (msg == 0) {
// no message in queue -> wait till Thread is notified
@@ -94,7 +106,7 @@ void MessageService::run() {
} else {
//TODO: set timestamp
// m_lastMessageTimestamp = System.currentTimeMillis();
- getMessageDispatcher().receive(msg);
+ m_messageDispatcher.receive(msg);
}
}
@@ -103,14 +115,16 @@ void MessageService::run() {
void MessageService::receive(const Message* msg) {
etMutex_enter(&m_mutex);
- AbstractMessageService::receive(msg);
+ if (msg != 0) {
+ m_messageQueue.push(const_cast<Message*>(msg));
+ }
etSema_wakeup(&m_executionSemaphore);
etMutex_leave(&m_mutex);
}
Address MessageService::getFreeAddress() {
etMutex_enter(&m_mutex);
- Address address = AbstractMessageService::getFreeAddress();
+ Address address = m_messageDispatcher.getFreeAddress();
etMutex_leave(&m_mutex);
return address;
@@ -118,34 +132,51 @@ Address MessageService::getFreeAddress() {
void MessageService::freeAddress(const Address& addr) {
etMutex_enter(&m_mutex);
- AbstractMessageService::freeAddress(addr);
+ m_messageDispatcher.freeAddress(addr);
etMutex_leave(&m_mutex);
}
void MessageService::addMessageReceiver(IMessageReceiver& receiver) {
etMutex_enter(&m_mutex);
- AbstractMessageService::addMessageReceiver(receiver);
+ m_messageDispatcher.addMessageReceiver(receiver);
etMutex_leave(&m_mutex);
}
void MessageService::removeMessageReceiver(IMessageReceiver& receiver) {
etMutex_enter(&m_mutex);
- AbstractMessageService::removeMessageReceiver(receiver);
+ m_messageDispatcher.removeMessageReceiver(receiver);
etMutex_leave(&m_mutex);
}
void MessageService::addPollingMessageReceiver(IMessageReceiver& receiver) {
etMutex_enter(&m_mutex);
- AbstractMessageService::addPollingMessageReceiver(receiver);
+ m_messageDispatcher.addPollingMessageReceiver(receiver);
etMutex_leave(&m_mutex);
}
void MessageService::removePollingMessageReceiver(IMessageReceiver& receiver) {
etMutex_enter(&m_mutex);
- AbstractMessageService::removePollingMessageReceiver(receiver);
+ m_messageDispatcher.removePollingMessageReceiver(receiver);
+ etMutex_leave(&m_mutex);
+}
+
+Message* MessageService::getMessageBuffer(int size) {
+ etMutex_enter(&m_mutex);
+ Message* buffer = m_messageMemory->getMessageBuffer(size);
+ etMutex_leave(&m_mutex);
+ return buffer;
+}
+
+void MessageService::returnMessageBuffer(const Message* buffer) {
+ etMutex_enter(&m_mutex);
+ m_messageMemory->returnMessageBuffer(buffer);
etMutex_leave(&m_mutex);
}
+std::string MessageService::toString() const {
+ return getName() + " " + getAddress().toID();
+}
+
void MessageService::terminate() {
if (m_execMode == IMessageService::POLLED || m_execMode == IMessageService::MIXED) {
etTimer_stop(&m_timer);
@@ -160,7 +191,14 @@ void MessageService::terminate() {
// called by osal timer, thread ?
void MessageService::pollingTask() {
if (m_running) {
- receive(&m_pollingMessage);
+ Message* pollingMessage = getMessageBuffer(sizeof(Message));
+ if (pollingMessage) {
+ new (pollingMessage) Message(m_messageDispatcher.getAddress(), 0);
+ receive(pollingMessage);
+ }
+ else {
+ // TODO JB: error handling for pollingMessage == NULL
+ }
}
}

Back to the top