diff options
author | Eike Stepper | 2008-05-26 08:36:19 +0000 |
---|---|---|
committer | Eike Stepper | 2008-05-26 08:36:19 +0000 |
commit | 44e6dc9c8ebac4806fa479a4b2828c70d1b88c5a (patch) | |
tree | f974285e1da57f260173f428896d528e7b298103 | |
parent | 538dd73622c329ec58145d4f44b07992b2dc722c (diff) | |
download | cdo-44e6dc9c8ebac4806fa479a4b2828c70d1b88c5a.tar.gz cdo-44e6dc9c8ebac4806fa479a4b2828c70d1b88c5a.tar.xz cdo-44e6dc9c8ebac4806fa479a4b2828c70d1b88c5a.zip |
[232648] Provide an HTTPConnector
https://bugs.eclipse.org/bugs/show_bug.cgi?id=232648
4 files changed, 123 insertions, 112 deletions
diff --git a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPAcceptor.java b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPAcceptor.java index a079825399..5bc5ec94a3 100644 --- a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPAcceptor.java +++ b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPAcceptor.java @@ -182,12 +182,8 @@ public class HTTPAcceptor extends Acceptor implements IHTTPAcceptor, INet4jTrans throw new IllegalArgumentException("Invalid connectorID: " + connectorID); } - short channelIndex = in.readShort(); - while (channelIndex != -1) - { - connector.handleBufferFromMultiplexer(channelIndex, in); - channelIndex = in.readShort(); - } + connector.readInputBuffers(in); + connector.writeOutputBuffers(out); } @Override diff --git a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPClientConnector.java b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPClientConnector.java index 79b21fa878..1736fa1004 100644 --- a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPClientConnector.java +++ b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPClientConnector.java @@ -76,61 +76,6 @@ public class HTTPClientConnector extends HTTPConnector } @Override - public void multiplexChannel(final IChannel channel) - { - Queue<IBuffer> localQueue = ((InternalChannel)channel).getSendQueue(); - final IBuffer buffer = localQueue.poll(); - if (TRACER.isEnabled()) - { - TRACER.trace("Multiplexing " + ((Buffer)buffer).formatContent(true)); - } - - try - { - request(new IOHandler() - { - public void handleOut(ExtendedDataOutputStream out) throws IOException - { - out.writeByte(INet4jTransportServlet.OPCODE_BUFFERS); - out.writeString(getConnectorID()); - out.writeShort(channel.getChannelIndex()); - - buffer.flip(); - ByteBuffer byteBuffer = buffer.getByteBuffer(); - byteBuffer.position(IBuffer.HEADER_SIZE); - int length = byteBuffer.limit() - byteBuffer.position(); - out.writeShort(length); - for (int i = 0; i < length; i++) - { - byte b = byteBuffer.get(); - System.out.println("Payload: " + b); - out.writeByte(b); - } - - buffer.release(); - } - - public void handleIn(ExtendedDataInputStream in) throws IOException - { - boolean ok = in.readBoolean(); - if (!ok) - { - throw new ConnectorException("Could not send buffer"); - } - } - }); - } - catch (RuntimeException ex) - { - throw ex; - } - catch (IOException ex) - { - throw new IORuntimeException(ex); - } - } - - @Override public String toString() { if (getUserID() == null) diff --git a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPConnector.java b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPConnector.java index b4029dbc1c..548a79a944 100644 --- a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPConnector.java +++ b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPConnector.java @@ -15,12 +15,16 @@ import org.eclipse.net4j.channel.IChannel; import org.eclipse.net4j.http.IHTTPConnector; import org.eclipse.net4j.internal.http.bundle.OM; import org.eclipse.net4j.internal.util.om.trace.ContextTracer; +import org.eclipse.net4j.util.io.ExtendedDataInputStream; +import org.eclipse.net4j.util.io.ExtendedDataOutputStream; import org.eclipse.net4j.util.security.INegotiationContext; import org.eclipse.internal.net4j.buffer.Buffer; import org.eclipse.internal.net4j.channel.Channel; import org.eclipse.internal.net4j.connector.Connector; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @@ -29,14 +33,19 @@ import java.util.concurrent.ConcurrentLinkedQueue; */ public abstract class HTTPConnector extends Connector implements IHTTPConnector { + public static final short NO_MORE_BUFFERS = -1; + private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG, HTTPConnector.class); private String connectorID; - private Queue<QueuedBuffer> outputQueue = new ConcurrentLinkedQueue<QueuedBuffer>(); + private transient Queue<QueuedBuffer> outputQueue = new ConcurrentLinkedQueue<QueuedBuffer>(); + + private transient long lastTraffic; public HTTPConnector() { + markLastTraffic(); } public String getConnectorID() @@ -54,6 +63,16 @@ public abstract class HTTPConnector extends Connector implements IHTTPConnector return outputQueue; } + public long getLastTraffic() + { + return lastTraffic; + } + + private void markLastTraffic() + { + lastTraffic = System.currentTimeMillis(); + } + public void multiplexChannel(IChannel channel) { IBuffer buffer; @@ -76,6 +95,102 @@ public abstract class HTTPConnector extends Connector implements IHTTPConnector outputQueue.add(new QueuedBuffer(buffer, outputBufferCount)); } + public void writeOutputBuffers(ExtendedDataOutputStream out) throws IOException + { + do + { + QueuedBuffer queuedBuffer = outputQueue.poll(); + if (queuedBuffer == null) + { + break; + } + + IBuffer buffer = queuedBuffer.getBuffer(); + short channelIndex = buffer.getChannelIndex(); + out.writeShort(channelIndex); + + long channelCount = queuedBuffer.getChannelCount(); + out.writeLong(channelCount); + + buffer.flip(); + ByteBuffer byteBuffer = buffer.getByteBuffer(); + byteBuffer.position(IBuffer.HEADER_SIZE); + int length = byteBuffer.limit() - byteBuffer.position(); + out.writeShort(length); + for (int i = 0; i < length; i++) + { + byte b = byteBuffer.get(); + System.out.println("Payload: " + b); + out.writeByte(b); + } + + buffer.release(); + markLastTraffic(); + } while (writeMoreBuffers()); + + out.writeShort(NO_MORE_BUFFERS); + } + + public void readInputBuffers(ExtendedDataInputStream in) throws IOException + { + short channelIndex = in.readShort(); + while (channelIndex != NO_MORE_BUFFERS) + { + HTTPChannel channel = (HTTPChannel)getChannel(channelIndex); + if (channel == null) + { + throw new IllegalArgumentException("Invalid channelIndex: " + channelIndex); + } + + long bufferCount = in.readLong(); + int length = in.readShort(); + + IBuffer buffer = getBufferProvider().provideBuffer(); + ByteBuffer byteBuffer = buffer.startPutting(channelIndex); + for (int i = 0; i < length; i++) + { + byte b = in.readByte(); + System.out.println("Payload: " + b); + byteBuffer.put(b); + } + + buffer.flip(); + handleInputBuffer(channel, bufferCount, buffer); + markLastTraffic(); + channelIndex = in.readShort(); + } + } + + private void handleInputBuffer(HTTPChannel channel, long bufferCount, IBuffer buffer) + { + synchronized (channel) + { + while (bufferCount < channel.getInputBufferCount()) + { + IBuffer quarantinedBuffer = channel.getQuarantinedInputBuffer(channel.getInputBufferCount()); + if (quarantinedBuffer != null) + { + channel.handleBufferFromMultiplexer(buffer); + channel.increaseInputBufferCount(); + } + else + { + break; + } + } + + if (bufferCount == channel.getInputBufferCount()) + { + channel.handleBufferFromMultiplexer(buffer); + channel.increaseInputBufferCount(); + } + else + { + channel.quarantineInputBuffer(bufferCount, buffer); + } + } + } + @Override protected INegotiationContext createNegotiationContext() { @@ -87,4 +202,9 @@ public abstract class HTTPConnector extends Connector implements IHTTPConnector { return new HTTPChannel(); } + + protected boolean writeMoreBuffers() + { + return true; + } } diff --git a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPServerConnector.java b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPServerConnector.java index 73002b70f4..57f451c221 100644 --- a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPServerConnector.java +++ b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPServerConnector.java @@ -10,17 +10,10 @@ **************************************************************************/ package org.eclipse.net4j.internal.http; -import org.eclipse.net4j.buffer.IBuffer; -import org.eclipse.net4j.channel.IChannel; import org.eclipse.net4j.connector.ConnectorException; import org.eclipse.net4j.connector.ConnectorLocation; import org.eclipse.net4j.protocol.IProtocol; -import org.eclipse.net4j.util.io.ExtendedDataInputStream; -import org.eclipse.internal.net4j.channel.InternalChannel; - -import java.io.IOException; -import java.nio.ByteBuffer; import java.text.MessageFormat; /** @@ -30,12 +23,9 @@ public class HTTPServerConnector extends HTTPConnector { private HTTPAcceptor acceptor; - private long lastTraffic; - public HTTPServerConnector(HTTPAcceptor acceptor) { this.acceptor = acceptor; - markLastTraffic(); } public HTTPAcceptor getAcceptor() @@ -58,46 +48,6 @@ public class HTTPServerConnector extends HTTPConnector return acceptor.getMaxIdleTime(); } - public long getLastTraffic() - { - return lastTraffic; - } - - private void markLastTraffic() - { - lastTraffic = System.currentTimeMillis(); - } - - @Override - public void multiplexChannel(IChannel channel) - { - throw new UnsupportedOperationException(); - } - - public void handleBufferFromMultiplexer(short channelIndex, ExtendedDataInputStream in) throws IOException - { - InternalChannel channel = getChannel(channelIndex); - if (channel == null) - { - throw new IllegalArgumentException("Invalid channelIndex: " + channelIndex); - } - - int length = in.readShort(); - - IBuffer buffer = getBufferProvider().provideBuffer(); - ByteBuffer byteBuffer = buffer.startPutting(channelIndex); - for (int i = 0; i < length; i++) - { - byte b = in.readByte(); - System.out.println("Payload: " + b); - byteBuffer.put(b); - } - - buffer.flip(); - channel.handleBufferFromMultiplexer(buffer); - markLastTraffic(); - } - @Override public String toString() { |