Skip to main content
summaryrefslogtreecommitdiffstats
blob: 34e7ecaba43643b29c71e0f2fb6c5a43131c38b2 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
/*
 * Created on Jan 25, 2010
 *
 * PLACE_YOUR_DISTRIBUTION_STATEMENT_RIGHT_HERE
 */
package org.eclipse.osee.framework.messaging.services.internal;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import org.eclipse.osee.framework.core.exception.OseeCoreException;
import org.eclipse.osee.framework.jdk.core.type.CompositeKeyHashMap;
import org.eclipse.osee.framework.jdk.core.type.Pair;
import org.eclipse.osee.framework.logging.OseeLog;
import org.eclipse.osee.framework.messaging.ConnectionNode;
import org.eclipse.osee.framework.messaging.services.BaseMessages;
import org.eclipse.osee.framework.messaging.services.RemoteServiceLookup;
import org.eclipse.osee.framework.messaging.services.ServiceNotification;
import org.eclipse.osee.framework.messaging.services.messages.ServiceHealthRequest;

/**
 * @author Andrew M. Finkbeiner
 * 
 */
public class RemoteServiceLookupImpl implements RemoteServiceLookup {

	private ConnectionNode connectionNode;
	private CompositeKeyHashMap<String, String, Map<String, ServiceHealthPlusTimeout>> map;
	private CompositeKeyHashMap<String, String, List<ServiceNotification>> callbacks;
	private HealthServiceListener healthServiceListener;

   public RemoteServiceLookupImpl(ConnectionNode node, ScheduledExecutorService executor) {
		this.connectionNode = node;
		map = new CompositeKeyHashMap<String, String, Map<String, ServiceHealthPlusTimeout>>(25, true);
		callbacks = new CompositeKeyHashMap<String, String, List<ServiceNotification>>(
				25, true);
		healthServiceListener = new HealthServiceListener(map, callbacks);
		connectionNode.subscribeToReply(BaseMessages.ServiceHealthRequest, 
				healthServiceListener);
		executor.scheduleAtFixedRate(new MonitorTimedOutServices(map, callbacks), 30, 30, TimeUnit.SECONDS);
	}
	
	public void start(){
		connectionNode.subscribe(BaseMessages.ServiceHealth,
				healthServiceListener,
				new OseeMessagingStatusImpl("Failed to subscribe to " + BaseMessages.ServiceHealth.getName(), 
						RemoteServiceLookupImpl.class));
		
	}
	
	public void stop(){
		connectionNode.unsubscribe(BaseMessages.ServiceHealth,
				healthServiceListener,
				new OseeMessagingStatusImpl("Failed to subscribe to " + BaseMessages.ServiceHealth.getName(), 
						RemoteServiceLookupImpl.class));
	}
	

	@Override
	public void register(String serviceName, String serviceVersion,
			ServiceNotification notification) {
		addListener(serviceName, serviceVersion, notification);
		Map<String, ServiceHealthPlusTimeout> healthMap = map.get(serviceName, serviceVersion);
		if (healthMap != null) {
			for(ServiceHealthPlusTimeout serviceHealth:healthMap.values()){
				notification.onServiceUpdate(serviceHealth.getServiceHealth());
			}
		} else {
		   sendOutRequest(serviceName, serviceVersion);
		}
	}
	
	public void sendOutRequestsForServiceHealth(){
	   Set<Pair<String, String>> pairs = callbacks.keySet();
	   for(Pair<String, String> pair:pairs){
	      sendOutRequest(pair.getFirst(), pair.getSecond());
	   }
	}
	
	private void sendOutRequest(String serviceName, String serviceVersion){
	   ServiceHealthRequest request = new ServiceHealthRequest();
      request.setServiceName(serviceName);
      request.setServiceVersion(serviceVersion);
      try {
         connectionNode.send(BaseMessages.ServiceHealthRequest, request, new OseeMessagingStatusImpl(String.format("Failed to send Health Request for %s [%s]", serviceName, serviceVersion), RemoteServiceLookup.class));
      } catch (OseeCoreException ex) {
         OseeLog.log(RemoteServiceLookupImpl.class, Level.SEVERE, ex);
      }
	}

	private void addListener(String serviceName, String serviceVersion,
			ServiceNotification notification) {
		List<ServiceNotification> itemsToNotify = callbacks.get(serviceName,
				serviceVersion);
		if (itemsToNotify == null) {
			itemsToNotify = new CopyOnWriteArrayList<ServiceNotification>();
			callbacks.put(serviceName, serviceVersion, itemsToNotify);
		}
		itemsToNotify.add(notification);
	}

	@Override
	public boolean unregister(String serviceName,
			String serviceVersion, ServiceNotification notification) {
		return removeListener(serviceName, serviceVersion, notification);
	}

	private boolean removeListener(String serviceName,
			String serviceVersion, ServiceNotification notification) {
		List<ServiceNotification> itemsToNotify = callbacks.get(serviceName,
				serviceVersion);
		boolean removed = false;
		if (itemsToNotify != null) {
			removed = itemsToNotify.remove(notification);
		}
		return removed;
	}

}

Back to the top