Skip to main content
aboutsummaryrefslogtreecommitdiffstats
blob: a5f1a76aedc765893878f7b4ba340e92d924fe63 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
/*******************************************************************************
 * Copyright (c) 2010 protos software gmbh (http://www.protos.de).
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/legal/epl-v10.html
 * 
 * CONTRIBUTORS:
 * 		Thomas Schuetz and Henrik Rentz-Reichert (initial contribution)
 * 
 *******************************************************************************/


package org.eclipse.etrice.runtime.java.messaging;


/**
 * The MessageService is the backbone of the asynchronous communication inside a SubSystem
 * It usually contains a thread a message queue and a dispatcher
 * 
 * @author Thomas Schuetz (initial contribution)
 * @author Henrik Rentz-Reichert (extending RTObject, implementing Runnable)
 *
 */
public class MessageService extends RTObject implements IMessageService {

	private boolean running = false;
	
	// TODO: add internal message queue for less locks (faster thread internal messaging)
	private MessageSeQueue messageQueue = null;
	private MessageDispatcher messageDispatcher = null;
	private Address address = null;
	private long lastMessageTimestamp;
	private Thread thread;
	private int priority;

	public MessageService(IRTObject parent, ExecMode mode, int node, int thread, String name) {
		this(parent, mode, 0, node, thread, name, Thread.NORM_PRIORITY);
	}
	
	public MessageService(IRTObject parent, ExecMode mode, int nsec, int node, int thread, String name) {
		this(parent, mode, nsec, node, thread, name, Thread.NORM_PRIORITY);
	}
	
	public MessageService(IRTObject parent, ExecMode mode, int nsec, int node, int thread, String name, int priority) {
		super(parent, "MessageService_"+name);
		
		address = new Address(node, thread, 0);
		this.priority = priority;

		assert priority >= Thread.MIN_PRIORITY : ("priority smaller than Thread.MIN_PRIORITY (1)"); 
		assert priority <= Thread.MAX_PRIORITY : ("priority bigger than Thread.MAX_PRIORITY (10)"); 

		// instantiate dispatcher and queue
		messageDispatcher = new MessageDispatcher(this, new Address(address.nodeID,address.threadID, address.objectID + 1), "Dispatcher");
		messageQueue = new MessageSeQueue(this, "Queue");
	}

	@Override
	public Address getAddress() {
		return address;
	}

	public synchronized void run() {
		running = true;
		while (running) {
			pollOneMessage();
		}
	}

	@Override
	public synchronized void receive(Message msg) {
		if (msg!=null) {
			messageQueue.push(msg);
			notifyAll(); // wake up thread to compute message
		}
	}

	private synchronized void pollOneMessage() {
		Message msg = messageQueue.pop(); // get next Message from Queue
		if (msg == null) {
			// no message in queue -> wait till Thread is notified
			try {
				wait();
			}
			catch (InterruptedException e) {
			}
		}
		else {
			lastMessageTimestamp = System.currentTimeMillis();
			messageDispatcher.receive(msg);
		}

	}

	public Address getFreeAddress() {
		return messageDispatcher.getFreeAddress();
	}
	
	public void freeAddress(Address addr) {
		messageDispatcher.freeAddress(addr);
	}
	
	// protected methods for sole use by test cases
	protected MessageSeQueue getMessageQueue() {
		return messageQueue;
	}

	public synchronized MessageDispatcher getMessageDispatcher() {
		return messageDispatcher;
	}

	protected synchronized long getLastMessageTimestamp() {
		return lastMessageTimestamp;
	}
	
	public synchronized void terminate() {
		if (running) {
			running = false;
			notifyAll();
		}
	}

	/* (non-Javadoc)
	 * @see org.eclipse.etrice.runtime.java.messaging.IMessageService#setThread(java.lang.Thread)
	 */
	public void setThread(Thread thread) {
		this.thread = thread;
		
		thread.setPriority(priority);
	}

	/* (non-Javadoc)
	 * @see org.eclipse.etrice.runtime.java.messaging.IMessageService#getThread()
	 */
	public Thread getThread() {
		return thread;
	}

}

Back to the top