diff options
Diffstat (limited to 'plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core')
8 files changed, 1916 insertions, 0 deletions
diff --git a/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/AbstractChannel.java b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/AbstractChannel.java new file mode 100644 index 000000000..fe8825f55 --- /dev/null +++ b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/AbstractChannel.java @@ -0,0 +1,953 @@ +/******************************************************************************* + * Copyright (c) 2007, 2008 Wind River Systems, Inc. and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Wind River Systems - initial API and implementation + *******************************************************************************/ +package org.eclipse.tm.tcf.core; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; + +import org.eclipse.tm.internal.tcf.core.ServiceManager; +import org.eclipse.tm.internal.tcf.core.Token; +import org.eclipse.tm.internal.tcf.core.TransportManager; +import org.eclipse.tm.internal.tcf.services.local.LocatorService; +import org.eclipse.tm.internal.tcf.services.remote.GenericProxy; +import org.eclipse.tm.tcf.protocol.IChannel; +import org.eclipse.tm.tcf.protocol.IErrorReport; +import org.eclipse.tm.tcf.protocol.IPeer; +import org.eclipse.tm.tcf.protocol.IService; +import org.eclipse.tm.tcf.protocol.IToken; +import org.eclipse.tm.tcf.protocol.JSON; +import org.eclipse.tm.tcf.protocol.Protocol; +import org.eclipse.tm.tcf.services.ILocator; + +/** + * Abstract implementation of IChannel interface. + * + * AbstractChannel implements communication link connecting two end points (peers). + * The channel asynchronously transmits messages: commands, results and events. + * + * Clients can subclass AbstractChannel to support particular transport (wire) protocol. + * Also, see StreamChannel for stream oriented transport protocols. + */ +public abstract class AbstractChannel implements IChannel { + + public interface TraceListener { + + public void onMessageReceived(char type, String token, + String service, String name, byte[] data); + + public void onMessageSent(char type, String token, + String service, String name, byte[] data); + + public void onChannelClosed(Throwable error); + } + + public interface Proxy { + + public void onCommand(IToken token, String service, String name, byte[] data); + + public void onEvent(String service, String name, byte[] data); + + public void onChannelClosed(Throwable error); + } + + private static class Message { + final char type; + Token token; + String service; + String name; + byte[] data; + + boolean is_sent; + boolean is_canceled; + + Collection<TraceListener> trace; + + Message(char type) { + this.type = type; + } + + @Override + public String toString() { + try { + StringBuffer bf = new StringBuffer(); + bf.append('[');; + bf.append(type); + if (token != null) { + bf.append(' '); + bf.append(token.getID()); + } + if (service != null) { + bf.append(' '); + bf.append(service); + } + if (name != null) { + bf.append(' '); + bf.append(name); + } + if (data != null) { + int i = 0; + while (i < data.length) { + int j = i; + while (j < data.length && data[j] != 0) j++; + bf.append(' '); + bf.append(new String(data, i, j - i, "UTF8")); + if (j < data.length && data[j] == 0) j++; + i = j; + } + } + bf.append(']'); + return bf.toString(); + } + catch (Exception x) { + return x.toString(); + } + } + } + + private static IChannelListener[] listeners_array = new IChannelListener[4]; + + private final LinkedList<String> redirect_queue = new LinkedList<String>(); + private final Map<Class<?>,IService> local_service_by_class = new HashMap<Class<?>,IService>(); + private final Map<Class<?>,IService> remote_service_by_class = new HashMap<Class<?>,IService>(); + private final Map<String,IService> local_service_by_name = new HashMap<String,IService>(); + private final Map<String,IService> remote_service_by_name = new HashMap<String,IService>(); + private final LinkedList<Message> out_queue = new LinkedList<Message>(); + private final Collection<IChannelListener> channel_listeners = new ArrayList<IChannelListener>(); + private final Map<String,IChannel.IEventListener[]> event_listeners = new HashMap<String,IChannel.IEventListener[]>(); + private final Map<String,IChannel.ICommandServer> command_servers = new HashMap<String,IChannel.ICommandServer>(); + private final Map<String,Message> out_tokens = new HashMap<String,Message>(); + private final Thread inp_thread; + private final Thread out_thread; + private boolean notifying_channel_opened; + private boolean registered_with_trasport; + private boolean shutdown; + private int state = STATE_OPENNING; + private IToken redirect_command; + private final IPeer local_peer; + private IPeer remote_peer; + private Proxy proxy; + + private static final int pending_command_limit = 32; + private int local_congestion_level = -100; + private int remote_congestion_level = -100; + private long local_congestion_time; + private int local_congestion_cnt; + private Collection<TraceListener> trace_listeners; + + public static final int + EOS = -1, // End Of Stream + EOM = -2; // End Of Message + + protected AbstractChannel(IPeer remote_peer) { + this(LocatorService.getLocalPeer(), remote_peer); + } + + protected AbstractChannel(IPeer local_peer, IPeer remote_peer) { + assert Protocol.isDispatchThread(); + this.remote_peer = remote_peer; + this.local_peer = local_peer; + + inp_thread = new Thread() { + + final byte[] empty_byte_array = new byte[0]; + byte[] buf = new byte[1024]; + byte[] eos; + + private void error() throws IOException { + throw new IOException("Protocol syntax error"); + } + + private byte[] readBytes(int end) throws IOException { + int len = 0; + for (;;) { + int n = read(); + if (n <= 0) { + if (n == end) break; + if (n == EOM) throw new IOException("Unexpected end of message"); + if (n < 0) throw new IOException("Communication channel is closed by remote peer"); + } + if (len >= buf.length) { + byte[] tmp = new byte[buf.length * 2]; + System.arraycopy(buf, 0, tmp, 0, len); + buf = tmp; + } + buf[len++] = (byte)n; + } + if (len == 0) return empty_byte_array; + byte[] res = new byte[len]; + System.arraycopy(buf, 0, res, 0, len); + return res; + } + + private String readString() throws IOException { + int len = 0; + for (;;) { + int n = read(); + if (n <= 0) { + if (n == 0) break; + if (n == EOM) throw new IOException("Unexpected end of message"); + if (n < 0) throw new IOException("Communication channel is closed by remote peer"); + } + if (len >= buf.length) { + byte[] tmp = new byte[buf.length * 2]; + System.arraycopy(buf, 0, tmp, 0, len); + buf = tmp; + } + buf[len++] = (byte)n; + } + return new String(buf, 0, len, "UTF8"); + } + + @Override + public void run() { + try { + while (true) { + int n = read(); + if (n == EOM) continue; + if (n == EOS) { + eos = readBytes(EOM); + break; + } + final Message msg = new Message((char)n); + if (read() != 0) error(); + switch (msg.type) { + case 'C': + msg.token = new Token(readBytes(0)); + msg.service = readString(); + msg.name = readString(); + msg.data = readBytes(EOM); + break; + case 'P': + case 'R': + case 'N': + msg.token = new Token(readBytes(0)); + msg.data = readBytes(EOM); + break; + case 'E': + msg.service = readString(); + msg.name = readString(); + msg.data = readBytes(EOM); + break; + case 'F': + msg.data = readBytes(EOM); + break; + default: + error(); + } + Protocol.invokeLater(new Runnable() { + public void run() { + handleInput(msg); + } + }); + int delay = local_congestion_level; + if (delay > 0) sleep(delay); + } + Protocol.invokeLater(new Runnable() { + public void run() { + if (out_tokens.isEmpty()) { + close(); + } + else { + IOException x = new IOException("Connection reset by peer"); + try { + Object[] args = JSON.parseSequence(eos); + if (args.length > 0 && args[0] != null) { + x = new IOException(Command.toErrorString(args[0])); + } + } + catch (IOException e) { + x = e; + } + terminate(x); + } + } + }); + } + catch (final Throwable x) { + Protocol.invokeLater(new Runnable() { + public void run() { + terminate(x); + } + }); + } + } + }; + + out_thread = new Thread() { + + @Override + public void run() { + try { + while (true) { + Message msg = null; + boolean last = false; + synchronized (out_queue) { + while (out_queue.isEmpty()) out_queue.wait(); + msg = out_queue.removeFirst(); + if (msg == null) break; + last = out_queue.isEmpty(); + if (msg.is_canceled) { + if (last) flush(); + continue; + } + msg.is_sent = true; + } + if (msg.trace != null) { + final Message m = msg; + Protocol.invokeLater(new Runnable() { + public void run() { + for (TraceListener l : m.trace) { + try { + l.onMessageSent(m.type, m.token == null ? null : m.token.getID(), + m.service, m.name, m.data); + } + catch (Throwable x) { + Protocol.log("Exception in channel listener", x); + } + } + } + }); + } + write(msg.type); + write(0); + if (msg.token != null) { + write(msg.token.getBytes()); + write(0); + } + if (msg.service != null) { + write(msg.service.getBytes("UTF8")); + write(0); + } + if (msg.name != null) { + write(msg.name.getBytes("UTF8")); + write(0); + } + if (msg.data != null) { + write(msg.data); + } + write(EOM); + int delay = 0; + int level = remote_congestion_level; + if (level > 0) delay = level * 10; + if (last || delay > 0) flush(); + if (delay > 0) sleep(delay); + else yield(); + } + write(EOS); + write(EOM); + flush(); + } + catch (final Throwable x) { + Protocol.invokeLater(new Runnable() { + public void run() { + terminate(x); + } + }); + } + } + }; + inp_thread.setName("TCF Channel Receiver"); + out_thread.setName("TCF Channel Transmitter"); + } + + protected void start() { + assert Protocol.isDispatchThread(); + Protocol.invokeLater(new Runnable() { + public void run() { + try { + if (proxy != null) return; + ServiceManager.onChannelCreated(AbstractChannel.this, local_service_by_name); + makeServiceByClassMap(local_service_by_name, local_service_by_class); + Object[] args = new Object[]{ local_service_by_name.keySet() }; + sendEvent(Protocol.getLocator(), "Hello", JSON.toJSONSequence(args)); + } + catch (IOException x) { + terminate(x); + } + } + }); + inp_thread.start(); + out_thread.start(); + } + + /** + * Redirect this channel to given peer using this channel remote peer locator service as a proxy. + * @param peer_id - peer that will become new remote communication endpoint of this channel + */ + public void redirect(final String peer_id) { + assert Protocol.isDispatchThread(); + if (state == STATE_OPENNING) { + redirect_queue.add(peer_id); + } + else { + assert state == STATE_OPEN; + assert redirect_command == null; + try { + final ILocator l = (ILocator)remote_service_by_class.get(ILocator.class); + if (l == null) throw new IOException("Cannot redirect channel: peer " + + remote_peer.getID() + " has no locator service"); + final IPeer peer = l.getPeers().get(peer_id); + if (peer == null) { + // Peer not found, must wait for a while until peer is discovered or time out + final boolean[] found = new boolean[1]; + Protocol.invokeLater(ILocator.DATA_RETENTION_PERIOD / 3, new Runnable() { + public void run() { + if (found[0]) return; + terminate(new Exception("Peer " + peer_id + " not found")); + } + }); + l.addListener(new ILocator.LocatorListener() { + public void peerAdded(IPeer peer) { + if (peer.getID().equals(peer_id)) { + found[0] = true; + state = STATE_OPEN; + l.removeListener(this); + redirect(peer_id); + } + } + public void peerChanged(IPeer peer) { + } + + public void peerHeartBeat(String id) { + } + + public void peerRemoved(String id) { + } + }); + } + else { + redirect_command = l.redirect(peer_id, new ILocator.DoneRedirect() { + public void doneRedirect(IToken token, Exception x) { + assert redirect_command == token; + redirect_command = null; + if (state != STATE_OPENNING) return; + if (x != null) terminate(x); + remote_peer = peer; + remote_service_by_class.clear(); + remote_service_by_name.clear(); + event_listeners.clear(); + } + }); + } + state = STATE_OPENNING; + } + catch (Throwable x) { + terminate(x); + } + } + } + + private void makeServiceByClassMap(Map<String,IService> by_name, Map<Class<?>,IService> by_class) { + for (IService service : by_name.values()) { + for (Class<?> fs : service.getClass().getInterfaces()) { + if (fs.equals(IService.class)) continue; + if (!IService.class.isAssignableFrom(fs)) continue; + by_class.put(fs, service); + } + } + } + + public final int getState() { + return state; + } + + public void addChannelListener(IChannelListener listener) { + assert Protocol.isDispatchThread(); + assert listener != null; + channel_listeners.add(listener); + } + + public void removeChannelListener(IChannelListener listener) { + assert Protocol.isDispatchThread(); + channel_listeners.remove(listener); + } + + public void addTraceListener(TraceListener listener) { + if (trace_listeners == null) { + trace_listeners = new ArrayList<TraceListener>(); + } + else { + trace_listeners = new ArrayList<TraceListener>(trace_listeners); + } + trace_listeners.add(listener); + } + + public void removeTraceListener(TraceListener listener) { + trace_listeners = new ArrayList<TraceListener>(trace_listeners); + trace_listeners.remove(listener); + if (trace_listeners.isEmpty()) trace_listeners = null; + } + + public void addEventListener(IService service, IChannel.IEventListener listener) { + assert Protocol.isDispatchThread(); + IChannel.IEventListener[] list = event_listeners.get(service.getName()); + IChannel.IEventListener[] next = new IChannel.IEventListener[list == null ? 1 : list.length + 1]; + if (list != null) System.arraycopy(list, 0, next, 0, list.length); + next[next.length - 1] = listener; + event_listeners.put(service.getName(), next); + } + + public void removeEventListener(IService service, IChannel.IEventListener listener) { + assert Protocol.isDispatchThread(); + IChannel.IEventListener[] list = event_listeners.get(service.getName()); + for (int i = 0; i < list.length; i++) { + if (list[i] == listener) { + if (list.length == 1) { + event_listeners.remove(service.getName()); + } + else { + IChannel.IEventListener[] next = new IChannel.IEventListener[list.length - 1]; + System.arraycopy(list, 0, next, 0, i); + System.arraycopy(list, i + 1, next, i, next.length - i); + event_listeners.put(service.getName(), next); + } + return; + } + } + } + + public void addCommandServer(IService service, IChannel.ICommandServer listener) { + assert Protocol.isDispatchThread(); + if (command_servers.put(service.getName(), listener) != null) { + throw new Error("Only one command server per service is allowed"); + } + } + + public void removeCommandServer(IService service, IChannel.ICommandServer listener) { + assert Protocol.isDispatchThread(); + if (command_servers.remove(service.getName()) != listener) { + throw new Error("Invalid command server"); + } + } + + private void sendEndOfStream() { + if (shutdown) return; + shutdown = true; + synchronized (out_queue) { + out_queue.clear(); + out_queue.add(0, null); + out_queue.notify(); + } + } + + public void close() { + assert Protocol.isDispatchThread(); + try { + sendEndOfStream(); + out_thread.join(10000); + stop(); + inp_thread.join(10000); + terminate(null); + } + catch (Exception x) { + terminate(x); + } + } + + public void terminate(final Throwable error) { + assert Protocol.isDispatchThread(); + sendEndOfStream(); + if (state == STATE_CLOSED) return; + state = STATE_CLOSED; + if (error != null && remote_peer instanceof AbstractPeer) { + ((AbstractPeer)remote_peer).onChannelTerminated(); + } + if (registered_with_trasport) { + registered_with_trasport = false; + TransportManager.channelClosed(this, error); + } + if (proxy != null) { + try { + proxy.onChannelClosed(error); + } + catch (Throwable x) { + Protocol.log("Exception in channel listener", x); + } + } + Protocol.invokeLater(new Runnable() { + public void run() { + if (!out_tokens.isEmpty()) { + Exception x = null; + if (error instanceof Exception) x = (Exception)error; + else if (error != null) x = new Exception(error); + else x = new IOException("Channel is closed"); + for (Message msg : out_tokens.values()) { + String s = msg.toString(); + if (s.length() > 72) s = s.substring(0, 72) + "...]"; + IOException y = new IOException("Command " + s + " aborted"); + y.initCause(x); + msg.token.getListener().terminated(msg.token, y); + } + out_tokens.clear(); + } + if (channel_listeners.isEmpty()) { + Protocol.log("TCF channel terminated", error); + } + else { + listeners_array = channel_listeners.toArray(listeners_array); + for (IChannelListener l : listeners_array) { + if (l == null) break; + try { + l.onChannelClosed(error); + } + catch (Throwable x) { + Protocol.log("Exception in channel listener", x); + } + } + } + if (trace_listeners != null) { + for (TraceListener l : trace_listeners) { + try { + l.onChannelClosed(error); + } + catch (Throwable x) { + Protocol.log("Exception in channel listener", x); + } + } + } + } + }); + } + + public int getCongestion() { + assert Protocol.isDispatchThread(); + int level = out_tokens.size() * 100 / pending_command_limit - 100; + if (remote_congestion_level > level) level = remote_congestion_level; + if (level > 100) level = 100; + return level; + } + + public IPeer getLocalPeer() { + assert Protocol.isDispatchThread(); + return local_peer; + } + + public IPeer getRemotePeer() { + assert Protocol.isDispatchThread(); + return remote_peer; + } + + public Collection<String> getLocalServices() { + assert Protocol.isDispatchThread(); + assert state != STATE_OPENNING; + return local_service_by_name.keySet(); + } + + public Collection<String> getRemoteServices() { + assert Protocol.isDispatchThread(); + assert state != STATE_OPENNING; + return remote_service_by_name.keySet(); + } + + @SuppressWarnings("unchecked") + public <V extends IService> V getLocalService(Class<V> cls) { + assert Protocol.isDispatchThread(); + assert state != STATE_OPENNING; + return (V)local_service_by_class.get(cls); + } + + @SuppressWarnings("unchecked") + public <V extends IService> V getRemoteService(Class<V> cls) { + assert Protocol.isDispatchThread(); + assert state != STATE_OPENNING; + return (V)remote_service_by_class.get(cls); + } + + public <V extends IService> void setServiceProxy(Class<V> service_interface, IService service_proxy) { + if (!notifying_channel_opened) new Error("setServiceProxe() can be called only from channel open call-back"); + if (!(remote_service_by_name.get(service_proxy.getName()) instanceof GenericProxy)) throw new Error("Proxy already set"); + if (remote_service_by_class.get(service_interface) != null) throw new Error("Proxy already set"); + remote_service_by_class.put(service_interface, service_proxy); + } + + public IService getLocalService(String service_name) { + assert Protocol.isDispatchThread(); + assert state != STATE_OPENNING; + return local_service_by_name.get(service_name); + } + + public IService getRemoteService(String service_name) { + assert Protocol.isDispatchThread(); + assert state != STATE_OPENNING; + return remote_service_by_name.get(service_name); + } + + public void setProxy(Proxy proxy, Collection<String> services) throws IOException { + this.proxy = proxy; + sendEvent(Protocol.getLocator(), "Hello", JSON.toJSONSequence(new Object[]{ services })); + local_service_by_class.clear(); + local_service_by_name.clear(); + } + + private void addToOutQueue(Message msg) { + msg.trace = trace_listeners; + synchronized (out_queue) { + out_queue.add(msg); + out_queue.notify(); + } + } + + public IToken sendCommand(IService service, String name, byte[] args, ICommandListener listener) { + assert Protocol.isDispatchThread(); + if (state == STATE_OPENNING) throw new Error("Channel is waiting for Hello message"); + if (state == STATE_CLOSED) throw new Error("Channel is closed"); + final Message msg = new Message('C'); + msg.service = service.getName(); + msg.name = name; + msg.data = args; + Token token = new Token(listener) { + @Override + public boolean cancel() { + assert Protocol.isDispatchThread(); + if (state != STATE_OPEN) return false; + synchronized (out_queue) { + if (msg.is_sent) return false; + msg.is_canceled = true; + } + out_tokens.remove(msg.token.getID()); + return true; + } + }; + msg.token = token; + out_tokens.put(token.getID(), msg); + addToOutQueue(msg); + return token; + } + + public void sendProgress(IToken token, byte[] results) { + assert Protocol.isDispatchThread(); + if (state != STATE_OPEN) throw new Error("Channel is closed"); + Message msg = new Message('P'); + msg.data = results; + msg.token = (Token)token; + addToOutQueue(msg); + } + + public void sendResult(IToken token, byte[] results) { + assert Protocol.isDispatchThread(); + if (state != STATE_OPEN) throw new Error("Channel is closed"); + Message msg = new Message('R'); + msg.data = results; + msg.token = (Token)token; + addToOutQueue(msg); + } + + public void rejectCommand(IToken token) { + assert Protocol.isDispatchThread(); + if (state != STATE_OPEN) throw new Error("Channel is closed"); + Message msg = new Message('N'); + msg.token = (Token)token; + addToOutQueue(msg); + } + + public void sendEvent(IService service, String name, byte[] args) { + assert Protocol.isDispatchThread(); + if (!(state == STATE_OPEN || state == STATE_OPENNING && service instanceof ILocator)) { + throw new Error("Channel is closed"); + } + Message msg = new Message('E'); + msg.service = service.getName(); + msg.name = name; + msg.data = args; + addToOutQueue(msg); + } + + @SuppressWarnings("unchecked") + private void handleInput(Message msg) { + assert Protocol.isDispatchThread(); + if (state == STATE_CLOSED) return; + if (trace_listeners != null) { + for (TraceListener l : trace_listeners) { + try { + l.onMessageReceived(msg.type, + msg.token != null ? msg.token.getID() : null, + msg.service, msg.name, msg.data); + } + catch (Throwable x) { + Protocol.log("Exception in trace listener", x); + } + } + } + try { + Token token = null; + switch (msg.type) { + case 'C': + if (state == STATE_OPENNING) { + throw new IOException("Received command " + msg.service + "." + msg.name + " before Hello message"); + } + if (proxy != null) { + proxy.onCommand(msg.token, msg.service, msg.name, msg.data); + } + else { + token = msg.token; + IChannel.ICommandServer cmds = command_servers.get(msg.service); + if (cmds != null) { + cmds.command(token, msg.name, msg.data); + } + else { + rejectCommand(token); + } + } + break; + case 'P': + token = out_tokens.get(msg.token.getID()).token; + token.getListener().progress(token, msg.data); + sendCongestionLevel(); + break; + case 'R': + token = out_tokens.remove(msg.token.getID()).token; + token.getListener().result(token, msg.data); + sendCongestionLevel(); + break; + case 'N': + token = out_tokens.remove(msg.token.getID()).token; + token.getListener().terminated(token, new ErrorReport( + "Command is not recognized", IErrorReport.TCF_ERROR_INV_COMMAND)); + break; + case 'E': + boolean hello = msg.service.equals(ILocator.NAME) && msg.name.equals("Hello"); + if (hello) { + remote_service_by_name.clear(); + remote_service_by_class.clear(); + ServiceManager.onChannelOpened(this, (Collection<String>)JSON.parseSequence(msg.data)[0], remote_service_by_name); + makeServiceByClassMap(remote_service_by_name, remote_service_by_class); + } + if (proxy != null && state == STATE_OPEN) { + proxy.onEvent(msg.service, msg.name, msg.data); + } + else if (hello) { + assert state == STATE_OPENNING; + state = STATE_OPEN; + assert redirect_command == null; + if (redirect_queue.size() > 0) { + redirect(redirect_queue.removeFirst()); + } + else { + notifying_channel_opened = true; + if (!registered_with_trasport) { + TransportManager.channelOpened(this); + registered_with_trasport = true; + } + listeners_array = channel_listeners.toArray(listeners_array); + for (IChannelListener l : listeners_array) { + if (l == null) break; + try { + l.onChannelOpened(); + } + catch (Throwable x) { + Protocol.log("Exception in channel listener", x); + } + } + notifying_channel_opened = false; + } + } + else { + IChannel.IEventListener[] list = event_listeners.get(msg.service); + if (list != null) { + for (int i = 0; i < list.length; i++) { + list[i].event(msg.name, msg.data); + } + } + sendCongestionLevel(); + } + break; + case 'F': + int len = msg.data.length; + if (len > 0 && msg.data[len - 1] == 0) len--; + remote_congestion_level = Integer.parseInt(new String(msg.data, 0, len, "ASCII")); + break; + default: + assert false; + break; + } + } + catch (Throwable x) { + terminate(x); + } + } + + private void sendCongestionLevel() throws IOException { + if (++local_congestion_cnt < 8) return; + local_congestion_cnt = 0; + if (state != STATE_OPEN) return; + long time = System.currentTimeMillis(); + if (time - local_congestion_time < 500) return; + assert Protocol.isDispatchThread(); + int level = Protocol.getCongestionLevel(); + if (level == local_congestion_level) return; + int i = (level - local_congestion_level) / 8; + if (i != 0) level = local_congestion_level + i; + local_congestion_time = time; + synchronized (out_queue) { + Message msg = out_queue.isEmpty() ? null : out_queue.get(0); + if (msg == null || msg.type != 'F') { + msg = new Message('F'); + out_queue.add(0, msg); + out_queue.notify(); + } + StringBuilder buffer = new StringBuilder(); + buffer.append(local_congestion_level); + buffer.append((char)0); // 0 terminate + msg.data = buffer.toString().getBytes("ASCII"); + msg.trace = trace_listeners; + local_congestion_level = level; + } + } + + /** + * Read one byte from the channel input stream. + * @return next data byte or EOS (-1) if end of stream is reached, + * or EOM (-2) if end of message is reached. + * @throws IOException + */ + protected abstract int read() throws IOException; + + /** + * Write one byte into the channel output stream. + * The method argument can be one of two special values: + * EOS (-1) end of stream marker; + * EOM (-2) end of message marker. + * The stream can put the byte into a buffer instead of transmitting it right away. + * @param n - the data byte. + * @throws IOException + */ + protected abstract void write(int n) throws IOException; + + /** + * Flush the channel output stream. + * All buffered data should be transmitted immediately. + * @throws IOException + */ + protected abstract void flush() throws IOException; + + /** + * Stop (close) channel underlying streams. + * If a thread is blocked by read() or write(), it should be + * resumed (or interrupted). + * @throws IOException + */ + protected abstract void stop() throws IOException; + + /** + * Write array of bytes into the channel output stream. + * The stream can put bytes into a buffer instead of transmitting it right away. + * @param buf + * @throws IOException + */ + protected void write(byte[] buf) throws IOException { + assert Thread.currentThread() == out_thread; + for (int i = 0; i < buf.length; i++) write(buf[i]); + } +} diff --git a/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/AbstractPeer.java b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/AbstractPeer.java new file mode 100644 index 000000000..1b857356b --- /dev/null +++ b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/AbstractPeer.java @@ -0,0 +1,186 @@ +/******************************************************************************* + * Copyright (c) 2007, 2008 Wind River Systems, Inc. and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Wind River Systems - initial API and implementation + *******************************************************************************/ +package org.eclipse.tm.tcf.core; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import org.eclipse.tm.internal.tcf.core.TransportManager; +import org.eclipse.tm.internal.tcf.services.local.LocatorService; +import org.eclipse.tm.tcf.protocol.IChannel; +import org.eclipse.tm.tcf.protocol.IPeer; +import org.eclipse.tm.tcf.protocol.JSON; +import org.eclipse.tm.tcf.protocol.Protocol; +import org.eclipse.tm.tcf.services.ILocator; +import org.eclipse.tm.tcf.services.ILocator.LocatorListener; + +/** + * Abstract implementation of IPeer interface. + */ +public abstract class AbstractPeer implements IPeer { + + private final Map<String, String> ro_attrs; + private final Map<String, String> rw_attrs; + + private long last_heart_beat_time; + + public AbstractPeer(Map<String,String> attrs) { + assert Protocol.isDispatchThread(); + if (attrs != null) { + rw_attrs = new HashMap<String, String>(attrs); + } + else { + rw_attrs = new HashMap<String, String>(); + } + ro_attrs = Collections.unmodifiableMap(rw_attrs); + assert getID() != null; + LocatorService.addPeer(this); + } + + void onChannelTerminated() { + // A channel to this peer was terminated: + // not delaying next heart beat helps client to recover much faster. + last_heart_beat_time = 0; + } + + public void updateAttributes(Map<String,String> attrs) { + boolean equ = true; + assert attrs.get(ATTR_ID).equals(rw_attrs.get(ATTR_ID)); + for (Iterator<String> i = rw_attrs.keySet().iterator(); i.hasNext();) { + String key = i.next(); + if (!rw_attrs.get(key).equals(attrs.get(key))) { + equ = false; + break; + } + } + for (Iterator<String> i = attrs.keySet().iterator(); i.hasNext();) { + String key = i.next(); + if (!attrs.get(key).equals(rw_attrs.get(key))) { + equ = false; + break; + } + } + long time = System.currentTimeMillis(); + if (!equ) { + rw_attrs.clear(); + rw_attrs.putAll(attrs); + for (LocatorListener l : LocatorService.getListeners()) { + try { + l.peerChanged(this); + } + catch (Throwable x) { + Protocol.log("Unhandled exception in Locator listener", x); + } + } + try { + Object[] args = { rw_attrs }; + Protocol.sendEvent(ILocator.NAME, "peerChanged", JSON.toJSONSequence(args)); + } + catch (IOException x) { + Protocol.log("Locator: failed to send 'peerChanged' event", x); + } + last_heart_beat_time = time; + } + else if (last_heart_beat_time + ILocator.DATA_RETENTION_PERIOD / 4 < time) { + for (LocatorListener l : LocatorService.getListeners()) { + try { + l.peerHeartBeat(attrs.get(ATTR_ID)); + } + catch (Throwable x) { + Protocol.log("Unhandled exception in Locator listener", x); + } + } + try { + Object[] args = { rw_attrs.get(ATTR_ID) }; + Protocol.sendEvent(ILocator.NAME, "peerHeartBeat", JSON.toJSONSequence(args)); + } + catch (IOException x) { + Protocol.log("Locator: failed to send 'peerHeartBeat' event", x); + } + last_heart_beat_time = time; + } + } + + public void sendPeerAddedEvent() { + for (LocatorListener l : LocatorService.getListeners()) { + try { + l.peerAdded(this); + } + catch (Throwable x) { + Protocol.log("Unhandled exception in Locator listener", x); + } + } + try { + Object[] args = { rw_attrs }; + Protocol.sendEvent(ILocator.NAME, "peerAdded", JSON.toJSONSequence(args)); + } + catch (IOException x) { + Protocol.log("Locator: failed to send 'peerAdded' event", x); + } + last_heart_beat_time = System.currentTimeMillis(); + } + + public void sendPeerRemovedEvent() { + for (LocatorListener l : LocatorService.getListeners()) { + try { + l.peerRemoved(rw_attrs.get(ATTR_ID)); + } + catch (Throwable x) { + Protocol.log("Unhandled exception in Locator listener", x); + } + } + try { + Object[] args = { rw_attrs.get(ATTR_ID) }; + Protocol.sendEvent(ILocator.NAME, "peerRemoved", JSON.toJSONSequence(args)); + } + catch (IOException x) { + Protocol.log("Locator: failed to send 'peerRemoved' event", x); + } + } + + public void dispose() { + assert Protocol.isDispatchThread(); + TransportManager.peerDisposed(this); + LocatorService.removePeer(this); + } + + public Map<String,String> getAttributes() { + assert Protocol.isDispatchThread(); + return ro_attrs; + } + + public String getID() { + assert Protocol.isDispatchThread(); + return ro_attrs.get(ATTR_ID); + } + + public String getName() { + assert Protocol.isDispatchThread(); + return ro_attrs.get(ATTR_NAME); + } + + public String getOSName() { + assert Protocol.isDispatchThread(); + return ro_attrs.get(ATTR_OS_NAME); + } + + public String getTransportName() { + assert Protocol.isDispatchThread(); + return ro_attrs.get(ATTR_TRANSPORT_NAME); + } + + public IChannel openChannel() { + return TransportManager.openChannel(this); + } +} diff --git a/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/Base64.java b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/Base64.java new file mode 100644 index 000000000..2a8ea72fa --- /dev/null +++ b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/Base64.java @@ -0,0 +1,148 @@ +/******************************************************************************* + * Copyright (c) 2007, 2008 Wind River Systems, Inc. and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Wind River Systems - initial API and implementation + *******************************************************************************/ +package org.eclipse.tm.tcf.core; + +/** + * Methods for translating Base64 encoded strings to byte arrays and back. + * @noextend This class is not intended to be subclassed by clients. + * @noinstantiate This class is not intended to be instantiated by clients. + */ +public final class Base64 { + + public static char[] toBase64(byte[] buf, int pos, int len) { + char[] out_buf = new char[4 * ((len + 2) / 3)]; + int end = pos + len; + int out_pos = 0; + while (pos < end) { + int byte0 = buf[pos++] & 0xff; + out_buf[out_pos++] = int2char[byte0 >> 2]; + if (pos == end) { + out_buf[out_pos++] = int2char[(byte0 << 4) & 0x3f]; + out_buf[out_pos++] = '='; + out_buf[out_pos++] = '='; + } + else { + int byte1 = buf[pos++] & 0xff; + out_buf[out_pos++] = int2char[(byte0 << 4) & 0x3f | (byte1 >> 4)]; + if (pos == end) { + out_buf[out_pos++] = int2char[(byte1 << 2) & 0x3f]; + out_buf[out_pos++] = '='; + } + else { + int byte2 = buf[pos++] & 0xff; + out_buf[out_pos++] = int2char[(byte1 << 2) & 0x3f | (byte2 >> 6)]; + out_buf[out_pos++] = int2char[byte2 & 0x3f]; + } + } + } + assert out_pos == out_buf.length; + return out_buf; + } + + public static void toByteArray(byte[] buf, int offs, int size, char[] inp) { + int out_pos = offs; + if (inp != null) { + int inp_len = inp.length; + if (inp_len % 4 != 0) { + throw new IllegalArgumentException( + "BASE64 string length must be a multiple of four."); + } + int out_len = inp_len / 4 * 3; + if (inp_len > 0 && inp[inp_len - 1] == '=') { + out_len--; + if (inp[inp_len - 2] == '=') { + out_len--; + } + } + if (out_len > size) { + throw new IllegalArgumentException( + "BASE64 data array is longer then destination buffer."); + } + int inp_pos = 0; + while (inp_pos < inp_len) { + int n0, n1, n2, n3; + char ch0 = inp[inp_pos++]; + char ch1 = inp[inp_pos++]; + char ch2 = inp[inp_pos++]; + char ch3 = inp[inp_pos++]; + if (ch0 >= char2int.length || (n0 = char2int[ch0]) < 0) { + throw new IllegalArgumentException("Illegal character " + ch0); + } + if (ch1 >= char2int.length || (n1 = char2int[ch1]) < 0) { + throw new IllegalArgumentException("Illegal character " + ch1); + } + buf[out_pos++] = (byte)((n0 << 2) | (n1 >> 4)); + if (ch2 == '=') break; + if (ch2 >= char2int.length || (n2 = char2int[ch2]) < 0) { + throw new IllegalArgumentException("Illegal character " + ch2); + } + buf[out_pos++] = (byte)((n1 << 4) | (n2 >> 2)); + if (ch3 == '=') break; + if (ch3 >= char2int.length || (n3 = char2int[ch3]) < 0) { + throw new IllegalArgumentException("Illegal character " + ch3); + } + buf[out_pos++] = (byte)((n2 << 6) | n3); + } + assert out_pos == offs + out_len; + } + while (out_pos < offs + size) buf[out_pos++] = 0; + } + + public static byte[] toByteArray(char[] inp) { + int inp_len = inp.length; + int out_len = inp_len / 4 * 3; + if (inp_len > 0 && inp[inp_len - 1] == '=') { + out_len--; + if (inp[inp_len - 2] == '=') { + out_len--; + } + } + byte[] buf = new byte[out_len]; + toByteArray(buf, 0, buf.length, inp); + return buf; + } + + /* + * See RFC 2045. + */ + private static final char int2char[] = { + 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', + 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', + 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', + 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', + 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', + 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', + 'w', 'x', 'y', 'z', '0', '1', '2', '3', + '4', '5', '6', '7', '8', '9', '+', '/' + }; + + /* + * See RFC 2045 + */ + private static final byte char2int[] = { + -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, 62, -1, -1, -1, 63, + 52, 53, 54, 55, 56, 57, 58, 59, + 60, 61, -1, -1, -1, -1, -1, -1, + -1, 0, 1, 2, 3, 4, 5, 6, + 7, 8, 9, 10, 11, 12, 13, 14, + 15, 16, 17, 18, 19, 20, 21, 22, + 23, 24, 25, -1, -1, -1, -1, -1, + -1, 26, 27, 28, 29, 30, 31, 32, + 33, 34, 35, 36, 37, 38, 39, 40, + 41, 42, 43, 44, 45, 46, 47, 48, + 49, 50, 51 + }; +} diff --git a/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/ChannelTCP.java b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/ChannelTCP.java new file mode 100644 index 000000000..fc4b0d5ae --- /dev/null +++ b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/ChannelTCP.java @@ -0,0 +1,103 @@ +/******************************************************************************* + * Copyright (c) 2007, 2008 Wind River Systems, Inc. and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Wind River Systems - initial API and implementation + *******************************************************************************/ +package org.eclipse.tm.tcf.core; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.net.SocketException; + +import org.eclipse.tm.tcf.protocol.IPeer; +import org.eclipse.tm.tcf.protocol.Protocol; + +/** + * ChannelTCP is a IChannel implementation that works on top of TCP sockets as a transport. + */ +public class ChannelTCP extends StreamChannel { + + private Socket socket; + private InputStream inp; + private OutputStream out; + private boolean closed; + + public ChannelTCP(IPeer remote_peer, final String host, final int port) { + super(remote_peer); + Thread thread = new Thread() { + public void run() { + try { + socket = new Socket(host, port); + socket.setTcpNoDelay(true); + inp = new BufferedInputStream(socket.getInputStream()); + out = new BufferedOutputStream(socket.getOutputStream()); + Protocol.invokeLater(new Runnable() { + public void run() { + ChannelTCP.this.start(); + } + }); + } + catch (final IOException x) { + Protocol.invokeLater(new Runnable() { + public void run() { + ChannelTCP.this.terminate(x); + } + }); + } + } + }; + thread.setName("TCF Socket Connect"); + thread.start(); + } + + public ChannelTCP(IPeer local_peer, IPeer remote_peer, Socket socket) throws IOException { + super(local_peer, remote_peer); + this.socket = socket; + socket.setTcpNoDelay(true); + inp = new BufferedInputStream(socket.getInputStream()); + out = new BufferedOutputStream(socket.getOutputStream()); + start(); + } + + @Override + protected final int get() throws IOException { + try { + if (closed) return -1; + return inp.read(); + } + catch (SocketException x) { + if (closed) return -1; + throw x; + } + } + + @Override + protected final void put(int b) throws IOException { + assert b >= 0 && b <= 0xff; + if (closed) return; + out.write(b); + } + + @Override + protected final void flush() throws IOException { + if (closed) return; + out.flush(); + } + + @Override + protected void stop() throws IOException { + closed = true; + socket.close(); + out.close(); + inp.close(); + } +} diff --git a/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/Command.java b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/Command.java new file mode 100644 index 000000000..7748e15c7 --- /dev/null +++ b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/Command.java @@ -0,0 +1,232 @@ +/******************************************************************************* + * Copyright (c) 2007, 2008 Wind River Systems, Inc. and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Wind River Systems - initial API and implementation + *******************************************************************************/ +package org.eclipse.tm.tcf.core; + +import java.io.IOException; +import java.text.MessageFormat; +import java.text.SimpleDateFormat; +import java.util.Collection; +import java.util.Date; +import java.util.Map; + +import org.eclipse.tm.internal.tcf.core.Token; +import org.eclipse.tm.tcf.protocol.IChannel; +import org.eclipse.tm.tcf.protocol.IErrorReport; +import org.eclipse.tm.tcf.protocol.IService; +import org.eclipse.tm.tcf.protocol.IToken; +import org.eclipse.tm.tcf.protocol.JSON; +import org.eclipse.tm.tcf.protocol.Protocol; + + +/** + * This is utility class that helps to implement sending a command and receiving + * command result over TCF communication channel. The class uses JSON to encode + * command arguments and to decode result data. + * + * The class also provides support for TCF standard error report encoding. + * + * Clients are expected to subclass <code>Command</code> and override <code>done</code> method. + * + * Note: most clients don't need to handle protocol commands directly and + * can use service APIs instead. Service API does all command encoding/decoding + * for a client. + * + * Typical usage example: + * + * public IToken getContext(String id, final DoneGetContext done) { + * return new Command(channel, IService.this, "getContext", new Object[]{ id }) { + * @Override + * public void done(Exception error, Object[] args) { + * Context ctx = null; + * if (error == null) { + * assert args.length == 2; + * error = toError(args[0]); + * if (args[1] != null) ctx = new Context(args[1]); + * } + * done.doneGetContext(token, error, ctx); + * } + * }.token; + * } + */ +public abstract class Command implements IChannel.ICommandListener { + + private final IService service; + private final String command; + private final Object[] args; + + public final IToken token; + + private boolean done; + + private static final SimpleDateFormat timestamp_format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + + public Command(IChannel channel, IService service, String command, Object[] args) { + this.service = service; + this.command = command; + this.args = args; + IToken t = null; + try { + t = channel.sendCommand(service, command, JSON.toJSONSequence(args), this); + } + catch (Throwable y) { + t = new Token(); + final Exception x = y instanceof Exception ? (Exception)y : new Exception(y); + Protocol.invokeLater(new Runnable() { + public void run() { + assert !done; + done = true; + done(x, null); + } + }); + } + token = t; + } + + public void progress(IToken token, byte[] data) { + assert this.token == token; + } + + public void result(IToken token, byte[] data) { + assert this.token == token; + Exception error = null; + Object[] args = null; + try { + args = JSON.parseSequence(data); + } + catch (Exception e) { + error = e; + } + assert !done; + done = true; + done(error, args); + } + + public void terminated(IToken token, Exception error) { + assert this.token == token; + assert !done; + done = true; + done(error, null); + } + + public abstract void done(Exception error, Object[] args); + + public String getCommandString() { + StringBuffer buf = new StringBuffer(); + buf.append(service.getName()); + buf.append(' '); + buf.append(command); + if (args != null) { + for (int i = 0; i < args.length; i++) { + buf.append(i == 0 ? " " : ", "); + try { + buf.append(JSON.toJSON(args[i])); + } + catch (IOException x) { + buf.append("***"); + buf.append(x.getMessage()); + buf.append("***"); + } + } + } + return buf.toString(); + } + + @SuppressWarnings({ "unchecked" }) + public static String toErrorString(Object data) { + if (data == null) return null; + Map<String,Object> map = (Map<String,Object>)data; + String fmt = (String)map.get(IErrorReport.ERROR_FORMAT); + if (fmt != null) { + Collection<Object> c = (Collection<Object>)map.get(IErrorReport.ERROR_PARAMS); + if (c != null) return new MessageFormat(fmt).format(c.toArray()); + return fmt; + } + Number code = (Number)map.get(IErrorReport.ERROR_CODE); + if (code != null) { + if (code.intValue() == IErrorReport.TCF_ERROR_OTHER) { + String alt_org = (String)map.get(IErrorReport.ERROR_ALT_ORG); + Number alt_code = (Number)map.get(IErrorReport.ERROR_ALT_CODE); + if (alt_org != null && alt_code != null) { + return alt_org + " Error " + alt_code; + } + } + return "TCF Error " + code; + } + return "Invalid error report format"; + } + + static void appendErrorProps(StringBuffer bf, Map<String,Object> map) { + Number time = (Number)map.get(IErrorReport.ERROR_TIME); + Number code = (Number)map.get(IErrorReport.ERROR_CODE); + String service = (String)map.get(IErrorReport.ERROR_SERVICE); + Number severity = (Number)map.get(IErrorReport.ERROR_SEVERITY); + Number alt_code = (Number)map.get(IErrorReport.ERROR_ALT_CODE); + String alt_org = (String)map.get(IErrorReport.ERROR_ALT_ORG); + if (time != null) { + bf.append('\n'); + bf.append("Time: "); + bf.append(timestamp_format.format(new Date(time.longValue()))); + } + if (severity != null) { + bf.append('\n'); + bf.append("Severity: "); + bf.append(toErrorString(map)); + switch (severity.intValue()) { + case IErrorReport.SEVERITY_ERROR: bf.append("Error"); + case IErrorReport.SEVERITY_FATAL: bf.append("Fatal"); + case IErrorReport.SEVERITY_WARNING: bf.append("Warning"); + default: bf.append("Unknown"); + } + } + bf.append('\n'); + bf.append("Error text: "); + bf.append(toErrorString(map)); + bf.append('\n'); + bf.append("Error code: "); + bf.append(code); + if (service != null) { + bf.append('\n'); + bf.append("Service: "); + bf.append(service); + } + if (alt_code != null) { + bf.append('\n'); + bf.append("Alt code: "); + bf.append(alt_code); + if (alt_org != null) { + bf.append('\n'); + bf.append("Alt org: "); + bf.append(alt_org); + } + } + } + + public Exception toError(Object data) { + return toError(data, true); + } + + @SuppressWarnings("unchecked") + public Exception toError(Object data, boolean include_command_text) { + if (data == null) return null; + Map<String,Object> map = (Map<String,Object>)data; + StringBuffer bf = new StringBuffer(); + bf.append("TCF error report:"); + bf.append('\n'); + if (include_command_text) { + String cmd = getCommandString(); + if (cmd.length() > 72) cmd = cmd.substring(0, 72) + "..."; + bf.append("Command: "); + bf.append(cmd); + } + appendErrorProps(bf, map); + return new ErrorReport(bf.toString(), map); + } +} diff --git a/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/ErrorReport.java b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/ErrorReport.java new file mode 100644 index 000000000..56dbca80e --- /dev/null +++ b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/ErrorReport.java @@ -0,0 +1,66 @@ +/******************************************************************************* + * Copyright (c) 2007-2009 Wind River Systems, Inc. and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Wind River Systems - initial API and implementation + *******************************************************************************/ +package org.eclipse.tm.tcf.core; + +import java.util.HashMap; +import java.util.Map; + +import org.eclipse.tm.tcf.protocol.IErrorReport; + +class ErrorReport extends Exception implements IErrorReport { + + private static final long serialVersionUID = 3687543884858739977L; + private final Map<String,Object> attrs; + + @SuppressWarnings("unchecked") + ErrorReport(String msg, Map<String,Object> attrs) { + super(msg); + this.attrs = attrs; + Object caused_by = attrs.get(IErrorReport.ERROR_CAUSED_BY); + if (caused_by != null) { + Map<String,Object> map = (Map<String,Object>)caused_by; + StringBuffer bf = new StringBuffer(); + bf.append("TCF error report:"); + bf.append('\n'); + Command.appendErrorProps(bf, map); + initCause(new ErrorReport(bf.toString(), map)); + } + } + + ErrorReport(String msg, int code) { + super(msg); + attrs = new HashMap<String,Object>(); + attrs.put(ERROR_CODE, code); + attrs.put(ERROR_TIME, System.currentTimeMillis()); + attrs.put(ERROR_FORMAT, msg); + attrs.put(ERROR_SEVERITY, SEVERITY_ERROR); + } + + public int getErrorCode() { + Number n = (Number)attrs.get(ERROR_CODE); + if (n == null) return 0; + return n.intValue(); + } + + public int getAltCode() { + Number n = (Number)attrs.get(ERROR_ALT_CODE); + if (n == null) return 0; + return n.intValue(); + } + + public String getAltOrg() { + return (String)attrs.get(ERROR_ALT_ORG); + } + + public Map<String, Object> getAttributes() { + return attrs; + } +}
\ No newline at end of file diff --git a/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/ServerTCP.java b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/ServerTCP.java new file mode 100644 index 000000000..70d14924b --- /dev/null +++ b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/ServerTCP.java @@ -0,0 +1,147 @@ +/******************************************************************************* + * Copyright (c) 2008 Wind River Systems, Inc. and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Wind River Systems - initial API and implementation + *******************************************************************************/ +package org.eclipse.tm.tcf.core; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.eclipse.tm.tcf.protocol.IPeer; +import org.eclipse.tm.tcf.protocol.Protocol; + +/** + * ServerTCP is a TCP server that is listening for incoming connection requests + * and creates TCF communication channels over TCP sockets for such requests. + * + * Clients may create objects of this class to become a TCF server. + */ +public class ServerTCP extends ServerSocket { + + private static class ServerPeer extends AbstractPeer { + ServerPeer(Map<String,String> attrs) { + super(attrs); + } + } + + private static class RemotePeer extends AbstractPeer { + RemotePeer(Map<String,String> attrs) { + super(attrs); + } + } + + private final String name; + private List<ServerPeer> peers; + private Thread thread; + + public ServerTCP(String name, int port) throws IOException { + super(port); + this.name = name; + peers = new ArrayList<ServerPeer>(); + Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces(); + while (e.hasMoreElements()) { + NetworkInterface f = e.nextElement(); + Enumeration<InetAddress> n = f.getInetAddresses(); + while (n.hasMoreElements()) { + peers.add(getLocalPeer(n.nextElement().getHostAddress())); + } + } + thread = new Thread() { + @Override + public void run() { + while (true) { + try { + final Socket socket = accept(); + Protocol.invokeLater(new Runnable() { + public void run() { + try { + new ChannelTCP(getLocalPeer(socket), getRemotePeer(socket), socket); + } + catch (final Throwable x) { + Protocol.log("TCF Server: failed to create a channel", x); + } + } + }); + } + catch (final Throwable x) { + Protocol.invokeLater(new Runnable() { + public void run() { + Protocol.log("TCF Server thread aborted", x); + } + }); + break; + } + } + } + }; + thread.setName(name); + thread.setDaemon(true); + thread.start(); + } + + private ServerPeer getLocalPeer(String addr) { + for (ServerPeer p : peers) { + if (addr.equals(p.getAttributes().get(IPeer.ATTR_IP_HOST))) return p; + } + Map<String,String> attrs = new HashMap<String,String>(); + attrs.put(IPeer.ATTR_ID, "TCP:" + addr + ":" + getLocalPort()); + attrs.put(IPeer.ATTR_NAME, name); + attrs.put(IPeer.ATTR_OS_NAME, System.getProperty("os.name")); + attrs.put(IPeer.ATTR_TRANSPORT_NAME, "TCP"); + attrs.put(IPeer.ATTR_IP_HOST, addr); + attrs.put(IPeer.ATTR_IP_PORT, Integer.toString(getLocalPort())); + attrs.put(IPeer.ATTR_PROXY, ""); + ServerPeer p = new ServerPeer(attrs); + peers.add(p); + return p; + } + + private IPeer getLocalPeer(Socket socket) { + return getLocalPeer(socket.getLocalAddress().getHostAddress()); + } + + private IPeer getRemotePeer(Socket socket) { + String addr = socket.getInetAddress().getHostAddress(); + for (IPeer p : Protocol.getLocator().getPeers().values()) { + if (addr.equals(p.getAttributes().get(IPeer.ATTR_IP_HOST))) return p; + } + Map<String,String> attrs = new HashMap<String,String>(); + attrs.put(IPeer.ATTR_ID, "TCP:" + addr + ":"); + attrs.put(IPeer.ATTR_TRANSPORT_NAME, "TCP"); + attrs.put(IPeer.ATTR_IP_HOST, addr); + return new RemotePeer(attrs); + } + + @Override + public void close() throws IOException { + if (peers != null) { + for (ServerPeer s : peers) s.dispose(); + peers = null; + } + super.close(); + if (thread != null) { + try { + thread.join(); + thread = null; + } + catch (InterruptedException e) { + throw new InterruptedIOException(); + } + } + } +} diff --git a/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/StreamChannel.java b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/StreamChannel.java new file mode 100644 index 000000000..b9020729a --- /dev/null +++ b/plugins/org.eclipse.tm.tcf.core/src/org/eclipse/tm/tcf/core/StreamChannel.java @@ -0,0 +1,81 @@ +/******************************************************************************* + * Copyright (c) 2007, 2008 Wind River Systems, Inc. and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Wind River Systems - initial API and implementation + *******************************************************************************/ +package org.eclipse.tm.tcf.core; + +import java.io.IOException; + +import org.eclipse.tm.tcf.protocol.IPeer; + +/** + * Abstract implementation of IChannel interface for stream oriented transport protocols. + * + * StreamChannel implements communication link connecting two end points (peers). + * The channel asynchronously transmits messages: commands, results and events. + * + * StreamChannel uses escape sequences to represent End-Of-Message and End-Of-Stream markers. + * + * Clients can subclass StreamChannel to support particular stream oriented transport (wire) protocol. + * Also, see ChannelTCP for a concrete IChannel implementation that works on top of TCP sockets as a transport. + */ +public abstract class StreamChannel extends AbstractChannel { + + public static final int ESC = 3; + + public StreamChannel(IPeer remote_peer) { + super(remote_peer); + } + + public StreamChannel(IPeer local_peer, IPeer remote_peer) { + super(local_peer, remote_peer); + } + + protected abstract int get() throws IOException; + protected abstract void put(int n) throws IOException; + + @Override + protected final int read() throws IOException { + int res = get(); + if (res < 0) return EOS; + assert res >= 0 && res <= 0xff; + if (res != ESC) return res; + int n = get(); + switch (n) { + case 0: return ESC; + case 1: return EOM; + case 2: return EOS; + default: + if (n < 0) return EOS; + assert false; + return 0; + } + } + + @Override + protected final void write(int n) throws IOException { + switch (n) { + case ESC: put(ESC); put(0); break; + case EOM: put(ESC); put(1); break; + case EOS: put(ESC); put(2); break; + default: + assert n >= 0 && n <= 0xff; + put(n); + } + } + + @Override + protected void write(byte[] buf) throws IOException { + for (int i = 0; i < buf.length; i++) { + int n = buf[i] & 0xff; + put(n); + if (n == ESC) put(0); + } + } +} |