/*******************************************************************************
* Copyright (c) 2011, 2013 Wind River Systems, Inc. and others. All rights reserved.
* This program and the accompanying materials are made available under the terms
* of the Eclipse Public License v1.0 which accompanies this distribution, and is
* available at http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Wind River Systems - initial API and implementation
*******************************************************************************/
package org.eclipse.tcf.te.tcf.locator;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.core.runtime.Assert;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status;
import org.eclipse.tcf.core.Command;
import org.eclipse.tcf.protocol.IChannel;
import org.eclipse.tcf.protocol.IPeer;
import org.eclipse.tcf.protocol.IToken;
import org.eclipse.tcf.protocol.Protocol;
import org.eclipse.tcf.services.ILocator;
import org.eclipse.tcf.te.runtime.callback.Callback;
import org.eclipse.tcf.te.runtime.interfaces.callback.ICallback;
import org.eclipse.tcf.te.runtime.utils.net.IPAddressUtil;
import org.eclipse.tcf.te.tcf.core.Tcf;
import org.eclipse.tcf.te.tcf.core.peers.Peer;
import org.eclipse.tcf.te.tcf.locator.activator.CoreBundleActivator;
import org.eclipse.tcf.te.tcf.locator.interfaces.IScanner;
import org.eclipse.tcf.te.tcf.locator.interfaces.ITracing;
import org.eclipse.tcf.te.tcf.locator.interfaces.nodes.ILocatorModel;
import org.eclipse.tcf.te.tcf.locator.interfaces.nodes.IPeerModel;
import org.eclipse.tcf.te.tcf.locator.interfaces.nodes.IPeerModelProperties;
import org.eclipse.tcf.te.tcf.locator.interfaces.services.ILocatorModelLookupService;
import org.eclipse.tcf.te.tcf.locator.interfaces.services.ILocatorModelUpdateService;
import org.eclipse.tcf.te.tcf.locator.nodes.PeerModel;
import org.eclipse.tcf.te.tcf.locator.nodes.PeerRedirector;
/**
* Scanner runnable to be executed for each peer to probe within the
* TCF event dispatch thread.
*/
public class ScannerRunnable implements Runnable, IChannel.IChannelListener {
// Reference to the parent model scanner
private final IScanner parentScanner;
// Reference to the peer model node to update
/* default */ final IPeerModel peerNode;
// Reference to the channel
/* default */ IChannel channel = null;
// Mark if the used channel is a shared channel instance
/* default */ boolean sharedChannel = false;
// Optional callback to invoke once the scan has been completed
private final ICallback callback;
/**
* Constructor.
*
* @param scanner The parent model scanner or null
if the runnable is constructed from outside a scanner.
* @param peerNode The peer model instance. Must not be null
.
*/
public ScannerRunnable(IScanner scanner, IPeerModel peerNode) {
this(scanner, peerNode, null);
}
/**
* Constructor.
*
* @param scanner The parent model scanner or null
if the runnable is constructed from outside a scanner.
* @param peerNode The peer model instance. Must not be null
.
* @param callback The callback to invoke once the scan has been completed or null
.
*/
public ScannerRunnable(IScanner scanner, IPeerModel peerNode, ICallback callback) {
super();
parentScanner = scanner;
Assert.isNotNull(peerNode);
this.peerNode = peerNode;
this.callback = callback;
}
/**
* Returns the parent scanner instance.
*
* @return The parent scanner instance or null
.
*/
protected final IScanner getParentScanner() {
return parentScanner;
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
Assert.isTrue(Protocol.isDispatchThread(), "Illegal Thread Access"); //$NON-NLS-1$
// If the parent scanner is terminated, don't do anything
IScanner scanner = getParentScanner();
if (scanner != null && scanner.isTerminated()) {
if (callback != null) callback.done(this, Status.OK_STATUS);
return;
}
// If a scanner runnable already active for this peer node, there
// is no need to run another scan.
if (peerNode.getProperty("scanner.transient") != null) { //$NON-NLS-1$
if (callback != null) callback.done(this, Status.OK_STATUS);
return;
}
peerNode.setProperty("scanner.transient", this); //$NON-NLS-1$
// Determine the peer
IPeer peer = peerNode.getPeer();
if (peer == null) {
if (callback != null) callback.done(this, Status.OK_STATUS);
return;
}
// Don't scan value-adds
String value = peer.getAttributes().get("ValueAdd"); //$NON-NLS-1$
boolean isValueAdd = value != null && ("1".equals(value.trim()) || Boolean.parseBoolean(value.trim())); //$NON-NLS-1$
if (isValueAdd) {
if (callback != null) callback.done(this, Status.OK_STATUS);
return;
}
// Don't scan command server peers
boolean isCommandServer = peer.getName() != null
&& peer.getName().endsWith("Command Server"); //$NON-NLS-1$
if (isCommandServer) {
if (callback != null) callback.done(this, Status.OK_STATUS);
return;
}
// Do not open a channel to incomplete peer nodes
if (peerNode.isComplete()) {
if (CoreBundleActivator.getTraceHandler().isSlotEnabled(ITracing.ID_TRACE_SCANNER)) {
CoreBundleActivator.getTraceHandler().trace("Scanner runnable invoked for peer '" + peerNode.getName() + "' (" + peerNode.getPeerId() + "). Attempting to open channel ...", //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
ITracing.ID_TRACE_SCANNER, ScannerRunnable.this);
}
// Check if there is a shared channel available which is still in open state
channel = Tcf.getChannelManager().getChannel(peer);
if (channel == null || channel.getState() != IChannel.STATE_OPEN) {
sharedChannel = false;
// Open the channel
channel = peer.openChannel();
// Add ourself as channel listener
channel.addChannelListener(this);
} else {
sharedChannel = true;
// Shared channel is in open state -> use it
onChannelOpened();
}
}
}
/* (non-Javadoc)
* @see org.eclipse.tcf.protocol.IChannel.IChannelListener#onChannelOpened()
*/
@Override
public void onChannelOpened() {
// Peer is reachable
if (channel != null && !sharedChannel) {
// Remove ourself as channel listener
channel.removeChannelListener(this);
}
if (CoreBundleActivator.getTraceHandler().isSlotEnabled(ITracing.ID_TRACE_SCANNER)) {
CoreBundleActivator.getTraceHandler().trace("Scanner runnable onChannelOpened invoked for peer '" + peerNode.getName() + "' (" + peerNode.getPeerId() + ").", //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
ITracing.ID_TRACE_SCANNER, ScannerRunnable.this);
}
// Turn off change notifications temporarily
final boolean changed = peerNode.setChangeEventsEnabled(false);
// Set the peer state property
int counter = peerNode.getIntProperty(IPeerModelProperties.PROP_CHANNEL_REF_COUNTER);
if (!peerNode.isProperty(IPeerModelProperties.PROP_STATE, IPeerModelProperties.STATE_WAITING_FOR_READY)) {
peerNode.setProperty(IPeerModelProperties.PROP_STATE, counter > 0 ? IPeerModelProperties.STATE_CONNECTED : IPeerModelProperties.STATE_REACHABLE);
peerNode.setProperty(IPeerModelProperties.PROP_LAST_SCANNER_ERROR, null);
}
// Get the parent model from the model mode
final ILocatorModel model = (ILocatorModel)peerNode.getAdapter(ILocatorModel.class);
if (channel != null && channel.getState() == IChannel.STATE_OPEN) {
// Update the services lists
ILocatorModelUpdateService updateService = model != null ? model.getService(ILocatorModelUpdateService.class) : null;
if (updateService != null) {
Collection localServices = channel.getLocalServices();
Collection remoteServices = channel.getRemoteServices();
updateService.updatePeerServices(peerNode, localServices, remoteServices);
if (CoreBundleActivator.getTraceHandler().isSlotEnabled(ITracing.ID_TRACE_SCANNER)) {
CoreBundleActivator.getTraceHandler().trace("Services: local = " + localServices + ", remote = " + remoteServices, //$NON-NLS-1$ //$NON-NLS-2$
ITracing.ID_TRACE_SCANNER, ScannerRunnable.this);
}
}
// If we don't queried the DNS name of the peer, or the peer IP changed,
// trigger a query (can run in any thread, outside TCF dispatch and UI
// thread). This make sense only if there is an IP address to query at all.
final String ip = channel.getRemotePeer().getAttributes().get(IPeer.ATTR_IP_HOST);
if (ip != null && !"".equals(ip)) { //$NON-NLS-1$
if (peerNode.getStringProperty("dns.name.transient") == null || !ip.equals(peerNode.getStringProperty("dns.lastIP.transient"))) { //$NON-NLS-1$ //$NON-NLS-2$
// If the IP address changed, reset the "do not query again" marker
if (!ip.equals(peerNode.getStringProperty("dns.lastIP.transient"))) { //$NON-NLS-1$
peerNode.setProperty("dns.lastIP.transient", ip); //$NON-NLS-1$
peerNode.setProperty("dns.skip.transient", false); //$NON-NLS-1$
}
if (!peerNode.getBooleanProperty("dns.skip.transient")) { //$NON-NLS-1$
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
InetAddress address = InetAddress.getByName(ip);
final AtomicReference nameRef = new AtomicReference();
nameRef.set(address.getCanonicalHostName());
if (ip.equals(nameRef.get()) && IPAddressUtil.getInstance().isLocalHost(ip)) {
String[] candidates = IPAddressUtil.getInstance().getCanonicalHostNames();
for (String candidate : candidates) {
if (!ip.equals(candidate)) {
nameRef.set(candidate);
break;
}
}
}
Protocol.invokeLater(new Runnable() {
@Override
public void run() {
String name = nameRef.get();
if (name != null && !"".equals(name) && !ip.equals(name)) { //$NON-NLS-1$
String dnsName = name.indexOf('.') != -1 ? name.substring(0, name.indexOf('.')) : name;
if (!ip.equalsIgnoreCase(dnsName)) {
peerNode.setProperty("dns.name.transient", dnsName.toLowerCase()); //$NON-NLS-1$
}
}
}
});
}
catch (UnknownHostException e) {
Protocol.invokeLater(new Runnable() {
@Override
public void run() {
peerNode.setProperty("dns.skip.transient", true); //$NON-NLS-1$
}
});
}
}
};
Thread thread = new Thread(runnable, "DNS Query Thread for " + ip); //$NON-NLS-1$
thread.start();
}
}
}
// Check if the agent ID is already set
String agentID = channel.getRemotePeer().getAgentID();
if (agentID == null && channel.getRemotePeer() instanceof Peer) {
// Determine the agent ID of the remote agent
ILocator locator = channel.getRemoteService(ILocator.class);
if (locator != null) {
locator.getAgentID(new ILocator.DoneGetAgentID() {
@Override
public void doneGetAgentID(IToken token, Exception error, String agentID) {
// Ignore errors. If the agent does not implement this command, we
// do not fail.
if (agentID != null) {
// Update the peer attributes
Map attrs = new HashMap(channel.getRemotePeer().getAttributes());
attrs.put(IPeer.ATTR_AGENT_ID, agentID);
peerNode.setProperty(IPeerModelProperties.PROP_INSTANCE, new Peer(attrs));
}
if (isGetPeersAllowed(channel)) {
// Get the peers from the remote locator
getPeers(channel, model, ip, new Callback() {
@Override
protected void internalDone(Object caller, IStatus status) {
// Complete
onDone(peerNode, changed);
}
});
} else {
onDone(peerNode, changed);
}
}
});
} else {
// Complete
onDone(peerNode, changed);
}
} else {
if (isGetPeersAllowed(channel)) {
// Get the peers from the remote locator
getPeers(channel, model, ip, new Callback() {
@Override
protected void internalDone(Object caller, IStatus status) {
// Complete
onDone(peerNode, changed);
}
});
} else {
onDone(peerNode, changed);
}
}
} else {
// Complete
onDone(peerNode, changed);
}
}
/**
* Returns if or if not "getPeers" is allowed for the given channel.
*
* @param channel The channel. Must not be null
.
* @return True
if "getPeers" is allowed, false
otherwise.
*/
/* default */ boolean isGetPeersAllowed(IChannel channel) {
String remoteIP = channel.getRemotePeer().getAttributes().get(IPeer.ATTR_IP_HOST);
boolean isLocal = remoteIP != null && IPAddressUtil.getInstance().isLocalHost(remoteIP);
boolean isCommandServer = channel.getRemotePeer().getName() != null
&& channel.getRemotePeer().getName().endsWith("Command Server"); //$NON-NLS-1$
isCommandServer |= channel.getLocalPeer().getName() != null
&& channel.getLocalPeer().getName().endsWith("Command Server"); //$NON-NLS-1$
return !isLocal && !isCommandServer;
}
/**
* Query the peers from the remote locator.
*
* @param channel The channel. Must not be null
.
* @param model The locator model. Must not be null
.
* @param ip The IP address or null
.
* @param callback The callback. Must not be null
.
*/
@SuppressWarnings("unused")
protected void getPeers(final IChannel channel, final ILocatorModel model, final String ip, final ICallback callback) {
Assert.isNotNull(channel);
Assert.isNotNull(model);
Assert.isNotNull(callback);
// Keep the channel open as long as the query for the remote peers is running.
boolean keepOpen = false;
// Get the agent ID of the remote agent we are connected too.
// Have to use the peer model node here.
final String agentID = peerNode.getPeer().getAgentID();
// Ask for discovered peers from the remote agents POV.
//
// Note: For simulators connected via NAT, we have to do this for localhost address
// as well. Otherwise we miss the discoverable agents only known to the simulator.
// The same applies to agent being discovered. If you don't ask for discovered peers
// here too, we may miss some routes.
if (ip != null && !"".equals(ip)) { //$NON-NLS-1$
// Use the open channel to ask the remote peer what other peers it knows
ILocator locator = channel.getRemoteService(ILocator.class);
if (locator != null) {
// Channel must be kept open as long as the command runs
keepOpen = true;
// Issue the command
new Command(channel, locator, "getPeers", null) { //$NON-NLS-1$
@Override
public void done(Exception error, Object[] args) {
if (error == null) {
Assert.isTrue(args.length == 2);
error = toError(args[0]);
}
// If the error is still null here, process the returned peers
if (error == null && args[1] != null) {
// Get the parent peer
IPeer parentPeer = channel.getRemotePeer();
// Get the old child list
List oldChildren = new ArrayList(model.getChildren(parentPeer.getID()));
// "getPeers" returns a collection of peer attribute maps
@SuppressWarnings("unchecked")
Collection