Skip to main content
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--plugins/org.eclipse.osee.ote.core/src/org/eclipse/osee/ote/endpoint/AddressBuffer.java6
-rw-r--r--plugins/org.eclipse.osee.ote.core/src/org/eclipse/osee/ote/endpoint/OteEndpointReceiveRunnable.java31
-rw-r--r--plugins/org.eclipse.osee.ote.core/src/org/eclipse/osee/ote/endpoint/OteEndpointSendRunnable.java37
3 files changed, 71 insertions, 3 deletions
diff --git a/plugins/org.eclipse.osee.ote.core/src/org/eclipse/osee/ote/endpoint/AddressBuffer.java b/plugins/org.eclipse.osee.ote.core/src/org/eclipse/osee/ote/endpoint/AddressBuffer.java
index 7ea00ea2206..7e1ec2f33df 100644
--- a/plugins/org.eclipse.osee.ote.core/src/org/eclipse/osee/ote/endpoint/AddressBuffer.java
+++ b/plugins/org.eclipse.osee.ote.core/src/org/eclipse/osee/ote/endpoint/AddressBuffer.java
@@ -5,7 +5,7 @@ import java.nio.ByteBuffer;
public class AddressBuffer {
- private final ByteBuffer buffer;
+ private ByteBuffer buffer;
private InetSocketAddress address;
public AddressBuffer(){
@@ -16,6 +16,10 @@ public class AddressBuffer {
return buffer;
}
+ public void setBytes(byte[] bytes) {
+ buffer = ByteBuffer.wrap(bytes);
+ }
+
public InetSocketAddress getAddress(){
return address;
}
diff --git a/plugins/org.eclipse.osee.ote.core/src/org/eclipse/osee/ote/endpoint/OteEndpointReceiveRunnable.java b/plugins/org.eclipse.osee.ote.core/src/org/eclipse/osee/ote/endpoint/OteEndpointReceiveRunnable.java
index 514e276c438..ed2e6be4c30 100644
--- a/plugins/org.eclipse.osee.ote.core/src/org/eclipse/osee/ote/endpoint/OteEndpointReceiveRunnable.java
+++ b/plugins/org.eclipse.osee.ote.core/src/org/eclipse/osee/ote/endpoint/OteEndpointReceiveRunnable.java
@@ -1,5 +1,6 @@
package org.eclipse.osee.ote.endpoint;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.BindException;
@@ -11,7 +12,8 @@ import java.nio.channels.DatagramChannel;
import java.util.Date;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.logging.Level;
-
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
import org.eclipse.osee.framework.logging.OseeLog;
import org.eclipse.osee.ote.OTEException;
import org.eclipse.osee.ote.message.event.OteEventMessage;
@@ -28,6 +30,8 @@ public class OteEndpointReceiveRunnable implements Runnable {
private volatile boolean debugOutput = false;
private Class<OteEndpointReceiveRunnable> logger = OteEndpointReceiveRunnable.class;
private final InetSocketAddress address;
+ private static final int MAGIC_NUMBER = ByteBuffer.wrap(OteEndpointSendRunnable.MAGIC_NUMBER).getInt();
+ private final Inflater inflater = new Inflater();
private CopyOnWriteArrayList<EndpointDataProcessor> dataProcessors = new CopyOnWriteArrayList<>();
@@ -99,6 +103,31 @@ public class OteEndpointReceiveRunnable implements Runnable {
}
private void processBuffer(ByteBuffer buffer) {
+ int magicNumber = 0;
+ if(buffer.remaining() > 4) {
+ magicNumber = buffer.getInt(0);
+ }
+ // compressed stream
+ if(magicNumber == MAGIC_NUMBER) {
+ inflater.reset();
+ inflater.setInput(buffer.array(), 4, buffer.remaining() - 4);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream(buffer.remaining() - 4);
+ byte[] tempBuf = new byte[1024];
+ try {
+ while (!inflater.finished()) {
+ int count = inflater.inflate(tempBuf);
+ outputStream.write(tempBuf, 0, count);
+ }
+ outputStream.close();
+ buffer = ByteBuffer.wrap(outputStream.toByteArray());
+ }
+ catch (DataFormatException e) {
+ OseeLog.log(getClass(), Level.SEVERE, e);
+ }
+ catch (IOException e) {
+ OseeLog.log(getClass(), Level.SEVERE, e);
+ }
+ }
int typeId = buffer.getShort(0) & 0xFFFF;
if(typeId == OteEventMessageHeader.MARKER_VALUE){
byte[] data = new byte[buffer.remaining()];
diff --git a/plugins/org.eclipse.osee.ote.core/src/org/eclipse/osee/ote/endpoint/OteEndpointSendRunnable.java b/plugins/org.eclipse.osee.ote.core/src/org/eclipse/osee/ote/endpoint/OteEndpointSendRunnable.java
index 65d9af4d16c..ea9a7dcd0c2 100644
--- a/plugins/org.eclipse.osee.ote.core/src/org/eclipse/osee/ote/endpoint/OteEndpointSendRunnable.java
+++ b/plugins/org.eclipse.osee.ote.core/src/org/eclipse/osee/ote/endpoint/OteEndpointSendRunnable.java
@@ -1,5 +1,6 @@
package org.eclipse.osee.ote.endpoint;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedByInterruptException;
@@ -10,7 +11,7 @@ import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
-
+import java.util.zip.Deflater;
import org.eclipse.osee.framework.jdk.core.util.Lib;
import org.eclipse.osee.framework.logging.OseeLog;
import org.eclipse.osee.ote.collections.ObjectPool;
@@ -18,9 +19,15 @@ import org.eclipse.osee.ote.collections.ObjectPool;
final class OteEndpointSendRunnable implements Runnable {
private static final int SEND_BUFFER_SIZE = 1024 * 512;
+ /**
+ * This is the header for a ZIP file
+ */
+ public static final byte[] MAGIC_NUMBER = {0x04, 0x03, 0x4b, 0x50};
+ private static final int MAX_DGRAM_SIZE = 65500;
private final ArrayBlockingQueue<AddressBuffer> toSend;
private final ObjectPool<AddressBuffer> buffers;
+ private final Deflater compressor = new Deflater();
private boolean debug = false;
@@ -28,6 +35,7 @@ final class OteEndpointSendRunnable implements Runnable {
this.toSend = toSend;
this.buffers = buffers;
this.debug = debug;
+ compressor.setLevel(Deflater.BEST_SPEED);
}
@Override
@@ -64,6 +72,33 @@ final class OteEndpointSendRunnable implements Runnable {
keepRunning = false;
break;
}
+
+ int bufSize = data.getBuffer().remaining();
+ if(bufSize > MAX_DGRAM_SIZE) {
+ compressor.reset();
+ compressor.setInput(data.getBuffer().array(), 0, bufSize);
+ compressor.finish();
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(bufSize);
+
+ byte[] buf = new byte[1024];
+ while (!compressor.finished()) {
+ int count = compressor.deflate(buf);
+ bos.write(buf, 0, count);
+ }
+ try {
+ bos.close();
+ byte[] dataBytes = new byte[bos.size() + 4];
+ System.arraycopy(MAGIC_NUMBER, 0, dataBytes, 0, 4);
+ System.arraycopy(bos.toByteArray(), 0, dataBytes, 4, bos.size());
+ data.setBytes(dataBytes);
+ }
+ catch (IOException e) {
+ OseeLog.log(getClass(), Level.SEVERE, "Error trying to compress data", e);
+ continue;
+ }
+ }
+
threadChannel.send(data.getBuffer(), data.getAddress());
}
} catch (ClosedByInterruptException ex){

Back to the top