Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/internal/services/remote/StreamsProxy.java')
-rw-r--r--plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/internal/services/remote/StreamsProxy.java172
1 files changed, 172 insertions, 0 deletions
diff --git a/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/internal/services/remote/StreamsProxy.java b/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/internal/services/remote/StreamsProxy.java
new file mode 100644
index 000000000..dea9b6f90
--- /dev/null
+++ b/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/internal/services/remote/StreamsProxy.java
@@ -0,0 +1,172 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2010 Wind River Systems, Inc. and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Wind River Systems - initial API and implementation
+ *******************************************************************************/
+package org.eclipse.tm.internal.tcf.services.remote;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.eclipse.tm.tcf.core.Command;
+import org.eclipse.tm.tcf.protocol.IChannel;
+import org.eclipse.tm.tcf.protocol.IToken;
+import org.eclipse.tm.tcf.protocol.JSON;
+import org.eclipse.tm.tcf.services.IStreams;
+
+public class StreamsProxy implements IStreams {
+
+ private final IChannel channel;
+ private final Map<String,IChannel.IEventListener> listeners =
+ new HashMap<String,IChannel.IEventListener>();
+
+ public StreamsProxy(IChannel channel) {
+ this.channel = channel;
+ }
+
+ public IToken connect(String stream_id, final DoneConnect done) {
+ return new Command(channel, this, "connect", new Object[]{ stream_id }) {
+ @Override
+ public void done(Exception error, Object[] args) {
+ if (error == null) {
+ assert args.length == 1;
+ error = toError(args[0]);
+ }
+ done.doneConnect(token, error);
+ }
+ }.token;
+ }
+
+ public IToken disconnect(String stream_id, final DoneDisconnect done) {
+ return new Command(channel, this, "disconnect", new Object[]{ stream_id }) {
+ @Override
+ public void done(Exception error, Object[] args) {
+ if (error == null) {
+ assert args.length == 1;
+ error = toError(args[0]);
+ }
+ done.doneDisconnect(token, error);
+ }
+ }.token;
+ }
+
+ public IToken eos(String stream_id, final DoneEOS done) {
+ return new Command(channel, this, "eos", new Object[]{ stream_id }) {
+ @Override
+ public void done(Exception error, Object[] args) {
+ if (error == null) {
+ assert args.length == 1;
+ error = toError(args[0]);
+ }
+ done.doneEOS(token, error);
+ }
+ }.token;
+ }
+
+ public IToken read(String stream_id, int size, final DoneRead done) {
+ return new Command(channel, this, "read", new Object[]{ stream_id, size }) {
+ @Override
+ public void done(Exception error, Object[] args) {
+ int lost_size = 0;
+ byte data[] = null;
+ boolean eos = false;
+ if (error == null) {
+ assert args.length == 4;
+ data = JSON.toByteArray(args[0]);
+ error = toError(args[1]);
+ lost_size = ((Number)args[2]).intValue();
+ eos = ((Boolean)args[3]).booleanValue();
+ }
+ done.doneRead(token, error, lost_size, data, eos);
+ }
+ }.token;
+ }
+
+ public IToken subscribe(final String stream_type, final StreamsListener listener, final DoneSubscribe done) {
+ return new Command(channel, this, "subscribe", new Object[]{ stream_type }) {
+ @Override
+ public void done(Exception error, Object[] args) {
+ if (error == null) {
+ assert args.length == 1;
+ error = toError(args[0]);
+ }
+ if (error == null) {
+ IChannel.IEventListener l = new IChannel.IEventListener() {
+
+ public void event(String name, byte[] data) {
+ try {
+ assert listeners.get(stream_type) == this;
+ Object[] args = JSON.parseSequence(data);
+ if (stream_type.equals(args[0])) {
+ if (name.equals("created")) {
+ if (args.length == 3) {
+ listener.created((String)args[0], (String)args[1], (String)args[2]);
+ }
+ else {
+ assert args.length == 2;
+ listener.created((String)args[0], (String)args[1], null);
+ }
+ }
+ else if (name.equals("disposed")) {
+ assert args.length == 2;
+ listener.disposed((String)args[0], (String)args[1]);
+ }
+ else {
+ throw new IOException("Streams service: unknown event: " + name);
+ }
+ }
+ }
+ catch (Throwable x) {
+ channel.terminate(x);
+ }
+ }
+ };
+ assert listeners.get(stream_type) == null;
+ listeners.put(stream_type, l);
+ channel.addEventListener(StreamsProxy.this, l);
+ }
+ done.doneSubscribe(token, error);
+ }
+ }.token;
+ }
+
+ public IToken unsubscribe(final String stream_type, final StreamsListener listener, final DoneUnsubscribe done) {
+ return new Command(channel, this, "unsubscribe", new Object[]{ stream_type }) {
+ @Override
+ public void done(Exception error, Object[] args) {
+ if (error == null) {
+ assert args.length == 1;
+ error = toError(args[0]);
+ }
+ if (error == null) {
+ IChannel.IEventListener l = listeners.remove(stream_type);
+ if (l != null) channel.removeEventListener(StreamsProxy.this, l);
+ }
+ done.doneUnsubscribe(token, error);
+ }
+ }.token;
+ }
+
+ public IToken write(String stream_id, byte[] buf, int offset, int size, final DoneWrite done) {
+ return new Command(channel, this, "write", new Object[]{ stream_id, size, new JSON.Binary(buf, offset, size) }) {
+ @Override
+ public void done(Exception error, Object[] args) {
+ if (error == null) {
+ assert args.length == 1;
+ error = toError(args[0]);
+ }
+ done.doneWrite(token, error);
+ }
+ }.token;
+ }
+
+ public String getName() {
+ return NAME;
+ }
+}

Back to the top