Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEugene Tarassov2013-10-04 15:33:19 -0400
committerEugene Tarassov2013-10-05 13:21:08 -0400
commit609363d7cc17b1d4e87a09280f2fc8d48f3f11b7 (patch)
tree3262d1f49de651d1389d5e3f00e990dc8847e280
parent7f544f397a6eb797402499a80305001b959c0989 (diff)
downloadorg.eclipse.tcf-609363d7cc17b1d4e87a09280f2fc8d48f3f11b7.tar.gz
org.eclipse.tcf-609363d7cc17b1d4e87a09280f2fc8d48f3f11b7.tar.xz
org.eclipse.tcf-609363d7cc17b1d4e87a09280f2fc8d48f3f11b7.zip
TCF Core: better error handling in Streams service utility classes
-rw-r--r--plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/util/TCFVirtualInputStream.java11
-rw-r--r--plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/util/TCFVirtualOutputStream.java91
-rw-r--r--plugins/org.eclipse.tcf.rse/src/org/eclipse/tcf/internal/rse/shells/TCFTerminalShell.java2
3 files changed, 80 insertions, 24 deletions
diff --git a/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/util/TCFVirtualInputStream.java b/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/util/TCFVirtualInputStream.java
index 83b9f37df..dd1c673f5 100644
--- a/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/util/TCFVirtualInputStream.java
+++ b/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/util/TCFVirtualInputStream.java
@@ -35,6 +35,7 @@ public final class TCFVirtualInputStream extends InputStream {
boolean eof;
}
+ private final IChannel channel;
private final IStreams streams;
private final String id;
private final Runnable on_close;
@@ -46,6 +47,7 @@ public final class TCFVirtualInputStream extends InputStream {
private boolean eof;
public TCFVirtualInputStream(IChannel channel, String id, Runnable on_close) throws IOException {
+ this.channel = channel;
streams = channel.getRemoteService(IStreams.class);
if (streams == null) throw new IOException("Streams service not available"); //$NON-NLS-1$
this.id = id;
@@ -129,12 +131,13 @@ public final class TCFVirtualInputStream extends InputStream {
public void run() {
streams.disconnect(id, new IStreams.DoneDisconnect() {
public void doneDisconnect(IToken token, Exception error) {
- if (error != null) {
+ if (error != null && channel.getState() != IChannel.STATE_CLOSED) {
error(error);
- return;
}
- if (on_close != null) on_close.run();
- done(this);
+ else {
+ if (on_close != null) on_close.run();
+ done(this);
+ }
}
});
}
diff --git a/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/util/TCFVirtualOutputStream.java b/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/util/TCFVirtualOutputStream.java
index cc71ee9ff..e5cc8535c 100644
--- a/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/util/TCFVirtualOutputStream.java
+++ b/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/util/TCFVirtualOutputStream.java
@@ -12,6 +12,8 @@ package org.eclipse.tcf.util;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.LinkedList;
import org.eclipse.tcf.protocol.IChannel;
import org.eclipse.tcf.protocol.IToken;
@@ -24,16 +26,25 @@ import org.eclipse.tcf.services.IStreams;
*/
public final class TCFVirtualOutputStream extends OutputStream {
+ private static final int MAX_QUEUE = 32;
+
+ private final IChannel channel;
private final IStreams streams;
private final String id;
+ private final boolean send_eos;
private final Runnable on_close;
private final byte[] buf = new byte[1];
+ private final HashSet<IToken> queue = new HashSet<IToken>();
+ private final LinkedList<Exception> errors = new LinkedList<Exception>();
+ private final HashSet<Runnable> wait_list = new HashSet<Runnable>();
private boolean closed;
- public TCFVirtualOutputStream(IChannel channel, String id, Runnable on_close) throws IOException{
+ public TCFVirtualOutputStream(IChannel channel, String id, boolean send_eos, Runnable on_close) throws IOException {
+ this.channel = channel;
streams = channel.getRemoteService(IStreams.class);
if (streams == null) throw new IOException("Streams service not available"); //$NON-NLS-1$
this.id = id;
+ this.send_eos = send_eos;
this.on_close = on_close;
}
@@ -45,12 +56,26 @@ public final class TCFVirtualOutputStream extends OutputStream {
if (len == 0) return;
new TCFTask<Object>() {
public void run() {
- streams.write(id, b, off, len, new IStreams.DoneWrite() {
+ if (queue.size() > MAX_QUEUE) {
+ wait_list.add(this);
+ return;
+ }
+ if (errors.size() > 0) {
+ error(errors.removeFirst());
+ return;
+ }
+ queue.add(streams.write(id, b, off, len, new IStreams.DoneWrite() {
public void doneWrite(IToken token, Exception error) {
- if (error != null) error(error);
- else done(this);
+ if (error != null) errors.add(error);
+ queue.remove(token);
+ if (wait_list.size() > 0) {
+ Runnable[] list = wait_list.toArray(new Runnable[wait_list.size()]);
+ wait_list.clear();
+ for (Runnable r : list) r.run();
+ }
}
- });
+ }));
+ done(this);
}
}.getIO();
}
@@ -62,27 +87,55 @@ public final class TCFVirtualOutputStream extends OutputStream {
}
@Override
+ public void flush() throws IOException {
+ if (closed) throw new IOException("Stream is closed"); //$NON-NLS-1$
+ new TCFTask<Object>() {
+ public void run() {
+ if (queue.size() > 0) {
+ wait_list.add(this);
+ }
+ else if (errors.size() > 0) {
+ error(errors.removeFirst());
+ }
+ else {
+ done(this);
+ }
+ }
+ }.getIO();
+ }
+
+ @Override
public void close() throws IOException {
+ flush();
if (closed) return;
closed = true;
+ if (send_eos) {
+ new TCFTask<Object>() {
+ public void run() {
+ streams.eos(id, new IStreams.DoneEOS() {
+ public void doneEOS(IToken token, Exception error) {
+ if (error != null && channel.getState() != IChannel.STATE_CLOSED) {
+ error(error);
+ }
+ else {
+ done(this);
+ }
+ }
+ });
+ }
+ }.getIO();
+ }
new TCFTask<Object>() {
public void run() {
- streams.eos(id, new IStreams.DoneEOS() {
- public void doneEOS(IToken token, Exception error) {
- if (error != null) {
+ streams.disconnect(id, new IStreams.DoneDisconnect() {
+ public void doneDisconnect(IToken token, Exception error) {
+ if (error != null && channel.getState() != IChannel.STATE_CLOSED) {
error(error);
- return;
}
- streams.disconnect(id, new IStreams.DoneDisconnect() {
- public void doneDisconnect(IToken token, Exception error) {
- if (error != null) {
- error(error);
- return;
- }
- if (on_close != null) on_close.run();
- done(this);
- }
- });
+ else {
+ if (on_close != null) on_close.run();
+ done(this);
+ }
}
});
}
diff --git a/plugins/org.eclipse.tcf.rse/src/org/eclipse/tcf/internal/rse/shells/TCFTerminalShell.java b/plugins/org.eclipse.tcf.rse/src/org/eclipse/tcf/internal/rse/shells/TCFTerminalShell.java
index 29020ee92..374ce1a31 100644
--- a/plugins/org.eclipse.tcf.rse/src/org/eclipse/tcf/internal/rse/shells/TCFTerminalShell.java
+++ b/plugins/org.eclipse.tcf.rse/src/org/eclipse/tcf/internal/rse/shells/TCFTerminalShell.java
@@ -235,7 +235,7 @@ public class TCFTerminalShell extends AbstractTerminalShell {
onInputStreamClosed();
}
});
- fOutputStream = new TCFVirtualOutputStream(fChannel, out_id, new Runnable() {
+ fOutputStream = new TCFVirtualOutputStream(fChannel, out_id, true, new Runnable() {
@Override
public void run() {
onOutputStreamClosed();

Back to the top