Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/core/AbstractChannel.java')
-rw-r--r--plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/core/AbstractChannel.java1060
1 files changed, 1060 insertions, 0 deletions
diff --git a/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/core/AbstractChannel.java b/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/core/AbstractChannel.java
new file mode 100644
index 000000000..4d982807e
--- /dev/null
+++ b/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/core/AbstractChannel.java
@@ -0,0 +1,1060 @@
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Wind River Systems, Inc. and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Wind River Systems - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.tcf.core;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.eclipse.tcf.internal.core.ServiceManager;
+import org.eclipse.tcf.internal.core.Token;
+import org.eclipse.tcf.internal.core.TransportManager;
+import org.eclipse.tcf.internal.services.local.LocatorService;
+import org.eclipse.tcf.internal.services.remote.GenericProxy;
+import org.eclipse.tcf.protocol.IChannel;
+import org.eclipse.tcf.protocol.IErrorReport;
+import org.eclipse.tcf.protocol.IPeer;
+import org.eclipse.tcf.protocol.IService;
+import org.eclipse.tcf.protocol.IToken;
+import org.eclipse.tcf.protocol.JSON;
+import org.eclipse.tcf.protocol.Protocol;
+import org.eclipse.tcf.services.ILocator;
+
+/**
+ * Abstract implementation of IChannel interface.
+ *
+ * AbstractChannel implements communication link connecting two end points (peers).
+ * The channel asynchronously transmits messages: commands, results and events.
+ *
+ * Clients can subclass AbstractChannel to support particular transport (wire) protocol.
+ * Also, see StreamChannel for stream oriented transport protocols.
+ */
+public abstract class AbstractChannel implements IChannel {
+
+ public interface TraceListener {
+
+ public void onMessageReceived(char type, String token,
+ String service, String name, byte[] data);
+
+ public void onMessageSent(char type, String token,
+ String service, String name, byte[] data);
+
+ public void onChannelClosed(Throwable error);
+ }
+
+ public interface Proxy {
+
+ public void onCommand(IToken token, String service, String name, byte[] data);
+
+ public void onEvent(String service, String name, byte[] data);
+
+ public void onChannelClosed(Throwable error);
+ }
+
+ private static class Message {
+ final char type;
+ Token token;
+ String service;
+ String name;
+ byte[] data;
+
+ boolean is_sent;
+ boolean is_canceled;
+
+ Collection<TraceListener> trace;
+
+ Message(char type) {
+ this.type = type;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ StringBuffer bf = new StringBuffer();
+ bf.append('[');;
+ bf.append(type);
+ if (token != null) {
+ bf.append(' ');
+ bf.append(token.getID());
+ }
+ if (service != null) {
+ bf.append(' ');
+ bf.append(service);
+ }
+ if (name != null) {
+ bf.append(' ');
+ bf.append(name);
+ }
+ if (data != null) {
+ int i = 0;
+ while (i < data.length) {
+ int j = i;
+ while (j < data.length && data[j] != 0) j++;
+ bf.append(' ');
+ bf.append(new String(data, i, j - i, "UTF8"));
+ if (j < data.length && data[j] == 0) j++;
+ i = j;
+ }
+ }
+ bf.append(']');
+ return bf.toString();
+ }
+ catch (Exception x) {
+ return x.toString();
+ }
+ }
+ }
+
+ private final LinkedList<Map<String,String>> redirect_queue = new LinkedList<Map<String,String>>();
+ private final Map<Class<?>,IService> local_service_by_class = new HashMap<Class<?>,IService>();
+ private final Map<Class<?>,IService> remote_service_by_class = new HashMap<Class<?>,IService>();
+ private final Map<String,IService> local_service_by_name = new HashMap<String,IService>();
+ private final Map<String,IService> remote_service_by_name = new HashMap<String,IService>();
+ private final LinkedList<Message> out_queue = new LinkedList<Message>();
+ private final Collection<IChannelListener> channel_listeners = new ArrayList<IChannelListener>();
+ private final Map<String,IChannel.IEventListener[]> event_listeners = new HashMap<String,IChannel.IEventListener[]>();
+ private final Map<String,IChannel.ICommandServer> command_servers = new HashMap<String,IChannel.ICommandServer>();
+ private final Map<String,Message> out_tokens = new HashMap<String,Message>();
+ private final Thread inp_thread;
+ private final Thread out_thread;
+ private boolean notifying_channel_opened;
+ private boolean registered_with_trasport;
+ private int state = STATE_OPENING;
+ private IToken redirect_command;
+ private final IPeer local_peer;
+ private IPeer remote_peer;
+ private Proxy proxy;
+ private boolean zero_copy;
+
+ private static final int pending_command_limit = 32;
+ private int local_congestion_level = -100;
+ private int remote_congestion_level = -100;
+ private long local_congestion_time;
+ private int local_congestion_cnt;
+ private Collection<TraceListener> trace_listeners;
+
+ public static final int
+ EOS = -1, // End Of Stream
+ EOM = -2; // End Of Message
+
+ protected AbstractChannel(IPeer remote_peer) {
+ this(LocatorService.getLocalPeer(), remote_peer);
+ }
+
+ protected AbstractChannel(IPeer local_peer, IPeer remote_peer) {
+ assert Protocol.isDispatchThread();
+ this.remote_peer = remote_peer;
+ this.local_peer = local_peer;
+
+ inp_thread = new Thread() {
+
+ final byte[] empty_byte_array = new byte[0];
+ byte[] buf = new byte[1024];
+ byte[] eos_err_report;
+
+ private void error() throws IOException {
+ throw new IOException("Protocol syntax error");
+ }
+
+ private byte[] readBytes(int end) throws IOException {
+ int len = 0;
+ for (;;) {
+ int n = read();
+ if (n <= 0) {
+ if (n == end) break;
+ if (n == EOM) throw new IOException("Unexpected end of message");
+ if (n < 0) throw new IOException("Communication channel is closed by remote peer");
+ }
+ if (len >= buf.length) {
+ byte[] tmp = new byte[buf.length * 2];
+ System.arraycopy(buf, 0, tmp, 0, len);
+ buf = tmp;
+ }
+ buf[len++] = (byte)n;
+ }
+ if (len == 0) return empty_byte_array;
+ byte[] res = new byte[len];
+ System.arraycopy(buf, 0, res, 0, len);
+ return res;
+ }
+
+ private String readString() throws IOException {
+ int len = 0;
+ for (;;) {
+ int n = read();
+ if (n <= 0) {
+ if (n == 0) break;
+ if (n == EOM) throw new IOException("Unexpected end of message");
+ if (n < 0) throw new IOException("Communication channel is closed by remote peer");
+ }
+ if (len >= buf.length) {
+ byte[] tmp = new byte[buf.length * 2];
+ System.arraycopy(buf, 0, tmp, 0, len);
+ buf = tmp;
+ }
+ buf[len++] = (byte)n;
+ }
+ return new String(buf, 0, len, "UTF8");
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ int n = read();
+ if (n == EOM) continue;
+ if (n == EOS) {
+ try {
+ eos_err_report = readBytes(EOM);
+ if (eos_err_report.length == 0 || eos_err_report.length == 1 && eos_err_report[0] == 0) eos_err_report = null;
+ }
+ catch (Exception x) {
+ }
+ break;
+ }
+ final Message msg = new Message((char)n);
+ if (read() != 0) error();
+ switch (msg.type) {
+ case 'C':
+ msg.token = new Token(readBytes(0));
+ msg.service = readString();
+ msg.name = readString();
+ msg.data = readBytes(EOM);
+ break;
+ case 'P':
+ case 'R':
+ case 'N':
+ msg.token = new Token(readBytes(0));
+ msg.data = readBytes(EOM);
+ break;
+ case 'E':
+ msg.service = readString();
+ msg.name = readString();
+ msg.data = readBytes(EOM);
+ break;
+ case 'F':
+ msg.data = readBytes(EOM);
+ break;
+ default:
+ error();
+ }
+ Protocol.invokeLater(new Runnable() {
+ public void run() {
+ handleInput(msg);
+ }
+ });
+ int delay = local_congestion_level;
+ if (delay > 0) sleep(delay);
+ }
+ Protocol.invokeLater(new Runnable() {
+ public void run() {
+ if (out_tokens.isEmpty() && eos_err_report == null && state != STATE_OPENING) {
+ close();
+ }
+ else {
+ IOException x = new IOException("Communication channel is closed by remote peer");
+ if (eos_err_report != null) {
+ try {
+ Object[] args = JSON.parseSequence(eos_err_report);
+ if (args.length > 0 && args[0] != null) {
+ x.initCause(new Exception(Command.toErrorString(args[0])));
+ }
+ }
+ catch (IOException e) {
+ }
+ }
+ terminate(x);
+ }
+ }
+ });
+ }
+ catch (final Throwable x) {
+ try {
+ Protocol.invokeLater(new Runnable() {
+ public void run() {
+ terminate(x);
+ }
+ });
+ }
+ catch (IllegalStateException y) {
+ // TCF event dispatcher has shut down
+ }
+ }
+ }
+ };
+
+ out_thread = new Thread() {
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ Message msg = null;
+ boolean last = false;
+ synchronized (out_queue) {
+ while (out_queue.size() == 0) out_queue.wait();
+ msg = out_queue.removeFirst();
+ if (msg == null) break;
+ last = out_queue.isEmpty();
+ if (msg.is_canceled) {
+ if (last) flush();
+ continue;
+ }
+ msg.is_sent = true;
+ }
+ if (msg.trace != null) {
+ final Message m = msg;
+ Protocol.invokeLater(new Runnable() {
+ public void run() {
+ for (TraceListener l : m.trace) {
+ try {
+ l.onMessageSent(m.type, m.token == null ? null : m.token.getID(),
+ m.service, m.name, m.data);
+ }
+ catch (Throwable x) {
+ Protocol.log("Exception in channel listener", x);
+ }
+ }
+ }
+ });
+ }
+ write(msg.type);
+ write(0);
+ if (msg.token != null) {
+ write(msg.token.getBytes());
+ write(0);
+ }
+ if (msg.service != null) {
+ write(msg.service.getBytes("UTF8"));
+ write(0);
+ }
+ if (msg.name != null) {
+ write(msg.name.getBytes("UTF8"));
+ write(0);
+ }
+ if (msg.data != null) {
+ write(msg.data);
+ }
+ write(EOM);
+ int delay = 0;
+ int level = remote_congestion_level;
+ if (level > 0) delay = level * 10;
+ if (last || delay > 0) flush();
+ if (delay > 0) sleep(delay);
+ else yield();
+ }
+ write(EOS);
+ write(EOM);
+ flush();
+ }
+ catch (final Throwable x) {
+ try {
+ Protocol.invokeLater(new Runnable() {
+ public void run() {
+ terminate(x);
+ }
+ });
+ }
+ catch (IllegalStateException y) {
+ // TCF event dispatcher has shut down
+ }
+ }
+ }
+ };
+ inp_thread.setName("TCF Channel Receiver");
+ out_thread.setName("TCF Channel Transmitter");
+ }
+
+ protected void start() {
+ assert Protocol.isDispatchThread();
+ Protocol.invokeLater(new Runnable() {
+ public void run() {
+ try {
+ if (proxy != null) return;
+ if (state == STATE_CLOSED) return;
+ ServiceManager.onChannelCreated(AbstractChannel.this, local_service_by_name);
+ makeServiceByClassMap(local_service_by_name, local_service_by_class);
+ Object[] args = new Object[]{ local_service_by_name.keySet() };
+ sendEvent(Protocol.getLocator(), "Hello", JSON.toJSONSequence(args));
+ }
+ catch (IOException x) {
+ terminate(x);
+ }
+ }
+ });
+ inp_thread.start();
+ out_thread.start();
+ }
+
+ /**
+ * Redirect this channel to given peer using this channel remote peer locator service as a proxy.
+ * @param peer_id - peer that will become new remote communication endpoint of this channel
+ */
+ public void redirect(String peer_id) {
+ Map<String,String> map = new HashMap<String,String>();
+ map.put(IPeer.ATTR_ID, peer_id);
+ redirect(map);
+ }
+
+ /**
+ * Redirect this channel to given peer using this channel remote peer locator service as a proxy.
+ * @param peer_attrs - peer that will become new remote communication endpoint of this channel
+ */
+ public void redirect(final Map<String,String> peer_attrs) {
+ assert Protocol.isDispatchThread();
+ if (state == STATE_OPENING) {
+ redirect_queue.add(peer_attrs);
+ }
+ else {
+ assert state == STATE_OPEN;
+ assert redirect_command == null;
+ try {
+ final ILocator l = (ILocator)remote_service_by_class.get(ILocator.class);
+ if (l == null) throw new IOException("Cannot redirect channel: peer " +
+ remote_peer.getID() + " has no locator service");
+ final String peer_id = peer_attrs.get(IPeer.ATTR_ID);
+ if (peer_id != null && peer_attrs.size() == 1) {
+ final IPeer peer = l.getPeers().get(peer_id);
+ if (peer == null) {
+ // Peer not found, must wait for a while until peer is discovered or time out
+ final boolean[] found = new boolean[1];
+ Protocol.invokeLater(ILocator.DATA_RETENTION_PERIOD / 3, new Runnable() {
+ public void run() {
+ if (found[0]) return;
+ terminate(new Exception("Peer " + peer_id + " not found"));
+ }
+ });
+ l.addListener(new ILocator.LocatorListener() {
+ public void peerAdded(IPeer peer) {
+ if (peer.getID().equals(peer_id)) {
+ found[0] = true;
+ state = STATE_OPEN;
+ l.removeListener(this);
+ redirect(peer_id);
+ }
+ }
+ public void peerChanged(IPeer peer) {
+ }
+
+ public void peerHeartBeat(String id) {
+ }
+
+ public void peerRemoved(String id) {
+ }
+ });
+ }
+ else {
+ redirect_command = l.redirect(peer_id, new ILocator.DoneRedirect() {
+ public void doneRedirect(IToken token, Exception x) {
+ assert redirect_command == token;
+ redirect_command = null;
+ if (state != STATE_OPENING) return;
+ if (x != null) terminate(x);
+ remote_peer = peer;
+ remote_service_by_class.clear();
+ remote_service_by_name.clear();
+ event_listeners.clear();
+ }
+ });
+ }
+ }
+ else {
+ redirect_command = l.redirect(peer_attrs, new ILocator.DoneRedirect() {
+ public void doneRedirect(IToken token, Exception x) {
+ assert redirect_command == token;
+ redirect_command = null;
+ if (state != STATE_OPENING) return;
+ if (x != null) terminate(x);
+ final IPeer parent = remote_peer;
+ remote_peer = new TransientPeer(peer_attrs) {
+ public IChannel openChannel() {
+ IChannel c = parent.openChannel();
+ c.redirect(peer_attrs);
+ return c;
+ }
+ };
+ remote_service_by_class.clear();
+ remote_service_by_name.clear();
+ event_listeners.clear();
+ }
+ });
+ }
+ state = STATE_OPENING;
+ }
+ catch (Throwable x) {
+ terminate(x);
+ }
+ }
+ }
+
+ private void makeServiceByClassMap(Map<String,IService> by_name, Map<Class<?>,IService> by_class) {
+ for (IService service : by_name.values()) {
+ for (Class<?> fs : service.getClass().getInterfaces()) {
+ if (fs.equals(IService.class)) continue;
+ if (!IService.class.isAssignableFrom(fs)) continue;
+ by_class.put(fs, service);
+ }
+ }
+ }
+
+ public final int getState() {
+ return state;
+ }
+
+ public void addChannelListener(IChannelListener listener) {
+ assert Protocol.isDispatchThread();
+ assert listener != null;
+ channel_listeners.add(listener);
+ }
+
+ public void removeChannelListener(IChannelListener listener) {
+ assert Protocol.isDispatchThread();
+ channel_listeners.remove(listener);
+ }
+
+ public void addTraceListener(TraceListener listener) {
+ if (trace_listeners == null) {
+ trace_listeners = new ArrayList<TraceListener>();
+ }
+ else {
+ trace_listeners = new ArrayList<TraceListener>(trace_listeners);
+ }
+ trace_listeners.add(listener);
+ }
+
+ public void removeTraceListener(TraceListener listener) {
+ trace_listeners = new ArrayList<TraceListener>(trace_listeners);
+ trace_listeners.remove(listener);
+ if (trace_listeners.isEmpty()) trace_listeners = null;
+ }
+
+ public void addEventListener(IService service, IChannel.IEventListener listener) {
+ assert Protocol.isDispatchThread();
+ IChannel.IEventListener[] list = event_listeners.get(service.getName());
+ IChannel.IEventListener[] next = new IChannel.IEventListener[list == null ? 1 : list.length + 1];
+ if (list != null) System.arraycopy(list, 0, next, 0, list.length);
+ next[next.length - 1] = listener;
+ event_listeners.put(service.getName(), next);
+ }
+
+ public void removeEventListener(IService service, IChannel.IEventListener listener) {
+ assert Protocol.isDispatchThread();
+ IChannel.IEventListener[] list = event_listeners.get(service.getName());
+ for (int i = 0; i < list.length; i++) {
+ if (list[i] == listener) {
+ if (list.length == 1) {
+ event_listeners.remove(service.getName());
+ }
+ else {
+ IChannel.IEventListener[] next = new IChannel.IEventListener[list.length - 1];
+ System.arraycopy(list, 0, next, 0, i);
+ System.arraycopy(list, i + 1, next, i, next.length - i);
+ event_listeners.put(service.getName(), next);
+ }
+ return;
+ }
+ }
+ }
+
+ public void addCommandServer(IService service, IChannel.ICommandServer listener) {
+ assert Protocol.isDispatchThread();
+ if (command_servers.put(service.getName(), listener) != null) {
+ throw new Error("Only one command server per service is allowed");
+ }
+ }
+
+ public void removeCommandServer(IService service, IChannel.ICommandServer listener) {
+ assert Protocol.isDispatchThread();
+ if (command_servers.remove(service.getName()) != listener) {
+ throw new Error("Invalid command server");
+ }
+ }
+
+ public void close() {
+ assert Protocol.isDispatchThread();
+ if (state == STATE_CLOSED) return;
+ try {
+ sendEndOfStream(10000);
+ close(null);
+ }
+ catch (Exception x) {
+ close(x);
+ }
+ }
+
+ public void terminate(Throwable error) {
+ assert Protocol.isDispatchThread();
+ if (state == STATE_CLOSED) return;
+ try {
+ sendEndOfStream(500);
+ close(error);
+ }
+ catch (Exception x) {
+ if (error == null) error = x;
+ close(error);
+ }
+ }
+
+ private void sendEndOfStream(long timeout) throws Exception {
+ synchronized (out_queue) {
+ out_queue.clear();
+ out_queue.add(null);
+ out_queue.notify();
+ }
+ out_thread.join(timeout);
+ }
+
+ private void close(final Throwable error) {
+ assert state != STATE_CLOSED;
+ state = STATE_CLOSED;
+ // Closing channel underlying streams can block for a long time,
+ // so it needs to be done by a background thread.
+ Thread thread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ AbstractChannel.this.stop();
+ }
+ catch (Exception x) {
+ Protocol.log("Cannot close channel streams", x);
+ }
+ }
+ };
+ thread.setName("TCF Channel Cleanup");
+ thread.setDaemon(true);
+ thread.start();
+ if (error != null && remote_peer instanceof AbstractPeer) {
+ ((AbstractPeer)remote_peer).onChannelTerminated();
+ }
+ if (registered_with_trasport) {
+ registered_with_trasport = false;
+ TransportManager.channelClosed(this, error);
+ }
+ if (proxy != null) {
+ try {
+ proxy.onChannelClosed(error);
+ }
+ catch (Throwable x) {
+ Protocol.log("Exception in channel listener", x);
+ }
+ }
+ Protocol.invokeLater(new Runnable() {
+ public void run() {
+ if (!out_tokens.isEmpty()) {
+ Exception x = null;
+ if (error instanceof Exception) x = (Exception)error;
+ else if (error != null) x = new Exception(error);
+ else x = new IOException("Channel is closed");
+ for (Message msg : out_tokens.values()) {
+ try {
+ String s = msg.toString();
+ if (s.length() > 72) s = s.substring(0, 72) + "...]";
+ IOException y = new IOException("Command " + s + " aborted");
+ y.initCause(x);
+ msg.token.getListener().terminated(msg.token, y);
+ }
+ catch (Throwable e) {
+ Protocol.log("Exception in command listener", e);
+ }
+ }
+ out_tokens.clear();
+ }
+ if (channel_listeners.size() > 0) {
+ for (IChannelListener l : channel_listeners.toArray(
+ new IChannelListener[channel_listeners.size()])) {
+ try {
+ l.onChannelClosed(error);
+ }
+ catch (Throwable x) {
+ Protocol.log("Exception in channel listener", x);
+ }
+ }
+ }
+ else if (error != null) {
+ Protocol.log("TCF channel terminated", error);
+ }
+ if (trace_listeners != null) {
+ for (TraceListener l : trace_listeners) {
+ try {
+ l.onChannelClosed(error);
+ }
+ catch (Throwable x) {
+ Protocol.log("Exception in channel listener", x);
+ }
+ }
+ }
+ }
+ });
+ }
+
+ public int getCongestion() {
+ assert Protocol.isDispatchThread();
+ int level = out_tokens.size() * 100 / pending_command_limit - 100;
+ if (remote_congestion_level > level) level = remote_congestion_level;
+ if (level > 100) level = 100;
+ return level;
+ }
+
+ public IPeer getLocalPeer() {
+ assert Protocol.isDispatchThread();
+ return local_peer;
+ }
+
+ public IPeer getRemotePeer() {
+ assert Protocol.isDispatchThread();
+ return remote_peer;
+ }
+
+ public Collection<String> getLocalServices() {
+ assert Protocol.isDispatchThread();
+ assert state != STATE_OPENING;
+ return local_service_by_name.keySet();
+ }
+
+ public Collection<String> getRemoteServices() {
+ assert Protocol.isDispatchThread();
+ assert state != STATE_OPENING;
+ return remote_service_by_name.keySet();
+ }
+
+ @SuppressWarnings("unchecked")
+ public <V extends IService> V getLocalService(Class<V> cls) {
+ assert Protocol.isDispatchThread();
+ assert state != STATE_OPENING;
+ return (V)local_service_by_class.get(cls);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <V extends IService> V getRemoteService(Class<V> cls) {
+ assert Protocol.isDispatchThread();
+ assert state != STATE_OPENING;
+ return (V)remote_service_by_class.get(cls);
+ }
+
+ public <V extends IService> void setServiceProxy(Class<V> service_interface, IService service_proxy) {
+ if (!notifying_channel_opened) throw new Error("setServiceProxe() can be called only from channel open call-back");
+ if (!(remote_service_by_name.get(service_proxy.getName()) instanceof GenericProxy)) throw new Error("Proxy already set");
+ if (remote_service_by_class.get(service_interface) != null) throw new Error("Proxy already set");
+ remote_service_by_class.put(service_interface, service_proxy);
+ remote_service_by_name.put(service_proxy.getName(), service_proxy);
+ }
+
+ public IService getLocalService(String service_name) {
+ assert Protocol.isDispatchThread();
+ assert state != STATE_OPENING;
+ return local_service_by_name.get(service_name);
+ }
+
+ public IService getRemoteService(String service_name) {
+ assert Protocol.isDispatchThread();
+ assert state != STATE_OPENING;
+ return remote_service_by_name.get(service_name);
+ }
+
+ public void setProxy(Proxy proxy, Collection<String> services) throws IOException {
+ this.proxy = proxy;
+ sendEvent(Protocol.getLocator(), "Hello", JSON.toJSONSequence(new Object[]{ services }));
+ local_service_by_class.clear();
+ local_service_by_name.clear();
+ }
+
+ private void addToOutQueue(Message msg) {
+ msg.trace = trace_listeners;
+ synchronized (out_queue) {
+ out_queue.add(msg);
+ out_queue.notify();
+ }
+ }
+
+ public IToken sendCommand(IService service, String name, byte[] args, ICommandListener listener) {
+ assert Protocol.isDispatchThread();
+ if (state == STATE_OPENING) throw new Error("Channel is waiting for Hello message");
+ if (state == STATE_CLOSED) throw new Error("Channel is closed");
+ final Message msg = new Message('C');
+ msg.service = service.getName();
+ msg.name = name;
+ msg.data = args;
+ Token token = new Token(listener) {
+ @Override
+ public boolean cancel() {
+ assert Protocol.isDispatchThread();
+ if (state != STATE_OPEN) return false;
+ synchronized (out_queue) {
+ if (msg.is_sent) return false;
+ msg.is_canceled = true;
+ }
+ out_tokens.remove(msg.token.getID());
+ return true;
+ }
+ };
+ msg.token = token;
+ out_tokens.put(token.getID(), msg);
+ addToOutQueue(msg);
+ return token;
+ }
+
+ public void sendProgress(IToken token, byte[] results) {
+ assert Protocol.isDispatchThread();
+ if (state != STATE_OPEN) throw new Error("Channel is closed");
+ Message msg = new Message('P');
+ msg.data = results;
+ msg.token = (Token)token;
+ addToOutQueue(msg);
+ }
+
+ public void sendResult(IToken token, byte[] results) {
+ assert Protocol.isDispatchThread();
+ if (state != STATE_OPEN) throw new Error("Channel is closed");
+ Message msg = new Message('R');
+ msg.data = results;
+ msg.token = (Token)token;
+ addToOutQueue(msg);
+ }
+
+ public void rejectCommand(IToken token) {
+ assert Protocol.isDispatchThread();
+ if (state != STATE_OPEN) throw new Error("Channel is closed");
+ Message msg = new Message('N');
+ msg.token = (Token)token;
+ addToOutQueue(msg);
+ }
+
+ public void sendEvent(IService service, String name, byte[] args) {
+ assert Protocol.isDispatchThread();
+ if (!(state == STATE_OPEN || state == STATE_OPENING && service instanceof ILocator)) {
+ throw new Error("Channel is closed");
+ }
+ Message msg = new Message('E');
+ msg.service = service.getName();
+ msg.name = name;
+ msg.data = args;
+ addToOutQueue(msg);
+ }
+
+ public boolean isZeroCopySupported() {
+ return zero_copy;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void handleInput(Message msg) {
+ assert Protocol.isDispatchThread();
+ if (state == STATE_CLOSED) return;
+ if (trace_listeners != null) {
+ for (TraceListener l : trace_listeners) {
+ try {
+ l.onMessageReceived(msg.type,
+ msg.token != null ? msg.token.getID() : null,
+ msg.service, msg.name, msg.data);
+ }
+ catch (Throwable x) {
+ Protocol.log("Exception in trace listener", x);
+ }
+ }
+ }
+ try {
+ Message cmd = null;
+ Token token = null;
+ switch (msg.type) {
+ case 'P':
+ case 'R':
+ case 'N':
+ String token_id = msg.token.getID();
+ cmd = msg.type == 'P' ? out_tokens.get(token_id) : out_tokens.remove(token_id);
+ if (cmd == null) throw new Exception("Invalid token received: " + token_id);
+ token = cmd.token;
+ break;
+ }
+ switch (msg.type) {
+ case 'C':
+ if (state == STATE_OPENING) {
+ throw new IOException("Received command " + msg.service + "." + msg.name + " before Hello message");
+ }
+ if (proxy != null) {
+ proxy.onCommand(msg.token, msg.service, msg.name, msg.data);
+ }
+ else {
+ token = msg.token;
+ IChannel.ICommandServer cmds = command_servers.get(msg.service);
+ if (cmds != null) {
+ cmds.command(token, msg.name, msg.data);
+ }
+ else {
+ rejectCommand(token);
+ }
+ }
+ break;
+ case 'P':
+ token.getListener().progress(token, msg.data);
+ sendCongestionLevel();
+ break;
+ case 'R':
+ token.getListener().result(token, msg.data);
+ sendCongestionLevel();
+ break;
+ case 'N':
+ {
+ String s = null;
+ if (remote_service_by_name.get(cmd.service) == null) {
+ s = "No such service: " + cmd.service;
+ }
+ else {
+ s = "Command is not recognized: " + cmd.service + "." + cmd.name;
+ }
+ token.getListener().terminated(token, new ErrorReport(s, IErrorReport.TCF_ERROR_INV_COMMAND));
+ }
+ break;
+ case 'E':
+ boolean hello = msg.service.equals(ILocator.NAME) && msg.name.equals("Hello");
+ if (hello) {
+ remote_service_by_name.clear();
+ remote_service_by_class.clear();
+ ServiceManager.onChannelOpened(this, (Collection<String>)JSON.parseSequence(msg.data)[0], remote_service_by_name);
+ makeServiceByClassMap(remote_service_by_name, remote_service_by_class);
+ zero_copy = remote_service_by_name.containsKey("ZeroCopy");
+ }
+ if (proxy != null && state == STATE_OPEN) {
+ proxy.onEvent(msg.service, msg.name, msg.data);
+ }
+ else if (hello) {
+ assert state == STATE_OPENING;
+ state = STATE_OPEN;
+ assert redirect_command == null;
+ if (redirect_queue.size() > 0) {
+ redirect(redirect_queue.removeFirst());
+ }
+ else {
+ notifying_channel_opened = true;
+ if (!registered_with_trasport) {
+ TransportManager.channelOpened(this);
+ registered_with_trasport = true;
+ }
+ for (IChannelListener l : channel_listeners.toArray(
+ new IChannelListener[channel_listeners.size()])) {
+ try {
+ l.onChannelOpened();
+ }
+ catch (Throwable x) {
+ Protocol.log("Exception in channel listener", x);
+ }
+ }
+ notifying_channel_opened = false;
+ }
+ }
+ else {
+ IChannel.IEventListener[] list = event_listeners.get(msg.service);
+ if (list != null) {
+ for (int i = 0; i < list.length; i++) {
+ list[i].event(msg.name, msg.data);
+ }
+ }
+ sendCongestionLevel();
+ }
+ break;
+ case 'F':
+ int len = msg.data.length;
+ if (len > 0 && msg.data[len - 1] == 0) len--;
+ remote_congestion_level = Integer.parseInt(new String(msg.data, 0, len, "ASCII"));
+ for (IChannelListener l : channel_listeners.toArray(
+ new IChannelListener[channel_listeners.size()])) {
+ try {
+ l.congestionLevel(getCongestion());
+ }
+ catch (Throwable x) {
+ Protocol.log("Exception in channel listener", x);
+ }
+ }
+ break;
+ default:
+ assert false;
+ break;
+ }
+ }
+ catch (Throwable x) {
+ terminate(x);
+ }
+ }
+
+ private void sendCongestionLevel() throws IOException {
+ if (++local_congestion_cnt < 8) return;
+ local_congestion_cnt = 0;
+ if (state != STATE_OPEN) return;
+ long time = System.currentTimeMillis();
+ if (time - local_congestion_time < 500) return;
+ assert Protocol.isDispatchThread();
+ int level = Protocol.getCongestionLevel();
+ if (level == local_congestion_level) return;
+ int i = (level - local_congestion_level) / 8;
+ if (i != 0) level = local_congestion_level + i;
+ local_congestion_time = time;
+ synchronized (out_queue) {
+ Message msg = out_queue.isEmpty() ? null : out_queue.get(0);
+ if (msg == null || msg.type != 'F') {
+ msg = new Message('F');
+ out_queue.add(0, msg);
+ out_queue.notify();
+ }
+ StringBuilder buffer = new StringBuilder();
+ buffer.append(local_congestion_level);
+ buffer.append((char)0); // 0 terminate
+ msg.data = buffer.toString().getBytes("ASCII");
+ msg.trace = trace_listeners;
+ local_congestion_level = level;
+ }
+ }
+
+ /**
+ * Read one byte from the channel input stream.
+ * @return next data byte or EOS (-1) if end of stream is reached,
+ * or EOM (-2) if end of message is reached.
+ * @throws IOException
+ */
+ protected abstract int read() throws IOException;
+
+ /**
+ * Write one byte into the channel output stream.
+ * The method argument can be one of two special values:
+ * EOS (-1) end of stream marker;
+ * EOM (-2) end of message marker.
+ * The stream can put the byte into a buffer instead of transmitting it right away.
+ * @param n - the data byte.
+ * @throws IOException
+ */
+ protected abstract void write(int n) throws IOException;
+
+ /**
+ * Flush the channel output stream.
+ * All buffered data should be transmitted immediately.
+ * @throws IOException
+ */
+ protected abstract void flush() throws IOException;
+
+ /**
+ * Stop (close) channel underlying streams.
+ * If a thread is blocked by read() or write(), it should be
+ * resumed (or interrupted).
+ * @throws IOException
+ */
+ protected abstract void stop() throws IOException;
+
+ /**
+ * Write array of bytes into the channel output stream.
+ * The stream can put bytes into a buffer instead of transmitting it right away.
+ * @param buf
+ * @throws IOException
+ */
+ protected void write(byte[] buf) throws IOException {
+ assert Thread.currentThread() == out_thread;
+ for (int i = 0; i < buf.length; i++) write(buf[i] & 0xff);
+ }
+}

Back to the top