diff options
author | jmisinco | 2017-09-07 15:18:09 +0000 |
---|---|---|
committer | jmisinco | 2017-09-07 19:18:16 +0000 |
commit | f11355aa268bc95508b059e2b134c427926628fb (patch) | |
tree | b18a83df30d9f8e5a01dbdd22d3b5ade7f866cc2 | |
parent | 33813d41e1c1d78c0e9a35f569bca982774ae3e4 (diff) | |
download | org.eclipse.osee-OTE_05_23_2018.tar.gz org.eclipse.osee-OTE_05_23_2018.tar.xz org.eclipse.osee-OTE_05_23_2018.zip |
feature: Add compression to endpointsOTE_10_12_2017OTE_05_23_2018OTE_04_27_2018OTE_04_13_2018OTE_03_06_2018
Change-Id: I88faa1f5e88ad92468489c25d458f89aa676cd6d
3 files changed, 71 insertions, 3 deletions
diff --git a/plugins/org.eclipse.osee.ote.core/src/org/eclipse/osee/ote/endpoint/AddressBuffer.java b/plugins/org.eclipse.osee.ote.core/src/org/eclipse/osee/ote/endpoint/AddressBuffer.java index 7ea00ea2206..7e1ec2f33df 100644 --- a/plugins/org.eclipse.osee.ote.core/src/org/eclipse/osee/ote/endpoint/AddressBuffer.java +++ b/plugins/org.eclipse.osee.ote.core/src/org/eclipse/osee/ote/endpoint/AddressBuffer.java @@ -5,7 +5,7 @@ import java.nio.ByteBuffer; public class AddressBuffer { - private final ByteBuffer buffer; + private ByteBuffer buffer; private InetSocketAddress address; public AddressBuffer(){ @@ -16,6 +16,10 @@ public class AddressBuffer { return buffer; } + public void setBytes(byte[] bytes) { + buffer = ByteBuffer.wrap(bytes); + } + public InetSocketAddress getAddress(){ return address; } diff --git a/plugins/org.eclipse.osee.ote.core/src/org/eclipse/osee/ote/endpoint/OteEndpointReceiveRunnable.java b/plugins/org.eclipse.osee.ote.core/src/org/eclipse/osee/ote/endpoint/OteEndpointReceiveRunnable.java index 514e276c438..ed2e6be4c30 100644 --- a/plugins/org.eclipse.osee.ote.core/src/org/eclipse/osee/ote/endpoint/OteEndpointReceiveRunnable.java +++ b/plugins/org.eclipse.osee.ote.core/src/org/eclipse/osee/ote/endpoint/OteEndpointReceiveRunnable.java @@ -1,5 +1,6 @@ package org.eclipse.osee.ote.endpoint; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InterruptedIOException; import java.net.BindException; @@ -11,7 +12,8 @@ import java.nio.channels.DatagramChannel; import java.util.Date; import java.util.concurrent.CopyOnWriteArrayList; import java.util.logging.Level; - +import java.util.zip.DataFormatException; +import java.util.zip.Inflater; import org.eclipse.osee.framework.logging.OseeLog; import org.eclipse.osee.ote.OTEException; import org.eclipse.osee.ote.message.event.OteEventMessage; @@ -28,6 +30,8 @@ public class OteEndpointReceiveRunnable implements Runnable { private volatile boolean debugOutput = false; private Class<OteEndpointReceiveRunnable> logger = OteEndpointReceiveRunnable.class; private final InetSocketAddress address; + private static final int MAGIC_NUMBER = ByteBuffer.wrap(OteEndpointSendRunnable.MAGIC_NUMBER).getInt(); + private final Inflater inflater = new Inflater(); private CopyOnWriteArrayList<EndpointDataProcessor> dataProcessors = new CopyOnWriteArrayList<>(); @@ -99,6 +103,31 @@ public class OteEndpointReceiveRunnable implements Runnable { } private void processBuffer(ByteBuffer buffer) { + int magicNumber = 0; + if(buffer.remaining() > 4) { + magicNumber = buffer.getInt(0); + } + // compressed stream + if(magicNumber == MAGIC_NUMBER) { + inflater.reset(); + inflater.setInput(buffer.array(), 4, buffer.remaining() - 4); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(buffer.remaining() - 4); + byte[] tempBuf = new byte[1024]; + try { + while (!inflater.finished()) { + int count = inflater.inflate(tempBuf); + outputStream.write(tempBuf, 0, count); + } + outputStream.close(); + buffer = ByteBuffer.wrap(outputStream.toByteArray()); + } + catch (DataFormatException e) { + OseeLog.log(getClass(), Level.SEVERE, e); + } + catch (IOException e) { + OseeLog.log(getClass(), Level.SEVERE, e); + } + } int typeId = buffer.getShort(0) & 0xFFFF; if(typeId == OteEventMessageHeader.MARKER_VALUE){ byte[] data = new byte[buffer.remaining()]; diff --git a/plugins/org.eclipse.osee.ote.core/src/org/eclipse/osee/ote/endpoint/OteEndpointSendRunnable.java b/plugins/org.eclipse.osee.ote.core/src/org/eclipse/osee/ote/endpoint/OteEndpointSendRunnable.java index 65d9af4d16c..ea9a7dcd0c2 100644 --- a/plugins/org.eclipse.osee.ote.core/src/org/eclipse/osee/ote/endpoint/OteEndpointSendRunnable.java +++ b/plugins/org.eclipse.osee.ote.core/src/org/eclipse/osee/ote/endpoint/OteEndpointSendRunnable.java @@ -1,5 +1,6 @@ package org.eclipse.osee.ote.endpoint; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.channels.AsynchronousCloseException; import java.nio.channels.ClosedByInterruptException; @@ -10,7 +11,7 @@ import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.logging.Level; - +import java.util.zip.Deflater; import org.eclipse.osee.framework.jdk.core.util.Lib; import org.eclipse.osee.framework.logging.OseeLog; import org.eclipse.osee.ote.collections.ObjectPool; @@ -18,9 +19,15 @@ import org.eclipse.osee.ote.collections.ObjectPool; final class OteEndpointSendRunnable implements Runnable { private static final int SEND_BUFFER_SIZE = 1024 * 512; + /** + * This is the header for a ZIP file + */ + public static final byte[] MAGIC_NUMBER = {0x04, 0x03, 0x4b, 0x50}; + private static final int MAX_DGRAM_SIZE = 65500; private final ArrayBlockingQueue<AddressBuffer> toSend; private final ObjectPool<AddressBuffer> buffers; + private final Deflater compressor = new Deflater(); private boolean debug = false; @@ -28,6 +35,7 @@ final class OteEndpointSendRunnable implements Runnable { this.toSend = toSend; this.buffers = buffers; this.debug = debug; + compressor.setLevel(Deflater.BEST_SPEED); } @Override @@ -64,6 +72,33 @@ final class OteEndpointSendRunnable implements Runnable { keepRunning = false; break; } + + int bufSize = data.getBuffer().remaining(); + if(bufSize > MAX_DGRAM_SIZE) { + compressor.reset(); + compressor.setInput(data.getBuffer().array(), 0, bufSize); + compressor.finish(); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(bufSize); + + byte[] buf = new byte[1024]; + while (!compressor.finished()) { + int count = compressor.deflate(buf); + bos.write(buf, 0, count); + } + try { + bos.close(); + byte[] dataBytes = new byte[bos.size() + 4]; + System.arraycopy(MAGIC_NUMBER, 0, dataBytes, 0, 4); + System.arraycopy(bos.toByteArray(), 0, dataBytes, 4, bos.size()); + data.setBytes(dataBytes); + } + catch (IOException e) { + OseeLog.log(getClass(), Level.SEVERE, "Error trying to compress data", e); + continue; + } + } + threadChannel.send(data.getBuffer(), data.getAddress()); } } catch (ClosedByInterruptException ex){ |