Skip to main content
aboutsummaryrefslogtreecommitdiffstats
blob: a12463a3cd95dc0bea292aae7f2f480275b14aaa (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
/*******************************************************************************
 * Copyright (c) 2010 protos software gmbh (http://www.protos.de).
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/legal/epl-v10.html
 * 
 * CONTRIBUTORS:
 * 		Thomas Schuetz and Henrik Rentz-Reichert (initial contribution)
 * 
 *******************************************************************************/


package org.eclipse.etrice.runtime.java.messaging;

/**
 * The MessageService the backbone of the asynchroneous communication inside a SubSystem
 * It usually contains a thread a message queue and a dispatcher
 * 
 * @author Thomas Schuetz
 *
 */
public class MessageService extends Thread implements IMessageReceiver,
		IRTObject {

	private IRTObject parent = null;
	private String name = NO_NAME;
	private boolean running = false;
	
	// TODO: add internal message queue for less locks (faster thread internal
	// messaging)
	private MessageSeQueue messageQueue = null;
	private MessageDispatcher messageDispatcher = null;
	private Address address = null;
	private long lastMessageTimestamp;

	public MessageService(IRTObject parent, Address addr, String name) {
		this.parent = parent;
		address = addr;
		this.name = name;
		messageDispatcher = new MessageDispatcher(this, new Address(addr.nodeID,addr.threadID, addr.objectID + 1), "Dispatcher");
		messageQueue = new MessageSeQueue(this, "Queue");
	}

	@Override
	public Address getAddress() {
		return address;
	}

	public void run() {
		running = true;
		while (running) {
			pollOneMessage();
		}
	}

	@Override
	public synchronized void receive(Message msg) {
		if (msg!=null) {
			messageQueue.push(msg);
			notifyAll(); // wake up thread to compute message
		}
	}

	private synchronized void pollOneMessage() {
		Message msg = messageQueue.pop(); // get next Message from Queue
		if (msg == null) {
			// no message in queue -> wait till Thread is notified
			try {
				wait();
			}
			catch (InterruptedException e) {
			}
		}
		else {
			lastMessageTimestamp = System.currentTimeMillis();
			//System.out.printf("%i. msgServ %i addr.Thread \n",address.threadID,msg.getAddress().threadID);
			messageDispatcher.receive(msg);
		}

	}

	// protected methods for sole use by test cases
	protected MessageSeQueue getMessageQueue() {
		return messageQueue;
	}

	public synchronized MessageDispatcher getMessageDispatcher() {
		return messageDispatcher;
	}

	protected synchronized long getLastMessageTimestamp() {
		return lastMessageTimestamp;
	}

	@Override
	public String getInstancePath() {
		String path = PATH_DELIM + name;
		
		if (parent!=null)
			path = parent.getInstancePath()+path;
		
		return path;
	}

	@Override
	public String getInstancePathName() {
		String path = PATHNAME_DELIM + name;
		
		if (parent!=null)
			path = parent.getInstancePathName()+path;
		
		return path;
	}
	
	public synchronized void terminate() {
		if (running) {
			running = false;
			notifyAll();
		}
	}
}

Back to the top