Skip to main content
aboutsummaryrefslogtreecommitdiffstats
blob: d2a6bac4d097b1d5b3c88ea62a5ccd5ae85f2236 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
/*******************************************************************************
 * Copyright (c) 2010 protos software gmbh (http://www.protos.de).
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/legal/epl-v10.html
 * 
 * CONTRIBUTORS:
 * 		Thomas Schuetz and Henrik Rentz-Reichert (initial contribution)
 * 
 *******************************************************************************/


package org.eclipse.etrice.runtime.java.messaging;

/**
 * The MessageService the backbone of the asynchroneous communication inside a SubSystem
 * It usually contains a thread a message queue and a dispatcher
 * 
 * @author Thomas Schuetz
 *
 */
public class MessageService extends Thread implements IMessageReceiver,
		IRTObject {

	private IRTObject parent = null;
	private String name = NO_NAME;
	private boolean running = false;
	
	// TODO: add internal message queue for less locks (faster thread internal
	// messaging)
	private MessageSeQueue messageQueue = null;
	private MessageDispatcher messageDispatcher = null;
	private Address address = null;
	private long lastMessageTimestamp;

	public MessageService(IRTObject parent, Address addr, String name) {
		this(parent, addr, name, Thread.NORM_PRIORITY);
	}
	
	
	public MessageService(IRTObject parent, Address addr, String name, int priority) {
		super("MessageService "+name);
		
		this.parent = parent;
		address = addr;
		this.name = name;

		// check and set priority 
		assert priority >= Thread.MIN_PRIORITY : ("priority smaller than Thread.MIN_PRIORITY (1)"); 
		assert priority <= Thread.MAX_PRIORITY : ("priority bigger than Thread.MAX_PRIORITY (10)"); 
		this.setPriority(priority);

		// instantiate dispatcher and queue
		messageDispatcher = new MessageDispatcher(this, new Address(addr.nodeID,addr.threadID, addr.objectID + 1), "Dispatcher");
		messageQueue = new MessageSeQueue(this, "Queue");
	}

	@Override
	public Address getAddress() {
		return address;
	}

	public synchronized void run() {
		running = true;
		while (running) {
			pollOneMessage();
		}
	}

	@Override
	public synchronized void receive(Message msg) {
		if (msg!=null) {
			messageQueue.push(msg);
			notifyAll(); // wake up thread to compute message
		}
	}

	private synchronized void pollOneMessage() {
		Message msg = messageQueue.pop(); // get next Message from Queue
		if (msg == null) {
			// no message in queue -> wait till Thread is notified
			try {
				wait();
			}
			catch (InterruptedException e) {
			}
		}
		else {
			lastMessageTimestamp = System.currentTimeMillis();
			messageDispatcher.receive(msg);
		}

	}

	public Address getFreeAddress() {
		return messageDispatcher.getFreeAddress();
	}
	
	public void freeAddress(Address addr) {
		messageDispatcher.freeAddress(addr);
	}
	
	// protected methods for sole use by test cases
	protected MessageSeQueue getMessageQueue() {
		return messageQueue;
	}

	public synchronized MessageDispatcher getMessageDispatcher() {
		return messageDispatcher;
	}

	protected synchronized long getLastMessageTimestamp() {
		return lastMessageTimestamp;
	}

	@Override
	public String getInstancePath(char delim) {
		String path = delim + name;
		
		if (parent!=null)
			path = parent.getInstancePath()+path;
		
		return path;
	}
	
	@Override
	public String getInstancePath() {
		return getInstancePath(PATH_DELIM);
	}

	@Override
	public String getInstancePathName() {
		return getInstancePath(PATHNAME_DELIM);
	}
	
	public synchronized void terminate() {
		if (running) {
			running = false;
			notifyAll();
		}
	}
}

Back to the top