diff options
author | Eugene Tarassov | 2015-02-20 19:07:53 +0000 |
---|---|---|
committer | Eugene Tarassov | 2015-02-20 19:07:53 +0000 |
commit | d2e45069a5f87e66ebd22caae4328c1dbfcae2d9 (patch) | |
tree | 7d2291da87f8dedd34062475337ed1f8e4aaa00b /plugins | |
parent | 217e842bbfec42df35dac6c4839fecfe9bb836ee (diff) | |
download | org.eclipse.tcf-d2e45069a5f87e66ebd22caae4328c1dbfcae2d9.tar.gz org.eclipse.tcf-d2e45069a5f87e66ebd22caae4328c1dbfcae2d9.tar.xz org.eclipse.tcf-d2e45069a5f87e66ebd22caae4328c1dbfcae2d9.zip |
TCF Core: added a buffer for output stream in AbstractChannel - improved performance
Diffstat (limited to 'plugins')
3 files changed, 144 insertions, 62 deletions
diff --git a/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/core/AbstractChannel.java b/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/core/AbstractChannel.java index 1969f293e..e79271b1a 100644 --- a/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/core/AbstractChannel.java +++ b/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/core/AbstractChannel.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2007, 2014 Wind River Systems, Inc. and others. + * Copyright (c) 2007, 2015 Wind River Systems, Inc. and others. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at @@ -335,29 +335,63 @@ public abstract class AbstractChannel implements IChannel { out_thread = new Thread() { + private final byte[] out_buf = new byte[0x4000]; + private int out_buf_pos; + + void writeBytes(byte[] buf) throws IOException { + if (buf.length > out_buf.length) { + write(out_buf, 0, out_buf_pos); + out_buf_pos = 0; + write(buf); + } + else { + int i = 0; + while (i < buf.length) { + if (out_buf_pos >= out_buf.length) { + write(out_buf); + out_buf_pos = 0; + } + int n = buf.length - i; + if (n > out_buf.length - out_buf_pos) n = out_buf.length - out_buf_pos; + System.arraycopy(buf, i, out_buf, out_buf_pos, n); + out_buf_pos += n; + i += n; + } + } + } + void writeString(String s) throws IOException { int l = s.length(); for (int i = 0; i < l; i++) { + if (out_buf_pos + 4 > out_buf.length) { + write(out_buf, 0, out_buf_pos); + out_buf_pos = 0; + } int ch = s.charAt(i); if (ch < 0x80) { - write(ch); + out_buf[out_buf_pos++] = (byte)ch; } else if (ch < 0x800) { - write((ch >> 6) | 0xc0); - write(ch & 0x3f | 0x80); + out_buf[out_buf_pos++] = (byte)((ch >> 6) | 0xc0); + out_buf[out_buf_pos++] = (byte)(ch & 0x3f | 0x80); } else if (ch < 0x10000) { - write((ch >> 12) | 0xe0); - write((ch >> 6) & 0x3f | 0x80); - write(ch & 0x3f | 0x80); + out_buf[out_buf_pos++] = (byte)((ch >> 12) | 0xe0); + out_buf[out_buf_pos++] = (byte)((ch >> 6) & 0x3f | 0x80); + out_buf[out_buf_pos++] = (byte)(ch & 0x3f | 0x80); } else { - write((ch >> 18) | 0xf0); - write((ch >> 12) & 0x3f | 0x80); - write((ch >> 6) & 0x3f | 0x80); - write(ch & 0x3f | 0x80); + out_buf[out_buf_pos++] = (byte)((ch >> 18) | 0xf0); + out_buf[out_buf_pos++] = (byte)((ch >> 12) & 0x3f | 0x80); + out_buf[out_buf_pos++] = (byte)((ch >> 6) & 0x3f | 0x80); + out_buf[out_buf_pos++] = (byte)(ch & 0x3f | 0x80); } } + if (out_buf_pos >= out_buf.length) { + write(out_buf); + out_buf_pos = 0; + } + out_buf[out_buf_pos++] = 0; } @Override @@ -393,23 +427,14 @@ public abstract class AbstractChannel implements IChannel { } }); } - write(msg.type); - write(0); - if (msg.token != null) { - write(msg.token.getBytes()); - write(0); - } - if (msg.service != null) { - writeString(msg.service); - write(0); - } - if (msg.name != null) { - writeString(msg.name); - write(0); - } - if (msg.data != null) { - write(msg.data); - } + out_buf_pos = 0; + out_buf[out_buf_pos++] = (byte)msg.type; + out_buf[out_buf_pos++] = 0; + if (msg.token != null) writeString(msg.token.getID()); + if (msg.service != null) writeString(msg.service); + if (msg.name != null) writeString(msg.name); + if (msg.data != null) writeBytes(msg.data); + write(out_buf, 0, out_buf_pos); write(EOM); int delay = 0; int level = remote_congestion_level; @@ -1134,4 +1159,17 @@ public abstract class AbstractChannel implements IChannel { assert Thread.currentThread() == out_thread; for (int i = 0; i < buf.length; i++) write(buf[i] & 0xff); } + + /** + * Write array of bytes into the channel output stream. + * The stream can put bytes into a buffer instead of transmitting it right away. + * @param buf + * @param pos + * @param len + * @throws IOException + */ + protected void write(byte[] buf, int pos, int len) throws IOException { + assert Thread.currentThread() == out_thread; + for (int i = pos; i < pos + len; i++) write(buf[i] & 0xff); + } } diff --git a/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/core/ChannelTCP.java b/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/core/ChannelTCP.java index e5d48913b..8402fafcc 100644 --- a/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/core/ChannelTCP.java +++ b/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/core/ChannelTCP.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2007, 2011 Wind River Systems, Inc. and others. + * Copyright (c) 2007, 2015 Wind River Systems, Inc. and others. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at @@ -181,6 +181,12 @@ public class ChannelTCP extends StreamChannel { } @Override + protected final void put(byte[] buf, int pos, int len) throws IOException { + if (closed) return; + out.write(buf, pos, len); + } + + @Override protected final void flush() throws IOException { if (closed) return; out.flush(); diff --git a/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/core/StreamChannel.java b/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/core/StreamChannel.java index 149453378..7ff555326 100644 --- a/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/core/StreamChannel.java +++ b/plugins/org.eclipse.tcf.core/src/org/eclipse/tcf/core/StreamChannel.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2007, 2011 Wind River Systems, Inc. and others. + * Copyright (c) 2007, 2015 Wind River Systems, Inc. and others. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at @@ -31,9 +31,11 @@ public abstract class StreamChannel extends AbstractChannel { private int bin_data_size; - private final byte[] buf = new byte[0x1000]; - private int buf_pos; - private int buf_len; + private final byte[] esc_buf = new byte[0x1000]; + + private final byte[] inp_buf = new byte[0x4000]; + private int inp_buf_pos; + private int inp_buf_len; public StreamChannel(IPeer remote_peer) { super(remote_peer); @@ -47,6 +49,7 @@ public abstract class StreamChannel extends AbstractChannel { protected abstract void put(int n) throws IOException; protected int get(byte[] buf) throws IOException { + /* Default implementation - it is expected to be overridden */ int i = 0; while (i < buf.length) { int b = get(); @@ -61,41 +64,48 @@ public abstract class StreamChannel extends AbstractChannel { } protected void put(byte[] buf) throws IOException { + /* Default implementation - it is expected to be overridden */ for (byte b : buf) put(b & 0xff); } + protected void put(byte[] buf, int pos, int len) throws IOException { + /* Default implementation - it is expected to be overridden */ + int end = pos + len; + while (pos < end) put(buf[pos++] & 0xff); + } + @Override protected final int read() throws IOException { for (;;) { - while (buf_pos >= buf_len) { - buf_len = get(buf); - buf_pos = 0; - if (buf_len < 0) return EOS; + while (inp_buf_pos >= inp_buf_len) { + inp_buf_len = get(inp_buf); + inp_buf_pos = 0; + if (inp_buf_len < 0) return EOS; } - int res = buf[buf_pos++] & 0xff; + int res = inp_buf[inp_buf_pos++] & 0xff; if (bin_data_size > 0) { bin_data_size--; return res; } if (res != ESC) return res; - while (buf_pos >= buf_len) { - buf_len = get(buf); - buf_pos = 0; - if (buf_len < 0) return EOS; + while (inp_buf_pos >= inp_buf_len) { + inp_buf_len = get(inp_buf); + inp_buf_pos = 0; + if (inp_buf_len < 0) return EOS; } - int n = buf[buf_pos++] & 0xff; + int n = inp_buf[inp_buf_pos++] & 0xff; switch (n) { case 0: return ESC; case 1: return EOM; case 2: return EOS; case 3: for (int i = 0;; i += 7) { - while (buf_pos >= buf_len) { - buf_len = get(buf); - buf_pos = 0; - if (buf_len < 0) return EOS; + while (inp_buf_pos >= inp_buf_len) { + inp_buf_len = get(inp_buf); + inp_buf_pos = 0; + if (inp_buf_len < 0) return EOS; } - int m = buf[buf_pos++] & 0xff; + int m = inp_buf[inp_buf_pos++] & 0xff; bin_data_size |= (m & 0x7f) << i; if ((m & 0x80) == 0) break; } @@ -110,36 +120,64 @@ public abstract class StreamChannel extends AbstractChannel { @Override protected final void write(int n) throws IOException { switch (n) { - case ESC: put(ESC); put(0); break; - case EOM: put(ESC); put(1); break; - case EOS: put(ESC); put(2); break; + case ESC: + esc_buf[0] = ESC; + esc_buf[1] = 0; + put(esc_buf, 0, 2); + break; + case EOM: + esc_buf[0] = ESC; + esc_buf[1] = 1; + put(esc_buf, 0, 2); + break; + case EOS: + esc_buf[0] = ESC; + esc_buf[1] = 2; + put(esc_buf, 0, 2); + break; default: assert n >= 0 && n <= 0xff; put(n); + break; } } @Override - protected void write(byte[] buf) throws IOException { - if (buf.length > 32 && isZeroCopySupported()) { - put(ESC); put(3); - int n = buf.length; + protected final void write(byte[] buf) throws IOException { + write(buf, 0, buf.length); + } + + @Override + protected final void write(byte[] buf, int pos, int len) throws IOException { + if (len > 32 && isZeroCopySupported()) { + int n = len; + int esc_buf_pos = 0; + esc_buf[esc_buf_pos++] = ESC; + esc_buf[esc_buf_pos++] = 3; for (;;) { if (n <= 0x7f) { - put(n); + esc_buf[esc_buf_pos++] = (byte)n; break; } - put((n & 0x7f) | 0x80); + esc_buf[esc_buf_pos++] = (byte)((n & 0x7f) | 0x80); n = n >> 7; } - put(buf); + put(esc_buf, 0, esc_buf_pos); + put(buf, pos, len); } else { - for (byte b : buf) { - int n = b & 0xff; - put(n); - if (n == ESC) put(0); + int esc_buf_pos = 0; + int end = pos + len; + for (int i = pos; i < end; i++) { + if (esc_buf_pos + 2 > esc_buf.length) { + put(esc_buf, 0, esc_buf_pos); + esc_buf_pos = 0; + } + byte b = buf[i]; + esc_buf[esc_buf_pos++] = b; + if (b == ESC) esc_buf[esc_buf_pos++] = 0; } + put(esc_buf, 0, esc_buf_pos); } } } |