summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEike Stepper2008-05-26 10:02:42 (EDT)
committerEike Stepper2008-05-26 10:02:42 (EDT)
commited41612d6f45c9a65070a384ec02fd0d47f07737 (patch)
tree52050325376f77f2ed5f6a466a5fab265e464025
parentec0e4577efead95f52911f43e064a5045816720d (diff)
downloadcdo-ed41612d6f45c9a65070a384ec02fd0d47f07737.zip
cdo-ed41612d6f45c9a65070a384ec02fd0d47f07737.tar.gz
cdo-ed41612d6f45c9a65070a384ec02fd0d47f07737.tar.bz2
[232648] Provide an HTTPConnector
https://bugs.eclipse.org/bugs/show_bug.cgi?id=232648
-rw-r--r--plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/http/INet4jTransportServlet.java16
-rw-r--r--plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPAcceptor.java30
-rw-r--r--plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPChannel.java53
-rw-r--r--plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPClientConnector.java61
-rw-r--r--plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPConnector.java430
-rw-r--r--plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/Net4jTransportServlet.java41
-rw-r--r--plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/tests/HTTPTest.java3
7 files changed, 396 insertions, 238 deletions
diff --git a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/http/INet4jTransportServlet.java b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/http/INet4jTransportServlet.java
index 8b6ece3..25bf7ea 100644
--- a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/http/INet4jTransportServlet.java
+++ b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/http/INet4jTransportServlet.java
@@ -22,16 +22,6 @@ import java.io.IOException;
*/
public interface INet4jTransportServlet extends Servlet
{
- public static final int OPCODE_CONNECT = 1;
-
- public static final int OPCODE_DISCONNECT = 2;
-
- public static final int OPCODE_OPEN_CHANNEL = 3;
-
- public static final int OPCODE_CLOSE_CHANNEL = 4;
-
- public static final int OPCODE_BUFFERS = 5;
-
public RequestHandler getRequestHandler();
public void setRequestHandler(RequestHandler handler);
@@ -47,11 +37,7 @@ public interface INet4jTransportServlet extends Servlet
public void handleDisonnect(String connectorID);
- public void handleOpenChannel(String connectorID, short channelIndex, int channelID, String protocolType);
-
- public void handleCloseChannel(String connectorID, short channelIndex);
-
- public void handleBuffers(String connectorID, ExtendedDataInputStream in, ExtendedDataOutputStream out)
+ public void handleOperations(String connectorID, ExtendedDataInputStream in, ExtendedDataOutputStream out)
throws IOException;
}
}
diff --git a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPAcceptor.java b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPAcceptor.java
index 1ef0f7a..5774952 100644
--- a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPAcceptor.java
+++ b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPAcceptor.java
@@ -23,7 +23,6 @@ import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
import org.eclipse.net4j.util.security.IRandomizer;
import org.eclipse.internal.net4j.acceptor.Acceptor;
-import org.eclipse.internal.net4j.channel.InternalChannel;
import org.eclipse.internal.net4j.connector.Connector;
import java.io.IOException;
@@ -160,29 +159,7 @@ public class HTTPAcceptor extends Acceptor implements IHTTPAcceptor, INet4jTrans
connector.deactivate();
}
- public void handleOpenChannel(String connectorID, short channelIndex, int channelID, String protocolType)
- {
- HTTPConnector connector = httpConnectors.get(connectorID);
- if (connector == null)
- {
- throw new IllegalArgumentException("Invalid connectorID: " + connectorID);
- }
-
- InternalChannel channel = connector.createChannel(channelID, channelIndex, protocolType);
- if (channel == null)
- {
- throw new IllegalStateException("Could not open channel");
- }
-
- channel.activate();
- }
-
- public void handleCloseChannel(String connectorID, short channelIndex)
- {
- throw new UnsupportedOperationException();
- }
-
- public void handleBuffers(String connectorID, ExtendedDataInputStream in, ExtendedDataOutputStream out)
+ public void handleOperations(String connectorID, ExtendedDataInputStream in, ExtendedDataOutputStream out)
throws IOException
{
HTTPServerConnector connector = httpConnectors.get(connectorID);
@@ -191,8 +168,8 @@ public class HTTPAcceptor extends Acceptor implements IHTTPAcceptor, INet4jTrans
throw new IllegalArgumentException("Invalid connectorID: " + connectorID);
}
- connector.readInputBuffers(in);
- connector.writeOutputBuffers(out);
+ connector.readInputOperations(in);
+ connector.writeOutputOperations(out);
}
@Override
@@ -230,6 +207,7 @@ public class HTTPAcceptor extends Acceptor implements IHTTPAcceptor, INet4jTrans
protected void doActivate() throws Exception
{
super.doActivate();
+ cleaner.setDaemon(true);
cleaner.activate();
}
diff --git a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPChannel.java b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPChannel.java
index a13e3ac..f845b43 100644
--- a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPChannel.java
+++ b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPChannel.java
@@ -16,61 +16,80 @@ import org.eclipse.internal.net4j.channel.Channel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
/**
* @author Eike Stepper
*/
public class HTTPChannel extends Channel
{
- private long outputBufferCount;
+ private long outputOperationCount = 1;// Open channel was 0 implicitely
- private long inputBufferCount;
+ private long inputOperationCount;
- private Map<Long, IBuffer> inputBufferQuarantine = new ConcurrentHashMap<Long, IBuffer>();
+ private Map<Long, IBuffer> inputOperationQuarantine = new ConcurrentHashMap<Long, IBuffer>();
+
+ private CountDownLatch openAck = new CountDownLatch(1);
public HTTPChannel()
{
}
- public long getOutputBufferCount()
+ public long getOutputOperationCount()
+ {
+ return outputOperationCount;
+ }
+
+ public void increaseOutputOperationCount()
{
- return outputBufferCount;
+ ++outputOperationCount;
}
- public void increaseOutputBufferCount()
+ public long getInputOperationCount()
{
- ++outputBufferCount;
+ return inputOperationCount;
}
- public long getInputBufferCount()
+ public void increaseInputOperationCount()
{
- return inputBufferCount;
+ ++inputOperationCount;
}
- public void increaseInputBufferCount()
+ public void quarantineInputOperation(long count, IBuffer buffer)
{
- ++inputBufferCount;
+ inputOperationQuarantine.put(count, buffer);
}
- public void quarantineInputBuffer(long count, IBuffer buffer)
+ public IBuffer getQuarantinedInputOperation(long count)
{
- inputBufferQuarantine.put(count, buffer);
+ return inputOperationQuarantine.remove(count);
}
- public IBuffer getQuarantinedInputBuffer(long count)
+ public void openAck()
{
- return inputBufferQuarantine.remove(count);
+ openAck.countDown();
+ }
+
+ public void waitForOpenAck()
+ {
+ try
+ {
+ openAck.await();
+ }
+ catch (InterruptedException ignore)
+ {
+ }
}
@Override
protected void doDeactivate() throws Exception
{
- for (IBuffer buffer : inputBufferQuarantine.values())
+ for (IBuffer buffer : inputOperationQuarantine.values())
{
buffer.release();
}
- inputBufferQuarantine.clear();
+ inputOperationQuarantine.clear();
super.doDeactivate();
}
}
diff --git a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPClientConnector.java b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPClientConnector.java
index df2b077..9775e05 100644
--- a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPClientConnector.java
+++ b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPClientConnector.java
@@ -11,11 +11,8 @@
package org.eclipse.net4j.internal.http;
import org.eclipse.net4j.channel.IChannel;
-import org.eclipse.net4j.connector.ConnectorException;
import org.eclipse.net4j.connector.ConnectorLocation;
-import org.eclipse.net4j.http.INet4jTransportServlet;
import org.eclipse.net4j.internal.util.lifecycle.Worker;
-import org.eclipse.net4j.protocol.IProtocol;
import org.eclipse.net4j.util.io.ExtendedDataInputStream;
import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
import org.eclipse.net4j.util.io.IOAdapter;
@@ -54,7 +51,7 @@ public class HTTPClientConnector extends HTTPConnector
@Override
protected void work(WorkContext context) throws Exception
{
- boolean moreBuffers = tryBuffersRequest();
+ boolean moreBuffers = tryOperationsRequest();
context.nextWork(moreBuffers ? 0 : 1000);
}
};
@@ -97,7 +94,7 @@ public class HTTPClientConnector extends HTTPConnector
public void multiplexChannel(IChannel channel)
{
super.multiplexChannel(channel);
- tryBuffersRequest();
+ tryOperationsRequest();
}
@Override
@@ -122,6 +119,7 @@ public class HTTPClientConnector extends HTTPConnector
protected void doActivate() throws Exception
{
super.doActivate();
+ poller.setDaemon(true);
poller.activate();
httpClient = createHTTPClient();
doConnect();
@@ -146,50 +144,13 @@ public class HTTPClientConnector extends HTTPConnector
return new PostMethod(url);
}
- @Override
- protected void registerChannelWithPeer(final int channelID, final short channelIndex, final IProtocol protocol)
- throws ConnectorException
- {
- try
- {
- request(new IOHandler()
- {
- public void handleOut(ExtendedDataOutputStream out) throws IOException
- {
- out.writeByte(INet4jTransportServlet.OPCODE_OPEN_CHANNEL);
- out.writeString(getConnectorID());
- out.writeInt(channelID);
- out.writeShort(channelIndex);
- out.writeString(protocol.getType());
- }
-
- public void handleIn(ExtendedDataInputStream in) throws IOException
- {
- boolean ok = in.readBoolean();
- if (!ok)
- {
- throw new ConnectorException("Could not open channel");
- }
- }
- });
- }
- catch (RuntimeException ex)
- {
- throw ex;
- }
- catch (IOException ex)
- {
- throw new ConnectorException(ex);
- }
- }
-
private void doConnect() throws IOException
{
request(new IOHandler()
{
public void handleOut(ExtendedDataOutputStream out) throws IOException
{
- out.writeByte(INet4jTransportServlet.OPCODE_CONNECT);
+ out.writeByte(Net4jTransportServlet.OPCODE_CONNECT);
out.writeString(getUserID());
}
@@ -211,7 +172,7 @@ public class HTTPClientConnector extends HTTPConnector
@Override
public void handleOut(ExtendedDataOutputStream out) throws IOException
{
- out.writeByte(INet4jTransportServlet.OPCODE_DISCONNECT);
+ out.writeByte(Net4jTransportServlet.OPCODE_DISCONNECT);
out.writeString(getConnectorID());
}
});
@@ -234,7 +195,7 @@ public class HTTPClientConnector extends HTTPConnector
method.releaseConnection();
}
- private boolean tryBuffersRequest()
+ private boolean tryOperationsRequest()
{
synchronized (poller)
{
@@ -253,24 +214,24 @@ public class HTTPClientConnector extends HTTPConnector
try
{
- final boolean moreBuffers[] = { false };
+ final boolean moreOperations[] = { false };
request(new IOHandler()
{
public void handleOut(ExtendedDataOutputStream out) throws IOException
{
- out.writeByte(INet4jTransportServlet.OPCODE_BUFFERS);
+ out.writeByte(Net4jTransportServlet.OPCODE_OPERATIONS);
out.writeString(getConnectorID());
- moreBuffers[0] = writeOutputBuffers(out);
+ moreOperations[0] = writeOutputOperations(out);
}
public void handleIn(ExtendedDataInputStream in) throws IOException
{
- readInputBuffers(in);
+ readInputOperations(in);
}
});
lastRequest = System.currentTimeMillis();
- return moreBuffers[0];
+ return moreOperations[0];
}
catch (IOException ex)
{
diff --git a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPConnector.java b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPConnector.java
index c8d7d3a..f7ad662 100644
--- a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPConnector.java
+++ b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/HTTPConnector.java
@@ -12,15 +12,18 @@ package org.eclipse.net4j.internal.http;
import org.eclipse.net4j.buffer.IBuffer;
import org.eclipse.net4j.channel.IChannel;
+import org.eclipse.net4j.connector.ConnectorException;
import org.eclipse.net4j.http.IHTTPConnector;
import org.eclipse.net4j.internal.http.bundle.OM;
import org.eclipse.net4j.internal.util.om.trace.ContextTracer;
+import org.eclipse.net4j.protocol.IProtocol;
import org.eclipse.net4j.util.io.ExtendedDataInputStream;
import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
import org.eclipse.net4j.util.security.INegotiationContext;
import org.eclipse.internal.net4j.buffer.Buffer;
import org.eclipse.internal.net4j.channel.Channel;
+import org.eclipse.internal.net4j.channel.InternalChannel;
import org.eclipse.internal.net4j.connector.Connector;
import java.io.IOException;
@@ -35,9 +38,19 @@ public abstract class HTTPConnector extends Connector implements IHTTPConnector
{
private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG, HTTPConnector.class);
+ private static final byte OPERATION_NONE = 0;
+
+ private static final byte OPERATION_OPEN = 1;
+
+ private static final byte OPERATION_OPEN_ACK = 2;
+
+ private static final byte OPERATION_CLOSE = 3;
+
+ private static final byte OPERATION_BUFFER = 4;
+
private String connectorID;
- private transient Queue<QueuedBuffer> outputQueue = new ConcurrentLinkedQueue<QueuedBuffer>();
+ private transient Queue<ChannelOperation> outputOperations = new ConcurrentLinkedQueue<ChannelOperation>();
private transient long lastTraffic;
@@ -56,9 +69,9 @@ public abstract class HTTPConnector extends Connector implements IHTTPConnector
this.connectorID = connectorID;
}
- public Queue<QueuedBuffer> getOutputQueue()
+ public Queue<ChannelOperation> getOutputQueue()
{
- return outputQueue;
+ return outputOperations;
}
public long getLastTraffic()
@@ -74,134 +87,81 @@ public abstract class HTTPConnector extends Connector implements IHTTPConnector
public void multiplexChannel(IChannel channel)
{
IBuffer buffer;
- long outputBufferCount;
+ long outputOperationCount;
HTTPChannel httpChannel = (HTTPChannel)channel;
synchronized (httpChannel)
{
Queue<IBuffer> channelQueue = httpChannel.getSendQueue();
buffer = channelQueue.poll();
- outputBufferCount = httpChannel.getOutputBufferCount();
- httpChannel.increaseOutputBufferCount();
+ outputOperationCount = httpChannel.getOutputOperationCount();
+ httpChannel.increaseOutputOperationCount();
}
if (TRACER.isEnabled())
{
- TRACER.format("Multiplexing {0} (count={1})", ((Buffer)buffer).formatContent(true), outputBufferCount);
+ TRACER.format("Multiplexing {0} (count={1})", ((Buffer)buffer).formatContent(true), outputOperationCount);
}
- outputQueue.add(new QueuedBuffer(buffer, outputBufferCount));
+ outputOperations.add(new BufferChannelOperation(httpChannel.getChannelIndex(), outputOperationCount, buffer));
}
/**
- * Writes buffers from the {@link #outputQueue} to the passed stream. After each written buffer
- * {@link #writeMoreBuffers()} is asked whether to send more buffers.
+ * Writes operations from the {@link #outputOperations} to the passed stream. After each written operation
+ * {@link #writeMoreOperations()} is asked whether to send more operations.
*
- * @return <code>true</code> if more buffers are in the {@link #outputQueue}, <code>false</code> otherwise.
+ * @return <code>true</code> if more operations are in the {@link #outputOperations}, <code>false</code> otherwise.
*/
- public boolean writeOutputBuffers(ExtendedDataOutputStream out) throws IOException
+ public boolean writeOutputOperations(ExtendedDataOutputStream out) throws IOException
{
do
{
- QueuedBuffer queuedBuffer = outputQueue.poll();
- if (queuedBuffer == null && pollAgain())
+ ChannelOperation operation = outputOperations.poll();
+ if (operation == null && pollAgain())
{
- queuedBuffer = outputQueue.poll();
+ operation = outputOperations.poll();
}
- if (queuedBuffer == null)
+ if (operation == null)
{
break;
}
- out.writeBoolean(true);
- IBuffer buffer = queuedBuffer.getBuffer();
- short channelIndex = buffer.getChannelIndex();
- out.writeShort(channelIndex);
-
- long channelCount = queuedBuffer.getChannelCount();
- out.writeLong(channelCount);
-
- buffer.flip();
- ByteBuffer byteBuffer = buffer.getByteBuffer();
- byteBuffer.position(IBuffer.HEADER_SIZE);
- int length = byteBuffer.limit() - byteBuffer.position();
- out.writeShort(length);
- for (int i = 0; i < length; i++)
- {
- byte b = byteBuffer.get();
- out.writeByte(b);
- }
-
- buffer.release();
+ operation.write(out);
markLastTraffic();
- } while (writeMoreBuffers());
+ } while (writeMoreOperations());
- out.writeBoolean(false);
- return !outputQueue.isEmpty();
+ out.writeByte(OPERATION_NONE);
+ return !outputOperations.isEmpty();
}
- public void readInputBuffers(ExtendedDataInputStream in) throws IOException
+ public void readInputOperations(ExtendedDataInputStream in) throws IOException
{
for (;;)
{
- boolean moreBuffers = in.readBoolean();
- if (!moreBuffers)
+ ChannelOperation operation;
+ byte code = in.readByte();
+ switch (code)
{
- break;
- }
-
- short channelIndex = in.readShort();
- HTTPChannel channel = (HTTPChannel)getChannel(channelIndex);
- if (channel == null)
- {
- throw new IllegalArgumentException("Invalid channelIndex: " + channelIndex);
- }
+ case OPERATION_OPEN:
+ operation = new OpenChannelOperation(in);
- long bufferCount = in.readLong();
- int length = in.readShort();
+ case OPERATION_CLOSE:
+ operation = new CloseChannelOperation(in);
- IBuffer buffer = getBufferProvider().provideBuffer();
- ByteBuffer byteBuffer = buffer.startPutting(channelIndex);
- for (int i = 0; i < length; i++)
- {
- byte b = in.readByte();
- byteBuffer.put(b);
- }
+ case OPERATION_BUFFER:
+ operation = new BufferChannelOperation(in);
+ break;
- buffer.flip();
- handleInputBuffer(channel, bufferCount, buffer);
- markLastTraffic();
- }
- }
+ case OPERATION_NONE:
+ return;
- private void handleInputBuffer(HTTPChannel channel, long bufferCount, IBuffer buffer)
- {
- synchronized (channel)
- {
- while (bufferCount < channel.getInputBufferCount())
- {
- IBuffer quarantinedBuffer = channel.getQuarantinedInputBuffer(channel.getInputBufferCount());
- if (quarantinedBuffer != null)
- {
- channel.handleBufferFromMultiplexer(buffer);
- channel.increaseInputBufferCount();
- }
- else
- {
- break;
- }
+ default:
+ throw new IOException("Invalid operation code: " + code);
}
- if (bufferCount == channel.getInputBufferCount())
- {
- channel.handleBufferFromMultiplexer(buffer);
- channel.increaseInputBufferCount();
- }
- else
- {
- channel.quarantineInputBuffer(bufferCount, buffer);
- }
+ operation.execute();
+ markLastTraffic();
}
}
@@ -218,9 +178,27 @@ public abstract class HTTPConnector extends Connector implements IHTTPConnector
}
@Override
- public void inverseRemoveChannel(int channelID, short channelIndex)
+ protected void registerChannelWithPeer(final int channelID, final short channelIndex, final IProtocol protocol)
+ throws ConnectorException
{
- super.inverseRemoveChannel(channelID, channelIndex);
+ ChannelOperation operation = new OpenChannelOperation(channelIndex, channelID, protocol.getType());
+ outputOperations.add(operation);
+
+ HTTPChannel channel = (HTTPChannel)getChannel(channelIndex);
+ channel.waitForOpenAck();
+ }
+
+ @Override
+ public boolean removeChannel(IChannel channel)
+ {
+ if (super.removeChannel(channel))
+ {
+ ChannelOperation operation = new CloseChannelOperation((HTTPChannel)channel);
+ outputOperations.add(operation);
+ return true;
+ }
+
+ return false;
}
protected boolean pollAgain()
@@ -228,7 +206,7 @@ public abstract class HTTPConnector extends Connector implements IHTTPConnector
return false;
}
- protected boolean writeMoreBuffers()
+ protected boolean writeMoreOperations()
{
return true;
}
@@ -236,16 +214,240 @@ public abstract class HTTPConnector extends Connector implements IHTTPConnector
/**
* @author Eike Stepper
*/
- private static final class QueuedBuffer
+ public abstract class ChannelOperation
{
- private IBuffer buffer;
+ private short channelIndex;
+
+ private long operationCount;
+
+ public ChannelOperation(short channelIndex, long operationCount)
+ {
+ this.channelIndex = channelIndex;
+ this.operationCount = operationCount;
+ }
+
+ public ChannelOperation(ExtendedDataInputStream in) throws IOException
+ {
+ System.out.println("READING " + getClass().getName());
+ channelIndex = in.readShort();
+ operationCount = in.readLong();
+ }
+
+ public void write(ExtendedDataOutputStream out) throws IOException
+ {
+ System.out.println("WRITING " + getClass().getName());
+ out.writeByte(getOperation());
+ out.writeShort(channelIndex);
+ out.writeLong(operationCount);
+ }
+
+ public abstract byte getOperation();
+
+ public short getChannelIndex()
+ {
+ return channelIndex;
+ }
+
+ public long getOperationCount()
+ {
+ return operationCount;
+ }
+
+ public abstract void execute();
+ }
+
+ /**
+ * @author Eike Stepper
+ */
+ private final class OpenChannelOperation extends ChannelOperation
+ {
+ private int channelID;
+
+ private String protocolID;
+
+ public OpenChannelOperation(short channelIndex, int channelID, String protocolID)
+ {
+ super(channelIndex, 0);
+ this.channelID = channelID;
+ this.protocolID = protocolID;
+ }
+
+ public OpenChannelOperation(ExtendedDataInputStream in) throws IOException
+ {
+ super(in);
+ channelID = in.readInt();
+ protocolID = in.readString();
+ }
+
+ @Override
+ public void write(ExtendedDataOutputStream out) throws IOException
+ {
+ super.write(out);
+ out.writeInt(channelID);
+ out.writeString(protocolID);
+ }
+
+ @Override
+ public byte getOperation()
+ {
+ return OPERATION_OPEN;
+ }
+
+ public int getChannelID()
+ {
+ return channelID;
+ }
- private long channelCount;
+ public String getProtocolID()
+ {
+ return protocolID;
+ }
+
+ @Override
+ public void execute()
+ {
+ boolean success = false;
+ try
+ {
+ InternalChannel channel = createChannel(channelID, getChannelIndex(), protocolID);
+ if (channel == null)
+ {
+ throw new IllegalStateException("Could not open channel");
+ }
+
+ channel.activate();
+ success = true;
+ }
+ finally
+ {
+ ChannelOperation operation = new OpenAckChannelOperation(getChannelIndex(), success);
+ outputOperations.add(operation);
+ }
+ }
+ }
- public QueuedBuffer(IBuffer buffer, long channelCount)
+ /**
+ * @author Eike Stepper
+ */
+ private final class OpenAckChannelOperation extends ChannelOperation
+ {
+ private boolean success;
+
+ public OpenAckChannelOperation(short channelIndex, boolean success)
{
+ super(channelIndex, 0);
+ this.success = success;
+ }
+
+ public OpenAckChannelOperation(ExtendedDataInputStream in) throws IOException
+ {
+ super(in);
+ success = in.readBoolean();
+ }
+
+ @Override
+ public byte getOperation()
+ {
+ return OPERATION_OPEN_ACK;
+ }
+
+ @Override
+ public void execute()
+ {
+ HTTPChannel channel = (HTTPChannel)getChannel(getChannelIndex());
+ channel.openAck();
+ }
+ }
+
+ /**
+ * @author Eike Stepper
+ */
+ private final class CloseChannelOperation extends ChannelOperation
+ {
+ public CloseChannelOperation(HTTPChannel channel)
+ {
+ super(channel.getChannelIndex(), channel.getOutputOperationCount());
+ channel.increaseOutputOperationCount();
+ }
+
+ public CloseChannelOperation(ExtendedDataInputStream in) throws IOException
+ {
+ super(in);
+ }
+
+ @Override
+ public void write(ExtendedDataOutputStream out) throws IOException
+ {
+ super.write(out);
+ }
+
+ @Override
+ public byte getOperation()
+ {
+ return OPERATION_CLOSE;
+ }
+
+ @Override
+ public void execute()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /**
+ * @author Eike Stepper
+ */
+ private final class BufferChannelOperation extends ChannelOperation
+ {
+ private IBuffer buffer;
+
+ public BufferChannelOperation(short channelIndex, long operationCount, IBuffer buffer)
+ {
+ super(channelIndex, operationCount);
this.buffer = buffer;
- this.channelCount = channelCount;
+ }
+
+ public BufferChannelOperation(ExtendedDataInputStream in) throws IOException
+ {
+ super(in);
+ int length = in.readShort();
+
+ buffer = getBufferProvider().provideBuffer();
+ ByteBuffer byteBuffer = buffer.startPutting(getChannelIndex());
+ for (int i = 0; i < length; i++)
+ {
+ byte b = in.readByte();
+ byteBuffer.put(b);
+ }
+
+ buffer.flip();
+ }
+
+ @Override
+ public void write(ExtendedDataOutputStream out) throws IOException
+ {
+ super.write(out);
+
+ buffer.flip();
+ ByteBuffer byteBuffer = buffer.getByteBuffer();
+ byteBuffer.position(IBuffer.HEADER_SIZE);
+
+ int length = byteBuffer.limit() - byteBuffer.position();
+ out.writeShort(length);
+
+ for (int i = 0; i < length; i++)
+ {
+ byte b = byteBuffer.get();
+ out.writeByte(b);
+ }
+
+ buffer.release();
+ }
+
+ @Override
+ public byte getOperation()
+ {
+ return OPERATION_BUFFER;
}
public IBuffer getBuffer()
@@ -253,9 +455,37 @@ public abstract class HTTPConnector extends Connector implements IHTTPConnector
return buffer;
}
- public long getChannelCount()
+ @Override
+ public void execute()
{
- return channelCount;
+ HTTPChannel channel = (HTTPChannel)getChannel(getChannelIndex());
+ long operationCount = getOperationCount();
+ synchronized (channel)
+ {
+ while (operationCount < channel.getInputOperationCount())
+ {
+ IBuffer quarantinedBuffer = channel.getQuarantinedInputOperation(channel.getInputOperationCount());
+ if (quarantinedBuffer != null)
+ {
+ channel.handleBufferFromMultiplexer(buffer);
+ channel.increaseInputOperationCount();
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ if (operationCount == channel.getInputOperationCount())
+ {
+ channel.handleBufferFromMultiplexer(buffer);
+ channel.increaseInputOperationCount();
+ }
+ else
+ {
+ channel.quarantineInputOperation(operationCount, buffer);
+ }
+ }
}
}
}
diff --git a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/Net4jTransportServlet.java b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/Net4jTransportServlet.java
index 278eb44..d503aea 100644
--- a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/Net4jTransportServlet.java
+++ b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/Net4jTransportServlet.java
@@ -37,6 +37,12 @@ import java.io.PrintWriter;
*/
public class Net4jTransportServlet extends HttpServlet implements INet4jTransportServlet
{
+ public static final int OPCODE_CONNECT = 1;
+
+ public static final int OPCODE_DISCONNECT = 2;
+
+ public static final int OPCODE_OPERATIONS = 3;
+
private static final long serialVersionUID = 1L;
private RequestHandler requestHandler;
@@ -90,20 +96,16 @@ public class Net4jTransportServlet extends HttpServlet implements INet4jTranspor
int opcode = servletInputStream.read();
switch (opcode)
{
- case OPCODE_CONNECT:
+ case Net4jTransportServlet.OPCODE_CONNECT:
doConnect(in, out);
break;
- case OPCODE_DISCONNECT:
+ case Net4jTransportServlet.OPCODE_DISCONNECT:
doDisconnect(in, out);
break;
- case OPCODE_OPEN_CHANNEL:
- doOpenChannel(in, out);
- break;
-
- case OPCODE_BUFFERS:
- doBuffers(in, out);
+ case Net4jTransportServlet.OPCODE_OPERATIONS:
+ doOperations(in, out);
break;
default:
@@ -196,30 +198,11 @@ public class Net4jTransportServlet extends HttpServlet implements INet4jTranspor
}
}
- protected void doOpenChannel(ExtendedDataInputStream in, ExtendedDataOutputStream out) throws ServletException,
- IOException
- {
- try
- {
- String connectorID = in.readString();
- int channelID = in.readInt();
- short channelIndex = in.readShort();
- String protocolType = in.readString();
- requestHandler.handleOpenChannel(connectorID, channelIndex, channelID, protocolType);
- out.writeBoolean(true);
- }
- catch (Exception ex)
- {
- OM.LOG.error(ex);
- out.writeBoolean(false);
- }
- }
-
- protected void doBuffers(ExtendedDataInputStream in, ExtendedDataOutputStream out) throws ServletException,
+ protected void doOperations(ExtendedDataInputStream in, ExtendedDataOutputStream out) throws ServletException,
IOException
{
String connectorID = in.readString();
- requestHandler.handleBuffers(connectorID, in, out);
+ requestHandler.handleOperations(connectorID, in, out);
}
/**
diff --git a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/tests/HTTPTest.java b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/tests/HTTPTest.java
index 68f4fe0..3f8ec48 100644
--- a/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/tests/HTTPTest.java
+++ b/plugins/org.eclipse.net4j.http/src/org/eclipse/net4j/internal/http/tests/HTTPTest.java
@@ -120,7 +120,8 @@ public class HTTPTest extends AbstractTransportTest
IntRequest request = new IntRequest(channel, 305419896);
int result = request.send();
assertEquals(305419896, result);
- channel.close();
+
+ sleep(500);
connector.deactivate();
}