Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPConnector.java')
-rw-r--r--plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPConnector.java122
1 files changed, 121 insertions, 1 deletions
diff --git a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPConnector.java b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPConnector.java
index b4029dbc1c..548a79a944 100644
--- a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPConnector.java
+++ b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPConnector.java
@@ -15,12 +15,16 @@ import org.eclipse.net4j.channel.IChannel;
import org.eclipse.net4j.http.IHTTPConnector;
import org.eclipse.net4j.internal.http.bundle.OM;
import org.eclipse.net4j.internal.util.om.trace.ContextTracer;
+import org.eclipse.net4j.util.io.ExtendedDataInputStream;
+import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
import org.eclipse.net4j.util.security.INegotiationContext;
import org.eclipse.internal.net4j.buffer.Buffer;
import org.eclipse.internal.net4j.channel.Channel;
import org.eclipse.internal.net4j.connector.Connector;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -29,14 +33,19 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*/
public abstract class HTTPConnector extends Connector implements IHTTPConnector
{
+ public static final short NO_MORE_BUFFERS = -1;
+
private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG, HTTPConnector.class);
private String connectorID;
- private Queue<QueuedBuffer> outputQueue = new ConcurrentLinkedQueue<QueuedBuffer>();
+ private transient Queue<QueuedBuffer> outputQueue = new ConcurrentLinkedQueue<QueuedBuffer>();
+
+ private transient long lastTraffic;
public HTTPConnector()
{
+ markLastTraffic();
}
public String getConnectorID()
@@ -54,6 +63,16 @@ public abstract class HTTPConnector extends Connector implements IHTTPConnector
return outputQueue;
}
+ public long getLastTraffic()
+ {
+ return lastTraffic;
+ }
+
+ private void markLastTraffic()
+ {
+ lastTraffic = System.currentTimeMillis();
+ }
+
public void multiplexChannel(IChannel channel)
{
IBuffer buffer;
@@ -76,6 +95,102 @@ public abstract class HTTPConnector extends Connector implements IHTTPConnector
outputQueue.add(new QueuedBuffer(buffer, outputBufferCount));
}
+ public void writeOutputBuffers(ExtendedDataOutputStream out) throws IOException
+ {
+ do
+ {
+ QueuedBuffer queuedBuffer = outputQueue.poll();
+ if (queuedBuffer == null)
+ {
+ break;
+ }
+
+ IBuffer buffer = queuedBuffer.getBuffer();
+ short channelIndex = buffer.getChannelIndex();
+ out.writeShort(channelIndex);
+
+ long channelCount = queuedBuffer.getChannelCount();
+ out.writeLong(channelCount);
+
+ buffer.flip();
+ ByteBuffer byteBuffer = buffer.getByteBuffer();
+ byteBuffer.position(IBuffer.HEADER_SIZE);
+ int length = byteBuffer.limit() - byteBuffer.position();
+ out.writeShort(length);
+ for (int i = 0; i < length; i++)
+ {
+ byte b = byteBuffer.get();
+ System.out.println("Payload: " + b);
+ out.writeByte(b);
+ }
+
+ buffer.release();
+ markLastTraffic();
+ } while (writeMoreBuffers());
+
+ out.writeShort(NO_MORE_BUFFERS);
+ }
+
+ public void readInputBuffers(ExtendedDataInputStream in) throws IOException
+ {
+ short channelIndex = in.readShort();
+ while (channelIndex != NO_MORE_BUFFERS)
+ {
+ HTTPChannel channel = (HTTPChannel)getChannel(channelIndex);
+ if (channel == null)
+ {
+ throw new IllegalArgumentException("Invalid channelIndex: " + channelIndex);
+ }
+
+ long bufferCount = in.readLong();
+ int length = in.readShort();
+
+ IBuffer buffer = getBufferProvider().provideBuffer();
+ ByteBuffer byteBuffer = buffer.startPutting(channelIndex);
+ for (int i = 0; i < length; i++)
+ {
+ byte b = in.readByte();
+ System.out.println("Payload: " + b);
+ byteBuffer.put(b);
+ }
+
+ buffer.flip();
+ handleInputBuffer(channel, bufferCount, buffer);
+ markLastTraffic();
+ channelIndex = in.readShort();
+ }
+ }
+
+ private void handleInputBuffer(HTTPChannel channel, long bufferCount, IBuffer buffer)
+ {
+ synchronized (channel)
+ {
+ while (bufferCount < channel.getInputBufferCount())
+ {
+ IBuffer quarantinedBuffer = channel.getQuarantinedInputBuffer(channel.getInputBufferCount());
+ if (quarantinedBuffer != null)
+ {
+ channel.handleBufferFromMultiplexer(buffer);
+ channel.increaseInputBufferCount();
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ if (bufferCount == channel.getInputBufferCount())
+ {
+ channel.handleBufferFromMultiplexer(buffer);
+ channel.increaseInputBufferCount();
+ }
+ else
+ {
+ channel.quarantineInputBuffer(bufferCount, buffer);
+ }
+ }
+ }
+
@Override
protected INegotiationContext createNegotiationContext()
{
@@ -87,4 +202,9 @@ public abstract class HTTPConnector extends Connector implements IHTTPConnector
{
return new HTTPChannel();
}
+
+ protected boolean writeMoreBuffers()
+ {
+ return true;
+ }
}

Back to the top