Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core')
-rw-r--r--plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/AbstractChannel.java953
-rw-r--r--plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/AbstractPeer.java186
-rw-r--r--plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/Base64.java148
-rw-r--r--plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/ChannelTCP.java103
-rw-r--r--plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/Command.java232
-rw-r--r--plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/ErrorReport.java66
-rw-r--r--plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/ServerTCP.java147
-rw-r--r--plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/StreamChannel.java81
8 files changed, 1916 insertions, 0 deletions
diff --git a/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/AbstractChannel.java b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/AbstractChannel.java
new file mode 100644
index 000000000..fe8825f55
--- /dev/null
+++ b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/AbstractChannel.java
@@ -0,0 +1,953 @@
+/*******************************************************************************
+ * Copyright (c) 2007, 2008 Wind River Systems, Inc. and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Wind River Systems - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.tm.tcf.core;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.eclipse.tm.internal.tcf.core.ServiceManager;
+import org.eclipse.tm.internal.tcf.core.Token;
+import org.eclipse.tm.internal.tcf.core.TransportManager;
+import org.eclipse.tm.internal.tcf.services.local.LocatorService;
+import org.eclipse.tm.internal.tcf.services.remote.GenericProxy;
+import org.eclipse.tm.tcf.protocol.IChannel;
+import org.eclipse.tm.tcf.protocol.IErrorReport;
+import org.eclipse.tm.tcf.protocol.IPeer;
+import org.eclipse.tm.tcf.protocol.IService;
+import org.eclipse.tm.tcf.protocol.IToken;
+import org.eclipse.tm.tcf.protocol.JSON;
+import org.eclipse.tm.tcf.protocol.Protocol;
+import org.eclipse.tm.tcf.services.ILocator;
+
+/**
+ * Abstract implementation of IChannel interface.
+ *
+ * AbstractChannel implements communication link connecting two end points (peers).
+ * The channel asynchronously transmits messages: commands, results and events.
+ *
+ * Clients can subclass AbstractChannel to support particular transport (wire) protocol.
+ * Also, see StreamChannel for stream oriented transport protocols.
+ */
+public abstract class AbstractChannel implements IChannel {
+
+ public interface TraceListener {
+
+ public void onMessageReceived(char type, String token,
+ String service, String name, byte[] data);
+
+ public void onMessageSent(char type, String token,
+ String service, String name, byte[] data);
+
+ public void onChannelClosed(Throwable error);
+ }
+
+ public interface Proxy {
+
+ public void onCommand(IToken token, String service, String name, byte[] data);
+
+ public void onEvent(String service, String name, byte[] data);
+
+ public void onChannelClosed(Throwable error);
+ }
+
+ private static class Message {
+ final char type;
+ Token token;
+ String service;
+ String name;
+ byte[] data;
+
+ boolean is_sent;
+ boolean is_canceled;
+
+ Collection<TraceListener> trace;
+
+ Message(char type) {
+ this.type = type;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ StringBuffer bf = new StringBuffer();
+ bf.append('[');;
+ bf.append(type);
+ if (token != null) {
+ bf.append(' ');
+ bf.append(token.getID());
+ }
+ if (service != null) {
+ bf.append(' ');
+ bf.append(service);
+ }
+ if (name != null) {
+ bf.append(' ');
+ bf.append(name);
+ }
+ if (data != null) {
+ int i = 0;
+ while (i < data.length) {
+ int j = i;
+ while (j < data.length && data[j] != 0) j++;
+ bf.append(' ');
+ bf.append(new String(data, i, j - i, "UTF8"));
+ if (j < data.length && data[j] == 0) j++;
+ i = j;
+ }
+ }
+ bf.append(']');
+ return bf.toString();
+ }
+ catch (Exception x) {
+ return x.toString();
+ }
+ }
+ }
+
+ private static IChannelListener[] listeners_array = new IChannelListener[4];
+
+ private final LinkedList<String> redirect_queue = new LinkedList<String>();
+ private final Map<Class<?>,IService> local_service_by_class = new HashMap<Class<?>,IService>();
+ private final Map<Class<?>,IService> remote_service_by_class = new HashMap<Class<?>,IService>();
+ private final Map<String,IService> local_service_by_name = new HashMap<String,IService>();
+ private final Map<String,IService> remote_service_by_name = new HashMap<String,IService>();
+ private final LinkedList<Message> out_queue = new LinkedList<Message>();
+ private final Collection<IChannelListener> channel_listeners = new ArrayList<IChannelListener>();
+ private final Map<String,IChannel.IEventListener[]> event_listeners = new HashMap<String,IChannel.IEventListener[]>();
+ private final Map<String,IChannel.ICommandServer> command_servers = new HashMap<String,IChannel.ICommandServer>();
+ private final Map<String,Message> out_tokens = new HashMap<String,Message>();
+ private final Thread inp_thread;
+ private final Thread out_thread;
+ private boolean notifying_channel_opened;
+ private boolean registered_with_trasport;
+ private boolean shutdown;
+ private int state = STATE_OPENNING;
+ private IToken redirect_command;
+ private final IPeer local_peer;
+ private IPeer remote_peer;
+ private Proxy proxy;
+
+ private static final int pending_command_limit = 32;
+ private int local_congestion_level = -100;
+ private int remote_congestion_level = -100;
+ private long local_congestion_time;
+ private int local_congestion_cnt;
+ private Collection<TraceListener> trace_listeners;
+
+ public static final int
+ EOS = -1, // End Of Stream
+ EOM = -2; // End Of Message
+
+ protected AbstractChannel(IPeer remote_peer) {
+ this(LocatorService.getLocalPeer(), remote_peer);
+ }
+
+ protected AbstractChannel(IPeer local_peer, IPeer remote_peer) {
+ assert Protocol.isDispatchThread();
+ this.remote_peer = remote_peer;
+ this.local_peer = local_peer;
+
+ inp_thread = new Thread() {
+
+ final byte[] empty_byte_array = new byte[0];
+ byte[] buf = new byte[1024];
+ byte[] eos;
+
+ private void error() throws IOException {
+ throw new IOException("Protocol syntax error");
+ }
+
+ private byte[] readBytes(int end) throws IOException {
+ int len = 0;
+ for (;;) {
+ int n = read();
+ if (n <= 0) {
+ if (n == end) break;
+ if (n == EOM) throw new IOException("Unexpected end of message");
+ if (n < 0) throw new IOException("Communication channel is closed by remote peer");
+ }
+ if (len >= buf.length) {
+ byte[] tmp = new byte[buf.length * 2];
+ System.arraycopy(buf, 0, tmp, 0, len);
+ buf = tmp;
+ }
+ buf[len++] = (byte)n;
+ }
+ if (len == 0) return empty_byte_array;
+ byte[] res = new byte[len];
+ System.arraycopy(buf, 0, res, 0, len);
+ return res;
+ }
+
+ private String readString() throws IOException {
+ int len = 0;
+ for (;;) {
+ int n = read();
+ if (n <= 0) {
+ if (n == 0) break;
+ if (n == EOM) throw new IOException("Unexpected end of message");
+ if (n < 0) throw new IOException("Communication channel is closed by remote peer");
+ }
+ if (len >= buf.length) {
+ byte[] tmp = new byte[buf.length * 2];
+ System.arraycopy(buf, 0, tmp, 0, len);
+ buf = tmp;
+ }
+ buf[len++] = (byte)n;
+ }
+ return new String(buf, 0, len, "UTF8");
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ int n = read();
+ if (n == EOM) continue;
+ if (n == EOS) {
+ eos = readBytes(EOM);
+ break;
+ }
+ final Message msg = new Message((char)n);
+ if (read() != 0) error();
+ switch (msg.type) {
+ case 'C':
+ msg.token = new Token(readBytes(0));
+ msg.service = readString();
+ msg.name = readString();
+ msg.data = readBytes(EOM);
+ break;
+ case 'P':
+ case 'R':
+ case 'N':
+ msg.token = new Token(readBytes(0));
+ msg.data = readBytes(EOM);
+ break;
+ case 'E':
+ msg.service = readString();
+ msg.name = readString();
+ msg.data = readBytes(EOM);
+ break;
+ case 'F':
+ msg.data = readBytes(EOM);
+ break;
+ default:
+ error();
+ }
+ Protocol.invokeLater(new Runnable() {
+ public void run() {
+ handleInput(msg);
+ }
+ });
+ int delay = local_congestion_level;
+ if (delay > 0) sleep(delay);
+ }
+ Protocol.invokeLater(new Runnable() {
+ public void run() {
+ if (out_tokens.isEmpty()) {
+ close();
+ }
+ else {
+ IOException x = new IOException("Connection reset by peer");
+ try {
+ Object[] args = JSON.parseSequence(eos);
+ if (args.length > 0 && args[0] != null) {
+ x = new IOException(Command.toErrorString(args[0]));
+ }
+ }
+ catch (IOException e) {
+ x = e;
+ }
+ terminate(x);
+ }
+ }
+ });
+ }
+ catch (final Throwable x) {
+ Protocol.invokeLater(new Runnable() {
+ public void run() {
+ terminate(x);
+ }
+ });
+ }
+ }
+ };
+
+ out_thread = new Thread() {
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ Message msg = null;
+ boolean last = false;
+ synchronized (out_queue) {
+ while (out_queue.isEmpty()) out_queue.wait();
+ msg = out_queue.removeFirst();
+ if (msg == null) break;
+ last = out_queue.isEmpty();
+ if (msg.is_canceled) {
+ if (last) flush();
+ continue;
+ }
+ msg.is_sent = true;
+ }
+ if (msg.trace != null) {
+ final Message m = msg;
+ Protocol.invokeLater(new Runnable() {
+ public void run() {
+ for (TraceListener l : m.trace) {
+ try {
+ l.onMessageSent(m.type, m.token == null ? null : m.token.getID(),
+ m.service, m.name, m.data);
+ }
+ catch (Throwable x) {
+ Protocol.log("Exception in channel listener", x);
+ }
+ }
+ }
+ });
+ }
+ write(msg.type);
+ write(0);
+ if (msg.token != null) {
+ write(msg.token.getBytes());
+ write(0);
+ }
+ if (msg.service != null) {
+ write(msg.service.getBytes("UTF8"));
+ write(0);
+ }
+ if (msg.name != null) {
+ write(msg.name.getBytes("UTF8"));
+ write(0);
+ }
+ if (msg.data != null) {
+ write(msg.data);
+ }
+ write(EOM);
+ int delay = 0;
+ int level = remote_congestion_level;
+ if (level > 0) delay = level * 10;
+ if (last || delay > 0) flush();
+ if (delay > 0) sleep(delay);
+ else yield();
+ }
+ write(EOS);
+ write(EOM);
+ flush();
+ }
+ catch (final Throwable x) {
+ Protocol.invokeLater(new Runnable() {
+ public void run() {
+ terminate(x);
+ }
+ });
+ }
+ }
+ };
+ inp_thread.setName("TCF Channel Receiver");
+ out_thread.setName("TCF Channel Transmitter");
+ }
+
+ protected void start() {
+ assert Protocol.isDispatchThread();
+ Protocol.invokeLater(new Runnable() {
+ public void run() {
+ try {
+ if (proxy != null) return;
+ ServiceManager.onChannelCreated(AbstractChannel.this, local_service_by_name);
+ makeServiceByClassMap(local_service_by_name, local_service_by_class);
+ Object[] args = new Object[]{ local_service_by_name.keySet() };
+ sendEvent(Protocol.getLocator(), "Hello", JSON.toJSONSequence(args));
+ }
+ catch (IOException x) {
+ terminate(x);
+ }
+ }
+ });
+ inp_thread.start();
+ out_thread.start();
+ }
+
+ /**
+ * Redirect this channel to given peer using this channel remote peer locator service as a proxy.
+ * @param peer_id - peer that will become new remote communication endpoint of this channel
+ */
+ public void redirect(final String peer_id) {
+ assert Protocol.isDispatchThread();
+ if (state == STATE_OPENNING) {
+ redirect_queue.add(peer_id);
+ }
+ else {
+ assert state == STATE_OPEN;
+ assert redirect_command == null;
+ try {
+ final ILocator l = (ILocator)remote_service_by_class.get(ILocator.class);
+ if (l == null) throw new IOException("Cannot redirect channel: peer " +
+ remote_peer.getID() + " has no locator service");
+ final IPeer peer = l.getPeers().get(peer_id);
+ if (peer == null) {
+ // Peer not found, must wait for a while until peer is discovered or time out
+ final boolean[] found = new boolean[1];
+ Protocol.invokeLater(ILocator.DATA_RETENTION_PERIOD / 3, new Runnable() {
+ public void run() {
+ if (found[0]) return;
+ terminate(new Exception("Peer " + peer_id + " not found"));
+ }
+ });
+ l.addListener(new ILocator.LocatorListener() {
+ public void peerAdded(IPeer peer) {
+ if (peer.getID().equals(peer_id)) {
+ found[0] = true;
+ state = STATE_OPEN;
+ l.removeListener(this);
+ redirect(peer_id);
+ }
+ }
+ public void peerChanged(IPeer peer) {
+ }
+
+ public void peerHeartBeat(String id) {
+ }
+
+ public void peerRemoved(String id) {
+ }
+ });
+ }
+ else {
+ redirect_command = l.redirect(peer_id, new ILocator.DoneRedirect() {
+ public void doneRedirect(IToken token, Exception x) {
+ assert redirect_command == token;
+ redirect_command = null;
+ if (state != STATE_OPENNING) return;
+ if (x != null) terminate(x);
+ remote_peer = peer;
+ remote_service_by_class.clear();
+ remote_service_by_name.clear();
+ event_listeners.clear();
+ }
+ });
+ }
+ state = STATE_OPENNING;
+ }
+ catch (Throwable x) {
+ terminate(x);
+ }
+ }
+ }
+
+ private void makeServiceByClassMap(Map<String,IService> by_name, Map<Class<?>,IService> by_class) {
+ for (IService service : by_name.values()) {
+ for (Class<?> fs : service.getClass().getInterfaces()) {
+ if (fs.equals(IService.class)) continue;
+ if (!IService.class.isAssignableFrom(fs)) continue;
+ by_class.put(fs, service);
+ }
+ }
+ }
+
+ public final int getState() {
+ return state;
+ }
+
+ public void addChannelListener(IChannelListener listener) {
+ assert Protocol.isDispatchThread();
+ assert listener != null;
+ channel_listeners.add(listener);
+ }
+
+ public void removeChannelListener(IChannelListener listener) {
+ assert Protocol.isDispatchThread();
+ channel_listeners.remove(listener);
+ }
+
+ public void addTraceListener(TraceListener listener) {
+ if (trace_listeners == null) {
+ trace_listeners = new ArrayList<TraceListener>();
+ }
+ else {
+ trace_listeners = new ArrayList<TraceListener>(trace_listeners);
+ }
+ trace_listeners.add(listener);
+ }
+
+ public void removeTraceListener(TraceListener listener) {
+ trace_listeners = new ArrayList<TraceListener>(trace_listeners);
+ trace_listeners.remove(listener);
+ if (trace_listeners.isEmpty()) trace_listeners = null;
+ }
+
+ public void addEventListener(IService service, IChannel.IEventListener listener) {
+ assert Protocol.isDispatchThread();
+ IChannel.IEventListener[] list = event_listeners.get(service.getName());
+ IChannel.IEventListener[] next = new IChannel.IEventListener[list == null ? 1 : list.length + 1];
+ if (list != null) System.arraycopy(list, 0, next, 0, list.length);
+ next[next.length - 1] = listener;
+ event_listeners.put(service.getName(), next);
+ }
+
+ public void removeEventListener(IService service, IChannel.IEventListener listener) {
+ assert Protocol.isDispatchThread();
+ IChannel.IEventListener[] list = event_listeners.get(service.getName());
+ for (int i = 0; i < list.length; i++) {
+ if (list[i] == listener) {
+ if (list.length == 1) {
+ event_listeners.remove(service.getName());
+ }
+ else {
+ IChannel.IEventListener[] next = new IChannel.IEventListener[list.length - 1];
+ System.arraycopy(list, 0, next, 0, i);
+ System.arraycopy(list, i + 1, next, i, next.length - i);
+ event_listeners.put(service.getName(), next);
+ }
+ return;
+ }
+ }
+ }
+
+ public void addCommandServer(IService service, IChannel.ICommandServer listener) {
+ assert Protocol.isDispatchThread();
+ if (command_servers.put(service.getName(), listener) != null) {
+ throw new Error("Only one command server per service is allowed");
+ }
+ }
+
+ public void removeCommandServer(IService service, IChannel.ICommandServer listener) {
+ assert Protocol.isDispatchThread();
+ if (command_servers.remove(service.getName()) != listener) {
+ throw new Error("Invalid command server");
+ }
+ }
+
+ private void sendEndOfStream() {
+ if (shutdown) return;
+ shutdown = true;
+ synchronized (out_queue) {
+ out_queue.clear();
+ out_queue.add(0, null);
+ out_queue.notify();
+ }
+ }
+
+ public void close() {
+ assert Protocol.isDispatchThread();
+ try {
+ sendEndOfStream();
+ out_thread.join(10000);
+ stop();
+ inp_thread.join(10000);
+ terminate(null);
+ }
+ catch (Exception x) {
+ terminate(x);
+ }
+ }
+
+ public void terminate(final Throwable error) {
+ assert Protocol.isDispatchThread();
+ sendEndOfStream();
+ if (state == STATE_CLOSED) return;
+ state = STATE_CLOSED;
+ if (error != null && remote_peer instanceof AbstractPeer) {
+ ((AbstractPeer)remote_peer).onChannelTerminated();
+ }
+ if (registered_with_trasport) {
+ registered_with_trasport = false;
+ TransportManager.channelClosed(this, error);
+ }
+ if (proxy != null) {
+ try {
+ proxy.onChannelClosed(error);
+ }
+ catch (Throwable x) {
+ Protocol.log("Exception in channel listener", x);
+ }
+ }
+ Protocol.invokeLater(new Runnable() {
+ public void run() {
+ if (!out_tokens.isEmpty()) {
+ Exception x = null;
+ if (error instanceof Exception) x = (Exception)error;
+ else if (error != null) x = new Exception(error);
+ else x = new IOException("Channel is closed");
+ for (Message msg : out_tokens.values()) {
+ String s = msg.toString();
+ if (s.length() > 72) s = s.substring(0, 72) + "...]";
+ IOException y = new IOException("Command " + s + " aborted");
+ y.initCause(x);
+ msg.token.getListener().terminated(msg.token, y);
+ }
+ out_tokens.clear();
+ }
+ if (channel_listeners.isEmpty()) {
+ Protocol.log("TCF channel terminated", error);
+ }
+ else {
+ listeners_array = channel_listeners.toArray(listeners_array);
+ for (IChannelListener l : listeners_array) {
+ if (l == null) break;
+ try {
+ l.onChannelClosed(error);
+ }
+ catch (Throwable x) {
+ Protocol.log("Exception in channel listener", x);
+ }
+ }
+ }
+ if (trace_listeners != null) {
+ for (TraceListener l : trace_listeners) {
+ try {
+ l.onChannelClosed(error);
+ }
+ catch (Throwable x) {
+ Protocol.log("Exception in channel listener", x);
+ }
+ }
+ }
+ }
+ });
+ }
+
+ public int getCongestion() {
+ assert Protocol.isDispatchThread();
+ int level = out_tokens.size() * 100 / pending_command_limit - 100;
+ if (remote_congestion_level > level) level = remote_congestion_level;
+ if (level > 100) level = 100;
+ return level;
+ }
+
+ public IPeer getLocalPeer() {
+ assert Protocol.isDispatchThread();
+ return local_peer;
+ }
+
+ public IPeer getRemotePeer() {
+ assert Protocol.isDispatchThread();
+ return remote_peer;
+ }
+
+ public Collection<String> getLocalServices() {
+ assert Protocol.isDispatchThread();
+ assert state != STATE_OPENNING;
+ return local_service_by_name.keySet();
+ }
+
+ public Collection<String> getRemoteServices() {
+ assert Protocol.isDispatchThread();
+ assert state != STATE_OPENNING;
+ return remote_service_by_name.keySet();
+ }
+
+ @SuppressWarnings("unchecked")
+ public <V extends IService> V getLocalService(Class<V> cls) {
+ assert Protocol.isDispatchThread();
+ assert state != STATE_OPENNING;
+ return (V)local_service_by_class.get(cls);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <V extends IService> V getRemoteService(Class<V> cls) {
+ assert Protocol.isDispatchThread();
+ assert state != STATE_OPENNING;
+ return (V)remote_service_by_class.get(cls);
+ }
+
+ public <V extends IService> void setServiceProxy(Class<V> service_interface, IService service_proxy) {
+ if (!notifying_channel_opened) new Error("setServiceProxe() can be called only from channel open call-back");
+ if (!(remote_service_by_name.get(service_proxy.getName()) instanceof GenericProxy)) throw new Error("Proxy already set");
+ if (remote_service_by_class.get(service_interface) != null) throw new Error("Proxy already set");
+ remote_service_by_class.put(service_interface, service_proxy);
+ }
+
+ public IService getLocalService(String service_name) {
+ assert Protocol.isDispatchThread();
+ assert state != STATE_OPENNING;
+ return local_service_by_name.get(service_name);
+ }
+
+ public IService getRemoteService(String service_name) {
+ assert Protocol.isDispatchThread();
+ assert state != STATE_OPENNING;
+ return remote_service_by_name.get(service_name);
+ }
+
+ public void setProxy(Proxy proxy, Collection<String> services) throws IOException {
+ this.proxy = proxy;
+ sendEvent(Protocol.getLocator(), "Hello", JSON.toJSONSequence(new Object[]{ services }));
+ local_service_by_class.clear();
+ local_service_by_name.clear();
+ }
+
+ private void addToOutQueue(Message msg) {
+ msg.trace = trace_listeners;
+ synchronized (out_queue) {
+ out_queue.add(msg);
+ out_queue.notify();
+ }
+ }
+
+ public IToken sendCommand(IService service, String name, byte[] args, ICommandListener listener) {
+ assert Protocol.isDispatchThread();
+ if (state == STATE_OPENNING) throw new Error("Channel is waiting for Hello message");
+ if (state == STATE_CLOSED) throw new Error("Channel is closed");
+ final Message msg = new Message('C');
+ msg.service = service.getName();
+ msg.name = name;
+ msg.data = args;
+ Token token = new Token(listener) {
+ @Override
+ public boolean cancel() {
+ assert Protocol.isDispatchThread();
+ if (state != STATE_OPEN) return false;
+ synchronized (out_queue) {
+ if (msg.is_sent) return false;
+ msg.is_canceled = true;
+ }
+ out_tokens.remove(msg.token.getID());
+ return true;
+ }
+ };
+ msg.token = token;
+ out_tokens.put(token.getID(), msg);
+ addToOutQueue(msg);
+ return token;
+ }
+
+ public void sendProgress(IToken token, byte[] results) {
+ assert Protocol.isDispatchThread();
+ if (state != STATE_OPEN) throw new Error("Channel is closed");
+ Message msg = new Message('P');
+ msg.data = results;
+ msg.token = (Token)token;
+ addToOutQueue(msg);
+ }
+
+ public void sendResult(IToken token, byte[] results) {
+ assert Protocol.isDispatchThread();
+ if (state != STATE_OPEN) throw new Error("Channel is closed");
+ Message msg = new Message('R');
+ msg.data = results;
+ msg.token = (Token)token;
+ addToOutQueue(msg);
+ }
+
+ public void rejectCommand(IToken token) {
+ assert Protocol.isDispatchThread();
+ if (state != STATE_OPEN) throw new Error("Channel is closed");
+ Message msg = new Message('N');
+ msg.token = (Token)token;
+ addToOutQueue(msg);
+ }
+
+ public void sendEvent(IService service, String name, byte[] args) {
+ assert Protocol.isDispatchThread();
+ if (!(state == STATE_OPEN || state == STATE_OPENNING && service instanceof ILocator)) {
+ throw new Error("Channel is closed");
+ }
+ Message msg = new Message('E');
+ msg.service = service.getName();
+ msg.name = name;
+ msg.data = args;
+ addToOutQueue(msg);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void handleInput(Message msg) {
+ assert Protocol.isDispatchThread();
+ if (state == STATE_CLOSED) return;
+ if (trace_listeners != null) {
+ for (TraceListener l : trace_listeners) {
+ try {
+ l.onMessageReceived(msg.type,
+ msg.token != null ? msg.token.getID() : null,
+ msg.service, msg.name, msg.data);
+ }
+ catch (Throwable x) {
+ Protocol.log("Exception in trace listener", x);
+ }
+ }
+ }
+ try {
+ Token token = null;
+ switch (msg.type) {
+ case 'C':
+ if (state == STATE_OPENNING) {
+ throw new IOException("Received command " + msg.service + "." + msg.name + " before Hello message");
+ }
+ if (proxy != null) {
+ proxy.onCommand(msg.token, msg.service, msg.name, msg.data);
+ }
+ else {
+ token = msg.token;
+ IChannel.ICommandServer cmds = command_servers.get(msg.service);
+ if (cmds != null) {
+ cmds.command(token, msg.name, msg.data);
+ }
+ else {
+ rejectCommand(token);
+ }
+ }
+ break;
+ case 'P':
+ token = out_tokens.get(msg.token.getID()).token;
+ token.getListener().progress(token, msg.data);
+ sendCongestionLevel();
+ break;
+ case 'R':
+ token = out_tokens.remove(msg.token.getID()).token;
+ token.getListener().result(token, msg.data);
+ sendCongestionLevel();
+ break;
+ case 'N':
+ token = out_tokens.remove(msg.token.getID()).token;
+ token.getListener().terminated(token, new ErrorReport(
+ "Command is not recognized", IErrorReport.TCF_ERROR_INV_COMMAND));
+ break;
+ case 'E':
+ boolean hello = msg.service.equals(ILocator.NAME) && msg.name.equals("Hello");
+ if (hello) {
+ remote_service_by_name.clear();
+ remote_service_by_class.clear();
+ ServiceManager.onChannelOpened(this, (Collection<String>)JSON.parseSequence(msg.data)[0], remote_service_by_name);
+ makeServiceByClassMap(remote_service_by_name, remote_service_by_class);
+ }
+ if (proxy != null && state == STATE_OPEN) {
+ proxy.onEvent(msg.service, msg.name, msg.data);
+ }
+ else if (hello) {
+ assert state == STATE_OPENNING;
+ state = STATE_OPEN;
+ assert redirect_command == null;
+ if (redirect_queue.size() > 0) {
+ redirect(redirect_queue.removeFirst());
+ }
+ else {
+ notifying_channel_opened = true;
+ if (!registered_with_trasport) {
+ TransportManager.channelOpened(this);
+ registered_with_trasport = true;
+ }
+ listeners_array = channel_listeners.toArray(listeners_array);
+ for (IChannelListener l : listeners_array) {
+ if (l == null) break;
+ try {
+ l.onChannelOpened();
+ }
+ catch (Throwable x) {
+ Protocol.log("Exception in channel listener", x);
+ }
+ }
+ notifying_channel_opened = false;
+ }
+ }
+ else {
+ IChannel.IEventListener[] list = event_listeners.get(msg.service);
+ if (list != null) {
+ for (int i = 0; i < list.length; i++) {
+ list[i].event(msg.name, msg.data);
+ }
+ }
+ sendCongestionLevel();
+ }
+ break;
+ case 'F':
+ int len = msg.data.length;
+ if (len > 0 && msg.data[len - 1] == 0) len--;
+ remote_congestion_level = Integer.parseInt(new String(msg.data, 0, len, "ASCII"));
+ break;
+ default:
+ assert false;
+ break;
+ }
+ }
+ catch (Throwable x) {
+ terminate(x);
+ }
+ }
+
+ private void sendCongestionLevel() throws IOException {
+ if (++local_congestion_cnt < 8) return;
+ local_congestion_cnt = 0;
+ if (state != STATE_OPEN) return;
+ long time = System.currentTimeMillis();
+ if (time - local_congestion_time < 500) return;
+ assert Protocol.isDispatchThread();
+ int level = Protocol.getCongestionLevel();
+ if (level == local_congestion_level) return;
+ int i = (level - local_congestion_level) / 8;
+ if (i != 0) level = local_congestion_level + i;
+ local_congestion_time = time;
+ synchronized (out_queue) {
+ Message msg = out_queue.isEmpty() ? null : out_queue.get(0);
+ if (msg == null || msg.type != 'F') {
+ msg = new Message('F');
+ out_queue.add(0, msg);
+ out_queue.notify();
+ }
+ StringBuilder buffer = new StringBuilder();
+ buffer.append(local_congestion_level);
+ buffer.append((char)0); // 0 terminate
+ msg.data = buffer.toString().getBytes("ASCII");
+ msg.trace = trace_listeners;
+ local_congestion_level = level;
+ }
+ }
+
+ /**
+ * Read one byte from the channel input stream.
+ * @return next data byte or EOS (-1) if end of stream is reached,
+ * or EOM (-2) if end of message is reached.
+ * @throws IOException
+ */
+ protected abstract int read() throws IOException;
+
+ /**
+ * Write one byte into the channel output stream.
+ * The method argument can be one of two special values:
+ * EOS (-1) end of stream marker;
+ * EOM (-2) end of message marker.
+ * The stream can put the byte into a buffer instead of transmitting it right away.
+ * @param n - the data byte.
+ * @throws IOException
+ */
+ protected abstract void write(int n) throws IOException;
+
+ /**
+ * Flush the channel output stream.
+ * All buffered data should be transmitted immediately.
+ * @throws IOException
+ */
+ protected abstract void flush() throws IOException;
+
+ /**
+ * Stop (close) channel underlying streams.
+ * If a thread is blocked by read() or write(), it should be
+ * resumed (or interrupted).
+ * @throws IOException
+ */
+ protected abstract void stop() throws IOException;
+
+ /**
+ * Write array of bytes into the channel output stream.
+ * The stream can put bytes into a buffer instead of transmitting it right away.
+ * @param buf
+ * @throws IOException
+ */
+ protected void write(byte[] buf) throws IOException {
+ assert Thread.currentThread() == out_thread;
+ for (int i = 0; i < buf.length; i++) write(buf[i]);
+ }
+}
diff --git a/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/AbstractPeer.java b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/AbstractPeer.java
new file mode 100644
index 000000000..1b857356b
--- /dev/null
+++ b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/AbstractPeer.java
@@ -0,0 +1,186 @@
+/*******************************************************************************
+ * Copyright (c) 2007, 2008 Wind River Systems, Inc. and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Wind River Systems - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.tm.tcf.core;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.eclipse.tm.internal.tcf.core.TransportManager;
+import org.eclipse.tm.internal.tcf.services.local.LocatorService;
+import org.eclipse.tm.tcf.protocol.IChannel;
+import org.eclipse.tm.tcf.protocol.IPeer;
+import org.eclipse.tm.tcf.protocol.JSON;
+import org.eclipse.tm.tcf.protocol.Protocol;
+import org.eclipse.tm.tcf.services.ILocator;
+import org.eclipse.tm.tcf.services.ILocator.LocatorListener;
+
+/**
+ * Abstract implementation of IPeer interface.
+ */
+public abstract class AbstractPeer implements IPeer {
+
+ private final Map<String, String> ro_attrs;
+ private final Map<String, String> rw_attrs;
+
+ private long last_heart_beat_time;
+
+ public AbstractPeer(Map<String,String> attrs) {
+ assert Protocol.isDispatchThread();
+ if (attrs != null) {
+ rw_attrs = new HashMap<String, String>(attrs);
+ }
+ else {
+ rw_attrs = new HashMap<String, String>();
+ }
+ ro_attrs = Collections.unmodifiableMap(rw_attrs);
+ assert getID() != null;
+ LocatorService.addPeer(this);
+ }
+
+ void onChannelTerminated() {
+ // A channel to this peer was terminated:
+ // not delaying next heart beat helps client to recover much faster.
+ last_heart_beat_time = 0;
+ }
+
+ public void updateAttributes(Map<String,String> attrs) {
+ boolean equ = true;
+ assert attrs.get(ATTR_ID).equals(rw_attrs.get(ATTR_ID));
+ for (Iterator<String> i = rw_attrs.keySet().iterator(); i.hasNext();) {
+ String key = i.next();
+ if (!rw_attrs.get(key).equals(attrs.get(key))) {
+ equ = false;
+ break;
+ }
+ }
+ for (Iterator<String> i = attrs.keySet().iterator(); i.hasNext();) {
+ String key = i.next();
+ if (!attrs.get(key).equals(rw_attrs.get(key))) {
+ equ = false;
+ break;
+ }
+ }
+ long time = System.currentTimeMillis();
+ if (!equ) {
+ rw_attrs.clear();
+ rw_attrs.putAll(attrs);
+ for (LocatorListener l : LocatorService.getListeners()) {
+ try {
+ l.peerChanged(this);
+ }
+ catch (Throwable x) {
+ Protocol.log("Unhandled exception in Locator listener", x);
+ }
+ }
+ try {
+ Object[] args = { rw_attrs };
+ Protocol.sendEvent(ILocator.NAME, "peerChanged", JSON.toJSONSequence(args));
+ }
+ catch (IOException x) {
+ Protocol.log("Locator: failed to send 'peerChanged' event", x);
+ }
+ last_heart_beat_time = time;
+ }
+ else if (last_heart_beat_time + ILocator.DATA_RETENTION_PERIOD / 4 < time) {
+ for (LocatorListener l : LocatorService.getListeners()) {
+ try {
+ l.peerHeartBeat(attrs.get(ATTR_ID));
+ }
+ catch (Throwable x) {
+ Protocol.log("Unhandled exception in Locator listener", x);
+ }
+ }
+ try {
+ Object[] args = { rw_attrs.get(ATTR_ID) };
+ Protocol.sendEvent(ILocator.NAME, "peerHeartBeat", JSON.toJSONSequence(args));
+ }
+ catch (IOException x) {
+ Protocol.log("Locator: failed to send 'peerHeartBeat' event", x);
+ }
+ last_heart_beat_time = time;
+ }
+ }
+
+ public void sendPeerAddedEvent() {
+ for (LocatorListener l : LocatorService.getListeners()) {
+ try {
+ l.peerAdded(this);
+ }
+ catch (Throwable x) {
+ Protocol.log("Unhandled exception in Locator listener", x);
+ }
+ }
+ try {
+ Object[] args = { rw_attrs };
+ Protocol.sendEvent(ILocator.NAME, "peerAdded", JSON.toJSONSequence(args));
+ }
+ catch (IOException x) {
+ Protocol.log("Locator: failed to send 'peerAdded' event", x);
+ }
+ last_heart_beat_time = System.currentTimeMillis();
+ }
+
+ public void sendPeerRemovedEvent() {
+ for (LocatorListener l : LocatorService.getListeners()) {
+ try {
+ l.peerRemoved(rw_attrs.get(ATTR_ID));
+ }
+ catch (Throwable x) {
+ Protocol.log("Unhandled exception in Locator listener", x);
+ }
+ }
+ try {
+ Object[] args = { rw_attrs.get(ATTR_ID) };
+ Protocol.sendEvent(ILocator.NAME, "peerRemoved", JSON.toJSONSequence(args));
+ }
+ catch (IOException x) {
+ Protocol.log("Locator: failed to send 'peerRemoved' event", x);
+ }
+ }
+
+ public void dispose() {
+ assert Protocol.isDispatchThread();
+ TransportManager.peerDisposed(this);
+ LocatorService.removePeer(this);
+ }
+
+ public Map<String,String> getAttributes() {
+ assert Protocol.isDispatchThread();
+ return ro_attrs;
+ }
+
+ public String getID() {
+ assert Protocol.isDispatchThread();
+ return ro_attrs.get(ATTR_ID);
+ }
+
+ public String getName() {
+ assert Protocol.isDispatchThread();
+ return ro_attrs.get(ATTR_NAME);
+ }
+
+ public String getOSName() {
+ assert Protocol.isDispatchThread();
+ return ro_attrs.get(ATTR_OS_NAME);
+ }
+
+ public String getTransportName() {
+ assert Protocol.isDispatchThread();
+ return ro_attrs.get(ATTR_TRANSPORT_NAME);
+ }
+
+ public IChannel openChannel() {
+ return TransportManager.openChannel(this);
+ }
+}
diff --git a/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/Base64.java b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/Base64.java
new file mode 100644
index 000000000..2a8ea72fa
--- /dev/null
+++ b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/Base64.java
@@ -0,0 +1,148 @@
+/*******************************************************************************
+ * Copyright (c) 2007, 2008 Wind River Systems, Inc. and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Wind River Systems - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.tm.tcf.core;
+
+/**
+ * Methods for translating Base64 encoded strings to byte arrays and back.
+ * @noextend This class is not intended to be subclassed by clients.
+ * @noinstantiate This class is not intended to be instantiated by clients.
+ */
+public final class Base64 {
+
+ public static char[] toBase64(byte[] buf, int pos, int len) {
+ char[] out_buf = new char[4 * ((len + 2) / 3)];
+ int end = pos + len;
+ int out_pos = 0;
+ while (pos < end) {
+ int byte0 = buf[pos++] & 0xff;
+ out_buf[out_pos++] = int2char[byte0 >> 2];
+ if (pos == end) {
+ out_buf[out_pos++] = int2char[(byte0 << 4) & 0x3f];
+ out_buf[out_pos++] = '=';
+ out_buf[out_pos++] = '=';
+ }
+ else {
+ int byte1 = buf[pos++] & 0xff;
+ out_buf[out_pos++] = int2char[(byte0 << 4) & 0x3f | (byte1 >> 4)];
+ if (pos == end) {
+ out_buf[out_pos++] = int2char[(byte1 << 2) & 0x3f];
+ out_buf[out_pos++] = '=';
+ }
+ else {
+ int byte2 = buf[pos++] & 0xff;
+ out_buf[out_pos++] = int2char[(byte1 << 2) & 0x3f | (byte2 >> 6)];
+ out_buf[out_pos++] = int2char[byte2 & 0x3f];
+ }
+ }
+ }
+ assert out_pos == out_buf.length;
+ return out_buf;
+ }
+
+ public static void toByteArray(byte[] buf, int offs, int size, char[] inp) {
+ int out_pos = offs;
+ if (inp != null) {
+ int inp_len = inp.length;
+ if (inp_len % 4 != 0) {
+ throw new IllegalArgumentException(
+ "BASE64 string length must be a multiple of four.");
+ }
+ int out_len = inp_len / 4 * 3;
+ if (inp_len > 0 && inp[inp_len - 1] == '=') {
+ out_len--;
+ if (inp[inp_len - 2] == '=') {
+ out_len--;
+ }
+ }
+ if (out_len > size) {
+ throw new IllegalArgumentException(
+ "BASE64 data array is longer then destination buffer.");
+ }
+ int inp_pos = 0;
+ while (inp_pos < inp_len) {
+ int n0, n1, n2, n3;
+ char ch0 = inp[inp_pos++];
+ char ch1 = inp[inp_pos++];
+ char ch2 = inp[inp_pos++];
+ char ch3 = inp[inp_pos++];
+ if (ch0 >= char2int.length || (n0 = char2int[ch0]) < 0) {
+ throw new IllegalArgumentException("Illegal character " + ch0);
+ }
+ if (ch1 >= char2int.length || (n1 = char2int[ch1]) < 0) {
+ throw new IllegalArgumentException("Illegal character " + ch1);
+ }
+ buf[out_pos++] = (byte)((n0 << 2) | (n1 >> 4));
+ if (ch2 == '=') break;
+ if (ch2 >= char2int.length || (n2 = char2int[ch2]) < 0) {
+ throw new IllegalArgumentException("Illegal character " + ch2);
+ }
+ buf[out_pos++] = (byte)((n1 << 4) | (n2 >> 2));
+ if (ch3 == '=') break;
+ if (ch3 >= char2int.length || (n3 = char2int[ch3]) < 0) {
+ throw new IllegalArgumentException("Illegal character " + ch3);
+ }
+ buf[out_pos++] = (byte)((n2 << 6) | n3);
+ }
+ assert out_pos == offs + out_len;
+ }
+ while (out_pos < offs + size) buf[out_pos++] = 0;
+ }
+
+ public static byte[] toByteArray(char[] inp) {
+ int inp_len = inp.length;
+ int out_len = inp_len / 4 * 3;
+ if (inp_len > 0 && inp[inp_len - 1] == '=') {
+ out_len--;
+ if (inp[inp_len - 2] == '=') {
+ out_len--;
+ }
+ }
+ byte[] buf = new byte[out_len];
+ toByteArray(buf, 0, buf.length, inp);
+ return buf;
+ }
+
+ /*
+ * See RFC 2045.
+ */
+ private static final char int2char[] = {
+ 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H',
+ 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
+ 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X',
+ 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f',
+ 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n',
+ 'o', 'p', 'q', 'r', 's', 't', 'u', 'v',
+ 'w', 'x', 'y', 'z', '0', '1', '2', '3',
+ '4', '5', '6', '7', '8', '9', '+', '/'
+ };
+
+ /*
+ * See RFC 2045
+ */
+ private static final byte char2int[] = {
+ -1, -1, -1, -1, -1, -1, -1, -1,
+ -1, -1, -1, -1, -1, -1, -1, -1,
+ -1, -1, -1, -1, -1, -1, -1, -1,
+ -1, -1, -1, -1, -1, -1, -1, -1,
+ -1, -1, -1, -1, -1, -1, -1, -1,
+ -1, -1, -1, 62, -1, -1, -1, 63,
+ 52, 53, 54, 55, 56, 57, 58, 59,
+ 60, 61, -1, -1, -1, -1, -1, -1,
+ -1, 0, 1, 2, 3, 4, 5, 6,
+ 7, 8, 9, 10, 11, 12, 13, 14,
+ 15, 16, 17, 18, 19, 20, 21, 22,
+ 23, 24, 25, -1, -1, -1, -1, -1,
+ -1, 26, 27, 28, 29, 30, 31, 32,
+ 33, 34, 35, 36, 37, 38, 39, 40,
+ 41, 42, 43, 44, 45, 46, 47, 48,
+ 49, 50, 51
+ };
+}
diff --git a/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/ChannelTCP.java b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/ChannelTCP.java
new file mode 100644
index 000000000..fc4b0d5ae
--- /dev/null
+++ b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/ChannelTCP.java
@@ -0,0 +1,103 @@
+/*******************************************************************************
+ * Copyright (c) 2007, 2008 Wind River Systems, Inc. and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Wind River Systems - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.tm.tcf.core;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.SocketException;
+
+import org.eclipse.tm.tcf.protocol.IPeer;
+import org.eclipse.tm.tcf.protocol.Protocol;
+
+/**
+ * ChannelTCP is a IChannel implementation that works on top of TCP sockets as a transport.
+ */
+public class ChannelTCP extends StreamChannel {
+
+ private Socket socket;
+ private InputStream inp;
+ private OutputStream out;
+ private boolean closed;
+
+ public ChannelTCP(IPeer remote_peer, final String host, final int port) {
+ super(remote_peer);
+ Thread thread = new Thread() {
+ public void run() {
+ try {
+ socket = new Socket(host, port);
+ socket.setTcpNoDelay(true);
+ inp = new BufferedInputStream(socket.getInputStream());
+ out = new BufferedOutputStream(socket.getOutputStream());
+ Protocol.invokeLater(new Runnable() {
+ public void run() {
+ ChannelTCP.this.start();
+ }
+ });
+ }
+ catch (final IOException x) {
+ Protocol.invokeLater(new Runnable() {
+ public void run() {
+ ChannelTCP.this.terminate(x);
+ }
+ });
+ }
+ }
+ };
+ thread.setName("TCF Socket Connect");
+ thread.start();
+ }
+
+ public ChannelTCP(IPeer local_peer, IPeer remote_peer, Socket socket) throws IOException {
+ super(local_peer, remote_peer);
+ this.socket = socket;
+ socket.setTcpNoDelay(true);
+ inp = new BufferedInputStream(socket.getInputStream());
+ out = new BufferedOutputStream(socket.getOutputStream());
+ start();
+ }
+
+ @Override
+ protected final int get() throws IOException {
+ try {
+ if (closed) return -1;
+ return inp.read();
+ }
+ catch (SocketException x) {
+ if (closed) return -1;
+ throw x;
+ }
+ }
+
+ @Override
+ protected final void put(int b) throws IOException {
+ assert b >= 0 && b <= 0xff;
+ if (closed) return;
+ out.write(b);
+ }
+
+ @Override
+ protected final void flush() throws IOException {
+ if (closed) return;
+ out.flush();
+ }
+
+ @Override
+ protected void stop() throws IOException {
+ closed = true;
+ socket.close();
+ out.close();
+ inp.close();
+ }
+}
diff --git a/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/Command.java b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/Command.java
new file mode 100644
index 000000000..7748e15c7
--- /dev/null
+++ b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/Command.java
@@ -0,0 +1,232 @@
+/*******************************************************************************
+ * Copyright (c) 2007, 2008 Wind River Systems, Inc. and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Wind River Systems - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.tm.tcf.core;
+
+import java.io.IOException;
+import java.text.MessageFormat;
+import java.text.SimpleDateFormat;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Map;
+
+import org.eclipse.tm.internal.tcf.core.Token;
+import org.eclipse.tm.tcf.protocol.IChannel;
+import org.eclipse.tm.tcf.protocol.IErrorReport;
+import org.eclipse.tm.tcf.protocol.IService;
+import org.eclipse.tm.tcf.protocol.IToken;
+import org.eclipse.tm.tcf.protocol.JSON;
+import org.eclipse.tm.tcf.protocol.Protocol;
+
+
+/**
+ * This is utility class that helps to implement sending a command and receiving
+ * command result over TCF communication channel. The class uses JSON to encode
+ * command arguments and to decode result data.
+ *
+ * The class also provides support for TCF standard error report encoding.
+ *
+ * Clients are expected to subclass <code>Command</code> and override <code>done</code> method.
+ *
+ * Note: most clients don't need to handle protocol commands directly and
+ * can use service APIs instead. Service API does all command encoding/decoding
+ * for a client.
+ *
+ * Typical usage example:
+ *
+ * public IToken getContext(String id, final DoneGetContext done) {
+ * return new Command(channel, IService.this, "getContext", new Object[]{ id }) {
+ * @Override
+ * public void done(Exception error, Object[] args) {
+ * Context ctx = null;
+ * if (error == null) {
+ * assert args.length == 2;
+ * error = toError(args[0]);
+ * if (args[1] != null) ctx = new Context(args[1]);
+ * }
+ * done.doneGetContext(token, error, ctx);
+ * }
+ * }.token;
+ * }
+ */
+public abstract class Command implements IChannel.ICommandListener {
+
+ private final IService service;
+ private final String command;
+ private final Object[] args;
+
+ public final IToken token;
+
+ private boolean done;
+
+ private static final SimpleDateFormat timestamp_format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+
+ public Command(IChannel channel, IService service, String command, Object[] args) {
+ this.service = service;
+ this.command = command;
+ this.args = args;
+ IToken t = null;
+ try {
+ t = channel.sendCommand(service, command, JSON.toJSONSequence(args), this);
+ }
+ catch (Throwable y) {
+ t = new Token();
+ final Exception x = y instanceof Exception ? (Exception)y : new Exception(y);
+ Protocol.invokeLater(new Runnable() {
+ public void run() {
+ assert !done;
+ done = true;
+ done(x, null);
+ }
+ });
+ }
+ token = t;
+ }
+
+ public void progress(IToken token, byte[] data) {
+ assert this.token == token;
+ }
+
+ public void result(IToken token, byte[] data) {
+ assert this.token == token;
+ Exception error = null;
+ Object[] args = null;
+ try {
+ args = JSON.parseSequence(data);
+ }
+ catch (Exception e) {
+ error = e;
+ }
+ assert !done;
+ done = true;
+ done(error, args);
+ }
+
+ public void terminated(IToken token, Exception error) {
+ assert this.token == token;
+ assert !done;
+ done = true;
+ done(error, null);
+ }
+
+ public abstract void done(Exception error, Object[] args);
+
+ public String getCommandString() {
+ StringBuffer buf = new StringBuffer();
+ buf.append(service.getName());
+ buf.append(' ');
+ buf.append(command);
+ if (args != null) {
+ for (int i = 0; i < args.length; i++) {
+ buf.append(i == 0 ? " " : ", ");
+ try {
+ buf.append(JSON.toJSON(args[i]));
+ }
+ catch (IOException x) {
+ buf.append("***");
+ buf.append(x.getMessage());
+ buf.append("***");
+ }
+ }
+ }
+ return buf.toString();
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ public static String toErrorString(Object data) {
+ if (data == null) return null;
+ Map<String,Object> map = (Map<String,Object>)data;
+ String fmt = (String)map.get(IErrorReport.ERROR_FORMAT);
+ if (fmt != null) {
+ Collection<Object> c = (Collection<Object>)map.get(IErrorReport.ERROR_PARAMS);
+ if (c != null) return new MessageFormat(fmt).format(c.toArray());
+ return fmt;
+ }
+ Number code = (Number)map.get(IErrorReport.ERROR_CODE);
+ if (code != null) {
+ if (code.intValue() == IErrorReport.TCF_ERROR_OTHER) {
+ String alt_org = (String)map.get(IErrorReport.ERROR_ALT_ORG);
+ Number alt_code = (Number)map.get(IErrorReport.ERROR_ALT_CODE);
+ if (alt_org != null && alt_code != null) {
+ return alt_org + " Error " + alt_code;
+ }
+ }
+ return "TCF Error " + code;
+ }
+ return "Invalid error report format";
+ }
+
+ static void appendErrorProps(StringBuffer bf, Map<String,Object> map) {
+ Number time = (Number)map.get(IErrorReport.ERROR_TIME);
+ Number code = (Number)map.get(IErrorReport.ERROR_CODE);
+ String service = (String)map.get(IErrorReport.ERROR_SERVICE);
+ Number severity = (Number)map.get(IErrorReport.ERROR_SEVERITY);
+ Number alt_code = (Number)map.get(IErrorReport.ERROR_ALT_CODE);
+ String alt_org = (String)map.get(IErrorReport.ERROR_ALT_ORG);
+ if (time != null) {
+ bf.append('\n');
+ bf.append("Time: ");
+ bf.append(timestamp_format.format(new Date(time.longValue())));
+ }
+ if (severity != null) {
+ bf.append('\n');
+ bf.append("Severity: ");
+ bf.append(toErrorString(map));
+ switch (severity.intValue()) {
+ case IErrorReport.SEVERITY_ERROR: bf.append("Error");
+ case IErrorReport.SEVERITY_FATAL: bf.append("Fatal");
+ case IErrorReport.SEVERITY_WARNING: bf.append("Warning");
+ default: bf.append("Unknown");
+ }
+ }
+ bf.append('\n');
+ bf.append("Error text: ");
+ bf.append(toErrorString(map));
+ bf.append('\n');
+ bf.append("Error code: ");
+ bf.append(code);
+ if (service != null) {
+ bf.append('\n');
+ bf.append("Service: ");
+ bf.append(service);
+ }
+ if (alt_code != null) {
+ bf.append('\n');
+ bf.append("Alt code: ");
+ bf.append(alt_code);
+ if (alt_org != null) {
+ bf.append('\n');
+ bf.append("Alt org: ");
+ bf.append(alt_org);
+ }
+ }
+ }
+
+ public Exception toError(Object data) {
+ return toError(data, true);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Exception toError(Object data, boolean include_command_text) {
+ if (data == null) return null;
+ Map<String,Object> map = (Map<String,Object>)data;
+ StringBuffer bf = new StringBuffer();
+ bf.append("TCF error report:");
+ bf.append('\n');
+ if (include_command_text) {
+ String cmd = getCommandString();
+ if (cmd.length() > 72) cmd = cmd.substring(0, 72) + "...";
+ bf.append("Command: ");
+ bf.append(cmd);
+ }
+ appendErrorProps(bf, map);
+ return new ErrorReport(bf.toString(), map);
+ }
+}
diff --git a/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/ErrorReport.java b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/ErrorReport.java
new file mode 100644
index 000000000..56dbca80e
--- /dev/null
+++ b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/ErrorReport.java
@@ -0,0 +1,66 @@
+/*******************************************************************************
+ * Copyright (c) 2007-2009 Wind River Systems, Inc. and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Wind River Systems - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.tm.tcf.core;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.eclipse.tm.tcf.protocol.IErrorReport;
+
+class ErrorReport extends Exception implements IErrorReport {
+
+ private static final long serialVersionUID = 3687543884858739977L;
+ private final Map<String,Object> attrs;
+
+ @SuppressWarnings("unchecked")
+ ErrorReport(String msg, Map<String,Object> attrs) {
+ super(msg);
+ this.attrs = attrs;
+ Object caused_by = attrs.get(IErrorReport.ERROR_CAUSED_BY);
+ if (caused_by != null) {
+ Map<String,Object> map = (Map<String,Object>)caused_by;
+ StringBuffer bf = new StringBuffer();
+ bf.append("TCF error report:");
+ bf.append('\n');
+ Command.appendErrorProps(bf, map);
+ initCause(new ErrorReport(bf.toString(), map));
+ }
+ }
+
+ ErrorReport(String msg, int code) {
+ super(msg);
+ attrs = new HashMap<String,Object>();
+ attrs.put(ERROR_CODE, code);
+ attrs.put(ERROR_TIME, System.currentTimeMillis());
+ attrs.put(ERROR_FORMAT, msg);
+ attrs.put(ERROR_SEVERITY, SEVERITY_ERROR);
+ }
+
+ public int getErrorCode() {
+ Number n = (Number)attrs.get(ERROR_CODE);
+ if (n == null) return 0;
+ return n.intValue();
+ }
+
+ public int getAltCode() {
+ Number n = (Number)attrs.get(ERROR_ALT_CODE);
+ if (n == null) return 0;
+ return n.intValue();
+ }
+
+ public String getAltOrg() {
+ return (String)attrs.get(ERROR_ALT_ORG);
+ }
+
+ public Map<String, Object> getAttributes() {
+ return attrs;
+ }
+} \ No newline at end of file
diff --git a/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/ServerTCP.java b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/ServerTCP.java
new file mode 100644
index 000000000..70d14924b
--- /dev/null
+++ b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/ServerTCP.java
@@ -0,0 +1,147 @@
+/*******************************************************************************
+ * Copyright (c) 2008 Wind River Systems, Inc. and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Wind River Systems - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.tm.tcf.core;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.eclipse.tm.tcf.protocol.IPeer;
+import org.eclipse.tm.tcf.protocol.Protocol;
+
+/**
+ * ServerTCP is a TCP server that is listening for incoming connection requests
+ * and creates TCF communication channels over TCP sockets for such requests.
+ *
+ * Clients may create objects of this class to become a TCF server.
+ */
+public class ServerTCP extends ServerSocket {
+
+ private static class ServerPeer extends AbstractPeer {
+ ServerPeer(Map<String,String> attrs) {
+ super(attrs);
+ }
+ }
+
+ private static class RemotePeer extends AbstractPeer {
+ RemotePeer(Map<String,String> attrs) {
+ super(attrs);
+ }
+ }
+
+ private final String name;
+ private List<ServerPeer> peers;
+ private Thread thread;
+
+ public ServerTCP(String name, int port) throws IOException {
+ super(port);
+ this.name = name;
+ peers = new ArrayList<ServerPeer>();
+ Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
+ while (e.hasMoreElements()) {
+ NetworkInterface f = e.nextElement();
+ Enumeration<InetAddress> n = f.getInetAddresses();
+ while (n.hasMoreElements()) {
+ peers.add(getLocalPeer(n.nextElement().getHostAddress()));
+ }
+ }
+ thread = new Thread() {
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ final Socket socket = accept();
+ Protocol.invokeLater(new Runnable() {
+ public void run() {
+ try {
+ new ChannelTCP(getLocalPeer(socket), getRemotePeer(socket), socket);
+ }
+ catch (final Throwable x) {
+ Protocol.log("TCF Server: failed to create a channel", x);
+ }
+ }
+ });
+ }
+ catch (final Throwable x) {
+ Protocol.invokeLater(new Runnable() {
+ public void run() {
+ Protocol.log("TCF Server thread aborted", x);
+ }
+ });
+ break;
+ }
+ }
+ }
+ };
+ thread.setName(name);
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ private ServerPeer getLocalPeer(String addr) {
+ for (ServerPeer p : peers) {
+ if (addr.equals(p.getAttributes().get(IPeer.ATTR_IP_HOST))) return p;
+ }
+ Map<String,String> attrs = new HashMap<String,String>();
+ attrs.put(IPeer.ATTR_ID, "TCP:" + addr + ":" + getLocalPort());
+ attrs.put(IPeer.ATTR_NAME, name);
+ attrs.put(IPeer.ATTR_OS_NAME, System.getProperty("os.name"));
+ attrs.put(IPeer.ATTR_TRANSPORT_NAME, "TCP");
+ attrs.put(IPeer.ATTR_IP_HOST, addr);
+ attrs.put(IPeer.ATTR_IP_PORT, Integer.toString(getLocalPort()));
+ attrs.put(IPeer.ATTR_PROXY, "");
+ ServerPeer p = new ServerPeer(attrs);
+ peers.add(p);
+ return p;
+ }
+
+ private IPeer getLocalPeer(Socket socket) {
+ return getLocalPeer(socket.getLocalAddress().getHostAddress());
+ }
+
+ private IPeer getRemotePeer(Socket socket) {
+ String addr = socket.getInetAddress().getHostAddress();
+ for (IPeer p : Protocol.getLocator().getPeers().values()) {
+ if (addr.equals(p.getAttributes().get(IPeer.ATTR_IP_HOST))) return p;
+ }
+ Map<String,String> attrs = new HashMap<String,String>();
+ attrs.put(IPeer.ATTR_ID, "TCP:" + addr + ":");
+ attrs.put(IPeer.ATTR_TRANSPORT_NAME, "TCP");
+ attrs.put(IPeer.ATTR_IP_HOST, addr);
+ return new RemotePeer(attrs);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (peers != null) {
+ for (ServerPeer s : peers) s.dispose();
+ peers = null;
+ }
+ super.close();
+ if (thread != null) {
+ try {
+ thread.join();
+ thread = null;
+ }
+ catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+ }
+ }
+}
diff --git a/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/StreamChannel.java b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/StreamChannel.java
new file mode 100644
index 000000000..b9020729a
--- /dev/null
+++ b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/StreamChannel.java
@@ -0,0 +1,81 @@
+/*******************************************************************************
+ * Copyright (c) 2007, 2008 Wind River Systems, Inc. and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Wind River Systems - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.tm.tcf.core;
+
+import java.io.IOException;
+
+import org.eclipse.tm.tcf.protocol.IPeer;
+
+/**
+ * Abstract implementation of IChannel interface for stream oriented transport protocols.
+ *
+ * StreamChannel implements communication link connecting two end points (peers).
+ * The channel asynchronously transmits messages: commands, results and events.
+ *
+ * StreamChannel uses escape sequences to represent End-Of-Message and End-Of-Stream markers.
+ *
+ * Clients can subclass StreamChannel to support particular stream oriented transport (wire) protocol.
+ * Also, see ChannelTCP for a concrete IChannel implementation that works on top of TCP sockets as a transport.
+ */
+public abstract class StreamChannel extends AbstractChannel {
+
+ public static final int ESC = 3;
+
+ public StreamChannel(IPeer remote_peer) {
+ super(remote_peer);
+ }
+
+ public StreamChannel(IPeer local_peer, IPeer remote_peer) {
+ super(local_peer, remote_peer);
+ }
+
+ protected abstract int get() throws IOException;
+ protected abstract void put(int n) throws IOException;
+
+ @Override
+ protected final int read() throws IOException {
+ int res = get();
+ if (res < 0) return EOS;
+ assert res >= 0 && res <= 0xff;
+ if (res != ESC) return res;
+ int n = get();
+ switch (n) {
+ case 0: return ESC;
+ case 1: return EOM;
+ case 2: return EOS;
+ default:
+ if (n < 0) return EOS;
+ assert false;
+ return 0;
+ }
+ }
+
+ @Override
+ protected final void write(int n) throws IOException {
+ switch (n) {
+ case ESC: put(ESC); put(0); break;
+ case EOM: put(ESC); put(1); break;
+ case EOS: put(ESC); put(2); break;
+ default:
+ assert n >= 0 && n <= 0xff;
+ put(n);
+ }
+ }
+
+ @Override
+ protected void write(byte[] buf) throws IOException {
+ for (int i = 0; i < buf.length; i++) {
+ int n = buf[i] & 0xff;
+ put(n);
+ if (n == ESC) put(0);
+ }
+ }
+}

Back to the top