diff options
Diffstat (limited to 'runtime/org.eclipse.etrice.runtime.cpp/src/common/messaging/MessageService.cpp')
-rw-r--r-- | runtime/org.eclipse.etrice.runtime.cpp/src/common/messaging/MessageService.cpp | 72 |
1 files changed, 55 insertions, 17 deletions
diff --git a/runtime/org.eclipse.etrice.runtime.cpp/src/common/messaging/MessageService.cpp b/runtime/org.eclipse.etrice.runtime.cpp/src/common/messaging/MessageService.cpp index e2d74934c..62d62d66f 100644 --- a/runtime/org.eclipse.etrice.runtime.cpp/src/common/messaging/MessageService.cpp +++ b/runtime/org.eclipse.etrice.runtime.cpp/src/common/messaging/MessageService.cpp @@ -10,7 +10,6 @@ * *******************************************************************************/ -#include "common/messaging/MessageDispatcher.h" #include "common/messaging/MessageSeQueue.h" #include "common/messaging/MessageService.h" #include "common/messaging/MessageServiceController.h" @@ -21,12 +20,16 @@ namespace etRuntime { -MessageService::MessageService(IRTObject* parent, IMessageService::ExecMode mode, int node, int thread, const std::string& name, int priority) : - AbstractMessageService(parent, name, node, thread), +MessageService::MessageService(IRTObject* parent, IMessageService::ExecMode mode, int node, int thread, const std::string& name, + IMessageMemory* memory, int priority) : + RTObject(parent, name), m_running(false), m_execMode(mode), m_lastMessageTimestamp(0), - m_pollingMessage(getMessageDispatcher().getAddress(), 0) { + m_address(node, thread, 0), + m_messageQueue(this, "Queue"), + m_messageDispatcher(this, m_address.createInc(), "Dispatcher"), + m_messageMemory(memory) { etTime interval; interval.sec = 0; @@ -35,12 +38,15 @@ MessageService::MessageService(IRTObject* parent, IMessageService::ExecMode mode } MessageService::MessageService(IRTObject* parent, IMessageService::ExecMode mode, etTime interval, int node, int thread, - const std::string& name, int priority) : - AbstractMessageService(parent, name, node, thread), + const std::string& name, IMessageMemory* memory, int priority) : + RTObject(parent, name), m_running(false), m_execMode(mode), m_lastMessageTimestamp(0), - m_pollingMessage(getMessageDispatcher().getAddress(), 0) { + m_address(node, thread, 0), + m_messageQueue(this, "Queue"), + m_messageDispatcher(this, m_address.createInc(), "Dispatcher"), + m_messageMemory(memory) { MessageService_init(interval, priority); } @@ -67,12 +73,18 @@ void MessageService::MessageService_init(etTime interval, int priority) { } MessageService::~MessageService() { + while(m_messageQueue.getSize() > 0) { + const Message* msg = m_messageQueue.pop(); + msg->~Message(); + returnMessageBuffer(msg); + } etMutex_destruct(&m_mutex); etSema_destruct(&m_executionSemaphore); etThread_destruct(&m_thread); if (m_execMode == IMessageService::POLLED || m_execMode == IMessageService::MIXED) { etTimer_destruct(&m_timer); } + delete m_messageMemory; } void MessageService::start() { @@ -86,7 +98,7 @@ void MessageService::start() { void MessageService::run() { while (m_running) { etMutex_enter(&m_mutex); - const Message* msg = getMessageQueue().pop(); // get next Message from Queue + const Message* msg = m_messageQueue.pop(); // get next Message from Queue etMutex_leave(&m_mutex); if (msg == 0) { // no message in queue -> wait till Thread is notified @@ -94,7 +106,7 @@ void MessageService::run() { } else { //TODO: set timestamp // m_lastMessageTimestamp = System.currentTimeMillis(); - getMessageDispatcher().receive(msg); + m_messageDispatcher.receive(msg); } } @@ -103,14 +115,16 @@ void MessageService::run() { void MessageService::receive(const Message* msg) { etMutex_enter(&m_mutex); - AbstractMessageService::receive(msg); + if (msg != 0) { + m_messageQueue.push(const_cast<Message*>(msg)); + } etSema_wakeup(&m_executionSemaphore); etMutex_leave(&m_mutex); } Address MessageService::getFreeAddress() { etMutex_enter(&m_mutex); - Address address = AbstractMessageService::getFreeAddress(); + Address address = m_messageDispatcher.getFreeAddress(); etMutex_leave(&m_mutex); return address; @@ -118,34 +132,51 @@ Address MessageService::getFreeAddress() { void MessageService::freeAddress(const Address& addr) { etMutex_enter(&m_mutex); - AbstractMessageService::freeAddress(addr); + m_messageDispatcher.freeAddress(addr); etMutex_leave(&m_mutex); } void MessageService::addMessageReceiver(IMessageReceiver& receiver) { etMutex_enter(&m_mutex); - AbstractMessageService::addMessageReceiver(receiver); + m_messageDispatcher.addMessageReceiver(receiver); etMutex_leave(&m_mutex); } void MessageService::removeMessageReceiver(IMessageReceiver& receiver) { etMutex_enter(&m_mutex); - AbstractMessageService::removeMessageReceiver(receiver); + m_messageDispatcher.removeMessageReceiver(receiver); etMutex_leave(&m_mutex); } void MessageService::addPollingMessageReceiver(IMessageReceiver& receiver) { etMutex_enter(&m_mutex); - AbstractMessageService::addPollingMessageReceiver(receiver); + m_messageDispatcher.addPollingMessageReceiver(receiver); etMutex_leave(&m_mutex); } void MessageService::removePollingMessageReceiver(IMessageReceiver& receiver) { etMutex_enter(&m_mutex); - AbstractMessageService::removePollingMessageReceiver(receiver); + m_messageDispatcher.removePollingMessageReceiver(receiver); + etMutex_leave(&m_mutex); +} + +Message* MessageService::getMessageBuffer(int size) { + etMutex_enter(&m_mutex); + Message* buffer = m_messageMemory->getMessageBuffer(size); + etMutex_leave(&m_mutex); + return buffer; +} + +void MessageService::returnMessageBuffer(const Message* buffer) { + etMutex_enter(&m_mutex); + m_messageMemory->returnMessageBuffer(buffer); etMutex_leave(&m_mutex); } +std::string MessageService::toString() const { + return getName() + " " + getAddress().toID(); +} + void MessageService::terminate() { if (m_execMode == IMessageService::POLLED || m_execMode == IMessageService::MIXED) { etTimer_stop(&m_timer); @@ -160,7 +191,14 @@ void MessageService::terminate() { // called by osal timer, thread ? void MessageService::pollingTask() { if (m_running) { - receive(&m_pollingMessage); + Message* pollingMessage = getMessageBuffer(sizeof(Message)); + if (pollingMessage) { + new (pollingMessage) Message(m_messageDispatcher.getAddress(), 0); + receive(pollingMessage); + } + else { + // TODO JB: error handling for pollingMessage == NULL + } } } |