diff options
Diffstat (limited to 'plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/internal/services/local/LocatorService.java')
-rw-r--r-- | plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/internal/services/local/LocatorService.java | 1248 |
1 files changed, 1248 insertions, 0 deletions
diff --git a/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/internal/services/local/LocatorService.java b/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/internal/services/local/LocatorService.java new file mode 100644 index 000000000..9ec7b7167 --- /dev/null +++ b/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/internal/services/local/LocatorService.java @@ -0,0 +1,1248 @@ +/******************************************************************************* + * Copyright (c) 2007, 2011 Wind River Systems, Inc. and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Wind River Systems - initial API and implementation + *******************************************************************************/ +package org.eclipse.tm.internal.tcf.services.local; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Method; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.eclipse.tm.internal.tcf.core.LocalPeer; +import org.eclipse.tm.internal.tcf.core.LoggingUtil; +import org.eclipse.tm.internal.tcf.core.RemotePeer; +import org.eclipse.tm.internal.tcf.core.ServiceManager; +import org.eclipse.tm.tcf.core.AbstractChannel; +import org.eclipse.tm.tcf.protocol.IChannel; +import org.eclipse.tm.tcf.protocol.IErrorReport; +import org.eclipse.tm.tcf.protocol.IPeer; +import org.eclipse.tm.tcf.protocol.IService; +import org.eclipse.tm.tcf.protocol.IServiceProvider; +import org.eclipse.tm.tcf.protocol.IToken; +import org.eclipse.tm.tcf.protocol.JSON; +import org.eclipse.tm.tcf.protocol.Protocol; +import org.eclipse.tm.tcf.services.ILocator; + + +/** + * Locator service uses transport layer to search + * for peers and to collect and maintain up-to-date + * data about peer’s attributes. + */ +// TODO: research usage of DNS-SD (DNS Service Discovery) to discover TCF peers +public class LocatorService implements ILocator { + + private static final int DISCOVERY_PORT = 1534; + private static final int MAX_PACKET_SIZE = 9000 - 40 - 8; + private static final int PREF_PACKET_SIZE = 1500 - 40 - 8; + + private static LocatorService locator; + private static final Map<String,IPeer> peers = new HashMap<String,IPeer>(); + private static final ArrayList<LocatorListener> listeners = new ArrayList<LocatorListener>(); + private static final HashSet<String> error_log = new HashSet<String>(); + + private final HashSet<SubNet> subnets = new HashSet<SubNet>(); + private final ArrayList<Slave> slaves = new ArrayList<Slave>(); + private final byte[] inp_buf = new byte[MAX_PACKET_SIZE]; + private final byte[] out_buf = new byte[MAX_PACKET_SIZE]; + + private InetAddress loopback_addr; + + /** + * Flag indicating whether tracing of the discovery activity is enabled. + */ + private static boolean TRACE_DISCOVERY = System.getProperty("org.eclipse.tm.tcf.core.tracing.discovery") != null; + + private static class SubNet { + final int prefix_length; + final InetAddress address; + final InetAddress broadcast; + + long last_slaves_req_time; + boolean send_all_ok; + + SubNet(int prefix_length, InetAddress address, InetAddress broadcast) { + this.prefix_length = prefix_length; + this.address = address; + this.broadcast = broadcast; + } + + boolean contains(InetAddress addr) { + if (addr == null || address == null) return false; + byte[] a1 = addr.getAddress(); + byte[] a2 = address.getAddress(); + if (a1.length != a2.length) return false; + int i = 0; + int l = prefix_length <= a1.length * 8 ? prefix_length : a1.length * 8; + while (i + 8 <= l) { + int n = i / 8; + if (a1[n] != a2[n]) return false; + i += 8; + } + while (i < l) { + int n = i / 8; + int m = 1 << (7 - i % 8); + if ((a1[n] & m) != (a2[n] & m)) return false; + i++; + } + return true; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof SubNet)) return false; + SubNet x = (SubNet)o; + return + prefix_length == x.prefix_length && + broadcast.equals(x.broadcast) && + address.equals(x.address); + } + + @Override + public int hashCode() { + return address.hashCode(); + } + + @Override + public String toString() { + return address.getHostAddress() + "/" + prefix_length; + } + } + + private static class Slave { + final InetAddress address; + final int port; + + /* Time of last packet receiver from this slave */ + long last_packet_time; + + /* Time of last REQ_SLAVES packet received from this slave */ + long last_req_slaves_time; + + Slave(InetAddress address, int port) { + this.address = address; + this.port = port; + } + + @Override + public String toString() { + return address.getHostAddress() + "/" + port; + } + } + + private static class AddressCacheItem { + final String host; + InetAddress address; + long time_stamp; + boolean used; + + AddressCacheItem(String host) { + this.host = host; + } + } + + private static final HashMap<String,AddressCacheItem> addr_cache = new HashMap<String,AddressCacheItem>(); + private static boolean addr_request; + + private static LocalPeer local_peer; + + private DatagramSocket socket; + private long last_master_packet_time; + + private final Thread timer_thread = new Thread() { + public void run() { + while (true) { + try { + sleep(DATA_RETENTION_PERIOD / 4); + final HashSet<SubNet> set = getSubNetList(); + Protocol.invokeAndWait(new Runnable() { + public void run() { + refresh_timer(set); + } + }); + } + catch (IllegalStateException x) { + // TCF event dispatch is shut down + return; + } + catch (Throwable x) { + log("Unhandled exception in TCF discovery timer thread", x); + } + } + } + }; + + private Thread dns_lookup_thread = new Thread() { + public void run() { + while (true) { + try { + long time; + HashSet<AddressCacheItem> set = null; + synchronized (addr_cache) { + if (!addr_request) addr_cache.wait(DATA_RETENTION_PERIOD); + time = System.currentTimeMillis(); + for (Iterator<AddressCacheItem> i = addr_cache.values().iterator(); i.hasNext();) { + AddressCacheItem a = i.next(); + if (a.time_stamp + DATA_RETENTION_PERIOD * 10 < time) { + if (a.used) { + if (set == null) set = new HashSet<AddressCacheItem>(); + set.add(a); + } + else { + i.remove(); + } + } + } + addr_request = false; + } + if (set != null) { + for (AddressCacheItem a : set) { + InetAddress addr = null; + try { + addr = InetAddress.getByName(a.host); + } + catch (UnknownHostException x) { + } + synchronized (addr_cache) { + a.address = addr; + a.time_stamp = time; + a.used = false; + } + } + } + } + catch (Throwable x) { + log("Unhandled exception in TCF discovery DNS lookup thread", x); + } + } + } + }; + + /** + * Wrapper for final class DatagramPacket so its toString() can present + * the value in the debugger in a readable fashion. + */ + private class InputPacket { + private DatagramPacket p; + protected InputPacket(DatagramPacket dgPacket) { + p = dgPacket; + } + protected DatagramPacket getPacket() { + return p; + } + protected int getLength() { + return p.getLength(); + } + protected byte[] getData() { + return p.getData(); + } + public int getPort() { + return p.getPort(); + } + public InetAddress getAddress() { + return p.getAddress(); + } + public String toString() { + return "[address=" + p.getAddress().toString() + + ",port=" + p.getPort() + + ",data=\"" + new String(p.getData(), 0, p.getLength()) + "\"]"; + } + } + + private final Thread input_thread = new Thread() { + public void run() { + try { + for (;;) { + DatagramSocket socket = LocatorService.this.socket; + try { + final InputPacket p + = new InputPacket(new DatagramPacket(inp_buf, inp_buf.length)); + socket.receive(p.getPacket()); + Protocol.invokeAndWait(new Runnable() { + public void run() { + handleDatagramPacket(p); + } + }); + } + catch (IllegalStateException x) { + // TCF event dispatch is shut down + return; + } + catch (Exception x) { + if (socket != LocatorService.this.socket) continue; + log("Cannot read from datagram socket at port " + socket.getLocalPort(), x); + sleep(2000); + } + } + } + catch (Throwable x) { + log("Unhandled exception in socket reading thread", x); + } + } + }; + + static { + ServiceManager.addServiceProvider(new IServiceProvider() { + + public IService[] getLocalService(final IChannel channel) { + channel.addCommandServer(locator, new IChannel.ICommandServer() { + public void command(IToken token, String name, byte[] data) { + locator.command((AbstractChannel)channel, token, name, data); + } + }); + return new IService[]{ locator }; + } + + public IService getServiceProxy(IChannel channel, String service_name) { + return null; + } + }); + } + + private static DatagramSocket createSocket(boolean slave) throws SocketException { + DatagramSocket socket = null; + if (slave) { + socket = new DatagramSocket(); + } + else { + socket = new DatagramSocket(null); + socket.setReuseAddress(false); + socket.bind(new InetSocketAddress(DISCOVERY_PORT)); + } + socket.setBroadcast(true); + return socket; + } + + public LocatorService() { + locator = this; + local_peer = new LocalPeer(); + try { + loopback_addr = InetAddress.getByName(null); + out_buf[0] = 'T'; + out_buf[1] = 'C'; + out_buf[2] = 'F'; + out_buf[3] = CONF_VERSION; + out_buf[4] = 0; + out_buf[5] = 0; + out_buf[6] = 0; + out_buf[7] = 0; + try { + socket = createSocket(false); + if (TRACE_DISCOVERY) { + LoggingUtil.trace("Became the master agent (bound to port " + socket.getLocalPort() + ")"); + } + } + catch (SocketException x) { + socket = createSocket(true); + if (TRACE_DISCOVERY) { + LoggingUtil.trace("Became a slave agent (bound to port " + socket.getLocalPort() + ")"); + } + } + refreshSubNetList(getSubNetList()); + input_thread.setName("TCF Locator Receiver"); + timer_thread.setName("TCF Locator Timer"); + dns_lookup_thread.setName("TCF Locator DNS Lookup"); + input_thread.setDaemon(true); + timer_thread.setDaemon(true); + dns_lookup_thread.setDaemon(true); + input_thread.start(); + timer_thread.start(); + dns_lookup_thread.start(); + listeners.add(new LocatorListener() { + + public void peerAdded(IPeer peer) { + sendPeerInfo(peer, null, 0); + } + + public void peerChanged(IPeer peer) { + sendPeerInfo(peer, null, 0); + } + + public void peerHeartBeat(String id) { + } + + public void peerRemoved(String id) { + } + }); + sendPeersRequest(null, 0); + sendAll(null, 0, null, System.currentTimeMillis()); + } + catch (Exception x) { + log("Cannot open UDP socket for TCF discovery protocol", x); + } + } + + public static LocalPeer getLocalPeer() { + return local_peer; + } + + public static LocatorListener[] getListeners() { + return listeners.toArray(new LocatorListener[listeners.size()]); + } + + private Map<String,Object> makeErrorReport(int code, String msg) { + Map<String,Object> err = new HashMap<String,Object>(); + err.put(IErrorReport.ERROR_TIME, new Long(System.currentTimeMillis())); + err.put(IErrorReport.ERROR_CODE, new Integer(code)); + err.put(IErrorReport.ERROR_FORMAT, msg); + return err; + } + + private void command(final AbstractChannel channel, final IToken token, String name, byte[] data) { + try { + if (name.equals("redirect")) { + String peer_id = (String)JSON.parseSequence(data)[0]; + IPeer peer = peers.get(peer_id); + if (peer == null) { + channel.sendResult(token, JSON.toJSONSequence(new Object[]{ + makeErrorReport(IErrorReport.TCF_ERROR_UNKNOWN_PEER, "Unknown peer ID") })); + return; + } + channel.sendResult(token, JSON.toJSONSequence(new Object[]{ null })); + if (peer instanceof LocalPeer) { + channel.sendEvent(Protocol.getLocator(), "Hello", JSON.toJSONSequence( + new Object[]{ channel.getLocalServices() })); + return; + } + new ChannelProxy(channel, peer.openChannel()); + } + else if (name.equals("sync")) { + channel.sendResult(token, null); + } + else if (name.equals("getPeers")) { + int i = 0; + Object[] arr = new Object[peers.size()]; + for (IPeer p : peers.values()) arr[i++] = p.getAttributes(); + channel.sendResult(token, JSON.toJSONSequence(new Object[]{ null, arr })); + } + else { + channel.rejectCommand(token); + } + } + catch (Throwable x) { + channel.terminate(x); + } + } + + private void log(String msg, Throwable x) { + // Don't report same error multiple times to avoid filling up the log file. + synchronized (error_log) { + if (error_log.contains(msg)) return; + error_log.add(msg); + } + Protocol.log(msg, x); + } + + private InetAddress getInetAddress(String host) { + if (host == null || host.length() == 0) return null; + synchronized (addr_cache) { + AddressCacheItem i = addr_cache.get(host); + if (i == null) { + i = new AddressCacheItem(host); + char ch = host.charAt(0); + if (ch == '[' || ch == ':' || ch >= '0' && ch <= '9') { + try { + i.address = InetAddress.getByName(host); + } + catch (UnknownHostException e) { + } + i.time_stamp = System.currentTimeMillis(); + } + else { + /* InetAddress.getByName() can cause long delay - delegate to background thread */ + addr_request = true; + addr_cache.notify(); + } + addr_cache.put(host, i); + } + i.used = true; + return i.address; + } + } + + private void refresh_timer(HashSet<SubNet> nets) { + long time = System.currentTimeMillis(); + /* Cleanup slave table */ + if (slaves.size() > 0) { + int i = 0; + while (i < slaves.size()) { + Slave s = slaves.get(i); + if (s.last_packet_time + DATA_RETENTION_PERIOD < time) { + slaves.remove(i); + } + else { + i++; + } + } + } + /* Cleanup peers table */ + ArrayList<RemotePeer> stale_peers = null; + for (IPeer p : peers.values()) { + if (p instanceof RemotePeer) { + RemotePeer r = (RemotePeer)p; + if (r.getLastUpdateTime() + DATA_RETENTION_PERIOD < time) { + if (stale_peers == null) stale_peers = new ArrayList<RemotePeer>(); + stale_peers.add(r); + } + } + } + if (stale_peers != null) { + for (RemotePeer p : stale_peers) p.dispose(); + } + /* Try to become a master */ + if (socket.getLocalPort() != DISCOVERY_PORT && last_master_packet_time + DATA_RETENTION_PERIOD / 2 <= time) { + try { + DatagramSocket s0 = socket; + socket = createSocket(false); + if (TRACE_DISCOVERY) { + LoggingUtil.trace("Became the master agent (bound to port " + socket.getLocalPort() + ")"); + } + s0.close(); + } + catch (Throwable x) { + } + } + refreshSubNetList(nets); + if (socket.getLocalPort() != DISCOVERY_PORT) { + for (SubNet subnet : subnets) { + addSlave(subnet.address, socket.getLocalPort(), time); + } + } + sendAll(null, 0, null, time); + } + + private Slave addSlave(InetAddress addr, int port, long timestamp) { + for (Slave s : slaves) { + if (s.port == port && s.address.equals(addr)) { + if (s.last_packet_time < timestamp) s.last_packet_time = timestamp; + return s; + } + } + final Slave s = new Slave(addr, port); + s.last_packet_time = timestamp; + slaves.add(s); + Protocol.invokeLater(new Runnable() { + public void run() { + long time_now = System.currentTimeMillis(); + sendPeersRequest(s.address, s.port); + sendAll(s.address, s.port, s, time_now); + sendSlaveInfo(s, time_now); + } + }); + return s; + } + + private void refreshSubNetList(HashSet<SubNet> set) { + if (set == null) return; + for (Iterator<SubNet> i = subnets.iterator(); i.hasNext();) { + SubNet s = i.next(); + if (set.contains(s)) continue; + i.remove(); + } + for (Iterator<SubNet> i = set.iterator(); i.hasNext();) { + SubNet s = i.next(); + if (subnets.contains(s)) continue; + subnets.add(s); + } + if (TRACE_DISCOVERY) { + StringBuilder str = new StringBuilder("Refreshed subnet list:"); + for (SubNet subnet : subnets) { + str.append("\n\t* address=" + subnet.address + ", broadcast=" + subnet.broadcast); + } + LoggingUtil.trace(str.toString()); + } + } + + private HashSet<SubNet> getSubNetList() { + HashSet<SubNet> set = new HashSet<SubNet>(); + try { + String osname = System.getProperty("os.name", ""); + if (osname.startsWith("Windows")) { + /* + * Workaround for JVM bug: + * InterfaceAddress.getNetworkPrefixLength() does not conform to Javadoc + * http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6707289 + * + * The bug shows up only on Windows when IPv6 is enabled. + * The bug is supposed to be fixed in Java 1.7. + */ + getWindowsSubNetList(set); + } + else { + getSubNetList(set); + } + } + catch (Exception x) { + log("Cannot get list of network interfaces", x); + return null; + } + return set; + } + + private void getSubNetList(HashSet<SubNet> set) throws SocketException { + for (Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces(); e.hasMoreElements();) { + NetworkInterface f = e.nextElement(); + /* TODO: Class InterfaceAddress does not exists in Java versions before 1.6. + * Fix the code below when support for old Java versions is not needed any more. + */ + try { + Method m0 = f.getClass().getMethod("getInterfaceAddresses"); + for (Object ia : (List<?>)m0.invoke(f)) { + Method m1 = ia.getClass().getMethod("getNetworkPrefixLength"); + Method m2 = ia.getClass().getMethod("getAddress"); + Method m3 = ia.getClass().getMethod("getBroadcast"); + int network_prefix_len = (Short)m1.invoke(ia); + InetAddress address = (InetAddress)m2.invoke(ia); + InetAddress broadcast = (InetAddress)m3.invoke(ia); + + // TODO: discovery over IPv6 + /* Create IPv6 broadcast address. + * The code does not work - commented out until fixed. + if (broadcast == null && + address instanceof Inet6Address && + !address.isAnyLocalAddress() && + !address.isLinkLocalAddress() && + !address.isMulticastAddress() && + !address.isLoopbackAddress()) { + byte[] net = address.getAddress(); + byte[] buf = new byte[16]; + buf[0] = (byte)0xff; // multicast + buf[1] = (byte)0x32; // flags + scope + buf[2] = (byte)0x00; // reserved + buf[3] = (byte)network_prefix_len; + int n = (network_prefix_len + 7) / 8; + for (int i = 0; i < n; i++) buf[i + 4] = net[i]; + broadcast = Inet6Address.getByAddress(null, buf); + } + */ + + if (network_prefix_len == 0 && address instanceof Inet4Address) { + // Java 1.6.0 on Linux returns network prefix == 0 for loop-back interface + byte[] buf = address.getAddress(); + if (buf[0] == 127) { + network_prefix_len = 8; + if (broadcast == null) broadcast = address; + } + } + + if (network_prefix_len > 0 && address != null && broadcast != null) { + set.add(new SubNet(network_prefix_len, address, broadcast)); + } + } + } + catch (Exception x) { + // Java 1.5 or older + // TODO: need a better way to get broadcast addresses on Java 1.5 VM + Enumeration<InetAddress> n = f.getInetAddresses(); + while (n.hasMoreElements()) { + InetAddress addr = n.nextElement(); + byte[] buf = addr.getAddress(); + if (buf.length != 4) continue; + buf[3] = (byte)255; + try { + set.add(new SubNet(24, addr, InetAddress.getByAddress(buf))); + } + catch (UnknownHostException y) { + } + } + } + } + } + + private void getWindowsSubNetList(HashSet<SubNet> set) throws Exception { + HashMap<String,InetAddress> map = new HashMap<String,InetAddress>(); + for (Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces(); e.hasMoreElements();) { + NetworkInterface f = e.nextElement(); + Enumeration<InetAddress> n = f.getInetAddresses(); + while (n.hasMoreElements()) { + InetAddress addr = n.nextElement(); + if (addr instanceof Inet4Address) { + String s = addr.getHostAddress(); + if (s.startsWith("127.")) { + byte[] buf = addr.getAddress(); + buf[1] = buf[2] = buf[3] = (byte)255; + set.add(new SubNet(8, addr, InetAddress.getByAddress(buf))); + } + else { + map.put(s, addr); + } + } + } + } + Process prs = Runtime.getRuntime().exec(new String[]{ "ipconfig", "/all" }, null); + BufferedReader inp = new BufferedReader(new InputStreamReader(prs.getInputStream())); + for (;;) { + String s = inp.readLine(); + if (s == null) break; + int n = s.indexOf(" : "); + if (n < 0) continue; + n += 3; + int m = n; + while (m < s.length()) { + char ch = s.charAt(m); + if ((ch < '0' || ch > '9') && ch != '.') break; + m++; + } + if (m == n) continue; + InetAddress addr = map.get(s.substring(n, m)); + if (addr == null) continue; + do s = inp.readLine(); + while (s != null && s.length() == 0); + if (s == null) break; + n = s.indexOf(" : "); + if (n < 0) continue; + s = s.substring(n + 3); + int l = s.length(); + int i_cnt = 0; + int d_cnt = 0; + for (int i = 0; i < l; i++) { + char ch = s.charAt(i); + if (ch == '.') d_cnt++; + else if (ch < '0' || ch > '9') i_cnt++; + } + if (d_cnt != 3 || i_cnt != 0) continue; + try { + byte[] buf = InetAddress.getByName(s).getAddress(); + int prefix_length = 0; + for (int i = 0; i < 32; i++) { + if ((buf[i / 8] & (1 << (7 - i % 8))) == 0) { + prefix_length = i; + break; + } + } + if (prefix_length > 0) { + buf = addr.getAddress(); + for (int i = prefix_length; i < 32; i++) { + buf[i / 8] |= 1 << (7 - i % 8); + } + set.add(new SubNet(prefix_length, addr, InetAddress.getByAddress(buf))); + } + } + catch (Exception x) { + } + } + try { + prs.getErrorStream().close(); + prs.getOutputStream().close(); + inp.close(); + } + catch (IOException x) { + } + prs.waitFor(); + } + + private byte[] getUTF8Bytes(String s) { + try { + return s.getBytes("UTF-8"); + } + catch (Exception x) { + log("UTF-8 character encoder is not available", x); + return s.getBytes(); + } + } + + /** Used for tracing */ + private static String packetTypes[] = new String[] { + null, + "CONF_REQ_INFO", + "CONF_PEER_INFO", + "CONF_REQ_SLAVES", + "CONF_SLAVES_INFO", + "CONF_PEER_REMOVE" + }; + + private boolean sendDatagramPacket(SubNet subnet, int size, InetAddress addr, int port) { + try { + if (addr == null) { + addr = subnet.broadcast; + port = DISCOVERY_PORT; + for (Slave slave : slaves) { + sendDatagramPacket(subnet, size, slave.address, slave.port); + } + } + if (!subnet.contains(addr)) return false; + if (port == socket.getLocalPort() && addr.equals(subnet.address)) return false; + socket.send(new DatagramPacket(out_buf, size, addr, port)); + + if (TRACE_DISCOVERY) { + Map<String,String> map = null; + switch (out_buf[4]) { + case CONF_PEER_INFO: map = parsePeerAtrributes(out_buf, size); break; + case CONF_SLAVES_INFO: map = parseIDs(out_buf, size); break; + case CONF_PEERS_REMOVED: map = parseIDs(out_buf, size); break; + } + traceDiscoveryPacket(false, packetTypes[out_buf[4]], map, addr, port); + } + } + catch (Exception x) { + log("Cannot send datagram packet to " + addr, x); + return false; + } + return true; + } + + /** + * Parse peer attributes in CONF_PEER_INFO packet data. + * + * @param data - the packet data + * @param size - the packet size + * @return a map containing the attributes + * @throws UnsupportedEncodingException + */ + private static Map<String,String> parsePeerAtrributes(byte[] data, int size) throws UnsupportedEncodingException { + Map<String,String> map = new HashMap<String,String>(); + String s = new String(data, 8, size - 8, "UTF-8"); + int l = s.length(); + int i = 0; + while (i < l) { + int i0 = i; + while (i < l && s.charAt(i) != '=' && s.charAt(i) != 0) i++; + int i1 = i; + if (i < l && s.charAt(i) == '=') i++; + int i2 = i; + while (i < l && s.charAt(i) != 0) i++; + int i3 = i; + if (i < l && s.charAt(i) == 0) i++; + String key = s.substring(i0, i1); + String val = s.substring(i2, i3); + map.put(key, val); + } + return map; + } + + /** + * Parse list of IDs in CONF_SLAVES_INFO and CONF_PEERS_REMOVED packet data. + * + * @param data - the packet data + * @param size - the packet size + * @return a map containing the IDs + * @throws UnsupportedEncodingException + */ + private static Map<String,String> parseIDs(byte[] data, int size) throws UnsupportedEncodingException { + int cnt = 0; + Map<String,String> map = new HashMap<String,String>(); + String s = new String(data, 8, size - 8, "UTF-8"); + int l = s.length(); + int i = 0; + while (i < l) { + int i0 = i; + while (i < l && s.charAt(i) != 0) i++; + if (i > i0) { + String id = s.substring(i0, i); + map.put(Integer.toString(cnt++), id); + } + while (i < l && s.charAt(i) == 0) i++; + } + return map; + } + + private void sendPeersRequest(InetAddress addr, int port) { + out_buf[4] = CONF_REQ_INFO; + for (SubNet subnet : subnets) { + sendDatagramPacket(subnet, 8, addr, port); + } + } + + private void sendPeerInfo(IPeer peer, InetAddress addr, int port) { + Map<String,String> attrs = peer.getAttributes(); + InetAddress peer_addr = getInetAddress(attrs.get(IPeer.ATTR_IP_HOST)); + if (peer_addr == null) return; + if (attrs.get(IPeer.ATTR_IP_PORT) == null) return; + out_buf[4] = CONF_PEER_INFO; + int i = 8; + + for (SubNet subnet : subnets) { + if (peer instanceof RemotePeer) { + if (socket.getLocalPort() != DISCOVERY_PORT) return; + if (!subnet.address.equals(loopback_addr) && !subnet.address.equals(peer_addr)) continue; + } + if (!subnet.address.equals(loopback_addr)) { + if (!subnet.contains(peer_addr)) continue; + } + if (i == 8) { + StringBuffer sb = new StringBuffer(out_buf.length); + for (String key : attrs.keySet()) { + sb.append(key); + sb.append('='); + sb.append(attrs.get(key)); + sb.append((char)0); + } + byte[] bt = getUTF8Bytes(sb.toString()); + if (i + bt.length > out_buf.length) return; + System.arraycopy(bt, 0, out_buf, i, bt.length); + i += bt.length; + } + if (sendDatagramPacket(subnet, i, addr, port)) subnet.send_all_ok = true; + } + } + + private void sendEmptyPacket(InetAddress addr, int port) { + out_buf[4] = CONF_SLAVES_INFO; + for (SubNet subnet : subnets) { + if (subnet.send_all_ok) continue; + sendDatagramPacket(subnet, 8, addr, port); + } + } + + private void sendAll(InetAddress addr, int port, Slave sl, long time) { + for (SubNet subnet : subnets) subnet.send_all_ok = false; + for (IPeer peer : peers.values()) sendPeerInfo(peer, addr, port); + if (addr != null && sl != null && sl.last_req_slaves_time + DATA_RETENTION_PERIOD >= time) { + sendSlavesInfo(addr, port, time); + } + sendEmptyPacket(addr, port); + } + + private void sendSlavesRequest(SubNet subnet, InetAddress addr, int port) { + out_buf[4] = CONF_REQ_SLAVES; + sendDatagramPacket(subnet, 8, addr, port); + } + + private void sendSlaveInfo(Slave x, long time) { + int ttl = (int)(x.last_packet_time + DATA_RETENTION_PERIOD - time); + if (ttl <= 0) return; + out_buf[4] = CONF_SLAVES_INFO; + for (SubNet subnet : subnets) { + if (!subnet.contains(x.address)) continue; + int i = 8; + String s = ttl + ":" + x.port + ":" + x.address.getHostAddress(); + byte[] bt = getUTF8Bytes(s); + System.arraycopy(bt, 0, out_buf, i, bt.length); + i += bt.length; + out_buf[i++] = 0; + for (Slave y : slaves) { + if (!subnet.contains(y.address)) continue; + if (y.last_req_slaves_time + DATA_RETENTION_PERIOD < time) continue; + sendDatagramPacket(subnet, i, y.address, y.port); + } + } + } + + private void sendSlavesInfo(InetAddress addr, int port, long time) { + out_buf[4] = CONF_SLAVES_INFO; + for (SubNet subnet : subnets) { + if (!subnet.contains(addr)) continue; + int i = 8; + for (Slave x : slaves) { + int ttl = (int)(x.last_packet_time + DATA_RETENTION_PERIOD - time); + if (ttl <= 0) continue; + if (x.port == port && x.address.equals(addr)) continue; + if (!subnet.address.equals(loopback_addr)) { + if (!subnet.contains(x.address)) continue; + } + subnet.send_all_ok = true; + String s = ttl + ":" + x.port + ":" + x.address.getHostAddress(); + byte[] bt = getUTF8Bytes(s); + if (i > 8 && i + bt.length >= PREF_PACKET_SIZE) { + sendDatagramPacket(subnet, i, addr, port); + i = 8; + } + System.arraycopy(bt, 0, out_buf, i, bt.length); + i += bt.length; + out_buf[i++] = 0; + } + if (i > 8) sendDatagramPacket(subnet, i, addr, port); + } + } + + private boolean isRemote(InetAddress address, int port) { + if (port != socket.getLocalPort()) return true; + for (SubNet s : subnets) { + if (s.address.equals(address)) return false; + } + return true; + } + + private void handleDatagramPacket(InputPacket p) { + try { + long time = System.currentTimeMillis(); + byte[] buf = p.getData(); + int len = p.getLength(); + if (len < 8) return; + if (buf[0] != 'T') return; + if (buf[1] != 'C') return; + if (buf[2] != 'F') return; + if (buf[3] != CONF_VERSION) return; + int remote_port = p.getPort(); + InetAddress remote_address = p.getAddress(); + if (isRemote(remote_address, remote_port)) { + if (buf[4] == CONF_PEERS_REMOVED) { + handlePeerRemovedPacket(p, remote_port == DISCOVERY_PORT && remote_address.isLoopbackAddress()); + } + else { + Slave sl = null; + if (remote_port != DISCOVERY_PORT) { + sl = addSlave(remote_address, remote_port, time); + } + switch (buf[4]) { + case CONF_PEER_INFO: + handlePeerInfoPacket(p); + break; + case CONF_REQ_INFO: + handleReqInfoPacket(p, sl, time); + break; + case CONF_SLAVES_INFO: + handleSlavesInfoPacket(p, time); + break; + case CONF_REQ_SLAVES: + handleReqSlavesPacket(p, sl, time); + break; + } + for (SubNet subnet : subnets) { + if (!subnet.contains(remote_address)) continue; + long delay = DATA_RETENTION_PERIOD / 3; + if (remote_port != DISCOVERY_PORT) delay = DATA_RETENTION_PERIOD / 3 * 2; + else if (!subnet.address.equals(remote_address)) delay = DATA_RETENTION_PERIOD / 2; + if (subnet.last_slaves_req_time + delay <= time) { + sendSlavesRequest(subnet, remote_address, remote_port); + subnet.last_slaves_req_time = time; + } + if (remote_port == DISCOVERY_PORT && subnet.address.equals(remote_address)) { + last_master_packet_time = time; + } + } + } + } + } + catch (Throwable x) { + log("Invalid datagram packet received from " + p.getAddress() + "/" + p.getPort(), x); + } + } + + private void handlePeerInfoPacket(InputPacket p) { + try { + Map<String,String> map = parsePeerAtrributes(p.getData(), p.getLength()); + if (TRACE_DISCOVERY) traceDiscoveryPacket(true, "CONF_PEER_INFO", map, p); + String id = map.get(IPeer.ATTR_ID); + if (id == null) throw new Exception("Invalid peer info: no ID"); + boolean ok = true; + String host = map.get(IPeer.ATTR_IP_HOST); + if (host != null) { + ok = false; + InetAddress peer_addr = getInetAddress(host); + if (peer_addr != null) { + for (SubNet subnet : subnets) { + if (subnet.contains(peer_addr)) { + ok = true; + break; + } + } + } + } + if (ok) { + IPeer peer = peers.get(id); + if (peer instanceof RemotePeer) { + ((RemotePeer)peer).updateAttributes(map); + } + else if (peer == null) { + new RemotePeer(map); + } + } + } + catch (Exception x) { + log("Invalid datagram packet received from " + p.getAddress() + "/" + p.getPort(), x); + } + } + + private void handleReqInfoPacket(InputPacket p, Slave sl, long time) { + if (TRACE_DISCOVERY) traceDiscoveryPacket(true, "CONF_REQ_INFO", null, p); + sendAll(p.getAddress(), p.getPort(), sl, time); + } + + private void handleSlavesInfoPacket(InputPacket p, long time_now) { + try { + Map<String,String> map = parseIDs(p.getData(), p.getLength()); + if (TRACE_DISCOVERY) traceDiscoveryPacket(true, "CONF_SLAVES_INFO", map, p); + for (String s : map.values()) { + int i = 0; + int l = s.length(); + int time0 = i; + while (i < l&& s.charAt(i) != ':' && s.charAt(i) != 0) i++; + int time1 = i; + if (i < l && s.charAt(i) == ':') i++; + int port0 = i; + while (i < l&& s.charAt(i) != ':' && s.charAt(i) != 0) i++; + int port1 = i; + if (i < l && s.charAt(i) == ':') i++; + int host0 = i; + while (i < l && s.charAt(i) != 0) i++; + int host1 = i; + int port = Integer.parseInt(s.substring(port0, port1)); + String timestamp = s.substring(time0, time1); + String host = s.substring(host0, host1); + if (port != DISCOVERY_PORT) { + InetAddress addr = getInetAddress(host); + if (addr != null) { + long delta = 1000 * 60 * 30; // 30 minutes + long time_val = timestamp.length() > 0 ? Long.parseLong(timestamp) : time_now; + if (time_val < 3600000) { + /* Time stamp is "time to live" in milliseconds */ + time_val = time_now + time_val - DATA_RETENTION_PERIOD; + } + else if (time_val < time_now / 1000 + 50000000) { + /* Time stamp is in seconds */ + time_val *= 1000; + } + else { + /* Time stamp is in milliseconds */ + } + if (time_val < time_now - delta || time_val > time_now + delta) { + SimpleDateFormat fmt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + String msg = + "Invalid slave info timestamp: " + timestamp + + " -> " + fmt.format(new Date(time_val)); + log("Invalid datagram packet received from " + + p.getAddress() + "/" + p.getPort(), + new Exception(msg)); + time_val = time_now - DATA_RETENTION_PERIOD / 2; + } + addSlave(addr, port, time_val); + } + } + } + } + catch (Exception x) { + log("Invalid datagram packet received from " + p.getAddress() + "/" + p.getPort(), x); + } + } + + private void handleReqSlavesPacket(InputPacket p, Slave sl, long time) { + if (TRACE_DISCOVERY) traceDiscoveryPacket(true, "CONF_REQ_SLAVES", null, p); + if (sl != null) sl.last_req_slaves_time = time; + sendSlavesInfo(p.getAddress(), p.getPort(), time); + } + + private void handlePeerRemovedPacket(InputPacket p, boolean master_exited) { + try { + Map<String,String> map = parseIDs(p.getData(), p.getLength()); + if (TRACE_DISCOVERY) traceDiscoveryPacket(true, "CONF_PEERS_REMOVED", map, p); + for (String id : map.values()) { + IPeer peer = peers.get(id); + if (peer instanceof RemotePeer) ((RemotePeer)peer).dispose(); + } + if (master_exited) { + // Master locator has exited, let's try to get master port. + Protocol.invokeLater(500, new Runnable() { + public void run() { + if (socket.getLocalPort() == DISCOVERY_PORT) return; + try { + DatagramSocket s0 = socket; + socket = createSocket(false); + if (TRACE_DISCOVERY) { + LoggingUtil.trace("Became the master agent (bound to port " + socket.getLocalPort() + ")"); + } + s0.close(); + } + catch (Throwable x) { + } + } + }); + } + } + catch (Exception x) { + log("Invalid datagram packet received from " + p.getAddress() + "/" + p.getPort(), x); + } + } + + /*----------------------------------------------------------------------------------*/ + + public static LocatorService getLocator() { + return locator; + } + + public String getName() { + return NAME; + } + + public Map<String,IPeer> getPeers() { + assert Protocol.isDispatchThread(); + return peers; + } + + public IToken redirect(String peer_id, DoneRedirect done) { + throw new Error("Channel redirect cannot be done on local peer"); + } + + public IToken redirect(Map<String,String> peer, DoneRedirect done) { + throw new Error("Channel redirect cannot be done on local peer"); + } + + public IToken sync(DoneSync done) { + throw new Error("Channel sync cannot be done on local peer"); + } + + public void addListener(LocatorListener listener) { + assert listener != null; + assert Protocol.isDispatchThread(); + listeners.add(listener); + } + + public void removeListener(LocatorListener listener) { + assert Protocol.isDispatchThread(); + listeners.remove(listener); + } + + /** + * Log that a TCF Discovery packet has be sent or received. The trace is + * sent to stdout. This should be called only if the tracing has been turned + * on via java property definitions. + * + * @param received + * true if the packet was sent, otherwise it was received + * @param type + * a string specifying the type of packet, e.g., "CONF_PEER_INFO" + * @param attrs + * a set of attributes relevant to the type of packet (typically + * a peer's attributes) + * @param addr + * the network address the packet is being sent to + * @param port + * the port the packet is being sent to + */ + private static void traceDiscoveryPacket(boolean received, String type, Map<String,String> attrs, InetAddress addr, int port) { + assert TRACE_DISCOVERY; + StringBuilder str = new StringBuilder(type + (received ? " received from " : " sent to ") + addr + "/" + port); + if (attrs != null) { + Iterator<Entry<String, String>> iter = attrs.entrySet().iterator(); + while (iter.hasNext()) { + Entry<String, String> entry = iter.next(); + str.append("\n\t" + entry.getKey() + '=' + entry.getValue()); + } + } + LoggingUtil.trace(str.toString()); + } + + /** + * Convenience variant that takes a DatagramPacket for specifying + * the target address and port. + */ + private static void traceDiscoveryPacket(boolean received, String type, Map<String,String> attrs, InputPacket packet) { + traceDiscoveryPacket(received, type, attrs, packet.getAddress(), packet.getPort()); + } +} |