Skip to main content
aboutsummaryrefslogtreecommitdiffstats
blob: 21153ce565d272a6cf00d3510015b723d44f48d8 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
/*******************************************************************************
 * Copyright (c) 2012 Draeger Medical GmbH (http://www.draeger.com).
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/legal/epl-v10.html
 *
 * CONTRIBUTORS:
 * 		Peter Karlitschek (initial contribution)
 *
 *******************************************************************************/

#include "MessageService.h"
#include <iostream>
#include <stdlib.h>
#include <unistd.h>

namespace etRuntime {

extern "C" {
	void* threadStarter(void* arg) {
		MessageService* thisPtr = static_cast<MessageService*>(arg);
		std::cout << "Thread " << thisPtr->getName() << " is started." << std::endl;
		thisPtr->run();
		return arg;
		//		pthread_exit(NULL);
	}
}

MessageService::MessageService(IRTObject* parent, Address addr, std::string name, int priority)
	:	IMessageReceiver(),
		RTObject(parent, name),
		m_running(false),
		m_thread(),
		m_mutex(),
		m_mutexAttr(),
		m_conditionVar(),
		m_threadAttr(),
		m_messageQueue(this, "Queue"),
		m_messageDispatcher( this,
				Address(addr.m_nodeID, addr.m_threadID,	addr.m_objectID + 1),
				"Dispatcher"),
		m_address(addr),
		m_lastMessageTimestamp(0),
        m_asyncActors()
{
	pthread_mutexattr_init(&m_mutexAttr);
	pthread_mutexattr_settype(&m_mutexAttr, PTHREAD_MUTEX_RECURSIVE);
	pthread_mutex_init(&m_mutex, &m_mutexAttr);
    pthread_cond_init (&m_conditionVar, NULL);
    pthread_attr_init(&m_threadAttr);
    pthread_attr_setdetachstate(&m_threadAttr, PTHREAD_CREATE_JOINABLE);

	// check and set priority
//			assert priority >= Thread.MIN_PRIORITY : ("priority smaller than Thread.MIN_PRIORITY (1)");
//			assert priority <= Thread.MAX_PRIORITY : ("priority bigger than Thread.MAX_PRIORITY (10)");
//			this.setPriority(priority);

}

MessageService::~MessageService() {
	pthread_attr_destroy(&m_threadAttr);
	pthread_mutex_destroy(&m_mutex);
	pthread_cond_destroy(&m_conditionVar);
	pthread_exit(NULL);

}

void MessageService::start(bool singlethreaded) {
	if (singlethreaded) {
		std::cout << "starting message service " << getName() << " singlethreaded" << std::endl;
	}
	else {
		std::cout << "starting message service " << getName() << " on own thread" << std::endl;
		int rc = pthread_create(&m_thread, &m_threadAttr, threadStarter, static_cast<void *>(this));
		if (rc){
			 std::cout << "ERROR; return code from pthread_create() is " << rc << std::endl;
			 exit(-1);
		}
	}
}

void MessageService::join() {
	void *status;
	int rc = pthread_join(m_thread, &status);
    if (rc){
         std::cout << "ERROR; return code from pthread_join() is " << rc << std::endl;
         exit(-1);
    }
}
void MessageService::run() {
	m_running = true;
	while (m_running) {
		pollOneMessage();
		usleep(10000);
	}
	std::cout << "ending message service " << getName() << " on own thread" << std::endl;
}

void MessageService::runOnce() {
	pollAsyncActors();
	while (m_messageQueue.isNotEmpty()){
		Message* msg = m_messageQueue.pop(); // get next Message from Queue
		if (msg != 0) {
			m_messageDispatcher.receive(msg);
		}
	}
}

void MessageService::receive(Message* msg) {
	pthread_mutex_lock(&m_mutex);
	if (msg != 0) {
		m_messageQueue.push(msg);
		pthread_cond_signal(&m_conditionVar); // wake up thread to compute message
	}
	pthread_mutex_unlock(&m_mutex);
}


// TODO: synchronized
void MessageService::terminate() {
	pthread_mutex_lock(&m_mutex);
	if (m_running) {
		m_running = false;
		pthread_cond_signal(&m_conditionVar); // wake up thread to terminate
	}
	pthread_mutex_unlock(&m_mutex);
}

void MessageService::pollOneMessage() {
	pthread_mutex_lock(&m_mutex);
	Message* msg = m_messageQueue.pop(); // get next Message from Queue
	if (msg == 0) {
		// no message in queue -> wait till Thread is notified
		pthread_cond_wait(&m_conditionVar, &m_mutex);
	} else {
		//TODO: set timestamp
		// m_lastMessageTimestamp = System.currentTimeMillis();
		m_messageDispatcher.receive(msg);
	}
	pthread_mutex_unlock(&m_mutex);

}

void MessageService::addAsyncActor(IEventReceiver& evtReceiver) {
	m_asyncActors.push_back(&evtReceiver);
}

void MessageService::pollAsyncActors() {
	std::vector<IEventReceiver*>::iterator it = m_asyncActors.begin();
	for ( ; it != m_asyncActors.end(); ++it) {
		// polling event
		(*it)->receiveEvent(0,0,0);
	}
}

} /* namespace etRuntime */

Back to the top