diff options
Diffstat (limited to 'plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPConnector.java')
-rw-r--r-- | plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPConnector.java | 122 |
1 files changed, 121 insertions, 1 deletions
diff --git a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPConnector.java b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPConnector.java index b4029dbc1c..548a79a944 100644 --- a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPConnector.java +++ b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPConnector.java @@ -15,12 +15,16 @@ import org.eclipse.net4j.channel.IChannel; import org.eclipse.net4j.http.IHTTPConnector; import org.eclipse.net4j.internal.http.bundle.OM; import org.eclipse.net4j.internal.util.om.trace.ContextTracer; +import org.eclipse.net4j.util.io.ExtendedDataInputStream; +import org.eclipse.net4j.util.io.ExtendedDataOutputStream; import org.eclipse.net4j.util.security.INegotiationContext; import org.eclipse.internal.net4j.buffer.Buffer; import org.eclipse.internal.net4j.channel.Channel; import org.eclipse.internal.net4j.connector.Connector; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @@ -29,14 +33,19 @@ import java.util.concurrent.ConcurrentLinkedQueue; */ public abstract class HTTPConnector extends Connector implements IHTTPConnector { + public static final short NO_MORE_BUFFERS = -1; + private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG, HTTPConnector.class); private String connectorID; - private Queue<QueuedBuffer> outputQueue = new ConcurrentLinkedQueue<QueuedBuffer>(); + private transient Queue<QueuedBuffer> outputQueue = new ConcurrentLinkedQueue<QueuedBuffer>(); + + private transient long lastTraffic; public HTTPConnector() { + markLastTraffic(); } public String getConnectorID() @@ -54,6 +63,16 @@ public abstract class HTTPConnector extends Connector implements IHTTPConnector return outputQueue; } + public long getLastTraffic() + { + return lastTraffic; + } + + private void markLastTraffic() + { + lastTraffic = System.currentTimeMillis(); + } + public void multiplexChannel(IChannel channel) { IBuffer buffer; @@ -76,6 +95,102 @@ public abstract class HTTPConnector extends Connector implements IHTTPConnector outputQueue.add(new QueuedBuffer(buffer, outputBufferCount)); } + public void writeOutputBuffers(ExtendedDataOutputStream out) throws IOException + { + do + { + QueuedBuffer queuedBuffer = outputQueue.poll(); + if (queuedBuffer == null) + { + break; + } + + IBuffer buffer = queuedBuffer.getBuffer(); + short channelIndex = buffer.getChannelIndex(); + out.writeShort(channelIndex); + + long channelCount = queuedBuffer.getChannelCount(); + out.writeLong(channelCount); + + buffer.flip(); + ByteBuffer byteBuffer = buffer.getByteBuffer(); + byteBuffer.position(IBuffer.HEADER_SIZE); + int length = byteBuffer.limit() - byteBuffer.position(); + out.writeShort(length); + for (int i = 0; i < length; i++) + { + byte b = byteBuffer.get(); + System.out.println("Payload: " + b); + out.writeByte(b); + } + + buffer.release(); + markLastTraffic(); + } while (writeMoreBuffers()); + + out.writeShort(NO_MORE_BUFFERS); + } + + public void readInputBuffers(ExtendedDataInputStream in) throws IOException + { + short channelIndex = in.readShort(); + while (channelIndex != NO_MORE_BUFFERS) + { + HTTPChannel channel = (HTTPChannel)getChannel(channelIndex); + if (channel == null) + { + throw new IllegalArgumentException("Invalid channelIndex: " + channelIndex); + } + + long bufferCount = in.readLong(); + int length = in.readShort(); + + IBuffer buffer = getBufferProvider().provideBuffer(); + ByteBuffer byteBuffer = buffer.startPutting(channelIndex); + for (int i = 0; i < length; i++) + { + byte b = in.readByte(); + System.out.println("Payload: " + b); + byteBuffer.put(b); + } + + buffer.flip(); + handleInputBuffer(channel, bufferCount, buffer); + markLastTraffic(); + channelIndex = in.readShort(); + } + } + + private void handleInputBuffer(HTTPChannel channel, long bufferCount, IBuffer buffer) + { + synchronized (channel) + { + while (bufferCount < channel.getInputBufferCount()) + { + IBuffer quarantinedBuffer = channel.getQuarantinedInputBuffer(channel.getInputBufferCount()); + if (quarantinedBuffer != null) + { + channel.handleBufferFromMultiplexer(buffer); + channel.increaseInputBufferCount(); + } + else + { + break; + } + } + + if (bufferCount == channel.getInputBufferCount()) + { + channel.handleBufferFromMultiplexer(buffer); + channel.increaseInputBufferCount(); + } + else + { + channel.quarantineInputBuffer(bufferCount, buffer); + } + } + } + @Override protected INegotiationContext createNegotiationContext() { @@ -87,4 +202,9 @@ public abstract class HTTPConnector extends Connector implements IHTTPConnector { return new HTTPChannel(); } + + protected boolean writeMoreBuffers() + { + return true; + } } |