summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEike Stepper2008-05-26 04:36:19 (EDT)
committerEike Stepper2008-05-26 04:36:19 (EDT)
commit44e6dc9c8ebac4806fa479a4b2828c70d1b88c5a (patch)
treef974285e1da57f260173f428896d528e7b298103
parent538dd73622c329ec58145d4f44b07992b2dc722c (diff)
downloadcdo-44e6dc9c8ebac4806fa479a4b2828c70d1b88c5a.zip
cdo-44e6dc9c8ebac4806fa479a4b2828c70d1b88c5a.tar.gz
cdo-44e6dc9c8ebac4806fa479a4b2828c70d1b88c5a.tar.bz2
[232648] Provide an HTTPConnector
https://bugs.eclipse.org/bugs/show_bug.cgi?id=232648
-rw-r--r--plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPAcceptor.java8
-rw-r--r--plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPClientConnector.java55
-rw-r--r--plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPConnector.java122
-rw-r--r--plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPServerConnector.java50
4 files changed, 123 insertions, 112 deletions
diff --git a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPAcceptor.java b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPAcceptor.java
index a079825..5bc5ec9 100644
--- a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPAcceptor.java
+++ b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPAcceptor.java
@@ -182,12 +182,8 @@ public class HTTPAcceptor extends Acceptor implements IHTTPAcceptor, INet4jTrans
throw new IllegalArgumentException("Invalid connectorID: " + connectorID);
}
- short channelIndex = in.readShort();
- while (channelIndex != -1)
- {
- connector.handleBufferFromMultiplexer(channelIndex, in);
- channelIndex = in.readShort();
- }
+ connector.readInputBuffers(in);
+ connector.writeOutputBuffers(out);
}
@Override
diff --git a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPClientConnector.java b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPClientConnector.java
index 79b21fa..1736fa1 100644
--- a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPClientConnector.java
+++ b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPClientConnector.java
@@ -76,61 +76,6 @@ public class HTTPClientConnector extends HTTPConnector
}
@Override
- public void multiplexChannel(final IChannel channel)
- {
- Queue<IBuffer> localQueue = ((InternalChannel)channel).getSendQueue();
- final IBuffer buffer = localQueue.poll();
- if (TRACER.isEnabled())
- {
- TRACER.trace("Multiplexing " + ((Buffer)buffer).formatContent(true));
- }
-
- try
- {
- request(new IOHandler()
- {
- public void handleOut(ExtendedDataOutputStream out) throws IOException
- {
- out.writeByte(INet4jTransportServlet.OPCODE_BUFFERS);
- out.writeString(getConnectorID());
- out.writeShort(channel.getChannelIndex());
-
- buffer.flip();
- ByteBuffer byteBuffer = buffer.getByteBuffer();
- byteBuffer.position(IBuffer.HEADER_SIZE);
- int length = byteBuffer.limit() - byteBuffer.position();
- out.writeShort(length);
- for (int i = 0; i < length; i++)
- {
- byte b = byteBuffer.get();
- System.out.println("Payload: " + b);
- out.writeByte(b);
- }
-
- buffer.release();
- }
-
- public void handleIn(ExtendedDataInputStream in) throws IOException
- {
- boolean ok = in.readBoolean();
- if (!ok)
- {
- throw new ConnectorException("Could not send buffer");
- }
- }
- });
- }
- catch (RuntimeException ex)
- {
- throw ex;
- }
- catch (IOException ex)
- {
- throw new IORuntimeException(ex);
- }
- }
-
- @Override
public String toString()
{
if (getUserID() == null)
diff --git a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPConnector.java b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPConnector.java
index b4029db..548a79a 100644
--- a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPConnector.java
+++ b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPConnector.java
@@ -15,12 +15,16 @@ import org.eclipse.net4j.channel.IChannel;
import org.eclipse.net4j.http.IHTTPConnector;
import org.eclipse.net4j.internal.http.bundle.OM;
import org.eclipse.net4j.internal.util.om.trace.ContextTracer;
+import org.eclipse.net4j.util.io.ExtendedDataInputStream;
+import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
import org.eclipse.net4j.util.security.INegotiationContext;
import org.eclipse.internal.net4j.buffer.Buffer;
import org.eclipse.internal.net4j.channel.Channel;
import org.eclipse.internal.net4j.connector.Connector;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -29,14 +33,19 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*/
public abstract class HTTPConnector extends Connector implements IHTTPConnector
{
+ public static final short NO_MORE_BUFFERS = -1;
+
private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG, HTTPConnector.class);
private String connectorID;
- private Queue<QueuedBuffer> outputQueue = new ConcurrentLinkedQueue<QueuedBuffer>();
+ private transient Queue<QueuedBuffer> outputQueue = new ConcurrentLinkedQueue<QueuedBuffer>();
+
+ private transient long lastTraffic;
public HTTPConnector()
{
+ markLastTraffic();
}
public String getConnectorID()
@@ -54,6 +63,16 @@ public abstract class HTTPConnector extends Connector implements IHTTPConnector
return outputQueue;
}
+ public long getLastTraffic()
+ {
+ return lastTraffic;
+ }
+
+ private void markLastTraffic()
+ {
+ lastTraffic = System.currentTimeMillis();
+ }
+
public void multiplexChannel(IChannel channel)
{
IBuffer buffer;
@@ -76,6 +95,102 @@ public abstract class HTTPConnector extends Connector implements IHTTPConnector
outputQueue.add(new QueuedBuffer(buffer, outputBufferCount));
}
+ public void writeOutputBuffers(ExtendedDataOutputStream out) throws IOException
+ {
+ do
+ {
+ QueuedBuffer queuedBuffer = outputQueue.poll();
+ if (queuedBuffer == null)
+ {
+ break;
+ }
+
+ IBuffer buffer = queuedBuffer.getBuffer();
+ short channelIndex = buffer.getChannelIndex();
+ out.writeShort(channelIndex);
+
+ long channelCount = queuedBuffer.getChannelCount();
+ out.writeLong(channelCount);
+
+ buffer.flip();
+ ByteBuffer byteBuffer = buffer.getByteBuffer();
+ byteBuffer.position(IBuffer.HEADER_SIZE);
+ int length = byteBuffer.limit() - byteBuffer.position();
+ out.writeShort(length);
+ for (int i = 0; i < length; i++)
+ {
+ byte b = byteBuffer.get();
+ System.out.println("Payload: " + b);
+ out.writeByte(b);
+ }
+
+ buffer.release();
+ markLastTraffic();
+ } while (writeMoreBuffers());
+
+ out.writeShort(NO_MORE_BUFFERS);
+ }
+
+ public void readInputBuffers(ExtendedDataInputStream in) throws IOException
+ {
+ short channelIndex = in.readShort();
+ while (channelIndex != NO_MORE_BUFFERS)
+ {
+ HTTPChannel channel = (HTTPChannel)getChannel(channelIndex);
+ if (channel == null)
+ {
+ throw new IllegalArgumentException("Invalid channelIndex: " + channelIndex);
+ }
+
+ long bufferCount = in.readLong();
+ int length = in.readShort();
+
+ IBuffer buffer = getBufferProvider().provideBuffer();
+ ByteBuffer byteBuffer = buffer.startPutting(channelIndex);
+ for (int i = 0; i < length; i++)
+ {
+ byte b = in.readByte();
+ System.out.println("Payload: " + b);
+ byteBuffer.put(b);
+ }
+
+ buffer.flip();
+ handleInputBuffer(channel, bufferCount, buffer);
+ markLastTraffic();
+ channelIndex = in.readShort();
+ }
+ }
+
+ private void handleInputBuffer(HTTPChannel channel, long bufferCount, IBuffer buffer)
+ {
+ synchronized (channel)
+ {
+ while (bufferCount < channel.getInputBufferCount())
+ {
+ IBuffer quarantinedBuffer = channel.getQuarantinedInputBuffer(channel.getInputBufferCount());
+ if (quarantinedBuffer != null)
+ {
+ channel.handleBufferFromMultiplexer(buffer);
+ channel.increaseInputBufferCount();
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ if (bufferCount == channel.getInputBufferCount())
+ {
+ channel.handleBufferFromMultiplexer(buffer);
+ channel.increaseInputBufferCount();
+ }
+ else
+ {
+ channel.quarantineInputBuffer(bufferCount, buffer);
+ }
+ }
+ }
+
@Override
protected INegotiationContext createNegotiationContext()
{
@@ -87,4 +202,9 @@ public abstract class HTTPConnector extends Connector implements IHTTPConnector
{
return new HTTPChannel();
}
+
+ protected boolean writeMoreBuffers()
+ {
+ return true;
+ }
}
diff --git a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPServerConnector.java b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPServerConnector.java
index 73002b7..57f451c 100644
--- a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPServerConnector.java
+++ b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPServerConnector.java
@@ -10,17 +10,10 @@
**************************************************************************/
package org.eclipse.net4j.internal.http;
-import org.eclipse.net4j.buffer.IBuffer;
-import org.eclipse.net4j.channel.IChannel;
import org.eclipse.net4j.connector.ConnectorException;
import org.eclipse.net4j.connector.ConnectorLocation;
import org.eclipse.net4j.protocol.IProtocol;
-import org.eclipse.net4j.util.io.ExtendedDataInputStream;
-import org.eclipse.internal.net4j.channel.InternalChannel;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
import java.text.MessageFormat;
/**
@@ -30,12 +23,9 @@ public class HTTPServerConnector extends HTTPConnector
{
private HTTPAcceptor acceptor;
- private long lastTraffic;
-
public HTTPServerConnector(HTTPAcceptor acceptor)
{
this.acceptor = acceptor;
- markLastTraffic();
}
public HTTPAcceptor getAcceptor()
@@ -58,46 +48,6 @@ public class HTTPServerConnector extends HTTPConnector
return acceptor.getMaxIdleTime();
}
- public long getLastTraffic()
- {
- return lastTraffic;
- }
-
- private void markLastTraffic()
- {
- lastTraffic = System.currentTimeMillis();
- }
-
- @Override
- public void multiplexChannel(IChannel channel)
- {
- throw new UnsupportedOperationException();
- }
-
- public void handleBufferFromMultiplexer(short channelIndex, ExtendedDataInputStream in) throws IOException
- {
- InternalChannel channel = getChannel(channelIndex);
- if (channel == null)
- {
- throw new IllegalArgumentException("Invalid channelIndex: " + channelIndex);
- }
-
- int length = in.readShort();
-
- IBuffer buffer = getBufferProvider().provideBuffer();
- ByteBuffer byteBuffer = buffer.startPutting(channelIndex);
- for (int i = 0; i < length; i++)
- {
- byte b = in.readByte();
- System.out.println("Payload: " + b);
- byteBuffer.put(b);
- }
-
- buffer.flip();
- channel.handleBufferFromMultiplexer(buffer);
- markLastTraffic();
- }
-
@Override
public String toString()
{